• Fendora范东_
    2021-04-27
    有点疑问 1.没有RDD缓存的情况下,是不是最好的任务级别是node_level,而且是最底下包含scan操作的tasks是这个级别? 因为:最开始数据全在磁盘,第一个stage生成的task本地性级别最好为node_level,后面的stage生成的task都是需要进行shuffle,最起码也是rack_node。 换句话说,是不是只有前一个action触发了RDD缓存操作,后一个action里面的任务才有可能是process_level呢? 2.当RDD被缓存了,由BlockManager进行管理,满足了「有个人或有个地方记录了什么数据存储在什么地方」,这样后续生成Task时就可以有process_local级别的任务出现了。 不知道理解的对吗?

    作者回复: 好问题。 先来说疑问1:没错,从磁盘文件系统读数据,最好的本地性级别就是NODE_LOCAL。不过,现在有一些内存文件系统,比如Alluxio。我们之前做过Alluxio和Spark集成,这个时候数据读取,最好的时候,就是PROCESS_LOCAL。其实,Alluxio的文件存储,本质上相当于OFF HEAP的cache,因此可以做到PROCESS_LOCAL。 因此,很显然,Cache是可以保证PROCESS_LOCAL的。但是,要做到PROCESS_LOCAL,却不是一定非要做cache。举个例子,Broadcast Joins机制下,小表的广播,读取的时候也是PROCESS_LOCAL。当然了,你也可以说,广播是一种特殊的缓存,这么说也没毛病。再举个栗子,Collocated joins,你不妨搜搜这个数据关联的计算过程,它的计算,也是PROCESS_LOCAL的。总之一句话,Cache可以保证PROCESS_LOCAL,但是要做到PROCESS_LOCAL,不一定非要做Cache。 疑问2:其实上面说过了,没错,Cache可以保证后面的Task就是PROCESS_LOCAL级别。

    
    4
  • kingcall
    2021-04-27
    DataFrame 要想实现map 端预聚合只能靠优化器自己了吧比较抽象层次比较高了,灵活度就降低了,但是RDD 的话还是可以自己实现实现的,虽然reduceBykey可以预聚合,但是在这个例子中不合适,不能替换collect_list,对于RDD 我们呢可以使用aggregateByKey def localDistinct(set: Set[String], b: String): mutable.Set[String] = { set.add(b) set } def combineDistinct(set1: Set[String], set2: Set[String]): mutable.Set[String] = { set1 ++ set2 } rdd.map(o => (o.id, o.name)).aggregateByKey(Set[String]())(localDistinct, combineDistinct).foreach(println(_))

    作者回复: Nice~ aggregateByKey玩儿的很溜啊~ 不过咱们这个例子其实可以用collect_set哈~ 不过你说的对,确实RDD更灵活,DataFrame抽象更高,那也就意味着灵活性变差。驴和熊猫不可兼得么,low level API比如RDD,高阶算子,开发灵活性非常高,但是优化空间有限;high level API比如DataFrame,Spark SQL优化空间大,但是就没有RDD开发起来那么灵活了。 RDD和DataFrame的恩怨情仇,第20讲可以了解一下~

    
    3
  • really_z
    2021-07-06
    老师,你好,想问一下本地性级别是在哪个环节确定的呢?

    作者回复: 好问题,这个得说回Spark调度系统,可以再回顾下调度系统。不过在那一讲,我们确实没有展开说Locality具体如何确定。是这样的,DAGScheduler在解析完DAG之后,会创建Stages,在每个Stages内部,DAGScheduler会创建TaskSet,这个其实就是Task的集合。Locality实际上是Task的属性,因此,本地性级别是在这个时候确定的。 那么问题来了,具体怎么确定呢?这个可能就比较细致了,对于HadoopRDD来说,它的每个数据分片,其实都可以从HDFS的Name Node元数据中查出数据分片所在节点,因此DAGScheduler很容易根据这些信息,来判断这个数据分片对应的Task,它的本地性倾向,也就是倾向于哪个计算节点。 对于一般的RDD来说,BlockManager会记录数据分片对应的ExecutorId和HostId,因此DAGScheduler结合这些信息,也很容易判断,这些数据分片对应的Task,应该调度到哪个Executor进程、哪个节点。 不知道这么说能不能解答你的问题哈~ 有问题继续留言~

    
    2
  • 在路上
    2021-12-05
    老师好。请教下,spark在申请executor时候有没有考虑读取数据时候的数据本地性呢,实际生产中集群特别大,在生成hadooprdd的时候大概率可能不能数据和executor在同一个机器甚至机架上吧?

    作者回复: 好问题~ 我们分资源调度和任务调度两种情况来说。 Spark在做任务调度之前,SchedulerBackend封装的调度器,比如Yarn、Mesos、Standalone,实际上已经完成了资源调度,换句话说,整个集群有多少个containers/executors,已经是一件确定的事情了。而且,每个Executors的CPU和内存,也都是确定的了(因为你启动Spark集群的时候,使用配置项指定了每个Executors的CPU和内存分别是多少)。资源调度器在做资源调度的时候,确实是同时需要CPU和内存信息的。 资源调度完成后,Spark开始任务调度,你的问题,其实是任务调度范畴的问题。也就是TaskScheduler在准备调度任务的时候,要事先知道都有哪些Executors可用,注意,是可用。也就是TaskScheduler的核心目的,在于获取“可用”的Executors。 现在来回答你的问题,也就是:为什么ExecutorData不存储于内存相关的信息。答案是:不需要。一来,TaskScheduler要达到目的,它只需知道Executors是否有空闲CPU、有几个空闲CPU就可以了,有这些信息就足以让他决定是否把tasks调度到目标Executors上去。二来,每个Executors的内存总大小,在Spark集群启动的时候就确定了,因此,ExecutorData自然是没必要记录像Total Memory这样的冗余信息。 再来说Free Memory,首先,我们说过,Spark对于内存的预估不准,再者,每个Executors的可用内存都会随着GC的执行而动态变化,因此,ExecutorData记录的Free Memory,永远都是过时的信息,TaskScheduler拿到这样的信息,也没啥用。一者是不准,二来确实没用,因为TaskScheduler拿不到数据分片大小这样的信息,TaskScheduler在Driver端,而数据分片是在目标Executors,所以TaskScheduler拿到Free Memory也没啥用,因为它也不能判断说:task要处理的数据分片,是不是超过了目标Executors的可用内存。 综上,ExecutorData的数据结构中,只保存了CPU信息,而没有记录内存消耗等信息。不知道这些能不能解答你的问题?有问题再聊哈~

    
    1
  • 静心
    2021-05-07
    老师,文中的案例是要获取各个群组的去重兴趣列表,我觉得实现该需求就必须要基于groupId进行分组后再进行兴趣列表去重。那我们应该怎样在map端使用collect_set实现该需求以减少shuffle数据量呢?

    作者回复: 用collect_set没问题哈,你说的对,应该按照groupId做分组,以分组为粒度去重,但是,你想,如果在同一个Executor中,就存在同样的groupId,他们对应的兴趣列表有重复的兴趣项,那么我们是不是应该在Map端,也就是Shuffle前,就给他们去重呢?所以说,用collect_set替换collect_list可以帮助在Map端做去重,能省则省~

    
    1
  • aof
    2021-05-05
    1. org.apache.spark.sql.functions中用来collect就两个函数,一个collect_list可以有重复元素,一个collect_set元素唯一 2. 好像没怎么理解问题的意思哈哈,聚合的话无非就是计数、就和、平均值、最大值最小值这些吗 3. cache的时候,数据以序列化的形式进行缓存(比如,StorageLevel.MEMORY_ONLY_SER),数据变小了,是不是也可以间接减小网络传输的开销,但是反序列化也会消耗更多的cpu

    作者回复: 1、2没问题~ collect_set是map端聚合,你说的那些操作,也都是map端聚合操作。 3的话,序列化的缓存,其实对于网络开销没什么贡献,不过倒是能节省内存开销。原因其实很简单,如果cache的副本数不大于1,Cache本身是不会涉及网络传输的。 但如果Cache的副本大于1,你说的就是对的,由于副本大于1,因此副本需要走网络upload到其他Executors或是hosts,序列化之后的数据自然更小,网络开销更小。

    
    1
  • Unknown element
    2022-01-05
    老师您好 我有几个问题: 1. 关于 Kryo Serializer 在哪里可以看到它已经支持的数据类型呢(就是不需要注册可以直接序列化)? 2. 我们是spark with hive的部署方式,脚本是用hive sql开发的,而不是 dataframe API,那在哪里可以找到hql中的函数/语句和 dataframe API 中算子的对应关系呢?比如hql中count(a)在底层会被转化为reduceByKey去执行? 3. 对于 @在路上 同学的问题的回答,您好像是复制的上一节的一个回答,但是好像并没有解答他的问题呢。。我也比较好奇这个问题的答案是什么 谢谢老师!

    作者回复: 一个个说哈 1)除了基本类型,凡是用户(开发者)自定义的class,都需要单独注册 2)如果是spark with Hive(Metastore)的话,那其实仅仅是用Hive Metastore来存元信息,执行引擎是Spark(SQL)。虽然看上去是Hive SQL,但实际是Spark SQL,从2.0开始,Spark跟Hive用一样的语法解析器ANTLR,所以Hive SQL,Spark完全能解析。所以说,看上去是Hive SQL,但实际上是被Spark SQL来解析、执行的。所以这里并不存在说hive函数与spark api的对应关系哈 3)老弟还需要再仔细看看答案,这里的关键,是区分资源调度和任务调度,这两个弄清楚了,其实答案就清楚了。之所以copy之前的答案,因为,在我看来,这类问题的本质,其实都是两种调度系统之间的区别、联系,没有搞清楚~

    
    
  • To_Drill
    2021-10-13
    问题三: 对数据源进行压缩在读取数据阶段可以降低网络开销,在shuffle阶段落盘的时候采用压缩可以降低shuffle阶段的网络开销,不过CPU的负担就变重了,还是得结合具体的场景来抉择

    作者回复: 是的,没错~

    
    
  • Jay
    2021-04-27
    文中提到“'RDD API使用频率越来越低”。 公司一直还用的是Rdd, 我也一直没学过spark sql。是否有必要切到spark sql呢?

    作者回复: 推荐用Spark SQL哈~ Spark SQL内置了很多优化机制(Catalyst、Tungsten),这个我们在20-23这4讲会详细展开。

    
    
  • zxk
    2021-04-26
    问题一: reduceBykey 可以在 map 端预聚合 问题二:在 map 端聚合的场景,比如求某个 key 的数量,求和等,业务方面来看凡是预聚合不影响正确性的都可以先在 map 端做聚合 问题三: 在读取数据源阶段,可以尽可能将 executor 落在数据同节点上,实现node local,再次就是同个机架下,实现 rack local。数据Shuffle时,可以考虑使用Broadcast代替,或者先在 map 端预聚合减少数据量,以及只传输时用到的字段。

    作者回复: 答得挺好~ 不过有个细节有疑问。 问题二:“预聚合不影响正确性的”,我能想到的是“依赖排序的聚合计算”,比如分位数(不同百分位、中位数等等)的计算,你指的是这种操作吗?

    
    