前言

在 Python 3.8 以后的版本中,异步编程变得越来越重要。

本文将系统介绍 Python 标准库中的异步编程工具,带领大家掌握 async/await 语法和 asyncio 的使用。


概念

协程(Coroutine)

  • 类比:协程就像一位 “可暂停的厨师”。
  • 同步函数:厨师必须做完一道菜才能做下一道
  • 异步函数:厨师在煮菜时可以去切菜,同时处理多道菜
  • 关键点:async 定义协程,await 标记可暂停点。

事件循环(Event Loop)

  • 类比:事件循环是一位 “餐厅经理”,负责分配厨师的工作。
  • 当厨师(协程)在等待食材(I/O 操作)时,经理会安排他去做其他事
  • 一旦食材准备好,经理会通知厨师继续原来的工作。

任务(Task)

  • 类比:任务是餐厅经理分配的 “具体工作”。
  • asyncio.create_task() 相当于经理给厨师派发任务。
  • 任务会被放入队列,由事件循环调度执行

并发执行

  • 类比:多个厨师(协程)同时在厨房工作,但共享同一套厨具CPU)。
  • asyncio.gather() 相当于经理同时安排多个厨师处理不同的菜

一、模拟耗时场景

通过time.sleep()来模拟耗时 I/O 操作场景。

如读取多文件或处理多数据等耗时任务。

import time
import random

def process_item(item):
    # 模拟耗时操作
    print(f"处理中:{item}")
    process_time = random.uniform(0.5, 2.0)
    time.sleep(process_time)
    return f"处理完成:{item},耗时 {process_time:.2f} 秒"

def process_all_items():
    items = ["任务A", "任务B", "任务C", "任务D"]
    results = []
    for item in items:
        result = process_item(item)
        results.append(result)
    return results

if __name__ == "__main__":
    start = time.time()
    results = process_all_items()
    end = time.time()
    
    print("\n".join(results))
    print(f"总耗时:{end - start:.2f} 秒")

上述代码输出内容:

处理中:任务A
处理中:任务B
处理中:任务C
处理中:任务D
处理完成:任务A,耗时 1.97 秒
处理完成:任务B,耗时 1.28 秒
处理完成:任务C,耗时 0.66 秒
处理完成:任务D,耗时 1.80 秒
总耗时:5.72 秒

这段代码的问题显而易见:任务执行存在强顺序依赖,后一个任务必须等待前一个任务完成才能启动。
若存在 4 个任务且单任务平均耗时 1 秒,整体执行耗时将趋近于 4 秒。


二、认识 async/await 关键字

Python 通过引入 async/await 语法原生支持异步编程范式。

函数定义前添加 async 关键字即可将其声明为协程(coroutine)

await 关键字则用于暂停当前协程并等待目标协程执行完毕。

下面我们将对前述代码进行异步化改造:

import asyncio
import random
import time

async def process_item(item):
    print(f"处理中:{item}")
    # async 定义的函数变成了协程
    process_time = random.uniform(0.5, 2.0)
    # time.sleep() 换成 asyncio.sleep()
    await asyncio.sleep(process_time)  # await 等待异步操作完成
    return f"处理完成:{item},耗时 {process_time:.2f} 秒"

async def process_all_items():
    items = ["任务A", "任务B", "任务C", "任务D"]
    # 创建任务列表
    tasks = [
        asyncio.create_task(process_item(item))
        for item in items
    ]
    print("开始处理")
    results = await asyncio.gather(*tasks)
    return results

async def main():
    start = time.time()
    results = await process_all_items()
    end = time.time()
    
    print("\n".join(results))
    print(f"总耗时:{end - start:.2f} 秒")

if __name__ == "__main__":
    asyncio.run(main())

上述代码输出内容:

开始处理
处理中:任务A
处理中:任务B
处理中:任务C
处理中:任务D
处理完成:任务A,耗时 1.97 秒
处理完成:任务B,耗时 0.80 秒
处理完成:任务C,耗时 0.83 秒
处理完成:任务D,耗时 1.46 秒
总耗时:1.97 秒

核心语法和运行机制角度,详细解析代码执行流程:

  1. async 关键字修饰的函数,调用时不会立即执行函数体,而是返回一个协程对象,这是异步编程的基础单元
  2. await 作为异步等待的核心指令,只能在 async 函数内部使用,其作用是暂停当前协程执行,直至目标协程完成后再继续推进
  3. asyncio.create_task() 用于将协程封装为可调度任务,通过事件循环机制实现高效调度执行
  4. asyncio.gather() 允许并发运行多个任务,并阻塞当前协程,直到所有任务全部执行完毕
  5. asyncio.run() 负责创建事件循环,启动并运行 main() 主协程,直至整个异步任务流程结束

三、添加超时控制

在实际应用中,为避免异步操作长时间阻塞导致系统资源浪费。

可使用 asyncio.wait_for() 为协程执行设置超时控制机制:

import asyncio
import random
import time

async def process_item(item):
    process_time = random.uniform(0.5, 2.0)
    try:
        # 设置1秒超时
        await asyncio.wait_for(
            asyncio.sleep(process_time),
            timeout=1.0
        )
        return f"处理完成:{item},耗时 {process_time:.2f} 秒"
    except asyncio.TimeoutError:
        return f"处理超时:{item}"

async def main():
    items = ["任务A", "任务B", "任务C", "任务D"]
    tasks = [
        asyncio.create_task(process_item(item))
        for item in items
    ]
    
    start = time.time()
    results = await asyncio.gather(*tasks, return_exceptions=True)
    end = time.time()
    
    print("\n".join(results))
    print(f"总耗时:{end - start:.2f} 秒")

if __name__ == "__main__":
    asyncio.run(main())

上述代码输出内容:

处理超时:任务A
处理完成:任务B,耗时 0.94 秒
处理超时:任务C
处理完成:任务D,耗时 0.78 秒
总耗时:1.00 秒

四、使用异步上下文管理器

Python 异步编程中,async with 语句可实现资源的异步管理,其原理与 with 语句类似但适用于异步场景。

若要让类支持异步上下文管理,需实现以下核心方法:

  • __aenter__():定义进入异步上下文时执行的操作(如异步资源获取),需返回awaitable对象
  • __aexit__():定义退出异步上下文时的清理逻辑(如释放资源、处理异常),同样需为 awaitable 对象
  • 通过 async with 配合异步上下文管理器,可确保异步操作中资源的安全释放,避免因异常或任务取消导致的资源泄漏。
import asyncio
import random

class AsyncResource:
    async def __aenter__(self):
        # 异步初始化资源
        print("正在初始化资源...")
        await asyncio.sleep(0.1)
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        # 异步清理资源
        print("正在清理资源...")
        await asyncio.sleep(0.1)
    
    async def process(self, item):
        # 异步处理任务
        print(f"正在处理任务:{item}")
        process_time = random.uniform(0.5, 2.0)
        await asyncio.sleep(process_time)
        return f"处理完成:{item},耗时 {process_time:.2f} 秒"

async def main():
    items = ["任务A", "任务B", "任务C"]
    
    async with AsyncResource() as resource:
        tasks = [
            asyncio.create_task(resource.process(item))
            for item in items
        ]
        results = await asyncio.gather(*tasks)
    
    print("\n".join(results))

if __name__ == "__main__":
    asyncio.run(main())

上述代码输出内容:

正在初始化资源...
正在处理任务:任务A
正在处理任务:任务B
正在处理任务:任务C
正在清理资源...
处理完成:任务A,耗时 1.31 秒
处理完成:任务B,耗时 0.77 秒
处理完成:任务C,耗时 0.84 秒

五、使用事件循环执行阻塞操作

在异步编程中,我们可能会遇到一些无法避免的阻塞操作(比如调用传统的同步API)。

这时,asyncio.get_running_loop()run_in_executor 就显得特别重要

在异步编程场景中,若需处理不可避免的阻塞操作(如调用传统同步 API)。
asyncio.get_running_loop()run_in_executor 组合将发挥关键作用:

  • 通过 asyncio.get_running_loop() 可获取当前运行的事件循环
  • 配合 loop.run_in_executor() 方法,能将阻塞操作委托给线程池或进程池执行,避免阻塞事件循环导致整个异步流程卡顿。
    这种方式实现了同步阻塞操作的异步化转换,确保 I/O 密集型任务与 CPU 密集型任务的高效协同。
import asyncio
import time
import requests  # 一个同步的HTTP客户端库

async def blocking_operation():
    # 获取当前事件循环
    loop = asyncio.get_running_loop()

    # 在线程池中执行阻塞操作
    result = await loop.run_in_executor(
        None,  # 使用默认的线程池执行器
        requests.get,  # 要执行的阻塞函数
        'http://httpbin.org/delay/1'  # 函数参数
    )
    return result.status_code

async def non_blocking_operation():
    await asyncio.sleep(1)
    return "非阻塞操作完成"

async def main():
    # 同时执行阻塞和非阻塞操作
    tasks = [
        asyncio.create_task(blocking_operation()),
        asyncio.create_task(non_blocking_operation())
    ]
    
    start = time.time()
    results = await asyncio.gather(*tasks)
    end = time.time()
    
    print(f"操作结果:{results}")
    print(f"总耗时:{end - start:.2f} 秒")

if __name__ == "__main__":
    asyncio.run(main())
		

上述代码输出内容:

操作结果:[200, '非阻塞操作完成']
总耗时:1.99 秒
核心原理与阻塞风险

该示例直观呈现了异步程序处理同步操作的关键逻辑:

  • 阻塞本质:requests.get() 作为同步 API,会直接阻塞当前线程;而事件循环默认运行在主线程,若在协程中直接调用同步操作,将导致整个事件循环卡顿。
  • 连锁影响:事件循环阻塞会使其他异步任务无法调度执行,彻底丧失异步编程的并发优势。
异步化解决方案

run_in_executor 的核心价值在于:

  • 线程隔离:将阻塞操作委托给线程池执行,主线程的事件循环可继续调度其他任务;
  • 结果回调:线程池任务完成后,结果会通过事件循环回调机制返回,实现非阻塞式处理。
最佳实践指南
场景 推荐方案 优势说明
普通 I/O 操作 优先使用异步原生库(如 aiohttp) 避免线程切换开销,性能更优
必须调用同步库 使用 run_in_executor 委托线程池 防止事件循环阻塞,维持程序响应性
CPU 密集型任务 run_in_executor 委托进程池 利用多进程突破 CPU 单核瓶颈

六、终止异步操作

在异步编程中,合理处理任务取消是构建健壮系统的关键环节。

下场景常需主动终止异步操作:

  • 用户主动中断(如点击取消按钮)
  • 操作超时自动终止
  • 资源不足时的任务降级
  • 系统关闭时的优雅退出
import asyncio
import random

async def long_operation(name):
    try:
        print(f"{name} 开始执行")
        while True:  # 模拟一个持续运行的操作
            await asyncio.sleep(0.5)
            print(f"{name} 正在执行...")
    except asyncio.CancelledError:
        print(f"{name} 被取消了")
        raise  # 重要:继续传播取消信号

async def main():
    # 创建三个任务
    task1 = asyncio.create_task(long_operation("任务1"))
    task2 = asyncio.create_task(long_operation("任务2"))
    task3 = asyncio.create_task(long_operation("任务3"))
    
    # 等待1秒后取消task1
    await asyncio.sleep(1)
    task1.cancel()
    
    # 等待2秒后取消其余任务
    await asyncio.sleep(1)
    task2.cancel()
    task3.cancel()
    
    try:
        # 等待所有任务完成或被取消
        await asyncio.gather(task1, task2, task3, return_exceptions=True)
    except asyncio.CancelledError:
        print("某个任务被取消了")

if __name__ == "__main__":
    asyncio.run(main())

上述代码输出内容:

任务1 开始执行
任务2 开始执行
任务3 开始执行
任务1 正在执行...
任务2 正在执行...
任务3 正在执行...
任务1 被取消了
任务2 正在执行...
任务3 正在执行...
任务2 正在执行...
任务3 正在执行...
任务2 被取消了
任务3 被取消了

最佳实践建议:

  1. 在任务实现中通过 asyncio.CancelledError 异常捕获取消信号
  2. 使用 asyncio.wait_for() 设置操作超时上限
  3. 通过 task.cancel() 发送取消请求而非直接销毁任务
  4. async with 上下文管理器中封装资源清理逻辑
  5. 使用 asyncio.gather()return_exceptions=True 参数避免取消操作传播

七、为什么需要 async/await?

协程(Coroutine)是一种特殊的函数。

它可以在执行过程中暂停,并在之后从暂停的地方继续执行。

当我们使用 async 定义一个函数时,我们实际上是在定义一个协程:

import asyncio

# 这是一个普通函数
def normal_function():
    return "Hello"

# 这是一个协程
async def coroutine_function():
    await asyncio.sleep(1)
    return "Hello"

# 让我们看看它们的区别
print(normal_function)      # <function normal_function at 0x1052cc040>
print(coroutine_function)   # <function coroutine_function at 0x1054b9790>

# 调用它们的结果不同
print(normal_function())    # 直接返回: "Hello"
print(coroutine_function()) # RuntimeWarning: coroutine 'coroutine_function' was never awaited
# <coroutine object coroutine_function at 0x105962e40>

八、await 如何与事件循环协作

协程(Coroutine)的核心在于它可以在执行过程中主动交出控制权,让其他代码有机会执行。

让我们通过一个详细的例子来理解这个过程:

import asyncio

async def task1():
    print("任务1:开始")
    print("任务1:准备休眠")
    await asyncio.sleep(2)  # 关键点1:交出控制权
    print("任务1:休眠结束")

async def task2():
    print("任务2:开始")
    print("任务2:准备休眠")
    await asyncio.sleep(1)  # 关键点2:交出控制权
    print("任务2:休眠结束")

async def main():
    # 同时执行两个任务
    await asyncio.gather(task1(), task2())

asyncio.run(main())

上述代码输出内容:

任务1:开始
任务1:准备休眠
任务2:开始
任务2:准备休眠
任务2:休眠结束    # 1秒后
任务1:休眠结束    # 2秒后

让我们详细解释执行过程:

  1. 当程序遇到 await asyncio.sleep(2) 时:
  • 这个 sleep 操作被注册到事件循环中
  • Python 记录当前的执行位置
  • task1 主动交出控制权(task1 并没有停止运行,而是被暂停了,等待之后恢复)
  1. 事件循环接管控制权后:
  • 寻找其他可以执行的协程(这里是 task2)
  • 开始执行 task2,直到遇到 await asyncio.sleep(1)
  • task2 也交出控制权,被暂停
  1. 事件循环继续工作:
  • 管理一个计时器,追踪这两个 sleep 操作
  • 1秒后,发现 task2sleep 时间到了
  • 恢复 task2 的执行,打印"任务2:休眠结束"
  • 2秒到时,恢复 task1 的执行,打印"任务1:休眠结束"

这就像是一个指挥家(事件循环)在指挥一个管弦乐队(多个协程):

  1. 当某个乐器(协程)需要休息时,它举手示意(await
  2. 指挥家看到后,立即指挥其他乐器演奏
  3. 当休息时间到了,指挥家会示意这个乐器继续演奏

代码验证:

import asyncio
import time

async def report_time(name, sleep_time):
    print(f"{time.strftime('%H:%M:%S')} - {name}开始")
    await asyncio.sleep(sleep_time)
    print(f"{time.strftime('%H:%M:%S')} - {name}结束")

async def main():
    # 同时执行多个任务
    await asyncio.gather(
        report_time("任务A", 2),
        report_time("任务B", 1),
        report_time("任务C", 3)
    )

asyncio.run(main())

上述代码输出内容:

00:19:26 - 任务A开始
00:19:26 - 任务B开始
00:19:26 - 任务C开始
00:19:27 - 任务B结束
00:19:28 - 任务A结束
00:19:29 - 任务C结束

这种机制的优势在于:

单线程执行,无线程切换开销

  • 原理:Python 异步编程基于事件循环(Event Loop),在单线程内通过协程的暂停和恢复实现并发。
  • 对比:线程切换需要保存 / 恢复上下文,而协程切换由程序控制,开销极低。

协程主动交出控制权

  • 关键点:await 是协程的暂停点,遇到 await 时当前协程挂起,事件循环调度其他协程执行。
  • 对比:线程由操作系统强制调度(抢占式),而协程是协作式的。

代码可读性与错误处理

  • 线性代码结构,避免了回调地狱(Callback Hell)
  • 直接使用 try/except 捕获异常,无需嵌套处理。

理解了这个机制,我们就能更好地使用异步编程:

  • await 的时候,其他协程有机会执行
  • 耗时操作应该是真正的异步操作(比如 asyncio.sleep )
  • 不要在协程中使用阻塞操作,那样会卡住整个事件循环,影响到主进程

九、总结

异步编程并非万能解决方案,但其在处理 I/O 密集型任务时,性能提升优势显著。

需深入理解同步与异步的本质差异,警惕阻塞问题。

只有合理运用相关工具,才能让程序兼具高效性与优雅性,反之则可能适得其反。

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐