深入理解Python协程:从 async/await 到 asyncio 再到 async with
异步编程并非万能解决方案,但其在处理 I/O 密集型任务时,性能提升优势显著。需深入理解同步与异步的本质差异,警惕阻塞问题。只有合理运用相关工具,才能让程序兼具高效性与优雅性,反之则可能适得其反。
前言
在 Python 3.8 以后的版本中,异步编程变得越来越重要。
本文将系统介绍
Python
标准库中的异步编程工具,带领大家掌握async/await
语法和asyncio
的使用。
概念
协程(Coroutine)
- 类比:协程就像一位 “可暂停的厨师”。
- 同步函数:厨师必须做完一道菜才能做下一道。
- 异步函数:厨师在煮菜时可以去切菜,同时处理多道菜。
- 关键点:async 定义协程,await 标记可暂停点。
事件循环(Event Loop)
- 类比:事件循环是一位 “餐厅经理”,负责分配厨师的工作。
- 当厨师(协程)在等待食材(I/O 操作)时,经理会安排他去做其他事。
- 一旦食材准备好,经理会通知厨师继续原来的工作。
任务(Task)
- 类比:任务是餐厅经理分配的 “具体工作”。
- asyncio.create_task() 相当于经理给厨师派发任务。
- 任务会被放入队列,由事件循环调度执行。
并发执行
- 类比:多个厨师(协程)同时在厨房工作,但共享同一套厨具(CPU)。
- asyncio.gather() 相当于经理同时安排多个厨师处理不同的菜。
一、模拟耗时场景
通过
time.sleep()
来模拟耗时I/O
操作场景。如读取多文件或处理多数据等耗时任务。
import time
import random
def process_item(item):
# 模拟耗时操作
print(f"处理中:{item}")
process_time = random.uniform(0.5, 2.0)
time.sleep(process_time)
return f"处理完成:{item},耗时 {process_time:.2f} 秒"
def process_all_items():
items = ["任务A", "任务B", "任务C", "任务D"]
results = []
for item in items:
result = process_item(item)
results.append(result)
return results
if __name__ == "__main__":
start = time.time()
results = process_all_items()
end = time.time()
print("\n".join(results))
print(f"总耗时:{end - start:.2f} 秒")
上述代码输出内容:
处理中:任务A
处理中:任务B
处理中:任务C
处理中:任务D
处理完成:任务A,耗时 1.97 秒
处理完成:任务B,耗时 1.28 秒
处理完成:任务C,耗时 0.66 秒
处理完成:任务D,耗时 1.80 秒
总耗时:5.72 秒
这段代码的问题显而易见:任务执行存在强顺序依赖,后一个任务必须等待前一个任务完成才能启动。
若存在 4
个任务且单任务平均耗时 1
秒,整体执行耗时将趋近于 4
秒。
二、认识 async/await 关键字
Python
通过引入async/await
语法原生支持异步编程范式。在
函数定义前添加 async 关键字即可将其声明为协程(coroutine)
。而
await
关键字则用于暂停
当前协程并等待目标协程执行完毕。
下面我们将对前述代码进行异步化改造:
import asyncio
import random
import time
async def process_item(item):
print(f"处理中:{item}")
# async 定义的函数变成了协程
process_time = random.uniform(0.5, 2.0)
# time.sleep() 换成 asyncio.sleep()
await asyncio.sleep(process_time) # await 等待异步操作完成
return f"处理完成:{item},耗时 {process_time:.2f} 秒"
async def process_all_items():
items = ["任务A", "任务B", "任务C", "任务D"]
# 创建任务列表
tasks = [
asyncio.create_task(process_item(item))
for item in items
]
print("开始处理")
results = await asyncio.gather(*tasks)
return results
async def main():
start = time.time()
results = await process_all_items()
end = time.time()
print("\n".join(results))
print(f"总耗时:{end - start:.2f} 秒")
if __name__ == "__main__":
asyncio.run(main())
上述代码输出内容:
开始处理
处理中:任务A
处理中:任务B
处理中:任务C
处理中:任务D
处理完成:任务A,耗时 1.97 秒
处理完成:任务B,耗时 0.80 秒
处理完成:任务C,耗时 0.83 秒
处理完成:任务D,耗时 1.46 秒
总耗时:1.97 秒
从
核心语法和运行机制
角度,详细解析代码执行流程:
- 被
async
关键字修饰的函数,调用时不会立即执行函数体,而是返回一个协程对象
,这是异步编程的基础单元 await
作为异步等待的核心指令,只能在async
函数内部使用,其作用是暂停当前协程执行
,直至目标协程完成后再继续推进asyncio.create_task()
用于将协程封装为可调度任务
,通过事件循环机制实现高效调度执行asyncio.gather()
允许并发
运行多个任务,并阻塞
当前协程,直到所有任务全部执行完毕asyncio.run()
负责创建事件循环,启动并运行main()
主协程,直至整个异步任务流程结束
三、添加超时控制
在实际应用中,为避免异步操作长时间阻塞导致系统资源浪费。
可使用
asyncio.wait_for()
为协程执行设置超时控制机制:
import asyncio
import random
import time
async def process_item(item):
process_time = random.uniform(0.5, 2.0)
try:
# 设置1秒超时
await asyncio.wait_for(
asyncio.sleep(process_time),
timeout=1.0
)
return f"处理完成:{item},耗时 {process_time:.2f} 秒"
except asyncio.TimeoutError:
return f"处理超时:{item}"
async def main():
items = ["任务A", "任务B", "任务C", "任务D"]
tasks = [
asyncio.create_task(process_item(item))
for item in items
]
start = time.time()
results = await asyncio.gather(*tasks, return_exceptions=True)
end = time.time()
print("\n".join(results))
print(f"总耗时:{end - start:.2f} 秒")
if __name__ == "__main__":
asyncio.run(main())
上述代码输出内容:
处理超时:任务A
处理完成:任务B,耗时 0.94 秒
处理超时:任务C
处理完成:任务D,耗时 0.78 秒
总耗时:1.00 秒
四、使用异步上下文管理器
在
Python
异步编程中,async with
语句可实现资源的异步管理,其原理与with
语句类似但适用于异步场景。若要让类支持异步上下文管理,
需实现以下核心方法:
__aenter__()
:定义进入异步
上下文时执行的操作(如异步资源获取),需返回awaitable对象__aexit__()
:定义退出异步
上下文时的清理逻辑(如释放资源、处理异常),同样需为awaitable
对象- 通过
async with
配合异步上下文管理器,可确保异步操作中资源的安全释放,避免因异常或任务取消导致的资源泄漏。
import asyncio
import random
class AsyncResource:
async def __aenter__(self):
# 异步初始化资源
print("正在初始化资源...")
await asyncio.sleep(0.1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
# 异步清理资源
print("正在清理资源...")
await asyncio.sleep(0.1)
async def process(self, item):
# 异步处理任务
print(f"正在处理任务:{item}")
process_time = random.uniform(0.5, 2.0)
await asyncio.sleep(process_time)
return f"处理完成:{item},耗时 {process_time:.2f} 秒"
async def main():
items = ["任务A", "任务B", "任务C"]
async with AsyncResource() as resource:
tasks = [
asyncio.create_task(resource.process(item))
for item in items
]
results = await asyncio.gather(*tasks)
print("\n".join(results))
if __name__ == "__main__":
asyncio.run(main())
上述代码输出内容:
正在初始化资源...
正在处理任务:任务A
正在处理任务:任务B
正在处理任务:任务C
正在清理资源...
处理完成:任务A,耗时 1.31 秒
处理完成:任务B,耗时 0.77 秒
处理完成:任务C,耗时 0.84 秒
五、使用事件循环执行阻塞操作
在异步编程中,我们可能会遇到一些无法避免的阻塞操作(比如调用传统的同步API)。
这时,
asyncio.get_running_loop()
和run_in_executor
就显得特别重要。
在异步编程场景中,若需处理不可避免的阻塞操作(如调用传统同步 API)。
asyncio.get_running_loop()
与run_in_executor
组合将发挥关键作用:
- 通过
asyncio.get_running_loop()
可获取当前运行的事件循环- 配合
loop.run_in_executor()
方法,能将阻塞操作委托给线程池或进程池执行,避免阻塞事件循环导致整个异步流程卡顿。
这种方式实现了同步阻塞操作的异步化转换
,确保I/O
密集型任务与CPU
密集型任务的高效协同。
import asyncio
import time
import requests # 一个同步的HTTP客户端库
async def blocking_operation():
# 获取当前事件循环
loop = asyncio.get_running_loop()
# 在线程池中执行阻塞操作
result = await loop.run_in_executor(
None, # 使用默认的线程池执行器
requests.get, # 要执行的阻塞函数
'http://httpbin.org/delay/1' # 函数参数
)
return result.status_code
async def non_blocking_operation():
await asyncio.sleep(1)
return "非阻塞操作完成"
async def main():
# 同时执行阻塞和非阻塞操作
tasks = [
asyncio.create_task(blocking_operation()),
asyncio.create_task(non_blocking_operation())
]
start = time.time()
results = await asyncio.gather(*tasks)
end = time.time()
print(f"操作结果:{results}")
print(f"总耗时:{end - start:.2f} 秒")
if __name__ == "__main__":
asyncio.run(main())
上述代码输出内容:
操作结果:[200, '非阻塞操作完成']
总耗时:1.99 秒
核心原理与阻塞风险
该示例直观呈现了异步程序处理同步操作的关键逻辑:
- 阻塞本质:
requests.get() 作为同步 API,会直接阻塞当前线程;
而事件循环默认运行在主线程,若在协程中直接调用同步操作,将导致整个事件循环卡顿。 - 连锁影响:
事件循环阻塞会使其他异步任务无法调度执行
,彻底丧失异步编程的并发优势。
异步化解决方案
run_in_executor 的核心价值在于:
- 线程隔离:将阻塞操作委托给
线程池执行
,主线程的事件循环可继续调度其他任务; - 结果回调:线程池任务完成后,结果会通过
事件循环回调机制
返回,实现非阻塞式处理。
最佳实践指南
场景 | 推荐方案 | 优势说明 |
---|---|---|
普通 I/O 操作 |
优先使用异步原生库 (如 aiohttp) |
避免线程切换开销,性能更优 |
必须调用同步库 | 使用 run_in_executor 委托线程池 |
防止事件循环阻塞,维持程序响应性 |
CPU 密集型任务 |
用 run_in_executor 委托进程池 |
利用多进程突破 CPU 单核瓶颈 |
六、终止异步操作
在异步编程中,合理处理任务取消是构建健壮系统的关键环节。
下场景常需主动终止异步操作:
- 用户主动中断(如点击取消按钮)
- 操作超时自动终止
- 资源不足时的任务降级
- 系统关闭时的优雅退出
import asyncio
import random
async def long_operation(name):
try:
print(f"{name} 开始执行")
while True: # 模拟一个持续运行的操作
await asyncio.sleep(0.5)
print(f"{name} 正在执行...")
except asyncio.CancelledError:
print(f"{name} 被取消了")
raise # 重要:继续传播取消信号
async def main():
# 创建三个任务
task1 = asyncio.create_task(long_operation("任务1"))
task2 = asyncio.create_task(long_operation("任务2"))
task3 = asyncio.create_task(long_operation("任务3"))
# 等待1秒后取消task1
await asyncio.sleep(1)
task1.cancel()
# 等待2秒后取消其余任务
await asyncio.sleep(1)
task2.cancel()
task3.cancel()
try:
# 等待所有任务完成或被取消
await asyncio.gather(task1, task2, task3, return_exceptions=True)
except asyncio.CancelledError:
print("某个任务被取消了")
if __name__ == "__main__":
asyncio.run(main())
上述代码输出内容:
任务1 开始执行
任务2 开始执行
任务3 开始执行
任务1 正在执行...
任务2 正在执行...
任务3 正在执行...
任务1 被取消了
任务2 正在执行...
任务3 正在执行...
任务2 正在执行...
任务3 正在执行...
任务2 被取消了
任务3 被取消了
最佳实践建议:
- 在任务实现中通过
asyncio.CancelledError
异常捕获取消信号 - 使用
asyncio.wait_for()
设置操作超时上限 - 通过
task.cancel()
发送取消请求而非直接销毁任务 - 在
async with
上下文管理器中封装资源清理逻辑 - 使用
asyncio.gather()
的return_exceptions=True
参数避免取消操作传播
七、为什么需要 async/await?
协程(Coroutine)是一种特殊的函数。
它可以在执行过程中暂停,并在之后从暂停的地方继续执行。
当我们使用
async
定义一个函数时,我们实际上是在定义一个协程:
import asyncio
# 这是一个普通函数
def normal_function():
return "Hello"
# 这是一个协程
async def coroutine_function():
await asyncio.sleep(1)
return "Hello"
# 让我们看看它们的区别
print(normal_function) # <function normal_function at 0x1052cc040>
print(coroutine_function) # <function coroutine_function at 0x1054b9790>
# 调用它们的结果不同
print(normal_function()) # 直接返回: "Hello"
print(coroutine_function()) # RuntimeWarning: coroutine 'coroutine_function' was never awaited
# <coroutine object coroutine_function at 0x105962e40>
八、await 如何与事件循环协作
协程(Coroutine)的核心在于它可以在执行过程中主动交出控制权,让其他代码有机会执行。
让我们通过一个详细的例子来理解这个过程:
import asyncio
async def task1():
print("任务1:开始")
print("任务1:准备休眠")
await asyncio.sleep(2) # 关键点1:交出控制权
print("任务1:休眠结束")
async def task2():
print("任务2:开始")
print("任务2:准备休眠")
await asyncio.sleep(1) # 关键点2:交出控制权
print("任务2:休眠结束")
async def main():
# 同时执行两个任务
await asyncio.gather(task1(), task2())
asyncio.run(main())
上述代码输出内容:
任务1:开始
任务1:准备休眠
任务2:开始
任务2:准备休眠
任务2:休眠结束 # 1秒后
任务1:休眠结束 # 2秒后
让我们详细解释执行过程:
- 当程序遇到
await asyncio.sleep(2)
时:
- 这个
sleep
操作被注册到事件循环中 Python
记录当前的执行位置task1
主动交出控制权(task1 并没有停止运行,而是被暂停了,等待之后恢复)
- 事件循环
接管控制权
后:
- 寻找其他可以执行的协程(这里是 task2)
- 开始执行
task2
,直到遇到await asyncio.sleep(1)
task2
也交出控制权,被暂停
- 事件循环
继续
工作:
- 管理一个计时器,追踪这两个
sleep
操作 - 1秒后,发现
task2
的sleep
时间到了 - 恢复
task2
的执行,打印"任务2:休眠结束" - 2秒到时,
恢复 task1 的执行
,打印"任务1:休眠结束"
这就像是一个指挥家(事件循环)在指挥一个管弦乐队(多个协程):
- 当某个乐器(协程)需要休息时,它举手示意(await)
- 指挥家看到后,立即指挥其他乐器演奏
- 当休息时间到了,指挥家会示意这个乐器继续演奏
代码验证:
import asyncio
import time
async def report_time(name, sleep_time):
print(f"{time.strftime('%H:%M:%S')} - {name}开始")
await asyncio.sleep(sleep_time)
print(f"{time.strftime('%H:%M:%S')} - {name}结束")
async def main():
# 同时执行多个任务
await asyncio.gather(
report_time("任务A", 2),
report_time("任务B", 1),
report_time("任务C", 3)
)
asyncio.run(main())
上述代码输出内容:
00:19:26 - 任务A开始
00:19:26 - 任务B开始
00:19:26 - 任务C开始
00:19:27 - 任务B结束
00:19:28 - 任务A结束
00:19:29 - 任务C结束
这种机制的优势在于:
单线程执行,无线程切换开销
- 原理:
Python 异步编程基于事件循环(Event Loop),在单线程内通过协程的暂停和恢复实现并发。
- 对比:线程切换需要保存 / 恢复上下文,而协程切换由程序控制,开销极低。
协程主动交出控制权
- 关键点:
await 是协程的暂停点,遇到 await 时当前协程挂起,事件循环调度其他协程执行。
- 对比:线程由操作系统强制调度(抢占式),而协程是协作式的。
代码可读性与错误处理
- 线性代码结构,避免了回调地狱(Callback Hell)。
- 直接使用 try/except 捕获异常,无需嵌套处理。
理解了这个机制,我们就能更好地使用异步编程:
- 在
await
的时候,其他协程有机会执行
- 耗时操作应该是真正的
异步操作(比如 asyncio.sleep )
- 不要在协程中使用阻塞操作,那样会
卡住整个事件循环,影响到主进程
九、总结
异步编程并非万能解决方案,但其在处理
I/O
密集型任务时,性能提升优势显著。需深入理解同步与异步的本质差异,警惕阻塞问题。
只有合理运用相关工具,才能让程序兼具高效性与优雅性,反之则可能适得其反。
更多推荐
所有评论(0)