
本文深入探讨了在Python asyncio中调度嵌套异步函数时遇到的并发挑战。通过分析传统`await`操作的阻塞特性,揭示了其在复杂流处理场景中的局限性。文章提出并详细阐述了基于`asyncio.Queue`和`asyncio.Event`的生产者-消费者模式,作为实现任务间解耦和真正并发执行的有效策略,从而显著提升异步应用的响应性和效率。
在异步编程中,我们经常需要处理数据流,其中一个任务负责生成数据,另一个任务负责处理数据。Python的asyncio库提供了强大的工具来构建并发应用程序,但在调度嵌套异步函数时,如果不正确地理解await关键字的行为,可能会导致程序并非按预期并发执行,而是串行阻塞。
考虑一个常见的场景:我们有一个字符流生成器,它逐步产生字符;一个句子生成器,它从字符流中收集字符并形成完整的句子;以及一个句子处理器,它对每个句子执行耗时操作。
以下是初始实现的代码结构:
import asyncio
async def stream():
char_string = "Hi. Hello. Hello."
for char in char_string:
await asyncio.sleep(0.1) # 模拟耗时操作
print("got char:", char)
yield char
async def sentences_generator():
sentence = ""
async for char in stream():
sentence += char
if char in [".", "!", "?"]:
print("got sentence: ", sentence)
yield sentence
sentence = ""
async def process_sentence(sentence: str):
print("waiting for processing sentence: ", sentence)
await asyncio.sleep(len(sentence)*0.1) # 模拟耗时操作
print("sentence processed!")
async def main():
i = 0
async for sentence in sentences_generator():
print("processing sentence: ", i)
await process_sentence(sentence) # 这里的await是关键
i += 1
asyncio.run(main())运行上述代码,其输出大致如下:
got char: H got char: i got char: . got sentence: Hi. processing sentence: 0 waiting for processing sentence: Hi. sentence processed! got char: got char: H got char: e got char: l got char: l got char: o got char: . got sentence: Hello. processing sentence: 1 waiting for processing sentence: Hello. sentence processed!
从输出可以看出,当process_sentence函数被await时,main协程会暂停,直到process_sentence完全执行完毕。这意味着在process_sentence处理第一个句子期间,stream和sentences_generator无法继续生成新的字符和句子。这并非我们期望的并发行为,我们希望在处理一个句子的同时,上游的字符流能够继续生成,从而提高整体吞吐量。
造成这种现象的根本原因在于await关键字的语义。当一个协程await另一个协程时,它会暂停自身的执行,并将控制权交给被await的协程。只有当被await的协程完成或自身也await了其他操作时,控制权才可能回到调用者。因此,在上述例子中,main函数中的await process_sentence(sentence)会完全阻塞main函数,直到当前句子处理完毕,才能继续从sentences_generator中获取下一个句子。
为了实现真正的并发,即在process_sentence处理句子的同时,sentences_generator和stream能够继续生成数据,我们可以采用经典的生产者-消费者模式。
在这种模式中:
asyncio提供了asyncio.Queue来实现这种异步安全的共享队列。此外,为了优雅地处理生产者完成后的消费者关闭问题,我们还可以引入asyncio.Event来发出生产者完成的信号。
Sitekick
一个AI登陆页面自动构建器
121
查看详情
以下是使用生产者-消费者模式重构后的代码:
import asyncio
async def stream():
char_string = "Hi. Hello. Thank you." # 更改了字符串以展示更长的流
for char in char_string:
await asyncio.sleep(0.1)
print("got char:", char)
yield char
async def sentences_generator(q: asyncio.Queue[str], flag: asyncio.Event):
"""
生产者:从字符流生成句子,并放入队列。
当字符流结束时,设置Event标志通知消费者。
"""
sentence = ""
async for char in stream():
sentence += char
if char in [".", "!", "?"]:
print("got sentence: ", sentence)
await q.put(sentence) # 将生成的句子放入队列
sentence = ""
flag.set() # 生产者完成所有句子的生成,设置Event
async def process_sentence(q: asyncio.Queue[str], flag: asyncio.Event):
"""
消费者:从队列中取出句子进行处理。
当队列为空且生产者已完成时,消费者停止。
"""
global i # 用于计数处理的句子
while True:
# 检查是否应该停止:队列为空且生产者已完成
if q.empty() and flag.is_set():
break
try:
# 尝试从队列获取项,如果队列为空,会等待
item = await asyncio.wait_for(q.get(), timeout=1.0) # 增加超时,避免无限等待
except asyncio.TimeoutError:
# 如果超时且生产者已完成,则退出
if flag.is_set():
break
continue # 否则继续等待
print("processing sentence: ", i)
print("waiting for processing sentence: ", item)
await asyncio.sleep(len(item) * 0.1)
print("sentence processed!")
i += 1
async def main():
global i
i = 1 # 初始化句子计数器
event = asyncio.Event() # 用于生产者通知消费者完成
queue = asyncio.Queue[str]() # 共享队列
# 创建生产者和消费者任务
producer_task = sentences_generator(queue, event)
consumer_task = process_sentence(queue, event)
# 并发运行生产者和消费者任务
await asyncio.gather(producer_task, consumer_task)
asyncio.run(main())代码解析:
sentences_generator (生产者):
process_sentence (消费者):
main 函数:
预期输出(部分):
got char: H got char: i got char: . got sentence: Hi. got char: got char: H got char: e got char: l got char: l got char: o got char: . got sentence: Hello. processing sentence: 1 waiting for processing sentence: Hi. got char: got char: T got char: h got char: a got char: n got char: k got char: got char: y got char: o got char: u got char: . got sentence: Thank you. sentence processed! processing sentence: 2 waiting for processing sentence: Hello. sentence processed! processing sentence: 3 waiting for processing sentence: Thank you. sentence processed!
从新的输出中可以看到,当process_sentence正在处理"Hi."时,stream和sentences_generator已经继续生成了"Hello."甚至"Thank you."。这种交错的输出表明生产者和消费者正在并发地工作,显著提高了程序的效率和响应性。
通过采用生产者-消费者模式并结合asyncio.Queue和asyncio.Event,我们可以有效地管理异步任务间的依赖关系,实现更高效、更具响应性的并发数据流处理。这对于构建复杂的异步系统,如网络服务、数据管道等,是至关重要的技术。
以上就是优化Asyncio嵌套函数调度:使用生产者-消费者模式实现并发流处理的详细内容,更多请关注其它相关文章!
# 我们可以
# 石家庄网站搜索优化服务
# seo优化排名官方版
# 浦东seo优化报价
# 新乡搜狗关键词排名技术
# 投资公司网站建设流程
# 上海营销推广企业排名榜
# seo实战进展
# 罗湖手机网站优化效果好
# 宿松网站建设制作外包
# 卖车位营销推广关键词
# 的是
# 几种
# 并将
# python
# 它是
# 会在
# 重构
# 浮点
# 多个
# 为空
# 异步任务
# stream
# ai
# 工具
# 处理器
# go
相关栏目:
【
Google疑问12 】
【
Facebook疑问10 】
【
优化推广96088 】
【
技术知识133117 】
【
IDC资讯59369 】
【
网络运营7196 】
【
IT资讯61894 】
相关推荐:
Lar*el Eloquent中通过Join查询关联数据表:解决多行子查询问题
Cassandra中复合主键、二级索引与ORDER BY排序的限制与解决方案
夸克浏览器资源嗅探怎么用 夸克浏览器网页资源下载技巧【教程】
解决异步Python机器人中同步操作的阻塞问题
Word如何将文字快速转成表格 Word文本转换成表格功能使用技巧【效率】
优化2xN网格最大路径和的动态规划算法实践
电脑视频号|直播|如何分享屏幕
J*aScript中高效处理用户输入:从Keyup事件到表单提交的优化实践
雨课堂官网在线登录 网页版雨课堂登录链接
极兔快递官网查询入口手机版 手机极兔快递登录查询入口官方
《虎扑》取消评分记录方法
b站网页版入口 哔哩哔哩官方网站直接进入
如何在CSS中使用伪类:valid实现表单验证提示_结合:valid改变边框颜色
中大网校app做题记录清除方法
Flash AS3.0简易相册制作
疯狂小鸟微信小游戏入口 疯狂小鸟网页版秒玩
海棠阅读网页版_进入海棠网页版在线阅读中心
cad加载的线型看不见怎么办_cad线型不可见问题解决方法
word表格如何按某一列内容进行排序_Word表格按列排序方法
红手指专业版app注册教程
高效调试PHP大型嵌套数组:JSON序列化与可视化工具实践
使用Python和NLTK从文本中高效提取名词的实用教程
VS Code源代码管理(SCM)视图的进阶使用技巧
Win10显卡驱动安装失败怎么办 Win10使用DDU彻底卸载驱动【解决】
TikTok视频播放中断怎么办 TikTok播放异常修复方法
c++中的const关键字用法大全_c++ const正确使用指南
食品生产用水只要符合国家规定的生活饮用水卫生标准就可以吗
如何外贸网站设计-能留住客户提升用户体验!
《东方财富》条件单关闭方法
126邮箱网页在线登录2025_126邮箱网页版入口官方地址
TikTok视频播放不流畅怎么办 TikTok视频播放优化方法
cad视图选项卡不见了怎么办_cad视图标签恢复显示方法
Go语言中方法接收器的选择:值类型还是指针类型?
《撕歌》会员开通方法
SQLAlchemy 2.0 与 Pydantic 模型类型安全集成指南
拷贝漫画2025网页版入口 拷贝漫画官网免费看全集
《图怪兽》退出登录方法
性能与资源监视器快捷打开
Lar*el怎么实现全文搜索_Lar*el Scout集成Algolia教程
Excel宏怎么删除_Excel中删除宏的详细操作流程
Firefox OS应用开发:解决XMLHttpRequest跨域请求阻塞问题
win11如何开启单声道音频 Win11为听障用户合并左右声道【辅助】
掌握CSS :has() 选择器:父选择器、嵌套限制与常见陷阱解析
QQ邮箱官方登录页_腾讯出品安全稳定的邮箱服务
优酷官网登录入口电脑版 优酷官网网址入口
向日葵客户端怎么进行语音通话_向日葵客户端语音通话功能使用方法
电脑从睡眠中被自动唤醒怎么办_Windows唤醒源事件查看与禁用【解决】
Dash应用中自定义HTML页面标题与网站图标(F*icon)的实用指南
Sublime怎么快速复制文件路径_Sublime右键菜单增强技巧
iPhone14无法连接蓝牙设备如何解决
2025-11-29
运城市盐湖区信雨科技有限公司是一家深耕海外推广领域十年的专业服务商,作为谷歌推广与Facebook广告全球合作伙伴,聚焦外贸企业出海痛点,以数字化营销为核心,提供一站式海外营销解决方案。公司凭借十年行业沉淀与平台官方资源加持,打破传统外贸获客壁垒,助力企业高效开拓全球市场,成为中小企业出海的可靠合作伙伴。