本周读到了一篇文章《Python has had async for 10 years – why isn’t it more popular? 》,深有感触,文章提出了一个值得深思的问题:Python 早在 2015 年就引入了 async
和 await
关键字,距今已近十年,为什么异步编程在 Python 社区中的普及仍不及预期?
这篇文章中指出了三大困境,也让我这个从Python2.7时代走过来的开发者深有共鸣。今天就结合我的实践经历,聊聊Python异步编程的过去、现在和未来。
异步编程的三座大山 困境一:适用场景受限 asyncio 在网络 I/O 方面表现出色,但对其他类型的 I/O 支持有限。即使是看似异步的文件操作,底层仍然依赖线程池来模拟异步行为。
1 2 3 async with aiofiles.open ('large_file.txt' , mode='r' ) as f: contents = await f.read()
生产环境中出于安全考虑,往往限制使用 io_uring 等系统级异步机制,进一步缩小了 asyncio 的适用范围。
困境二:GIL 的根本限制 无论异步代码执行得多么高效,都无法突破 GIL(全局解释器锁)的天花板。async/await 虽能优化 I/O 密集型任务,但对 CPU 密集型操作帮助有限。
更麻烦的是错误处理——忘记 await
可能导致协程静默失败,这种隐形问题调试起来很棘手。
1 2 3 4 5 6 7 8 sync_result = get_data_sync() async_result = await get_data_async() results = await asyncio.gather(*[ get_data_async() for _ in range (10 ) ])
困境三:双 API 的维护负担 为了兼容性,开发者往往需要同时维护同步和异步两套 API,大大增加了开发和维护成本。
1 2 3 4 5 6 7 8 9 10 11 12 @property def user_records (self ) -> list [Record]: return fetch_from_database(self .user_id) async def get_user_records (self ) -> list [Record]: return await async_fetch_from_database(self .user_id) user.name records = await user.get_user_records()
从魔法方法无法异步化,到属性访问方式的不一致,再到代码重复和测试复杂度增加,都让开发者望而却步。
异步编程的演进之路 什么是异步编程? 异步编程是一种非阻塞的编程范式,允许程序在等待某些操作(如I/O)完成时继续执行其他任务,而不是干等着。
同步 VS 异步的直观对比:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import timeimport asynciodef sync_demo (): def task (name, delay ): time.sleep(delay) print (f"{name} 完成" ) start = time.time() task("任务1" , 1 ) task("任务2" , 1 ) task("任务3" , 1 ) print (f"同步总耗时: {time.time() - start:.2 f} 秒" ) async def async_demo (): async def task (name, delay ): await asyncio.sleep(delay) print (f"{name} 完成" ) start = time.time() await asyncio.gather( task("任务1" , 1 ), task("任务2" , 1 ), task("任务3" , 1 ) ) print (f"异步总耗时: {time.time() - start:.2 f} 秒" ) sync_demo() asyncio.run(async_demo())
三种实现方式的变迁 方式一:回调函数(Callback)——原始方案 兼容性好,但代码难以维护(坊间流传一个词:回调地狱)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 import requestsfrom concurrent.futures import ThreadPoolExecutorimport timedef callback_example (): """回调函数示例 - 早期的异步解决方案""" def download_complete (future ): """下载完成时的回调函数""" try : result = future.result() print (f"下载完成,状态码: {result.status_code} " ) print (f"获取数据长度: {len (result.text)} 字符" ) except Exception as e: print (f"下载失败: {e} " ) print ("开始回调函数示例..." ) start_time = time.time() with ThreadPoolExecutor(max_workers=3 ) as executor: futures = [] urls = [ 'https://httpbin.org/delay/1' , 'https://httpbin.org/delay/2' , 'https://httpbin.org/delay/1' ] for url in urls: future = executor.submit(requests.get, url) future.add_done_callback(download_complete) futures.append(future) for future in futures: future.result() print (f"回调函数示例总耗时: {time.time() - start_time:.2 f} 秒" ) callback_example()
方式二:生成器(Generator)—过渡方案 虽然避免了回调地狱,代码变的更清晰了,但需要手动调度,实现变的更复杂了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 import timefrom types import coroutinedef generator_example (): """生成器实现协程 - Python 3.4时代的解决方案""" @coroutine def async_sleep (delay ): """模拟异步sleep""" start = time.time() while time.time() - start < delay: yield return f"休眠了{delay} 秒" def run_coroutines (): """运行多个协程""" tasks = [async_sleep(1 ), async_sleep(2 ), async_sleep(1 )] results = [] while tasks: current = tasks.pop(0 ) try : next (current) tasks.append(current) except StopIteration as e: results.append(e.value) return results print ("开始生成器协程示例..." ) start_time = time.time() results = run_coroutines() print (f"生成器协程结果: {results} " ) print (f"生成器示例总耗时: {time.time() - start_time:.2 f} 秒" ) generator_example()
方式三:Async/Await——现代方案 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import asyncioimport timeasync def modern_async_example (): """现代async/await示例""" async def fetch_data (url, delay ): """模拟异步数据获取""" print (f"开始获取 {url} " ) await asyncio.sleep(delay) print (f"完成获取 {url} " ) return f"{url} 的数据(延迟{delay} 秒)" print ("开始现代async/await示例..." ) start_time = time.time() results = await asyncio.gather( fetch_data("https://api1.com" , 1 ), fetch_data("https://api2.com" , 2 ), fetch_data("https://api3.com" , 1 ), fetch_data("https://api4.com" , 1 ) ) print ("所有任务完成结果:" ) for result in results: print (f" - {result} " ) print (f"现代async/await总耗时: {time.time() - start_time:.2 f} 秒" ) asyncio.run(modern_async_example())
asyncio:现代异步解决方案 异步编程的核心优势在于:当一个协程等待 I/O 操作时,事件循环可以切换执行其他协程,从而提高整体吞吐量 。
asyncio的核心概念 协程(Coroutine) :异步函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import asyncioasync def say_hello (name ): print (f"开始向 {name} 问好" ) await asyncio.sleep(1 ) print (f"你好, {name} !" ) return f"{name} 的问候完成" async def main (): result1 = await say_hello("Alice" ) task = asyncio.create_task(say_hello("Bob" )) result2 = await task results = await asyncio.gather( say_hello("Charlie" ), say_hello("David" ), say_hello("Eve" ) ) return results results = asyncio.run(main()) print (results)
事件循环(Event Loop) :
异步引擎,也是asyncio的核心,负责调度和执行协程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import asyncioasync def demo_event_loop (): print ("事件循环demo" ) loop = asyncio.get_running_loop() print (f"当前时间: {loop.time()} " ) def callback (name ): print (f"回调函数被调用: {name} " ) loop.call_later(2 , callback, "2秒后" ) loop.call_soon(callback, "立即" ) await asyncio.sleep(3 ) print ("演示结束" ) asyncio.run(demo_event_loop())
任务(Task) :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 async def task_management (): """任务管理演示""" async def worker (name, seconds ): print (f"{name} 开始工作" ) await asyncio.sleep(seconds) print (f"{name} 工作完成" ) return f"{name} 工作了{seconds} 秒" tasks = [ asyncio.create_task(worker("工人1" , 2 )), asyncio.create_task(worker("工人2" , 1 )), asyncio.create_task(worker("工人3" , 3 )) ] print ("所有任务已创建,开始并发执行..." ) results = await asyncio.gather(*tasks) print (f"所有任务完成: {results} " ) for task in tasks: print (f"任务 {task.get_name()} : 完成={task.done()} , 结果={task.result()} " ) asyncio.run(task_management())
Future对象 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import asyncioasync def future_demo (): future = asyncio.Future() print (f"Future状态: {future.done()} " ) def set_result (): print ("设置Future结果" ) future.set_result("完成!" ) asyncio.get_event_loop().call_later(2 , set_result) result = await future print (f"Future状态: {future.done()} " ) print (f"结果: {result} " ) return result asyncio.run(future_demo())
低级API vs 高级API对比 更全面的低级API的用法请:点进前往
更全面的高级API的用法请:点进前往
类别
API名称
说明
使用场景
低级API
loop.create_future()
创建Future对象
需要精细控制时
loop.call_soon()
立即调度回调
优先级高的任务
loop.call_later()
延迟调度
定时任务
loop.run_in_executor()
在线程池中运行
CPU密集型任务
高级API
asyncio.run()
运行协程
程序入口点
asyncio.create_task()
创建任务
并发执行协程
asyncio.gather()
并发运行多个任务
等待多个任务完成
asyncio.wait()
更灵活的任务等待
需要超时或优先完成时
asyncio.sleep()
异步等待
模拟I/O操作
asyncio高级特性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import asyncioasync def limited_concurrency (): semaphore = asyncio.Semaphore(3 ) async def limited_task (i ): async with semaphore: print (f"任务 {i} 开始" ) await asyncio.sleep(1 ) print (f"任务 {i} 完成" ) return i tasks = [limited_task(i) for i in range (10 )] results = await asyncio.gather(*tasks) print (f"所有任务完成: {results} " ) asyncio.run(limited_concurrency())
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 async def timeout_cancel_demo (): """超时和取消demo""" async def long_running_task (name, seconds ): try : print (f"{name} 开始运行" ) await asyncio.sleep(seconds) print (f"{name} 正常完成" ) return f"{name} 成功" except asyncio.CancelledError: print (f"{name} 被取消" ) raise try : result = await asyncio.wait_for( long_running_task("任务A" , 5 ), timeout=2.0 ) print (f"任务A结果: {result} " ) except asyncio.TimeoutError: print ("任务A超时" ) task_b = asyncio.create_task(long_running_task("任务B" , 3 )) await asyncio.sleep(1 ) task_b.cancel() try : await task_b except asyncio.CancelledError: print ("任务B已取消" ) asyncio.run(timeout_cancel_demo())
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import asyncioimport randomasync def queue_example (): queue = asyncio.Queue(maxsize=3 ) async def producer (): for i in range (5 ): await asyncio.sleep(random.random()) await queue.put(f"消息 {i} " ) print (f"生产: 消息 {i} " ) async def consumer (): while True : item = await queue.get() await asyncio.sleep(random.random() * 2 ) print (f"消费: {item} " ) queue.task_done() producer_task = asyncio.create_task(producer()) consumer_task = asyncio.create_task(consumer()) await producer_task await queue.join() consumer_task.cancel() asyncio.run(queue_example())
常用场景的代码框架 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import asyncioimport aiohttpasync def advanced_patterns (): semaphore = asyncio.Semaphore(5 ) async def limited_request (url ): async with semaphore: async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.text() try : result = await asyncio.wait_for( limited_request('https://httpbin.org/delay/5' ), timeout=3.0 ) except asyncio.TimeoutError: print ("请求超时" ) task = asyncio.create_task(limited_request('https://httpbin.org/delay/10' )) await asyncio.sleep(1 ) task.cancel() try : await task except asyncio.CancelledError: print ("任务被取消" )
实践中的挑战与解决方案 识别适合异步的场景 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import asyncioimport timeasync def good_async_example (): async def fetch_api_data (api_url ): await asyncio.sleep(1 ) return f"数据来自 {api_url} " start = time.time() results = await asyncio.gather( fetch_api_data("api1" ), fetch_api_data("api2" ), fetch_api_data("api3" ) ) print (f"异步网络请求耗时: {time.time() - start:.2 f} 秒" ) def bad_async_example (): def cpu_intensive_task (n ): total = 0 for i in range (n): total += i * i return total async def async_cpu_task (n ): return cpu_intensive_task(n) asyncio.run(good_async_example())
混合使用异步与执行器 针对 GIL 限制,我们可以结合异步和执行器来解决不同类型的任务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 import asyncioimport concurrent.futuresimport timeasync def hybrid_solution (): def cpu_task (n ): """CPU 密集型任务""" return sum (i * i for i in range (n)) def blocking_io_task (): """阻塞 I/O 任务""" time.sleep(1 ) return "阻塞任务完成" loop = asyncio.get_running_loop() with concurrent.futures.ProcessPoolExecutor() as process_pool: cpu_futures = [ loop.run_in_executor(process_pool, cpu_task, 100000 ) for _ in range (4 ) ] with concurrent.futures.ThreadPoolExecutor() as thread_pool: io_futures = [ loop.run_in_executor(thread_pool, blocking_io_task) for _ in range (4 ) ] start_time = time.time() cpu_results = await asyncio.gather(*cpu_futures) io_results = await asyncio.gather(*io_futures) print (f"混合方案总耗时: {time.time() - start_time:.2 f} 秒" ) asyncio.run(hybrid_solution())
统一API设计模式 针对双 API 维护问题,这里提供一个实用的解决方案:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 from typing import Union , Awaitable, overloadimport asyncioclass UnifiedDataService : """统一的数据服务,支持同步和异步调用""" @overload def get_data (self, key: str , *, async_mode: bool = False ) -> str : ... @overload def get_data (self, key: str , *, async_mode: bool = True ) -> Awaitable[str ]: ... def get_data (self, key: str , *, async_mode: bool = False ) -> Union [str , Awaitable[str ]]: """根据 async_mode 参数决定返回同步还是异步结果""" if async_mode: return self ._async_get_data(key) else : return self ._sync_get_data(key) def _sync_get_data (self, key: str ) -> str : return f"同步获取: {key} " async def _async_get_data (self, key: str ) -> str : await asyncio.sleep(0.1 ) return f"异步获取: {key} " service = UnifiedDataService() sync_result = service.get_data("key1" , async_mode=False ) print (sync_result)async def test_async (): async_result = await service.get_data("key1" , async_mode=True ) print (async_result) asyncio.run(test_async())
持续关注:Python 3.14 的革命性变化 正如文章中所预言的,Python 3.14 带来了两个具有历史意义的并发编程突破,有望彻底改变当前的格局。
PEP 779: Free-Threading 的历史性突破 核心改变 :移除 GIL 限制,实现真正的多线程并行执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import threadingimport timedef cpu_intensive_task (name, n ): total = sum (i * i for i in range (n)) print (f"任务 {name} 完成: {total} " ) threads = [] start_time = time.time() for i in range (4 ): thread = threading.Thread( target=cpu_intensive_task, args=[f"Thread-{i} " , 1000000 ] ) threads.append(thread) thread.start() for thread in threads: thread.join() print (f"总耗时: {time.time() - start_time:.2 f} 秒" )
重大意义:
性能革命:CPU 密集型任务可以充分利用多核处理器
降低复杂性:某些场景下线程可能比协程更简单易用
向后兼容:现有异步代码仍然有效
PEP 734: 多解释器支持 核心特性 :每个解释器拥有独立的全局状态,提供更安全的并行执行环境。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import interpretersinterp1 = interpreters.create() interp2 = interpreters.create() code1 = """ def cpu_task(): return sum(i*i for i in range(1000000)) result = cpu_task() """ code2 = """ async def io_task(): await asyncio.sleep(1) return "IO task completed" result = asyncio.run(io_task()) """ interp1.exec (code1) interp2.exec (code2)
对异步编程的影响 竞争与互补的新格局
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 async def io_heavy_work (): async with aiohttp.ClientSession() as session: tasks = [session.get(url) for url in urls] return await asyncio.gather(*tasks) def cpu_heavy_work (): with ThreadPoolExecutor() as executor: futures = [executor.submit(compute, data) for data in datasets] return [f.result() for f in futures] def hybrid_work (): io_interp = interpreters.create() cpu_interp = interpreters.create()
项目选型的一些建议 立即可行的方案 :
I/O 密集型项目 :继续使用 FastAPI + aiohttp 异步方案
CPU 密集型项目 :采用多进程方案或等待 Free-Threading 稳定
混合负载项目 :考虑 asyncio + 执行器的混合方案
Web 框架推荐 :
新项目:FastAPI(异步优先)> Django 4.1+(异步支持)
现有项目:渐进式引入异步,在 I/O 边界使用
HTTP 客户端推荐 :
异步场景:aiohttp > httpx(异步模式)
同步场景:httpx(同步模式)> requests
异步编程的未来可期 异步编程就是一把双刃剑——用对了场景能大幅提升性能,用错了反而增加复杂度,开篇引入的那边文章就是很好的揭示了这一点。
当下 :在适合的场景(Web API、爬虫、实时应用)继续使用异步编程
未来 :关注 Python 3.14+ 的发展,准备拥抱多样化的并发解决方案
技术选型没有银弹,只有最适合的方案。随着 Python 并发能力的不断增强,我相信,未来的异步编程会更加简单、强大和普及。
参考资料 :