01|Spark:从“大数据的Hello World”开始
- 深入了解
- 翻译
- 解释
- 总结
本文以"大数据的Hello World"为主题,介绍了学习Spark的基础知识。通过实战演示Spark应用开发的过程,以Word Count作为入门项目。首先介绍了准备工作,包括本地部署Spark运行环境的步骤。然后梳理了Word Count的计算步骤,包括读取内容、分词和分组计数。接着详细介绍了Word Count代码实现的过程,包括使用SparkContext的textFile方法读取文件内容,并对RDD的概念进行了简单解释。文章以实例代码和清晰的解释帮助读者快速了解Spark的基础知识和应用开发过程。文章还提到了RDD算子的使用,如map、filter、flatMap和reduceByKey等,为读者提供了更多学习Spark的思路和方法。整体而言,本文是一篇适合初学者的Spark入门指南,通过实例代码和详细解释,帮助读者快速了解Spark的基础知识和应用开发过程。
《零基础入门 Spark》,新⼈⾸单¥59
全部留言(44)
- 最新
- 精选
- Alvin-L置顶在Python中运行: ``` from pyspark import SparkContext textFile = SparkContext().textFile("./wikiOfSpark.txt") wordCount = ( textFile.flatMap(lambda line: line.split(" ")) .filter(lambda word: word != "") .map(lambda word: (word, 1)) .reduceByKey(lambda x, y: x + y) .sortBy(lambda x: x[1], False) .take(5) ) print(wordCount) #显示: [('the', 67), ('Spark', 63), ('a', 54), ('and', 51), ('of', 50)] ```
作者回复: 赞👍,Perfect! 可以作为Python标杆代码,供后续同学参考~
2021-09-20619 - liugddx我是一个大数据小白,我想咨询下spark和hadoop在大数据体系下的关系?
作者回复: 好问题,Hadoop的范畴可大可小。 往小了说,Hadoop特指HDFS、YARN、MapReduce这三个组件,他们分别是Hadoop分布式文件系统、分布式任务调度框架、分布式计算引擎。 往大了说,Hadoop生态包含所有由这3个组件衍生出的大数据产品,如Hive、Hbase、Pig、Sqoop,等等。 Spark和Hadoop的关系,是共生共赢的关系。Spark的定位是分布式计算引擎,因此,它的直接“竞争对手”,是MapReduce,也就是Hadoop的分布式计算引擎。Spark是内存计算引擎,而MapReduce在计算的过程中,需要频繁落盘,因此,一般来说,相比MapReduce,Spark在执行性能上,更胜一筹。 对于HDFS、YARN,Spark可与之完美结合,实际上,在很多的使用场景中,Spark的数据源往往存储于HDFS,而YARN是Spark重要的资源调度框架之一。 大体上是这些,当然,还可以说的更细,老弟可以继续在后台留言,我们继续讨论~
2021-09-07439 - Neo-dqy老师好!wordCounts.map{case (k, v) => (v, k)}.sortByKey(false).take(5)这行代码我还存在疑问,为什么这里的map函数使用了花括号{ }而不是上面一些算子的( ),同时这个case又是什么意思?这一行代码非常像我曾经在Python中使用字典数据结构,然后根据字典值的升序排序。最后,貌似Scala语言本身就可以实现wordcount案例,那么它本身的实现和spark实现相比,spark有什么优势呢?
作者回复: Scala语法确实比较灵活,一般来说,简单表达式用(),复杂表达式用{}。 比如,简单函数体:(x => x + 1)、(_ + 1),等等; 复杂函数体:{case x: String => “一大堆关于x的转换” } 关于最后的问题,也就是Scala也可以实现Word Count,和Spark有什么区别。这个问题比较重要,老弟需要用心听一下~ 其实,任何一种语言,都可以实现任何计算逻辑,毕竟是高级编程语言,基本上都是图灵完备的。所以说像word count这种非常简单的逻辑,不只是scala,其他语言都能搞定。但是,Scala也好、Java也罢,再或者是Python,他们实现的word count,只能在单机跑,而Spark实现的Word Count,是在分布式环境跑。这,是本质的区别。 Spark,最核心的能力,就是分布式计算,利用大规模集群的能力。这是最本质的区别。 比如,现在有2万亿行的文本,需要你计算word count,你用scala也能在单机实现,但是,这个word count只能在单机跑,对于一般的机器,大概率是跑不动的。 可是,在分布式集群中,这样的量级,就轻松得多了。Spark,更多的,是提供一种分布式计算的能力,它提供给开发者简单的开发API,让开发者可以像开发单机应用那样,轻而易举地开发分布式应用,让分布式计算对于开发者来说,变得透明。开发者只需要关注业务逻辑,或者说计算逻辑,而不必关心分布式系统底层的调度、分发、数据交换、等等和分布式计算本身有关的东西。 所以说,Scala和Spark,两者没有可比性。当然,除了Spark,现在还有非常多的分布式计算框架,比如MapReduce、Flink、Presto、TensorFlow,等等,大家都是玩分布式计算的,只不过“术业有专攻”,各司其职~
2021-09-18522 - Vic遇到这个问题 scala> val rootPath: String = _ <console>:24: error: unbound placeholder parameter val rootPath: String = _ 网上搜一下,说这是汇编错误。 要把val 改成var , 但会遇到"_"这default值是null。 org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/Users/vic/src/data/null/wikiOfSpark.txt 这一段就先跳过root_path,直接给file一个路径,是可以成功运行"word count",得到和老师一样的结果: [Stage 0:> (0 + 2) / res0: Array[(Int, String)] = Array((67,the), (63,Spark), (54,a), (51,and), (50,of))
作者回复: 这个是我的锅,哈哈~ 这里是我偷懒了,我应该在这条code上面加个注释,这里“_”(下划线)的意思,是你的文件根目录。Scala里面,用“_”表示一些不重要、不关心的东西,所以我偷用了Scala的“_”。但这里确实会引起困惑,不用管这个,我的锅,用你的文件根目录替换掉这里的“_”就好~
2021-09-0797 - Neo-dqy老师我可以再问一下,如果我是用IDEA创建Spark项目,是不是只要配置好Scala的SDK,然后在pom文件中加入对应版本号的spark依赖,就会自动下载spark包了?这个时候不需要再去官网下载spark了吗,同时也不再需要使用spark-shell了吗?
作者回复: 是的,没错~
2021-09-186 - Abigail前排占座!三年前接触过 Spark 今天从头再学!
作者回复: 欢迎~
2021-09-076 - 浮生若梦Java实现: SparkConf sparkConf = new SparkConf().setAppName("Test").setMaster("local[*]"); JavaSparkContext JSC = new JavaSparkContext(sparkConf); // 读取文件内容 JavaRDD<String> lineRDD = JSC.textFile("wikiOfSpark.txt"); // 以行为单位做分词 JavaRDD<String> wordRDD = lineRDD.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s) throws Exception { return Arrays.asList(s.split(" ")).iterator(); } }); JavaRDD<String> cleanWordRDD = wordRDD.filter(new Function<String, Boolean>() { @Override public Boolean call(String s) throws Exception { return !s.equals(""); } }); // 把RDD元素转换为(Key,Value)的形式 JavaPairRDD<String, Integer> kvRDD = cleanWordRDD.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s,1); } }); // 按照单词做分组计数 JavaPairRDD<String, Integer> wordCounts = kvRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer+integer2; } }); // 打印词频最高的5个词汇(先将元组的key value交换一下顺序,然后在调用sortByKey()) wordCounts.mapToPair((row)-> new Tuple2<>(row._2,row._1)).sortByKey(false).foreach(new VoidFunction<Tuple2<Integer, String>>() { @Override public void call(Tuple2<Integer, String> stringIntegerTuple2) throws Exception { System.out.println(stringIntegerTuple2._1 + ":" + stringIntegerTuple2._2); } }); //关闭context JSC.close();
作者回复: 满分💯,赞👍~
2021-12-2444 - 火炎焱燚Python版代码为: file='~~~/wikiOfSpark.txt' lineRDD=sc.textFile(file) lineRDD.first() # 会打印出lineRDD的第一行: u'Apache Spark',如果出错则不打印 wordRDD=lineRDD.flatMap(lambda line: line.split(" ")) cleanWordRDD=wordRDD.filter(lambda word: word!='') kvRDD=cleanWordRDD.map(lambda word:(word,1)) wordCounts=kvRDD.reduceByKey(lambda x,y:x+y) wordCounts.map(lambda (k,v):(v,k)).sortByKey(False).take(5)
作者回复: Cool~ 后面等专栏更完了,打算把所有Scala代码整理出一份Python版本的,老弟有没有兴趣一起呀~
2021-09-1723 - Unknown element问下执行 val lineRDD: RDD[String] = spark.sparkContext.textFile(file) 报错error: not found: value spark是怎么回事?
作者回复: spark-shell中,spark是默认的SparkSession实例,你是在spark-shell中执行这段代码吗?如果是在IDE,需要自己明确定义SparkSession实例的,比如: import org.apache.spark.sql.SparkSession val spark = SparkSession.builder .master("local[2]") .appName("SparkSession Example") .getOrCreate()
2021-09-173 - 国度2022年2月5号学习打卡记录 机器环境:ROG14 操作系统:win11 + wsl Ubuntu20.04 环境变量: ---------------------------- export SPARK_HOME=/mnt/c/spark/spark-2.4.8-bin-hadoop2.7 export JAVA_HOME=/mnt/c/linux_environment/jdk/jdk1.8.0_321 export M2_HOME=/mnt/c/linux_environment/apache-maven-3.8.4 export SCALA_HOME=/mnt/c/linux_environment/scala3-3.1.1 export PATH=$SPARK_HOME/bin:$SCALA_HOME/bin:$M2_HOME/bin:$JAVA_HOME/bin:$PATH --------------------------- 希望帮助和我一样从零开始一起学习的同学躲避一些坑: 坑1:jdk版本不兼容: 一开始使用jdk17版本,在启动过程中一直报错,降为1.8后启动成功; 坑2:hadoop版本问题: hadoop3.2.1 逐步使用Dataset,报错类型转换异常; 由于scala经验不足,暂时无法大规模改写老师的代码,降低版本为spark2.4.8 下载地址:https://dlcdn.apache.org/spark/ 可以选择适合的版本下载 原理性的还没有搞懂,目前在第一阶段,读懂,简单改写为主; 感谢吴磊老师的课
作者回复: 赞学习打卡~ 一起加油~
2022-02-052