Python核心技术与实战
景霄
Facebook资深工程师
立即订阅
13891 人已学习
课程目录
已完结 46 讲
0/4登录后,你可以任选4讲全文学习。
开篇词 (1讲)
开篇词 | 从工程的角度深入理解Python
免费
基础篇 (14讲)
01 | 如何逐步突破,成为Python高手?
02 | Jupyter Notebook为什么是现代Python的必学技术?
03 | 列表和元组,到底用哪一个?
04 | 字典、集合,你真的了解吗?
05 | 深入浅出字符串
06 | Python “黑箱”:输入与输出
07 | 修炼基本功:条件与循环
08 | 异常处理:如何提高程序的稳定性?
09 | 不可或缺的自定义函数
10 | 简约不简单的匿名函数
11 | 面向对象(上):从生活中的类比说起
12 | 面向对象(下):如何实现一个搜索引擎?
13 | 搭建积木:Python 模块化
14 | 答疑(一):列表和元组的内部实现是怎样的?
进阶篇 (11讲)
15 | Python对象的比较、拷贝
16 | 值传递,引用传递or其他,Python里参数是如何传递的?
17 | 强大的装饰器
18 | metaclass,是潘多拉魔盒还是阿拉丁神灯?
19 | 深入理解迭代器和生成器
20 | 揭秘 Python 协程
21 | Python并发编程之Futures
22 | 并发编程之Asyncio
23 | 你真的懂Python GIL(全局解释器锁)吗?
24 | 带你解析 Python 垃圾回收机制
25 | 答疑(二):GIL与多线程是什么关系呢?
规范篇 (7讲)
26 | 活都来不及干了,还有空注意代码风格?!
27 | 学会合理分解代码,提高代码可读性
28 | 如何合理利用assert?
29 | 巧用上下文管理器和With语句精简代码
30 | 真的有必要写单元测试吗?
31 | pdb & cProfile:调试和性能分析的法宝
32 | 答疑(三):如何选择合适的异常处理方式?
量化交易实战篇 (8讲)
33 | 带你初探量化世界
免费
34 | RESTful & Socket: 搭建交易执行层核心
35 | RESTful & Socket: 行情数据对接和抓取
36 | Pandas & Numpy: 策略与回测系统
免费
37 | Kafka & ZMQ:自动化交易流水线
38 | MySQL:日志和数据存储系统
39 | Django:搭建监控平台
40 | 总结:Python中的数据结构与算法全景
技术见闻与分享 (4讲)
41 | 硅谷一线互联网公司的工作体验
42 | 细数技术研发的注意事项
加餐 | 带你上手SWIG:一份清晰好用的SWIG编程实践指南
43 | Q&A:聊一聊职业发展和选择
结束语 (1讲)
结束语 | 技术之外的几点成长建议
Python核心技术与实战
登录|注册

22 | 并发编程之Asyncio

景霄 2019-06-28
你好,我是景霄。
上节课,我们一起学习了 Python 并发编程的一种实现——多线程。今天这节课,我们继续学习 Python 并发编程的另一种实现方式——Asyncio。不同于协程那章,这节课我们更注重原理的理解。
通过上节课的学习,我们知道,在处理 I/O 操作时,使用多线程与普通的单线程相比,效率得到了极大的提高。你可能会想,既然这样,为什么还需要 Asyncio?
诚然,多线程有诸多优点且应用广泛,但也存在一定的局限性:
比如,多线程运行过程容易被打断,因此有可能出现 race condition 的情况;
再如,线程切换本身存在一定的损耗,线程数不能无限增加,因此,如果你的 I/O 操作非常 heavy,多线程很有可能满足不了高效率、高质量的需求。
正是为了解决这些问题,Asyncio 应运而生。

什么是 Asyncio

Sync VS Async

我们首先来区分一下 Sync(同步)和 Async(异步)的概念。
所谓 Sync,是指操作一个接一个地执行,下一个操作必须等上一个操作完成后才能执行。
而 Async 是指不同操作间可以相互交替执行,如果其中的某个操作被 block 了,程序并不会等待,而是会找出可执行的操作继续执行。
举个简单的例子,你的老板让你做一份这个季度的报表,并且邮件发给他。
取消
完成
0/1000字
划线
笔记
复制
© 版权归极客邦科技所有,未经许可不得传播售卖。 页面已增加防盗追踪,如有侵权极客邦将依法追究其法律责任。
该试读文章来自付费专栏《Python核心技术与实战》,如需阅读全部文章,
请订阅文章所属专栏。
立即订阅
登录 后留言

精选留言(41)

  • Jingxiao 置顶
    思考题答案:
    import multiprocessing
    import time


    def cpu_bound(number):
        return sum(i * i for i in range(number))


    def find_sums(numbers):
        with multiprocessing.Pool() as pool:
            pool.map(cpu_bound, numbers)


    if __name__ == "__main__":
        numbers = [10000000 + x for x in range(20)]

        start_time = time.time()
        find_sums(numbers)
        duration = time.time() - start_time
        print(f"Duration {duration} seconds")
    2019-07-02
    4
  • helloworld
    总结多线程和协程之间的共同点和区别:
    共同点:
    都是并发操作,多线程同一时间点只能有一个线程在执行,协程同一时间点只能有一个任务在执行;
    不同点:
    多线程,是在I/O阻塞时通过切换线程来达到并发的效果,在什么情况下做线程切换是由操作系统来决定的,开发者不用操心,但会造成race condition;
    协程,只有一个线程,在I/O阻塞时通过在线程内切换任务来达到并发的效果,在什么情况下做任务切换是开发者决定的,不会有race condition的情况;
    多线程的线程切换比协程的任务切换开销更大;
    对于开发者而言,多线程并发的代码比协程并发的更容易书写。
    一般情况下协程并发的处理效率比多线程并发更高。
    2019-06-28
    16
  • transformation
    import time
    from concurrent import futures


    def cpu_bound(number):
        return sum(i * i for i in range(number))


    def calculate_sums(numbers):
        for number in numbers:
            print(cpu_bound(number))


    def main():
        start_time = time.perf_counter()
        numbers = [10000000 + x for x in range(20)]
        calculate_sums(numbers)
        end_time = time.perf_counter()
        print('Calculation takes {} seconds'.format(end_time - start_time))


    def main_process():
        start_time = time.perf_counter()
        numbers = [10000000 + x for x in range(20)]
        with futures.ProcessPoolExecutor() as pe:
            result = pe.map(cpu_bound, numbers)
            print(f"result: {list(result)}")
        end_time = time.perf_counter()
        print('multiprocessing Calculation takes {} seconds'.format(end_time - start_time))


    if __name__ == '__main__':
        main()
        main_process()
    ————————
    输出:
    333333283333335000000
    333333383333335000000
    333333483333355000001
    333333583333395000005
    333333683333455000014
    333333783333535000030
    333333883333635000055
    333333983333755000091
    333334083333895000140
    333334183334055000204
    333334283334235000285
    333334383334435000385
    333334483334655000506
    333334583334895000650
    333334683335155000819
    333334783335435001015
    333334883335735001240
    333334983336055001496
    333335083336395001785
    333335183336755002109
    Calculation takes 15.771127400000001 seconds
    result: [333333283333335000000, 333333383333335000000, 333333483333355000001, 333333583333395000005, 333333683333455000014, 333333783333535000030, 333333883333635000055, 333333983333755000091, 333334083333895000140, 333334183334055000204, 333334283334235000285, 333334383334435000385, 333334483334655000506, 333334583334895000650, 333334683335155000819, 333334783335435001015, 333334883335735001240, 333334983336055001496, 333335083336395001785, 333335183336755002109]
    multiprocessing Calculation takes 4.7333084 seconds
    2019-06-28
    5
    6
  • 天凉好个秋
    如果完成,则将其放到预备状态的列表;
    如果未完成,则继续放在等待状态的列表。
    这里是不是写的有问题?
    PS:想问一下,完成之后为什么还要放队列里?难道不应该从队列里移除吗?
    2019-06-28
    2
    5
  • hlz-123
    1、单进程,老师的原程序,运行时间
         Calculation takes 15.305913339 seconds
    2、CPU并行方式,运行时间:
         Calculation takes 3.457259904 seconds
          def calculate_sums(numbers):
                 with concurrent.futures.ProcessPoolExecutor() as executor:
                 executor.map(cpu_bound,numbers)
    3、多线程,cocurrent.futures,运行时间
          Calculation takes 15.331446270999999 seconds
          def calculate_sums(numbers):
                    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
                    executor.map(cpu_bound,numbers)
    4、异步方式,asyncio
          Calculation takes 16.019983702999998 seconds
          async def cpu_bound(number):
                print(sum(i * i for i in range(number)))
          async def calculate_sums(numbers):
               tasks=[asyncio.create_task(cpu_bound(number)) for number in numbers]
               await asyncio.gather(*tasks)
    2019-06-28
    1
    4
  • Geek_59f23e
    import time
    from multiprocessing import Pool


    def square(number):
        return sum(i * i for i in range(number))


    def single_process(numbers):
        res = []
        for number in numbers:
            res.append(square(number))
        return res


    def multi_process(numbers):
        with Pool() as pool:
            res = pool.map(square, numbers)
        return res


    if __name__ == '__main__':
        numbers = [10000000 + x for x in range(20)]
        start1 = time.perf_counter()
        single_process(numbers)
        print('单进程用时:%f 秒' % (time.perf_counter() - start1))
        start2 = time.perf_counter()
        multi_process(numbers)
        print('多进程用时:%f 秒' % (time.perf_counter() - start2))

    ————————
    输出:
    单进程用时:29.382878 秒
    多进程用时:10.354565 秒

    [333333283333335000000, 333333383333335000000, 333333483333355000001, 333333583333395000005, 333333683333455000014, 333333783333535000030, 333333883333635000055, 333333983333755000091, 333334083333895000140, 333334183334055000204, 333334283334235000285, 333334383334435000385, 333334483334655000506, 333334583334895000650, 333334683335155000819, 333334783335435001015, 333334883335735001240, 333334983336055001496, 333335083336395001785, 333335183336755002109]
    2019-06-28
    2
  • Destroy、
    race condition 是什么?
    2019-06-28
    1
    2
  • KaitoShy
    运行文章中出现的代码时出现‘aiohttp.client_exceptions.ClientConnectorCertificateError’的这个报错,我讲代码第7行更改成‘async with session.get(url, ssl=False) as resp’后运行成功,是否还有其他的解决方案?
    2019-07-03
    2
    1
  • 唐哥
    老师好,对于 Asyncio 来说,它的任务在运行时不会被外部的一些因素打断。不被打断是如何保证的?还有event loop是每次取出一个任务运行,当这个任务运行期间它就是只等待任务结束吗?不干其他事了吗?
    2019-07-01
    1
  • 方向
    如果完成,则放到预备状态列表,这句话不理解。这样一来,预备状态列表同时拥有两种形式的任务啊
    2019-06-28
    1
    1
  • 轻风悠扬
    你好老师,我在运行文中例子的时候得到一个RuntimeError: Event loop is closed。
    2019-11-23
  • Paul Shan
    sync是线性前后执行。
    async是穿插执行,之所以要穿插,代码需要的资源不同,有的代码需要CPU,有的代码需要IO(例如网络),穿插以后,同时需要CPU和网络的代码可以同时执行,充分利用硬件。

    具体到关键字 async 是表示函数是异步的,也就是来回穿插的起点(进入预备队列),await是表示调用需要IO,也就是进入等待队列的入口(函数开始调用)和出口(函数调用结束,重新进入预备队列)。
    2019-11-21
  • pyhton 3.7.4 在jupyter中 执行 asyncio.run 还是报RuntimeError: asyncio.run() cannot be called from a running event loop, 换成 await 报SyntaxError: 'await' outside async function,这是什么原因
    2019-11-21
    1
  • 段总
    网站下载那个代码出现这个问题:Cannot connect to host en.wikipedia.org:443 ssl:default [远程主机强迫关闭了一个现有的连接。] 烦请大家有空看看该怎么解决?谢过大家!
    2019-11-18
    1
  • Leon📷
    老师,我的pycharm最新版报这个错误,这个是啥意思 File "/Users/Liuchao/PycharmProjects/lsn1/venv/lib/python3.7/site-packages/aiohttp/tcp_helpers.py", line 20, in <module>
        def tcp_keepalive(transport: asyncio.Transport) -> None:
    AttributeError: module 'asyncio' has no attribute 'Transport'
    2019-11-10
  • 孤独剑
    c++调用pyc文件A.pyc,A.pyc又同时调用了自定义块B.pyc,系统提示找不到B.pyc模块,是什么原因呢 两个模块都放在c++软件系统目录
    2019-11-08
  • 阿卡牛
    常听到阻塞,同步是不是就是阻塞地意思

    作者回复: 通俗来讲是

    2019-11-01
  • 建强
    上网查询资料后,初步了解了多进程的一些知识,按照资料中的方法简单改写了一下程序,由于多进程方式时,不知什么原因,cpu_bound函数不能实时输出,所以就把cpu_bound改为返回字符串形式的结果,等所有的数计算完成后,再一并输出结果 ,程序中常规执行和多进程两种方式都有,并作了对比后发现,常规执行用时约23秒,多进程用时约6秒,两者相差4倍,程序如下,不足处请老师指正:
    #多进程演示
    import multiprocessing
    import time

    def cpu_bound(number):
        return 'sum({}^2)={}'.format(number,sum(i * i for i in range(number)))

    def calculate_sums(numbers):
        
        results = []

        print('-'*10+'串行执行开始:'+'-'*10)

        for number in numbers:
            results.append(cpu_bound(number))

        print('-'*10+'串行执行结束,结果如下:'+'-'*10)
        for res in results:
            print(res)

    def multicalculate_sums(numbers):

        #创建有4个进程的进程池
        pool = multiprocessing.Pool(processes=4)

        results = []

        print('-'*10+'多进程执行开始:'+'-'*10)

        #为每一个需要计算的元素创建一个进程
        for number in numbers:
            results.append(pool.apply_async(cpu_bound, (number,)))

        pool.close() #关闭进程池,不能往进程池添加进程
        pool.join() #等待进程池中的所有进程执行完毕

        print('-'*10+'多进程执行结束,结果如下:'+'-'*10)
        for res in results:
            print(res.get())
        
    def main():

        numbers = [10000000 + x for x in range(20)]

        #串行执行方式
        start_time = time.perf_counter()
        calculate_sums(numbers)
        end_time = time.perf_counter()
        print('串行执行用时:Calculation takes {} seconds'.format(end_time - start_time))

        #多进程执行方式
        start_time = time.perf_counter()
        multicalculate_sums(numbers)
        end_time = time.perf_counter()
        print('多进程执行用时:Calculation takes {} seconds'.format(end_time - start_time))
        
    if __name__ == '__main__':
        main()

    作者回复: 很棒的例子,但是对计算密集型程序,你可以打开任务管理器的性能页,CPU 选择显示逻辑处理器,可以注意到串行执行和并行执行的不同。

    2019-10-21
  • 自由民
    总结:Python并发编程开销从大到小为多进程,多线程和协程。分别适合于重CPU操作,重I/O快操作,重I/O慢操作。注意使用Asyncio需要第三方库支持。
    思考题
    # 思考题 并行版本
    def calculate_sums_future(numbers):
        with concurrent.futures.ThreadPoolExecutor() as executor:
            executor.map(cpu_bound, numbers)

    def calcuter_future(numbers):
        start_time = time.perf_counter()
        calculate_sums_future(numbers)
        end_time = time.perf_counter()
        print("多进程版本,耗时{}秒".format(end_time-start_time))
    运行结果却不好,耗时是普通版本的8倍。
    又写了一个动态规划的版本:
    # 思考题 动态规划版本
    squ = {} # 用来储存中间结果
    def cpu_dp(number):
    result = 0
    for i in range(number):
    if i not in squ.keys():
    squ[i] = i*i
    result += squ[i]
    print("number={}, result={}".format(number, result))

    def calculate_sums_dp(numbers):
    for number in numbers:
    cpu_dp(number)

    def calcuter_dp(numbers):
    start_time = time.perf_counter()
    calculate_sums_dp(numbers)
    end_time = time.perf_counter()
    print("动态规划版本,耗时{}秒".format(end_time-start_time))
    耗时还是比常规版本慢了两倍。原因?
    课程的练习代码: https://github.com/zwdnet/PythonPractice
    2019-10-14
  • 安排
    cpu密集型的任务为什么多进程会好些?用多线程不也是可以吗?多进程我理解的就是隔离作用会好一些。用多线程只要能跑满多个核效率是一样的吧。
    2019-10-02
    2
收起评论
41
返回
顶部