大规模数据处理实战
蔡元楠
硅谷资深工程师
41608 人已学习
新⼈⾸单¥59
登录后,你可以任选4讲全文学习
课程目录
已完结/共 46 讲
大规模数据处理实战
15
15
1.0x
00:00/00:00
登录|注册

26 | Pipeline:Beam如何抽象多步骤的数据流水线?

元素发生错误时相关联的Bundle重新处理
可能转移到其他Worker处理
处理失败时整个Bundle重新处理
Bundle的关联性
分配Worker处理Bundle
分割PCollection成Bundle
分割PCollection成Bundle并分发给Workers处理
启动多个Workers并行处理PCollection
基于MapReduce原理
创建多个不同的PCollection
PCollection的不可变性
数据流水线示例
在main()函数中创建Beam数据流水线
使用PipelineOptions定义选项
包括读取数据集、转换数据集、输出结果数据集
封装数据处理逻辑
多步骤Transform上的错误处理
单个Transform上的错误处理
处理过程示例
底层思想
应用PCollection和Transform
创建流水线
抽象概念
错误处理
处理模型
数据流水线
Pipeline:Beam如何抽象多步骤的数据流水线
文章

该思维导图由 AI 生成,仅供参考

你好,我是蔡元楠。
今天我要与你分享的主题是“Pipeline:Beam 如何抽象多步骤的数据流水线”。
在上两讲中,我们一起学习了 Beam 是如何抽象封装数据,以及如何抽象对于数据集的转换操作的。在掌握了这两个基本概念后,我们就可以很好地回答 Beam 编程模型里的 4 个维度 What、Where、When、How 中的第一个问题——What 了。也就是,我们要做什么计算?想得到什么样的结果?
这个时候你可能已经跃跃欲试,开始想用 PCollection 和 Transform 解决我们平常经常会使用到的批处理任务了。没有问题,那我们就先抛开 Where、When 和 How 这三个问题,由简至繁地讲起。
现在假设我们的数据处理逻辑只需要处理有边界数据集,在这个情况下,让我们一起来看看 Beam 是如何运行一套批处理任务的。

数据流水线

在 Beam 的世界里,所有的数据处理逻辑都会被抽象成数据流水线(Pipeline)来运行。那么什么是数据流水线呢?
Beam 的数据流水线是对于数据处理逻辑的一个封装,它包括了从读取数据集将数据集转换成想要的结果输出结果数据集这样的一整套流程。
所以,如果我们想要跑自己的数据处理逻辑,就必须在程序中创建一个 Beam 数据流水线出来,比较常见的做法是在 main() 函数中直接创建。
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结

Beam数据流水线是一种高度抽象的数据处理逻辑,它将数据处理过程封装成一个流水线,包括从读取数据集到转换数据集并输出结果的整个过程。在Beam中,数据流水线的创建需要定义选项,告诉Beam如何运行。数据流水线中的PCollection和Transform可以被应用在多个步骤中,创建新的PCollection以供下一个Transform使用。在底层处理模型中,数据流水线采用MapReduce原理,在分布式环境下启动多个Workers来处理PCollection,将输入数据集分割成不同的Bundle并分发给不同的Worker来处理,以实现完美并行。在错误处理方面,单个Transform上的错误会导致整个Bundle重新处理,而多步骤Transform上的错误会影响相关联的所有Bundle重新处理。Beam数据流水线的抽象性和并行处理能力使其成为处理大规模数据的强大工具。

仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《大规模数据处理实战》
新⼈⾸单¥59
立即购买
登录 后留言

全部留言(19)

  • 最新
  • 精选
  • espzest
    bundle怎么聚合成pcollection? 一个bundle处理失败,为什么需要重做前面的bundle?

    作者回复: 谢谢你的提问!其实我们在第24讲中知道,PCollection是具有无序性的,所以最简单的做法bundle在处理完成之后可以直接append到结果PCollection中。 至于为什么需要重做前面的bundle,这其实也是错误处理机制的一个trade-off了。Beam希望尽可能减少persistence cost,也就是不希望将中间结果保持在某一个worker上。可以这么想,如果我们想要不重新处理前面的bundle,我们必须要将很多中间结果转换成硬盘数据,这样一方面增加很大的时间开销,另一方面因为数据持久化了在具体一台机器上,我们也没有办法再重新动态分配bundle到不同的机器上去了。

    2019-06-21
    17
  • cricket1981
    bundle随机分配会不会产生数据倾斜?完美并行背后的机制是?beam应该也有类似spark的persist方法缓存转换中间结果,防止出错恢复链太长吧?

    作者回复: 谢谢你的提问!其实文章中所讲到的随机分配并不是说像分配随机数那样将bundle随机分配出去给workers,只是说根据runner的不同,bundle的分配方式也会不一样了,但最终还是还是希望能最大化并行度。 至于完美并行的背后机制,Beam会在真正处理数据前先计算优化出执行的一个有向无环图,希望保持并行处理数据的同时,能够减少每个worker之间的联系。 Beam据我所知是有的,BEAM-7131 issue就有反应这个问题。

    2019-06-21
    2
    9
  • YZJ
    老师请教个问题:PCollectionA transform PCollectionB, 假如PCollectionB 要比PCollectionA大很多倍,比如transform 是把PCollectionA 中每个字符串重复1000次,那PCollectionB 就要大1000倍,worker会不会有内存溢出问题? spark中可以配置executor 的core和memery来控制每个task内存用量,beam有类似机制吗?不然怎样让资源利用最优化呢?

    作者回复: 谢谢你的提问!这个问题很好啊,运行大型的Beam pipeline遇到OOM也是有可能的。要配置底层资源的话要看Runner支不支持,像如果将Google Cloud Dataflow作为Runner的话,我们可以通过配置PipelineOption来达到目的。底层使用Spark的话我个人还没有使用过,不过应该是可以用SparkContextOptions来配置的。

    2019-06-21
    8
  • 西南偏北
    Beam的错误处理和RDD的真的很像,因为transformation都是lazy的,只有action才会触发计算,中间的转换过程都是被记录在DAG中的,这就导致中间某个transformation失败之后,需要往上追溯之前的转换,可以理解为是寻找父transformation,然后父transformation还要往上寻找父父transformation,直到没有父transformation为止,就像是类加载机制一样。但是如果能把中间结果保存在内存中,在失败重新计算时,就能提高计算的效率。

    作者回复: 的确如此

    2019-06-26
    6
  • 沈洪彬
    在 Beam 的数据流水线中,当处理的元素发生错误时流水线的的错误处理机制分两种情况 1.单个Transform上的错误处理 如果某个Bundle里元素处理失败,则整个Bundle里元素都必须重新处理 2.多步骤Transform上的错误处理 如果某个Bundle里元素处理失败,则整个Bundle里元素及与之关联的所有Bundle都必须重新处理

    作者回复: 谢谢留言!总结得不错!

    2019-06-23
    4
  • onepieceJT2018
    老师 想到一个问题啊 如果有个计算是 需要worker1 和 worker2 都算完的结果再计算 发生worker1 一直错误没通过 这时候worker2会一直傻傻等待嘛

    作者回复: 谢谢你的提问!这个依赖worker1和worker2计算结果的Transform会一直等待。但是与此同时,worker2可以做其它的计算,甚至有可能worker1的计算如果一直出错,Beam会将这个bundle重新分配给worker2来计算。

    2019-06-21
    4
  • 常超
    <在多步骤的 Transform 上,如果处理的一个 Bundle 元素发生错误了,则这个元素所在的整个 Bundle 以及与这个 Bundle 有关联的所有 Bundle 都必须重新处理。 如果upstream transform里状态有更新操作,重新处理已经成功的bundle会出现数据重复,可能导致状态更新不正确吧?

    作者回复: 谢谢你的提问!这个问题非常好啊,如果你所说的是stateful processing的话,那它的错误处理机制和stateful-less会不太一样。Stateful processing里的ParDo在处理每一个元素的时候会将它的state持久化,也就是保存到外部的storage中,下次需要用到这个元素的时候会再读取这个元素的state。如果是发生了错误的话,会有机制reclaim这些states或者是invalidate它们。

    2019-06-21
    3
  • Alpha
    上一期讲到,PCollection 是有向图中的边,而 Transform 是有向图里的节点。这一期的图咋又变了呢

    作者回复: 谢谢留言!哈哈,需要表达的内容不一样了。

    2019-06-21
    1
  • dancer
    想问老师,一个bundle的数据必须要全部处理完之后才能进行第二个transform吗?如果部分数据经过transform1后就可以继续执行transform2,这样数据并行度会更高吧,为什么没有采用这种机制呢?

    作者回复: 谢谢你的提问!这个问题还是要看情况吧,如果多步骤的Transform都是ParDo的话,那确实可以按照你说的做法去做。不过当Transform涉及到Combine或者Flatten这种Transform的话, 那就必须等到这一阶段所有的Transform完成了之后才能够进行下一步的Transform了。

    2019-06-21
    1
  • chief
    老师您好,bundle经过Transform会产生新的bundle,那么是同时保留前后bundle数据还是在新生成的bundle中保留血缘关系?

    作者回复: 谢谢你的留言!前后bundle保不保留这个还要看你的执行DAG需不需要还用到这个bundle。至于保留前后关系的话主要是用于在发生错误的情况下重新trace back到最开始的那个transform,这个信息从DAG中就可以找到了。

    2019-07-04
收起评论
显示
设置
留言
19
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部