作者回复: 好问题。 先来说疑问1:没错,从磁盘文件系统读数据,最好的本地性级别就是NODE_LOCAL。不过,现在有一些内存文件系统,比如Alluxio。我们之前做过Alluxio和Spark集成,这个时候数据读取,最好的时候,就是PROCESS_LOCAL。其实,Alluxio的文件存储,本质上相当于OFF HEAP的cache,因此可以做到PROCESS_LOCAL。 因此,很显然,Cache是可以保证PROCESS_LOCAL的。但是,要做到PROCESS_LOCAL,却不是一定非要做cache。举个例子,Broadcast Joins机制下,小表的广播,读取的时候也是PROCESS_LOCAL。当然了,你也可以说,广播是一种特殊的缓存,这么说也没毛病。再举个栗子,Collocated joins,你不妨搜搜这个数据关联的计算过程,它的计算,也是PROCESS_LOCAL的。总之一句话,Cache可以保证PROCESS_LOCAL,但是要做到PROCESS_LOCAL,不一定非要做Cache。 疑问2:其实上面说过了,没错,Cache可以保证后面的Task就是PROCESS_LOCAL级别。
作者回复: Nice~ aggregateByKey玩儿的很溜啊~ 不过咱们这个例子其实可以用collect_set哈~ 不过你说的对,确实RDD更灵活,DataFrame抽象更高,那也就意味着灵活性变差。驴和熊猫不可兼得么,low level API比如RDD,高阶算子,开发灵活性非常高,但是优化空间有限;high level API比如DataFrame,Spark SQL优化空间大,但是就没有RDD开发起来那么灵活了。 RDD和DataFrame的恩怨情仇,第20讲可以了解一下~
作者回复: 好问题,这个得说回Spark调度系统,可以再回顾下调度系统。不过在那一讲,我们确实没有展开说Locality具体如何确定。是这样的,DAGScheduler在解析完DAG之后,会创建Stages,在每个Stages内部,DAGScheduler会创建TaskSet,这个其实就是Task的集合。Locality实际上是Task的属性,因此,本地性级别是在这个时候确定的。 那么问题来了,具体怎么确定呢?这个可能就比较细致了,对于HadoopRDD来说,它的每个数据分片,其实都可以从HDFS的Name Node元数据中查出数据分片所在节点,因此DAGScheduler很容易根据这些信息,来判断这个数据分片对应的Task,它的本地性倾向,也就是倾向于哪个计算节点。 对于一般的RDD来说,BlockManager会记录数据分片对应的ExecutorId和HostId,因此DAGScheduler结合这些信息,也很容易判断,这些数据分片对应的Task,应该调度到哪个Executor进程、哪个节点。 不知道这么说能不能解答你的问题哈~ 有问题继续留言~
作者回复: 好问题~ 我们分资源调度和任务调度两种情况来说。 Spark在做任务调度之前,SchedulerBackend封装的调度器,比如Yarn、Mesos、Standalone,实际上已经完成了资源调度,换句话说,整个集群有多少个containers/executors,已经是一件确定的事情了。而且,每个Executors的CPU和内存,也都是确定的了(因为你启动Spark集群的时候,使用配置项指定了每个Executors的CPU和内存分别是多少)。资源调度器在做资源调度的时候,确实是同时需要CPU和内存信息的。 资源调度完成后,Spark开始任务调度,你的问题,其实是任务调度范畴的问题。也就是TaskScheduler在准备调度任务的时候,要事先知道都有哪些Executors可用,注意,是可用。也就是TaskScheduler的核心目的,在于获取“可用”的Executors。 现在来回答你的问题,也就是:为什么ExecutorData不存储于内存相关的信息。答案是:不需要。一来,TaskScheduler要达到目的,它只需知道Executors是否有空闲CPU、有几个空闲CPU就可以了,有这些信息就足以让他决定是否把tasks调度到目标Executors上去。二来,每个Executors的内存总大小,在Spark集群启动的时候就确定了,因此,ExecutorData自然是没必要记录像Total Memory这样的冗余信息。 再来说Free Memory,首先,我们说过,Spark对于内存的预估不准,再者,每个Executors的可用内存都会随着GC的执行而动态变化,因此,ExecutorData记录的Free Memory,永远都是过时的信息,TaskScheduler拿到这样的信息,也没啥用。一者是不准,二来确实没用,因为TaskScheduler拿不到数据分片大小这样的信息,TaskScheduler在Driver端,而数据分片是在目标Executors,所以TaskScheduler拿到Free Memory也没啥用,因为它也不能判断说:task要处理的数据分片,是不是超过了目标Executors的可用内存。 综上,ExecutorData的数据结构中,只保存了CPU信息,而没有记录内存消耗等信息。不知道这些能不能解答你的问题?有问题再聊哈~
作者回复: 用collect_set没问题哈,你说的对,应该按照groupId做分组,以分组为粒度去重,但是,你想,如果在同一个Executor中,就存在同样的groupId,他们对应的兴趣列表有重复的兴趣项,那么我们是不是应该在Map端,也就是Shuffle前,就给他们去重呢?所以说,用collect_set替换collect_list可以帮助在Map端做去重,能省则省~
作者回复: 1、2没问题~ collect_set是map端聚合,你说的那些操作,也都是map端聚合操作。 3的话,序列化的缓存,其实对于网络开销没什么贡献,不过倒是能节省内存开销。原因其实很简单,如果cache的副本数不大于1,Cache本身是不会涉及网络传输的。 但如果Cache的副本大于1,你说的就是对的,由于副本大于1,因此副本需要走网络upload到其他Executors或是hosts,序列化之后的数据自然更小,网络开销更小。
作者回复: 一个个说哈 1)除了基本类型,凡是用户(开发者)自定义的class,都需要单独注册 2)如果是spark with Hive(Metastore)的话,那其实仅仅是用Hive Metastore来存元信息,执行引擎是Spark(SQL)。虽然看上去是Hive SQL,但实际是Spark SQL,从2.0开始,Spark跟Hive用一样的语法解析器ANTLR,所以Hive SQL,Spark完全能解析。所以说,看上去是Hive SQL,但实际上是被Spark SQL来解析、执行的。所以这里并不存在说hive函数与spark api的对应关系哈 3)老弟还需要再仔细看看答案,这里的关键,是区分资源调度和任务调度,这两个弄清楚了,其实答案就清楚了。之所以copy之前的答案,因为,在我看来,这类问题的本质,其实都是两种调度系统之间的区别、联系,没有搞清楚~
作者回复: 是的,没错~
作者回复: 推荐用Spark SQL哈~ Spark SQL内置了很多优化机制(Catalyst、Tungsten),这个我们在20-23这4讲会详细展开。
作者回复: 答得挺好~ 不过有个细节有疑问。 问题二:“预聚合不影响正确性的”,我能想到的是“依赖排序的聚合计算”,比如分位数(不同百分位、中位数等等)的计算,你指的是这种操作吗?