优化Asyncio嵌套函数调度:使用生产者-消费者模式实现并发流处理


优化asyncio嵌套函数调度:使用生产者-消费者模式实现并发流处理

本文深入探讨了在Python asyncio中调度嵌套异步函数时遇到的并发挑战。通过分析传统`await`操作的阻塞特性,揭示了其在复杂流处理场景中的局限性。文章提出并详细阐述了基于`asyncio.Queue`和`asyncio.Event`的生产者-消费者模式,作为实现任务间解耦和真正并发执行的有效策略,从而显著提升异步应用的响应性和效率。

在异步编程中,我们经常需要处理数据流,其中一个任务负责生成数据,另一个任务负责处理数据。Python的asyncio库提供了强大的工具来构建并发应用程序,但在调度嵌套异步函数时,如果不正确地理解await关键字的行为,可能会导致程序并非按预期并发执行,而是串行阻塞。

传统await的阻塞特性及其局限性

考虑一个常见的场景:我们有一个字符流生成器,它逐步产生字符;一个句子生成器,它从字符流中收集字符并形成完整的句子;以及一个句子处理器,它对每个句子执行耗时操作。

以下是初始实现的代码结构:

import asyncio

async def stream():
    char_string = "Hi. Hello. Hello."
    for char in char_string:
        await asyncio.sleep(0.1)  # 模拟耗时操作
        print("got char:", char)
        yield char

async def sentences_generator():
    sentence = ""
    async for char in stream():
        sentence += char
        if char in [".", "!", "?"]:
            print("got sentence: ", sentence)
            yield sentence
            sentence = ""

async def process_sentence(sentence: str):
    print("waiting for processing sentence: ", sentence)
    await asyncio.sleep(len(sentence)*0.1) # 模拟耗时操作
    print("sentence processed!")

async def main():
    i = 0
    async for sentence in sentences_generator():
        print("processing sentence: ", i)
        await process_sentence(sentence) # 这里的await是关键
        i += 1

asyncio.run(main())

运行上述代码,其输出大致如下:

got char: H
got char: i
got char: .
got sentence:  Hi.
processing sentence:  0
waiting for processing sentence:  Hi.
sentence processed!
got char:  
got char: H
got char: e
got char: l
got char: l
got char: o
got char: .
got sentence:   Hello.
processing sentence:  1
waiting for processing sentence:   Hello.
sentence processed!

从输出可以看出,当process_sentence函数被await时,main协程会暂停,直到process_sentence完全执行完毕。这意味着在process_sentence处理第一个句子期间,stream和sentences_generator无法继续生成新的字符和句子。这并非我们期望的并发行为,我们希望在处理一个句子的同时,上游的字符流能够继续生成,从而提高整体吞吐量。

造成这种现象的根本原因在于await关键字的语义。当一个协程await另一个协程时,它会暂停自身的执行,并将控制权交给被await的协程。只有当被await的协程完成或自身也await了其他操作时,控制权才可能回到调用者。因此,在上述例子中,main函数中的await process_sentence(sentence)会完全阻塞main函数,直到当前句子处理完毕,才能继续从sentences_generator中获取下一个句子。

解决方案:基于asyncio.Queue的生产者-消费者模式

为了实现真正的并发,即在process_sentence处理句子的同时,sentences_generator和stream能够继续生成数据,我们可以采用经典的生产者-消费者模式。

在这种模式中:

  1. 生产者(Producer):负责生成数据(例如,sentences_generator生成句子),并将数据放入一个共享队列中。
  2. 消费者(Consumer):负责从共享队列中取出数据(例如,process_sentence处理句子),并独立执行其任务。

asyncio提供了asyncio.Queue来实现这种异步安全的共享队列。此外,为了优雅地处理生产者完成后的消费者关闭问题,我们还可以引入asyncio.Event来发出生产者完成的信号。

Sitekick Sitekick

一个AI登陆页面自动构建器

Sitekick 121 查看详情 Sitekick

以下是使用生产者-消费者模式重构后的代码:

import asyncio

async def stream():
    char_string = "Hi. Hello. Thank you." # 更改了字符串以展示更长的流
    for char in char_string:
        await asyncio.sleep(0.1)
        print("got char:", char)
        yield char

async def sentences_generator(q: asyncio.Queue[str], flag: asyncio.Event):
    """
    生产者:从字符流生成句子,并放入队列。
    当字符流结束时,设置Event标志通知消费者。
    """
    sentence = ""
    async for char in stream():
        sentence += char
        if char in [".", "!", "?"]:
            print("got sentence: ", sentence)
            await q.put(sentence) # 将生成的句子放入队列
            sentence = ""
    flag.set() # 生产者完成所有句子的生成,设置Event

async def process_sentence(q: asyncio.Queue[str], flag: asyncio.Event):
    """
    消费者:从队列中取出句子进行处理。
    当队列为空且生产者已完成时,消费者停止。
    """
    global i # 用于计数处理的句子
    while True:
        # 检查是否应该停止:队列为空且生产者已完成
        if q.empty() and flag.is_set():
            break
        try:
            # 尝试从队列获取项,如果队列为空,会等待
            item = await asyncio.wait_for(q.get(), timeout=1.0) # 增加超时,避免无限等待
        except asyncio.TimeoutError:
            # 如果超时且生产者已完成,则退出
            if flag.is_set():
                break
            continue # 否则继续等待

        print("processing sentence: ", i)
        print("waiting for processing sentence: ", item)
        await asyncio.sleep(len(item) * 0.1)
        print("sentence processed!")
        i += 1

async def main():
    global i
    i = 1 # 初始化句子计数器
    event = asyncio.Event() # 用于生产者通知消费者完成
    queue = asyncio.Queue[str]() # 共享队列

    # 创建生产者和消费者任务
    producer_task = sentences_generator(queue, event)
    consumer_task = process_sentence(queue, event)

    # 并发运行生产者和消费者任务
    await asyncio.gather(producer_task, consumer_task)

asyncio.run(main())

代码解析:

  1. sentences_generator (生产者):

    • 接收一个asyncio.Queue实例q和一个asyncio.Event实例flag。
    • 它继续从stream()生成字符并构建句子。
    • 一旦一个完整的句子形成,它不再直接yield句子,而是使用await q.put(sentence)将句子异步地放入队列。
    • 当stream()耗尽所有字符,即生产者完成其所有工作时,它调用flag.set()来通知消费者没有更多的句子会生成。
  2. process_sentence (消费者):

    • 同样接收q和flag。
    • 在一个无限循环中运行,直到满足退出条件。
    • 使用await q.get()异步地从队列中取出句子。如果队列为空,q.get()会暂停当前协程,直到有新的项可用。
    • 为了更健壮地处理消费者退出,我们添加了asyncio.wait_for和超时机制。当队列为空且flag.is_set()为真时(表示生产者已完成且队列中不再有新数据),消费者将退出循环。
    • 取出句子后,它执行模拟的耗时处理await asyncio.sleep(...)。
  3. main 函数:

    • 初始化asyncio.Event和asyncio.Queue。
    • 使用asyncio.gather(producer_task, consumer_task)同时启动生产者和消费者两个独立的协程。asyncio.gather会等待所有传入的协程完成。由于生产者会在完成后设置事件,而消费者会在队列清空且事件设置后退出,因此gather最终会完成。

预期输出(部分):

got char: H
got char: i
got char: .
got sentence:  Hi.
got char:  
got char: H
got char: e
got char: l
got char: l
got char: o
got char: .
got sentence:   Hello.
processing sentence:  1
waiting for processing sentence:  Hi.
got char:  
got char: T
got char: h
got char: a
got char: n
got char: k
got char:  
got char: y
got char: o
got char: u
got char: .
got sentence:   Thank you.
sentence processed!
processing sentence:  2
waiting for processing sentence:   Hello.
sentence processed!
processing sentence:  3
waiting for processing sentence:   Thank you.
sentence processed!

从新的输出中可以看到,当process_sentence正在处理"Hi."时,stream和sentences_generator已经继续生成了"Hello."甚至"Thank you."。这种交错的输出表明生产者和消费者正在并发地工作,显著提高了程序的效率和响应性。

注意事项与总结

  1. asyncio.Queue的重要性: 它是实现任务间安全通信和解耦的关键。生产者将数据放入队列,消费者从队列取出数据,两者无需直接等待对方完成,只需通过队列进行协调。
  2. asyncio.Event的用途: 在生产者-消费者模式中,asyncio.Event常用于信号通知。生产者完成所有工作后,设置Event,消费者在队列为空时检查此Event,以判断是否可以安全退出,避免消费者在生产者已无数据生成后无限期等待。
  3. 优雅地关闭消费者: 确保消费者能够识别生产者完成的信号并优雅地退出,是构建健壮异步应用的重要一环。除了asyncio.Event,也可以考虑使用特殊的“哨兵值”(Sentinel Value)放入队列来指示生产者结束。
  4. asyncio.gather: 用于同时运行多个协程,并等待它们全部完成。它是协调多个独立或半独立任务的强大工具。
  5. 理解并发与并行: asyncio实现的是并发(concurrency),而非真正的并行(parallelism)。这意味着在单个CPU核心上,任务仍然是交替执行的,但通过await的非阻塞特性,可以在等待I/O操作(如asyncio.sleep)时切换到其他任务,从而提高资源利用率。

通过采用生产者-消费者模式并结合asyncio.Queue和asyncio.Event,我们可以有效地管理异步任务间的依赖关系,实现更高效、更具响应性的并发数据流处理。这对于构建复杂的异步系统,如网络服务、数据管道等,是至关重要的技术。

以上就是优化Asyncio嵌套函数调度:使用生产者-消费者模式实现并发流处理的详细内容,更多请关注其它相关文章!


# 我们可以  # 石家庄网站搜索优化服务  # seo优化排名官方版  # 浦东seo优化报价  # 新乡搜狗关键词排名技术  # 投资公司网站建设流程  # 上海营销推广企业排名榜  # seo实战进展  # 罗湖手机网站优化效果好  # 宿松网站建设制作外包  # 卖车位营销推广关键词  # 的是  # 几种  # 并将  # python  # 它是  # 会在  # 重构  # 浮点  # 多个  # 为空  # 异步任务  # stream  # ai  # 工具  # 处理器  # go 


相关栏目: 【 Google疑问12 】 【 Facebook疑问10 】 【 优化推广96088 】 【 技术知识133117 】 【 IDC资讯59369 】 【 网络运营7196 】 【 IT资讯61894


相关推荐: Lar*el Eloquent中通过Join查询关联数据表:解决多行子查询问题  Cassandra中复合主键、二级索引与ORDER BY排序的限制与解决方案  夸克浏览器资源嗅探怎么用 夸克浏览器网页资源下载技巧【教程】  解决异步Python机器人中同步操作的阻塞问题  Word如何将文字快速转成表格 Word文本转换成表格功能使用技巧【效率】  优化2xN网格最大路径和的动态规划算法实践  电脑视频号|直播|如何分享屏幕  J*aScript中高效处理用户输入:从Keyup事件到表单提交的优化实践  雨课堂官网在线登录 网页版雨课堂登录链接  极兔快递官网查询入口手机版 手机极兔快递登录查询入口官方  《虎扑》取消评分记录方法  b站网页版入口 哔哩哔哩官方网站直接进入  如何在CSS中使用伪类:valid实现表单验证提示_结合:valid改变边框颜色  中大网校app做题记录清除方法  Flash AS3.0简易相册制作  疯狂小鸟微信小游戏入口 疯狂小鸟网页版秒玩  海棠阅读网页版_进入海棠网页版在线阅读中心  cad加载的线型看不见怎么办_cad线型不可见问题解决方法  word表格如何按某一列内容进行排序_Word表格按列排序方法  红手指专业版app注册教程  高效调试PHP大型嵌套数组:JSON序列化与可视化工具实践  使用Python和NLTK从文本中高效提取名词的实用教程  VS Code源代码管理(SCM)视图的进阶使用技巧  Win10显卡驱动安装失败怎么办 Win10使用DDU彻底卸载驱动【解决】  TikTok视频播放中断怎么办 TikTok播放异常修复方法  c++中的const关键字用法大全_c++ const正确使用指南  食品生产用水只要符合国家规定的生活饮用水卫生标准就可以吗  如何外贸网站设计-能留住客户提升用户体验!  《东方财富》条件单关闭方法  126邮箱网页在线登录2025_126邮箱网页版入口官方地址  TikTok视频播放不流畅怎么办 TikTok视频播放优化方法  cad视图选项卡不见了怎么办_cad视图标签恢复显示方法  Go语言中方法接收器的选择:值类型还是指针类型?  《撕歌》会员开通方法  SQLAlchemy 2.0 与 Pydantic 模型类型安全集成指南  拷贝漫画2025网页版入口 拷贝漫画官网免费看全集  《图怪兽》退出登录方法  性能与资源监视器快捷打开  Lar*el怎么实现全文搜索_Lar*el Scout集成Algolia教程  Excel宏怎么删除_Excel中删除宏的详细操作流程  Firefox OS应用开发:解决XMLHttpRequest跨域请求阻塞问题  win11如何开启单声道音频 Win11为听障用户合并左右声道【辅助】  掌握CSS :has() 选择器:父选择器、嵌套限制与常见陷阱解析  QQ邮箱官方登录页_腾讯出品安全稳定的邮箱服务  优酷官网登录入口电脑版 优酷官网网址入口  向日葵客户端怎么进行语音通话_向日葵客户端语音通话功能使用方法  电脑从睡眠中被自动唤醒怎么办_Windows唤醒源事件查看与禁用【解决】  Dash应用中自定义HTML页面标题与网站图标(F*icon)的实用指南  Sublime怎么快速复制文件路径_Sublime右键菜单增强技巧  iPhone14无法连接蓝牙设备如何解决 

 2025-11-29

了解您产品搜索量及市场趋势,制定营销计划

同行竞争及网站分析保障您的广告效果

点击免费数据支持

提交您的需求,1小时内享受我们的专业解答。

运城市盐湖区信雨科技有限公司


运城市盐湖区信雨科技有限公司

运城市盐湖区信雨科技有限公司是一家深耕海外推广领域十年的专业服务商,作为谷歌推广与Facebook广告全球合作伙伴,聚焦外贸企业出海痛点,以数字化营销为核心,提供一站式海外营销解决方案。公司凭借十年行业沉淀与平台官方资源加持,打破传统外贸获客壁垒,助力企业高效开拓全球市场,成为中小企业出海的可靠合作伙伴。

 8156699

 13765294890

 8156699@qq.com

Notice

We and selected third parties use cookies or similar technologies for technical purposes and, with your consent, for other purposes as specified in the cookie policy.
You can consent to the use of such technologies by closing this notice, by interacting with any link or button outside of this notice or by continuing to browse otherwise.