28|调度引擎:负载均衡与调度器实战
郑建勋
你好,我是郑建勋。
在上一节课程中,我们实战了广度优先搜索算法,不过我们对网站的爬取都是在一个协程中进行的。在真实的实践场景中,我们常常需要爬取多个初始网站,我们希望能够同时爬取这些网站。这就需要合理调度和组织爬虫任务了。因此,这节课的重点就是实战任务调度的高并发模型,使资源得到充分的利用。
实战调度引擎
调度引擎主要目标是完成下面几个功能:
创建调度程序,接收任务并将任务存储起来;
执行调度任务,通过一定的调度算法将任务调度到合适的 worker 中执行;
创建指定数量的 worker,完成实际任务的处理;
创建数据处理协程,对爬取到的数据进行进一步处理。
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
- 深入了解
- 翻译
- 解释
- 总结
本文深入探讨了调度引擎的实际应用和底层原理,重点介绍了高并发模型的任务调度和资源利用。首先,文章详细解释了调度引擎的核心功能和运行机制,包括创建调度程序、执行调度任务、创建worker以及数据处理协程。其次,通过函数式选项模式实现了高度可配置化的调度器初始化,使参数配置更加灵活和可扩展。此外,文章还深入探讨了通道的底层原理和select机制的工作原理,为读者提供了更深入的学习方向。整体而言,本文内容涵盖了调度引擎的实际应用、技术特点和底层原理,适合对高并发模型和任务调度感兴趣的读者阅读学习。文章内容丰富,涉及的技术细节和实例丰富多样,对于想深入了解调度引擎的读者具有很高的参考价值。
仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《Go 进阶 · 分布式爬虫实战》,新⼈⾸单¥68
《Go 进阶 · 分布式爬虫实战》,新⼈⾸单¥68
立即购买
© 版权归极客邦科技所有,未经许可不得传播售卖。 页面已增加防盗追踪,如有侵权极客邦将依法追究其法律责任。
登录 后留言
全部留言(13)
- 最新
- 精选
- shuff1ebug应该是,会丢失发给worker的任务。 case r := <-s.requestCh:的情况下,如果req不是nil,应该把req再添加到reqQueue头部
作者回复: 是的,后面有修复
2022-12-15归属地:北京1 - Alex有个小问题请教一下老师, 这个Seeds 是slice, 我觉得在取出任务的时候会有并发问题 如果没有请老师指教下
作者回复: 目前只有初始化会写,运行时都是读,所以不会有并发问题。 随着课程的深入,运行时可以添加资源,到时会有并发冲突的考虑
2023-02-12归属地:江苏2 - mantra本文档中的代码链接 tag v0.1.4 (之前的文档,也一样)对应的 URL 都是主库的地址(https://github.com/dreamerjackson/crawler),这是故意的吗?正确的应该是 https://github.com/dreamerjackson/crawler/archive/refs/tags/v0.1.4.tar.gz
作者回复: 我所有的项目链接都放置的主库的地址,tag的地址就稍微点击一下吧
2023-01-13归属地:北京 - 7oty如何控制某个爬虫任务的启动,停止和任务的实时运行状态?
作者回复: 后面文章会有介绍
2022-12-20归属地:广东 - 翡翠虎如果任务推送到worker,但又还没实施,这时候那台服务器停机了,或者进程退出了,任务会不会丢?怎么处理任务还没执行就丢了的这种情况呢?
作者回复: 随着课程深入会看到,我们的任务是存储在etcd中的,所以不会丢失
2022-12-20归属地:广西2 - Geek_a9ea01for { var req *collect.Request var ch chan *collect.Request if len(reqQueue) > 0 { req = reqQueue[0] reqQueue = reqQueue[1:] ch = s.workerCh } select { case r := <-s.requestCh: reqQueue = append(reqQueue, r) case ch <- req: } } 有个问题: 如果ch堵塞了 这时候又有requestCh请求上来;会不会导致ch数据丢失?
作者回复: 这个是已知的问题,我在后面的课程中做了修复
2022-12-13归属地:广东 - RealmSeeds | | req ParseFunc(req) HandleResult() requestCh----> reqQueue -----> workerCh ----------> out-----------> result: ^ - item ==> 存储 | - req | |---------------<----------------------<---------------------<------|2022-12-13归属地:浙江1
- Geek_38ea75我有好几个问题 1.为啥req和ch放在for循环内部声明,这样有个问题,就是work来不及执行的话,会丢失。 2.如果requestCh中的任务很多的话,会导致work队列中没有能够运行的任务。2024-03-07归属地:北京
- 周龙亭func (s *Schedule) Schedule() { var reqQueue = s.Seeds go func() { for { var req *collect.Request var ch chan *collect.Request if len(reqQueue) > 0 { req = reqQueue[0] ch = s.workerCh } select { case r := <-s.requestCh: reqQueue = append(reqQueue, r) case ch <- req: reqQueue = reqQueue[1:] } } }() }2023-01-25归属地:上海
- 周龙亭修复下Schedule方法的bug: func (s *Schedule) schedule() { go func() { for r := range s.requestCh { s.reqQueueCond.L.Lock() s.reqQueue = append(s.reqQueue, r) s.reqQueueCond.Signal() s.reqQueueCond.L.Unlock() } }() go func() { for { s.reqQueueCond.L.Lock() for len(s.reqQueue) == 0 { s.reqQueueCond.Wait() } var movedReqQueue = s.reqQueue s.reqQueue = nil s.reqQueueCond.L.Unlock() for _, r := range movedReqQueue { s.workerCh <- r } } }() }2023-01-25归属地:上海
收起评论