本周读到了一篇文章《Python has had async for 10 years – why isn’t it more popular?》,深有感触,文章提出了一个值得深思的问题:Python 早在 2015 年就引入了 asyncawait 关键字,距今已近十年,为什么异步编程在 Python 社区中的普及仍不及预期?

这篇文章中指出了三大困境,也让我这个从Python2.7时代走过来的开发者深有共鸣。今天就结合我的实践经历,聊聊Python异步编程的过去、现在和未来。

异步编程的三座大山

困境一:适用场景受限

asyncio 在网络 I/O 方面表现出色,但对其他类型的 I/O 支持有限。即使是看似异步的文件操作,底层仍然依赖线程池来模拟异步行为。

1
2
3
# 看似异步,实际仍使用线程池
async with aiofiles.open('large_file.txt', mode='r') as f:
contents = await f.read() # 底层依然是线程池实现

生产环境中出于安全考虑,往往限制使用 io_uring 等系统级异步机制,进一步缩小了 asyncio 的适用范围。

困境二:GIL 的根本限制

无论异步代码执行得多么高效,都无法突破 GIL(全局解释器锁)的天花板。async/await 虽能优化 I/O 密集型任务,但对 CPU 密集型操作帮助有限。

更麻烦的是错误处理——忘记 await 可能导致协程静默失败,这种隐形问题调试起来很棘手。

1
2
3
4
5
6
7
8
# 单次调用性能相同
sync_result = get_data_sync() # 耗时 1 秒
async_result = await get_data_async() # 同样耗时 1 秒

# 只有在并发场景下异步才显示优势
results = await asyncio.gather(*[
get_data_async() for _ in range(10)
]) # 总耗时约 1 秒(而非 10 秒)

困境三:双 API 的维护负担

为了兼容性,开发者往往需要同时维护同步和异步两套 API,大大增加了开发和维护成本。

1
2
3
4
5
6
7
8
9
10
11
12
# 同步版本
@property
def user_records(self) -> list[Record]:
return fetch_from_database(self.user_id)

# 异步版本 - API 设计不一致
async def get_user_records(self) -> list[Record]:
return await async_fetch_from_database(self.user_id)

# 使用时的心理负担:需要记住哪些方法需要 await
user.name # 普通属性
records = await user.get_user_records() # 异步方法

从魔法方法无法异步化,到属性访问方式的不一致,再到代码重复和测试复杂度增加,都让开发者望而却步。

异步编程的演进之路

什么是异步编程?

异步编程是一种非阻塞的编程范式,允许程序在等待某些操作(如I/O)完成时继续执行其他任务,而不是干等着。

同步 VS 异步的直观对比:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import time
import asyncio

# 同步版本 - 顺序执行
def sync_demo():
def task(name, delay):
time.sleep(delay)
print(f"{name} 完成")

start = time.time()
task("任务1", 1)
task("任务2", 1)
task("任务3", 1)
print(f"同步总耗时: {time.time() - start:.2f}秒")

# 异步版本 - 并发执行
async def async_demo():
async def task(name, delay):
await asyncio.sleep(delay)
print(f"{name} 完成")

start = time.time()
await asyncio.gather(
task("任务1", 1),
task("任务2", 1),
task("任务3", 1)
)
print(f"异步总耗时: {time.time() - start:.2f}秒")

# 运行对比
sync_demo() # 输出:总耗时约3秒
asyncio.run(async_demo()) # 输出:总耗时约1秒

三种实现方式的变迁

方式一:回调函数(Callback)——原始方案

兼容性好,但代码难以维护(坊间流传一个词:回调地狱)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import requests
from concurrent.futures import ThreadPoolExecutor
import time

def callback_example():
"""回调函数示例 - 早期的异步解决方案"""

def download_complete(future):
"""下载完成时的回调函数"""
try:
result = future.result()
print(f"下载完成,状态码: {result.status_code}")
print(f"获取数据长度: {len(result.text)} 字符")
except Exception as e:
print(f"下载失败: {e}")

print("开始回调函数示例...")
start_time = time.time()

with ThreadPoolExecutor(max_workers=3) as executor:
# 提交多个任务
futures = []
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/2',
'https://httpbin.org/delay/1'
]

for url in urls:
future = executor.submit(requests.get, url)
future.add_done_callback(download_complete)
futures.append(future)

# 等待所有任务完成
for future in futures:
future.result()

print(f"回调函数示例总耗时: {time.time() - start_time:.2f}秒")

# 运行示例
callback_example()

方式二:生成器(Generator)—过渡方案

虽然避免了回调地狱,代码变的更清晰了,但需要手动调度,实现变的更复杂了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import time
from types import coroutine

def generator_example():
"""生成器实现协程 - Python 3.4时代的解决方案"""

@coroutine
def async_sleep(delay):
"""模拟异步sleep"""
start = time.time()
while time.time() - start < delay:
yield
return f"休眠了{delay}秒"

def run_coroutines():
"""运行多个协程"""
tasks = [async_sleep(1), async_sleep(2), async_sleep(1)]
results = []

while tasks:
current = tasks.pop(0)
try:
next(current)
tasks.append(current) # 如果还没完成,放回队列
except StopIteration as e:
results.append(e.value) # 协程完成,获取返回值

return results

print("开始生成器协程示例...")
start_time = time.time()
results = run_coroutines()
print(f"生成器协程结果: {results}")
print(f"生成器示例总耗时: {time.time() - start_time:.2f}秒")

# 运行示例
generator_example()

方式三:Async/Await——现代方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import asyncio
import time

async def modern_async_example():
"""现代async/await示例"""

async def fetch_data(url, delay):
"""模拟异步数据获取"""
print(f"开始获取 {url}")
await asyncio.sleep(delay)
print(f"完成获取 {url}")
return f"{url} 的数据(延迟{delay}秒)"

print("开始现代async/await示例...")
start_time = time.time()

# 并发执行多个异步任务
results = await asyncio.gather(
fetch_data("https://api1.com", 1),
fetch_data("https://api2.com", 2),
fetch_data("https://api3.com", 1),
fetch_data("https://api4.com", 1)
)

print("所有任务完成结果:")
for result in results:
print(f" - {result}")

print(f"现代async/await总耗时: {time.time() - start_time:.2f}秒")

# 运行示例
asyncio.run(modern_async_example())

asyncio:现代异步解决方案

异步编程的核心优势在于:当一个协程等待 I/O 操作时,事件循环可以切换执行其他协程,从而提高整体吞吐量

asyncio的核心概念

协程(Coroutine):异步函数

  • 是异步编程的基础模块,使用async def定义
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import asyncio

# 定义一个简单的协程
async def say_hello(name):
print(f"开始向 {name} 问好")
await asyncio.sleep(1) # 模拟耗时操作
print(f"你好, {name}!")
return f"{name} 的问候完成"

# 运行协程的三种方式
async def main():
# 方式1: 直接await
result1 = await say_hello("Alice")

# 方式2: 使用asyncio.create_task
task = asyncio.create_task(say_hello("Bob"))
result2 = await task

# 方式3: 使用asyncio.gather并发执行
results = await asyncio.gather(
say_hello("Charlie"),
say_hello("David"),
say_hello("Eve")
)

return results

# 运行主函数
results = asyncio.run(main())
print(results)

事件循环(Event Loop)

  • 异步引擎,也是asyncio的核心,负责调度和执行协程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio

async def demo_event_loop():
print("事件循环demo")

# 获取当前事件循环
loop = asyncio.get_running_loop()
print(f"当前时间: {loop.time()}")

# 安排回调函数
def callback(name):
print(f"回调函数被调用: {name}")

# 安排定时回调
loop.call_later(2, callback, "2秒后")
loop.call_soon(callback, "立即")

await asyncio.sleep(3)
print("演示结束")

asyncio.run(demo_event_loop())

任务(Task)

  • 协程的包装器,用于并发执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
async def task_management():
"""任务管理演示"""

async def worker(name, seconds):
print(f"{name} 开始工作")
await asyncio.sleep(seconds)
print(f"{name} 工作完成")
return f"{name} 工作了{seconds}秒"

# 创建多个任务
tasks = [
asyncio.create_task(worker("工人1", 2)),
asyncio.create_task(worker("工人2", 1)),
asyncio.create_task(worker("工人3", 3))
]

print("所有任务已创建,开始并发执行...")

# 等待所有任务完成
results = await asyncio.gather(*tasks)
print(f"所有任务完成: {results}")

# 任务状态检查
for task in tasks:
print(f"任务 {task.get_name()}: 完成={task.done()}, 结果={task.result()}")

asyncio.run(task_management())

Future对象

  • 代表一个尚未完成的计算结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio

async def future_demo():
# 创建Future对象
future = asyncio.Future()
print(f"Future状态: {future.done()}")

# 设置结果(通常在别处)
def set_result():
print("设置Future结果")
future.set_result("完成!")

# 2秒后设置结果
asyncio.get_event_loop().call_later(2, set_result)

# 等待Future完成
result = await future
print(f"Future状态: {future.done()}")
print(f"结果: {result}")

return result

asyncio.run(future_demo())

低级API vs 高级API对比

更全面的低级API的用法请:点进前往

更全面的高级API的用法请:点进前往

类别 API名称 说明 使用场景
低级API loop.create_future() 创建Future对象 需要精细控制时
loop.call_soon() 立即调度回调 优先级高的任务
loop.call_later() 延迟调度 定时任务
loop.run_in_executor() 在线程池中运行 CPU密集型任务
高级API asyncio.run() 运行协程 程序入口点
asyncio.create_task() 创建任务 并发执行协程
asyncio.gather() 并发运行多个任务 等待多个任务完成
asyncio.wait() 更灵活的任务等待 需要超时或优先完成时
asyncio.sleep() 异步等待 模拟I/O操作

asyncio高级特性

  • 信号量控制并发数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import asyncio

async def limited_concurrency():
semaphore = asyncio.Semaphore(3) # 最大并发数3

async def limited_task(i):
async with semaphore:
print(f"任务 {i} 开始")
await asyncio.sleep(1)
print(f"任务 {i} 完成")
return i

# 创建10个任务,但最多同时执行3个
tasks = [limited_task(i) for i in range(10)]
results = await asyncio.gather(*tasks)
print(f"所有任务完成: {results}")

asyncio.run(limited_concurrency())
  • 超时与取消控制
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
async def timeout_cancel_demo():
"""超时和取消demo"""

async def long_running_task(name, seconds):
try:
print(f"{name} 开始运行")
await asyncio.sleep(seconds)
print(f"{name} 正常完成")
return f"{name} 成功"
except asyncio.CancelledError:
print(f"{name} 被取消")
raise

# 超时控制
try:
result = await asyncio.wait_for(
long_running_task("任务A", 5),
timeout=2.0
)
print(f"任务A结果: {result}")
except asyncio.TimeoutError:
print("任务A超时")

# 手动取消
task_b = asyncio.create_task(long_running_task("任务B", 3))
await asyncio.sleep(1)
task_b.cancel()
try:
await task_b
except asyncio.CancelledError:
print("任务B已取消")

asyncio.run(timeout_cancel_demo())
  • 队列处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import asyncio
import random

async def queue_example():
queue = asyncio.Queue(maxsize=3)

async def producer():
for i in range(5):
await asyncio.sleep(random.random())
await queue.put(f"消息 {i}")
print(f"生产: 消息 {i}")

async def consumer():
while True:
item = await queue.get()
await asyncio.sleep(random.random() * 2)
print(f"消费: {item}")
queue.task_done()

# 启动生产者和消费者
producer_task = asyncio.create_task(producer())
consumer_task = asyncio.create_task(consumer())

await producer_task
await queue.join() # 等待所有任务完成
consumer_task.cancel()

asyncio.run(queue_example())

常用场景的代码框架

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import asyncio
import aiohttp

async def advanced_patterns():
# 1. 限制并发数
semaphore = asyncio.Semaphore(5)

async def limited_request(url):
async with semaphore:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()

# 2. 超时控制
try:
result = await asyncio.wait_for(
limited_request('https://httpbin.org/delay/5'),
timeout=3.0
)
except asyncio.TimeoutError:
print("请求超时")

# 3. 任务取消
task = asyncio.create_task(limited_request('https://httpbin.org/delay/10'))
await asyncio.sleep(1)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("任务被取消")

实践中的挑战与解决方案

识别适合异步的场景

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import asyncio
import time

# 适合异步:网络 I/O 密集型
async def good_async_example():
async def fetch_api_data(api_url):
# 模拟网络请求
await asyncio.sleep(1)
return f"数据来自 {api_url}"

start = time.time()
# 并发执行多个网络请求
results = await asyncio.gather(
fetch_api_data("api1"),
fetch_api_data("api2"),
fetch_api_data("api3")
)
print(f"异步网络请求耗时: {time.time() - start:.2f}秒") # 约1秒

# 不适合异步:CPU 密集型
def bad_async_example():
def cpu_intensive_task(n):
total = 0
for i in range(n):
total += i * i
return total

# 即使用异步包装,由于 GIL 限制,性能提升有限
async def async_cpu_task(n):
return cpu_intensive_task(n)

asyncio.run(good_async_example())

混合使用异步与执行器

针对 GIL 限制,我们可以结合异步和执行器来解决不同类型的任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import asyncio
import concurrent.futures
import time

async def hybrid_solution():
def cpu_task(n):
"""CPU 密集型任务"""
return sum(i * i for i in range(n))

def blocking_io_task():
"""阻塞 I/O 任务"""
time.sleep(1)
return "阻塞任务完成"

loop = asyncio.get_running_loop()

# CPU 密集型任务使用进程池
with concurrent.futures.ProcessPoolExecutor() as process_pool:
cpu_futures = [
loop.run_in_executor(process_pool, cpu_task, 100000)
for _ in range(4)
]

# 阻塞 I/O 使用线程池
with concurrent.futures.ThreadPoolExecutor() as thread_pool:
io_futures = [
loop.run_in_executor(thread_pool, blocking_io_task)
for _ in range(4)
]

# 并发执行所有任务
start_time = time.time()
cpu_results = await asyncio.gather(*cpu_futures)
io_results = await asyncio.gather(*io_futures)

print(f"混合方案总耗时: {time.time() - start_time:.2f}秒")

asyncio.run(hybrid_solution())

统一API设计模式

针对双 API 维护问题,这里提供一个实用的解决方案:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from typing import Union, Awaitable, overload
import asyncio

class UnifiedDataService:
"""统一的数据服务,支持同步和异步调用"""

@overload
def get_data(self, key: str, *, async_mode: bool = False) -> str: ...

@overload
def get_data(self, key: str, *, async_mode: bool = True) -> Awaitable[str]: ...

def get_data(self, key: str, *, async_mode: bool = False) -> Union[str, Awaitable[str]]:
"""根据 async_mode 参数决定返回同步还是异步结果"""
if async_mode:
return self._async_get_data(key)
else:
return self._sync_get_data(key)

def _sync_get_data(self, key: str) -> str:
return f"同步获取: {key}"

async def _async_get_data(self, key: str) -> str:
await asyncio.sleep(0.1) # 模拟异步操作
return f"异步获取: {key}"

# 使用示例
service = UnifiedDataService()

# 同步调用
sync_result = service.get_data("key1", async_mode=False)
print(sync_result)

# 异步调用
async def test_async():
async_result = await service.get_data("key1", async_mode=True)
print(async_result)

asyncio.run(test_async())

持续关注:Python 3.14 的革命性变化

正如文章中所预言的,Python 3.14 带来了两个具有历史意义的并发编程突破,有望彻底改变当前的格局。

PEP 779: Free-Threading 的历史性突破

核心改变:移除 GIL 限制,实现真正的多线程并行执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 在 Free-Threading 模式下,这些操作可以真正并行
import threading
import time

def cpu_intensive_task(name, n):
total = sum(i * i for i in range(n))
print(f"任务 {name} 完成: {total}")

# 以前受 GIL 限制,现在可以真正并行
threads = []
start_time = time.time()

for i in range(4):
thread = threading.Thread(
target=cpu_intensive_task,
args=[f"Thread-{i}", 1000000]
)
threads.append(thread)
thread.start()

for thread in threads:
thread.join()

print(f"总耗时: {time.time() - start_time:.2f} 秒")
# Free-Threading 模式下,4个线程可以真正并行

重大意义:

  • 性能革命:CPU 密集型任务可以充分利用多核处理器
  • 降低复杂性:某些场景下线程可能比协程更简单易用
  • 向后兼容:现有异步代码仍然有效

PEP 734: 多解释器支持

核心特性:每个解释器拥有独立的全局状态,提供更安全的并行执行环境。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 伪代码示例(PEP 734 实现)
import interpreters

# 创建独立的解释器实例
interp1 = interpreters.create()
interp2 = interpreters.create()

# 在不同解释器中执行代码,完全隔离
code1 = """
def cpu_task():
return sum(i*i for i in range(1000000))
result = cpu_task()
"""

code2 = """
async def io_task():
await asyncio.sleep(1)
return "IO task completed"
result = asyncio.run(io_task())
"""

# 并行执行,每个解释器独立运行
interp1.exec(code1)
interp2.exec(code2)

对异步编程的影响

竞争与互补的新格局

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 场景分析:不同工作负载的最优选择

# I/O 密集型 - 异步仍然是最佳选择
async def io_heavy_work():
async with aiohttp.ClientSession() as session:
tasks = [session.get(url) for url in urls]
return await asyncio.gather(*tasks)

# CPU 密集型 - Free-Threading 提供新选择
def cpu_heavy_work():
with ThreadPoolExecutor() as executor:
futures = [executor.submit(compute, data) for data in datasets]
return [f.result() for f in futures]

# 混合工作负载 - 多解释器方案
def hybrid_work():
io_interp = interpreters.create()
cpu_interp = interpreters.create()
# 不同类型的任务在不同解释器中执行

项目选型的一些建议

立即可行的方案

  1. I/O 密集型项目:继续使用 FastAPI + aiohttp 异步方案
  2. CPU 密集型项目:采用多进程方案或等待 Free-Threading 稳定
  3. 混合负载项目:考虑 asyncio + 执行器的混合方案

Web 框架推荐

  • 新项目:FastAPI(异步优先)> Django 4.1+(异步支持)
  • 现有项目:渐进式引入异步,在 I/O 边界使用

HTTP 客户端推荐

  • 异步场景:aiohttp > httpx(异步模式)
  • 同步场景:httpx(同步模式)> requests

异步编程的未来可期

异步编程就是一把双刃剑——用对了场景能大幅提升性能,用错了反而增加复杂度,开篇引入的那边文章就是很好的揭示了这一点。

  • 当下:在适合的场景(Web API、爬虫、实时应用)继续使用异步编程
  • 未来:关注 Python 3.14+ 的发展,准备拥抱多样化的并发解决方案

技术选型没有银弹,只有最适合的方案。随着 Python 并发能力的不断增强,我相信,未来的异步编程会更加简单、强大和普及。


参考资料


本站由 BluesSen 使用 Stellar 1.33.1 主题创建。
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。

本站总访问量