Python核心技术与实战
景霄
Facebook资深工程师
立即订阅
13891 人已学习
课程目录
已完结 46 讲
0/4登录后,你可以任选4讲全文学习。
开篇词 (1讲)
开篇词 | 从工程的角度深入理解Python
免费
基础篇 (14讲)
01 | 如何逐步突破,成为Python高手?
02 | Jupyter Notebook为什么是现代Python的必学技术?
03 | 列表和元组,到底用哪一个?
04 | 字典、集合,你真的了解吗?
05 | 深入浅出字符串
06 | Python “黑箱”:输入与输出
07 | 修炼基本功:条件与循环
08 | 异常处理:如何提高程序的稳定性?
09 | 不可或缺的自定义函数
10 | 简约不简单的匿名函数
11 | 面向对象(上):从生活中的类比说起
12 | 面向对象(下):如何实现一个搜索引擎?
13 | 搭建积木:Python 模块化
14 | 答疑(一):列表和元组的内部实现是怎样的?
进阶篇 (11讲)
15 | Python对象的比较、拷贝
16 | 值传递,引用传递or其他,Python里参数是如何传递的?
17 | 强大的装饰器
18 | metaclass,是潘多拉魔盒还是阿拉丁神灯?
19 | 深入理解迭代器和生成器
20 | 揭秘 Python 协程
21 | Python并发编程之Futures
22 | 并发编程之Asyncio
23 | 你真的懂Python GIL(全局解释器锁)吗?
24 | 带你解析 Python 垃圾回收机制
25 | 答疑(二):GIL与多线程是什么关系呢?
规范篇 (7讲)
26 | 活都来不及干了,还有空注意代码风格?!
27 | 学会合理分解代码,提高代码可读性
28 | 如何合理利用assert?
29 | 巧用上下文管理器和With语句精简代码
30 | 真的有必要写单元测试吗?
31 | pdb & cProfile:调试和性能分析的法宝
32 | 答疑(三):如何选择合适的异常处理方式?
量化交易实战篇 (8讲)
33 | 带你初探量化世界
免费
34 | RESTful & Socket: 搭建交易执行层核心
35 | RESTful & Socket: 行情数据对接和抓取
36 | Pandas & Numpy: 策略与回测系统
免费
37 | Kafka & ZMQ:自动化交易流水线
38 | MySQL:日志和数据存储系统
39 | Django:搭建监控平台
40 | 总结:Python中的数据结构与算法全景
技术见闻与分享 (4讲)
41 | 硅谷一线互联网公司的工作体验
42 | 细数技术研发的注意事项
加餐 | 带你上手SWIG:一份清晰好用的SWIG编程实践指南
43 | Q&A:聊一聊职业发展和选择
结束语 (1讲)
结束语 | 技术之外的几点成长建议
Python核心技术与实战
登录|注册

21 | Python并发编程之Futures

景霄 2019-06-26
你好,我是景霄。
无论对于哪门语言,并发编程都是一项很常用很重要的技巧。比如我们上节课所讲的很常见的爬虫,就被广泛应用在工业界的各个领域。我们每天在各个网站、各个 App 上获取的新闻信息,很大一部分便是通过并发编程版的爬虫获得。
正确合理地使用并发编程,无疑会给我们的程序带来极大的性能提升。今天这节课,我就带你一起来学习理解、运用 Python 中的并发编程——Futures。

区分并发和并行

在我们学习并发编程时,常常同时听到并发(Concurrency)和并行(Parallelism)这两个术语,这两者经常一起使用,导致很多人以为它们是一个意思,其实不然。
首先你要辨别一个误区,在 Python 中,并发并不是指同一时刻有多个操作(thread、task)同时进行。相反,某个特定的时刻,它只允许有一个操作发生,只不过线程 / 任务之间会互相切换,直到完成。我们来看下面这张图:
图中出现了 thread 和 task 两种切换顺序的不同方式,分别对应 Python 中并发的两种形式——threading 和 asyncio。
对于 threading,操作系统知道每个线程的所有信息,因此它会做主在适当的时候做线程切换。很显然,这样的好处是代码容易书写,因为程序员不需要做任何切换操作的处理;但是切换线程的操作,也有可能出现在一个语句执行的过程中(比如 x += 1),这样就容易出现 race condition 的情况。
取消
完成
0/1000字
划线
笔记
复制
© 版权归极客邦科技所有,未经许可不得传播售卖。 页面已增加防盗追踪,如有侵权极客邦将依法追究其法律责任。
该试读文章来自付费专栏《Python核心技术与实战》,如需阅读全部文章,
请订阅文章所属专栏。
立即订阅
登录 后留言

精选留言(35)

  • KaitoShy
    思考题:
    1. request.get 会触发:ConnectionError, TimeOut, HTTPError等,所有显示抛出的异常都是继承requests.exceptions.RequestException
    2. executor.map(download_one, urls) 会触发concurrent.futures.TimeoutError
    3. result() 会触发Timeout,CancelledError
    4. as_completed() 会触发TimeOutError

    作者回复: 回答的很对

    2019-06-26
    2
    23
  • SCAR
    future之与中文理解起来其实挺微妙,不过这与生活中大家熟知的期物在底层逻辑上是一致的,future英文词义中就有期货的意思,都是封存一个东西,平常你该干嘛就干嘛,可以不用去理会,在未来的某个时候去看结果就行,只是python中那个物是对象而已。而关键词是延迟,异步。
    思考题:添加异常处理
    def download_all(sites):
        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            to_do = {}
            for site in sites:
                future = executor.submit(download_one, site)
                to_do[future]=site
                
            for future in concurrent.futures.as_completed(to_do):
                try:
                    res=future.result()
                except request.exceptions.HTTPError as e:
                    e_msg=‘HTTP erro’
                except request.exceptions.ConnectionError as e:
                    e_msg=‘Connection erro’
                else:
                    e_msg=''
                if e_msg:
                    site=to_do[future]
                    Print(‘Error is {} from {}’.format(e_msg,site))
    2019-06-26
    1
    13
  • Luke Zhang
    关于concurrent写过一篇学习笔记:
    https://www.zhangqibot.com/post/python-concurrent-futures/
    Python实现多线程/多进程,大家常常会用到标准库中的threading和multiprocessing模块。
    但从Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutor和ProcessPoolExecutor两个类,实现了对threading和multiprocessing的进一步抽象,使得开发者只需编写少量代码即可让程序实现并行计算。
    2019-06-26
    8
  • Fergus
    需要加异常的应该就只有一个地方:requests.get()发送网页请求的时候。其它地方不涉及IO。也不涉及数据类型变化,不用做数据类型判断。
    由于不能访问wiki,所以网页改了成了国内的。
    -- ps: 和0.2s比起来太慢了。

    # -*- encoding -*-
    '''
    py 3.6
    sulime
    '''
    import concurrent.futures
    import threading
    import requests
    import time

    now = lambda: time.perf_counter()

    def download_one(url):
        try:
            req = requests.get(url)
            req.raise_for_status()
            print('Read {} from {}'.format(len(req.content), url))
        except:
            print('404')

    def download_all(sites):
        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            executor.map(download_one, sites)

    def main():
        sites = [
                'https://www.baidu.com/',
                'https://pypi.org/',
                'https://www.sina.com.cn/',
                'https://www.163.com/',
                'https://news.qq.com/',
                'http://www.ifeng.com/',
                'http://www.ce.cn/',
                'https://news.baidu.com/',
                'http://www.people.com.cn/',
                'http://www.ce.cn/',
                'https://news.163.com/',
                'http://news.sohu.com/'
                ]
        start = now()
        download_all(sites)
        print('Download {} sites in {} s'.format(len(sites), now() - start))

    if __name__ == '__main__':
        main()

    # Read 2443 from https://www.baidu.com/
    # Read 6216 from https://news.qq.com/
    # Read 699004 from https://www.163.com/
    # Read 250164 from http://www.ifeng.com/
    # Read 579572 from https://www.sina.com.cn/
    # Read 107530 from http://www.ce.cn/
    # Read 165901 from http://www.people.com.cn/
    # Read 107530 from http://www.ce.cn/
    # Read 210816 from https://news.163.com/
    # Read 74060 from https://news.baidu.com/
    # Read 174553 from http://news.sohu.com/
    # Read 19492 from https://pypi.org/
    # Download 12 sites in 2.8500169346527673 s
    # [Finished in 3.6s]
    2019-06-30
    4
  • Geek_5bb182
    老师你好,concurrent.futures 和 asyncio 中的Future 的区别是什么,在携程编程中

    作者回复: 可以参考https://stackoverflow.com/questions/29902908/what-is-the-difference-between-concurrent-futures-and-asyncio-futures

    2019-06-27
    3
  • 大王叫我来巡山
    老师,我感觉您对并发和并行的理解是有问题的,并发是针对最初的单核CPU的,并行是针对现代的多核CPU,并且所有的调度行为都是基于线程的,一个进程中至少有一个线程,资源的分配是基与进程的,并不是只有多进程模型才可以同时在多个核心上运行的。
    2019-11-19
    2
    2
  • somenzz
    from multiprocessing.dummy import Pool as ThreadPool
    with ThreadPool(processes=100) as executor:
        executor.map(func, iterable)

    请问老师,Futures 和这种方式哪一种好呢? 我在实际的网终请求中发现 Futures 请求成功的次数更少。 都是 100 个线程,处理 3000 个相同的请求。
    2019-07-19
    2
  • Fergus
    老师好,看到文中为了使用.as_complete()作的修改似乎做了重复的工作,我对比了使用.as_complete()和.submit()后直接result(),得到的是相同的结果。
    -- 问1:这里所做的修改只是为了展示.as_complete的功能么?我查看了文档也没想明白。
    -- 问2:.as_complete()可能会在什么场景下使用得比较多?

    eg.2.`.submit()`后直接`.result()`

        with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
            for site in sites:
                future = executor.submit(download_one, site)
                print(future.result())
    2019-06-30
    1
    2
  • 羁绊
    在使用executor.map()时候假如上面sites里面的url有链接超时报错的话,ThreadPoolExecutor会隐藏该异常,这个线程会在没有任何输出的情况下终止,然后线程继续执行
    2019-06-30
    1
  • 干布球
    请问老师,future任务是调用submit后就开始执行,还是在调用as_completed之后才开始执行呢?

    作者回复: submit之后

    2019-06-26
    2
    1
  • 无才不肖生
    在submit()后只是放入队列而并未真正开始执行,as_completed时才真正去执行,对吗?
    as_completed会不会有个别future并执行完而没有输出结果,还是说就一定都会完成
    2019-06-26
    1
  • Paul Shan
    请问老师,多数编程语言的并行(Parallelism),都是通过多线程实现(例如rxjava),因为线程是CPU调度的最小单元,为什么Python并行是通过多进程实现,多谢!
    2019-11-21
  • Paul Shan
    Future 是一个任务队列模型,所有需要IO的任务,都进入队列,然后根据IO和CPU的使用来回调度任务,合理配置IO和CPU的资源。
    2019-11-20
  • Paul Shan
    Concurrency -- 流水线,充分利用不同的设备,例如CPU,内存和网络,同一时刻,只有一个任务被CPU执行。
    Parallelism -- 并行处理,充分利用多个CPU
    多进程不会比多线程块,我个人以为线程是操作系统调度的最小单元,切换进程比切换线程代价大。
    2019-11-20
  • Leon📷
    老师给一些不存在的网站让我们去测试用例,机智的我换成了BAT的首页
    2019-11-10
  • tsunami
    老师,python不是不适合多线程吗,多线程由于GIL反而变慢了吗,这个只是针对cpu密集型的不适合多线程吗,IO密集型的可以使用python多线程吗?
    2019-10-14
  • 自由民
    并发是交替执行,同时只有一个任务执行。用于高I/O程序。并行是多个进程同时运行,用于CPU-heavy程序。原来Python可以做并行运算的,建立线程池的时候不指定线程数量就行了。以前写并发程序比较少。
    思考题,只写异常处理的地方吧。
    # 处理requests异常
    except ConnectionError as err:
    print(err)
    except HTTPError as err:
    print(err)
    except Timeout as err:
    print(err)
    # 处理futures异常
    except TooManyRedirects as err:
    print(err)
    except CancelledError as err:
    print(err)
    except TimeoutError as err:
    print(err)
    except BrokenExecutor as err:
    print(err)
    except:
    print("发生错误")

    课程的练习代码: https://github.com/zwdnet/PythonPractice
    2019-10-12
  • 建强
    简单的加了个异常处理,请老师指正
    #多线程实现网站内容下载演示
    import concurrent.futures
    import requests
    import threading
    import time

    def download_one(url):
        resp = requests.get(url)
        print('Read {} from {}'.format(len(resp.content), url))
        
        #模拟下载任务出错
        print(1/0)

    def download_all(sites):
        try:
            #以下是用futures对象并发下载
            with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
                to_do = []
                for site in sites:
                    future = executor.submit(download_one, site)
                    to_do.append(future)

                for future in concurrent.futures.as_completed(to_do):
                    future.result()
                        
        except Exception as e:
            print('运行错误:',e)
    2019-10-11
  • 白勇坤
    关于windows运行 多进程 子函数print不打印问题:
    The comments revealed that OP uses Windows as well as Spyder. Since Spyder redirects stdoutand Windows does not support forking, a new child process won't print into the Spyder console. This is simply due to the fact that stdout of the new child process is Python's vanilla stdout, which can also be found in sys.__stdout__.

    解决:1.打印日志到文件。2.用返回值替代打印。(还是用mac或者linux来学吧。。。。。。)
    2019-08-28
  • 白勇坤
    import requests
    import time
    import concurrent.futures
    import threading

    def download_one(url):
        headers = {"User-Agent" : "User-Agent:Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0;"}
        resp = requests.get(url,headers=headers)
        print('Read {} from {}'.format(len(resp.content), url))
        
    def download_all(sites):
    # with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # executor.map(download_one, sites)
        
        with concurrent.futures.ProcessPoolExecutor() as executor:
            executor.map(download_one, sites)

    def main():
        sites = [
            'https://baike.baidu.com/item/ARTS',
            'https://baike.baidu.com/item/History',
            'https://baike.baidu.com/item/Society',
            'https://baike.baidu.com/item/Biography',
            'https://baike.baidu.com/item/Mathematics',
            'https://baike.baidu.com/item/Technology',
            'https://baike.baidu.com/item/Geography',
            'https://baike.baidu.com/item/Science'
        ]
        start_time = time.perf_counter()
        download_all(sites)
        end_time = time.perf_counter()
        print('Download {} sites in {} seconds'.format(len(sites), end_time - start_time))
        
    if __name__ == '__main__':
        main()
    输出:Download 8 sites in 0.25235257599979377 seconds
    用多进程后为什么 download_one函数的 print内容没有打印出来呢。。。。。
    2019-08-28
收起评论
35
返回
顶部