Go 进阶 · 分布式爬虫实战
郑建勋
Go 语言技术专家,《Go 语言底层原理剖析》作者
15839 人已学习
新⼈⾸单¥68
登录后,你可以任选4讲全文学习
课程目录
已完结/共 58 讲
Go 进阶 · 分布式爬虫实战
15
15
1.0x
00:00/00:00
登录|注册

28|调度引擎:负载均衡与调度器实战

你好,我是郑建勋。
在上一节课程中,我们实战了广度优先搜索算法,不过我们对网站的爬取都是在一个协程中进行的。在真实的实践场景中,我们常常需要爬取多个初始网站,我们希望能够同时爬取这些网站。这就需要合理调度和组织爬虫任务了。因此,这节课的重点就是实战任务调度的高并发模型,使资源得到充分的利用。

实战调度引擎

首先,我们新建一个文件夹 engine 用于存储调度引擎的代码,核心的调度逻辑位于 ScheduleEngine.Run 中。这部分的完整代码位于 tag v0.1.4,你可以对照代码进行查看。
调度引擎主要目标是完成下面几个功能:
创建调度程序,接收任务并将任务存储起来;
执行调度任务,通过一定的调度算法将任务调度到合适的 worker 中执行;
创建指定数量的 worker,完成实际任务的处理;
创建数据处理协程,对爬取到的数据进行进一步处理。
func (s *ScheduleEngine) Run() {
requestCh := make(chan *collect.Request)
workerCh := make(chan *collect.Request)
out := make(chan collect.ParseResult)
s.requestCh = requestCh
s.workerCh = workerCh
s.out = out
go s.Schedule()
}
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结

本文深入探讨了调度引擎的实际应用和底层原理,重点介绍了高并发模型的任务调度和资源利用。首先,文章详细解释了调度引擎的核心功能和运行机制,包括创建调度程序、执行调度任务、创建worker以及数据处理协程。其次,通过函数式选项模式实现了高度可配置化的调度器初始化,使参数配置更加灵活和可扩展。此外,文章还深入探讨了通道的底层原理和select机制的工作原理,为读者提供了更深入的学习方向。整体而言,本文内容涵盖了调度引擎的实际应用、技术特点和底层原理,适合对高并发模型和任务调度感兴趣的读者阅读学习。文章内容丰富,涉及的技术细节和实例丰富多样,对于想深入了解调度引擎的读者具有很高的参考价值。

仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《Go 进阶 · 分布式爬虫实战》
新⼈⾸单¥68
立即购买
登录 后留言

全部留言(13)

  • 最新
  • 精选
  • shuff1e
    bug应该是,会丢失发给worker的任务。 case r := <-s.requestCh:的情况下,如果req不是nil,应该把req再添加到reqQueue头部

    作者回复: 是的,后面有修复

    2022-12-15归属地:北京
    1
  • Alex
    有个小问题请教一下老师, 这个Seeds 是slice, 我觉得在取出任务的时候会有并发问题 如果没有请老师指教下

    作者回复: 目前只有初始化会写,运行时都是读,所以不会有并发问题。 随着课程的深入,运行时可以添加资源,到时会有并发冲突的考虑

    2023-02-12归属地:江苏
    2
  • mantra
    本文档中的代码链接 tag v0.1.4 (之前的文档,也一样)对应的 URL 都是主库的地址(https://github.com/dreamerjackson/crawler),这是故意的吗?正确的应该是 https://github.com/dreamerjackson/crawler/archive/refs/tags/v0.1.4.tar.gz

    作者回复: 我所有的项目链接都放置的主库的地址,tag的地址就稍微点击一下吧

    2023-01-13归属地:北京
  • 7oty
    如何控制某个爬虫任务的启动,停止和任务的实时运行状态?

    作者回复: 后面文章会有介绍

    2022-12-20归属地:广东
  • 翡翠虎
    如果任务推送到worker,但又还没实施,这时候那台服务器停机了,或者进程退出了,任务会不会丢?怎么处理任务还没执行就丢了的这种情况呢?

    作者回复: 随着课程深入会看到,我们的任务是存储在etcd中的,所以不会丢失

    2022-12-20归属地:广西
    2
  • Geek_a9ea01
    for { var req *collect.Request var ch chan *collect.Request if len(reqQueue) > 0 { req = reqQueue[0] reqQueue = reqQueue[1:] ch = s.workerCh } select { case r := <-s.requestCh: reqQueue = append(reqQueue, r) case ch <- req: } } 有个问题: 如果ch堵塞了 这时候又有requestCh请求上来;会不会导致ch数据丢失?

    作者回复: 这个是已知的问题,我在后面的课程中做了修复

    2022-12-13归属地:广东
  • Realm
    Seeds | | req ParseFunc(req) HandleResult() requestCh----> reqQueue -----> workerCh ----------> out-----------> result: ^ - item ==> 存储 | - req | |---------------<----------------------<---------------------<------|
    2022-12-13归属地:浙江
    1
  • Geek_38ea75
    我有好几个问题 1.为啥req和ch放在for循环内部声明,这样有个问题,就是work来不及执行的话,会丢失。 2.如果requestCh中的任务很多的话,会导致work队列中没有能够运行的任务。
    2024-03-07归属地:北京
  • 周龙亭
    func (s *Schedule) Schedule() { var reqQueue = s.Seeds go func() { for { var req *collect.Request var ch chan *collect.Request if len(reqQueue) > 0 { req = reqQueue[0] ch = s.workerCh } select { case r := <-s.requestCh: reqQueue = append(reqQueue, r) case ch <- req: reqQueue = reqQueue[1:] } } }() }
    2023-01-25归属地:上海
  • 周龙亭
    修复下Schedule方法的bug: func (s *Schedule) schedule() { go func() { for r := range s.requestCh { s.reqQueueCond.L.Lock() s.reqQueue = append(s.reqQueue, r) s.reqQueueCond.Signal() s.reqQueueCond.L.Unlock() } }() go func() { for { s.reqQueueCond.L.Lock() for len(s.reqQueue) == 0 { s.reqQueueCond.Wait() } var movedReqQueue = s.reqQueue s.reqQueue = nil s.reqQueueCond.L.Unlock() for _, r := range movedReqQueue { s.workerCh <- r } } }() }
    2023-01-25归属地:上海
收起评论
显示
设置
留言
13
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部