零基础入门 Spark
吴磊
前 FreeWheel 机器学习研发经理
19171 人已学习
新⼈⾸单¥59
登录后,你可以任选4讲全文学习
课程目录
已完结/共 38 讲
零基础入门 Spark
15
15
1.0x
00:00/00:00
登录|注册

13 | Spark SQL:让我们从“小汽车摇号分析”开始

你好,我是吴磊。
在开篇词我们提出“入门 Spark 需要三步走”,到目前为止,我们携手并肩跨越了前面两步,首先恭喜你学到这里!熟练掌握了 Spark 常用算子与核心原理以后,你已经可以轻松应对大部分数据处理需求了。
不过,数据处理毕竟是比较基础的数据应用场景,就像赛车有着不同的驾驶场景,想成为 Spark 的资深赛车手,我们还要走出第三步——学习 Spark 计算子框架。只有完成这一步,我们才能掌握 Spark SQL,Structured Streaming 和 Spark MLlib 的常规开发方法,游刃有余地应对不同的数据应用场景,如数据分析、流计算和机器学习,等等。
还差第三步
那这么多子框架,从哪里入手比较好呢?在所有的子框架中,Spark SQL 是代码量最多、Spark 社区投入最大、应用范围最广、影响力最深远的那个。就子框架的学习来说,我们自然要从 Spark SQL 开始。
今天我们从一个例子入手,在实战中带你熟悉数据分析开发的思路和实现步骤。有了对 Spark SQL 的直观体验,我们后面几讲还会深入探讨 Spark SQL 的用法、特性与优势,让你逐步掌握 Spark SQL 的全貌。

业务需求

今天我们要讲的小例子,来自于北京市小汽车摇号。我们知道,为了限制机动车保有量,从 2011 年开始,北京市政府推出了小汽车摇号政策。随着摇号进程的推进,在 2016 年,为了照顾那些长时间没有摇中号码牌的“准司机”,摇号政策又推出了“倍率”制度。
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结

学习Spark SQL的重要性以及从小汽车摇号数据分析入手学习Spark SQL的方法。作者指出,熟练掌握Spark常用算子与核心原理后,学习Spark计算子框架是成为资深赛车手的第三步。在众多子框架中,Spark SQL是最重要的一个,因此建议从Spark SQL开始学习。文章以北京市小汽车摇号数据为例,介绍了业务需求和准备工作,并提供了数据探索的代码示例。作者还简要介绍了SparkSession和DataFrame的概念,强调DataFrame是一种带Schema的分布式数据集,可以简单地看作是数据库中的一张二维表。通过实际业务需求出发,通过数据分析学习Spark SQL的实践方法,为读者快速了解Spark SQL的重要性和学习路径提供了指导。文章重点介绍了数据过滤、数据关联、倍率统计、分组计数等操作的代码示例,并通过实际计算结果展示了中签率与倍率之间的关系。最后,鼓励读者在自己的Spark环境中运行代码,并提出了脑洞时间和每课一练的问题,以促进读者的交流和思考。整体而言,本文为读者提供了一个系统的学习Spark SQL的实践方法,既有理论指导,又有实际操作,适合读者快速了解和掌握Spark SQL的技术特点。

仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《零基础入门 Spark》
新⼈⾸单¥59
立即购买
登录 后留言

全部留言(18)

  • 最新
  • 精选
  • qinsi
    spark无关。讨论下摇号。 评论区有匿名读者质疑文中的结论。这里尝试换个角度代入具体的数字分析下。 简单起见,假设每轮摇号有1000人中签,并且倍率和轮次一致,即第一轮大家都是1倍,第一轮没中的人在第二轮变为2倍,第二轮又没中的人到了第三轮就变成3倍,依次类推。 先看第一轮的1000个中签者,显然他们的倍率都是1,没有其他倍率的中签者,记为: [1000, 0, 0, ...] 再看第二轮的1000个中签者。由于新加入的申请者倍率为1,第一轮未中的人倍率为2。按照倍率摇号的话,期望的结果就是,倍率为2的中签人数是倍率为1的中签人数的2倍,记为: [333, 667, 0, 0, ...] 以此类推,第三轮就是: [167, 333, 500, 0, 0, ...] 尝试摇个10轮,可以得到下表: [1000, 0, 0, ...] [333, 667, 0, 0, ...] [167, 333, 500, 0, 0, ...] [100, 200, 300, 400, 0, 0, ...] [67, 133, 200, 267, 333, 0, 0, ...] [48, 95, 143, 190, 238, 286, 0, 0, ...] [36, 71, 107, 143, 179, 214, 250, 0, 0, ...] [28, 56, 83, 111, 139, 167, 194, 222, 0, 0, ...] [22, 44, 67, 89, 111, 133, 156, 178, 200, 0, 0, ...] [18, 36, 55, 73, 91, 109, 127, 145, 164, 182, 0, 0, ...] 可以看到在每一轮的中签者中,确实是倍率越高中签的人数越多。 而文中的统计方法,相当于把这张表按列求和: [1819, 1635, 1455, 1273, 1091, 909, 727, 545, 364, 182, 0, 0, ...] 可以看到这是一条单调递减的曲线。然而却不能像文中一样得出“中签率没有随着倍率增加”的结论。高倍率的中签人数比低倍率的人数少,是因为能达到高倍率的人本身就少。比如上面例子中,10轮过后10倍率的中签者只有182人,是因为前9轮没有人能达到10倍率。相比之下,在第一轮就有1000个1倍率的人中签。 至于文中配图为什么会是一条类似钟型的曲线,猜测可能第一次引入倍率摇号的时候,就已经给不同的人分配不同的倍率了,而不是大家一开始都是1倍率。在上面的例子中,如果只对后5轮求和,可以得到: [152, 302, 455, 606, 758, 909, 727, 545, 364, 182] 这样就和文中的配图比较接近了。 所以结论就是要验证中签率和倍率的关系,不能按照倍率去累加中签人数,而是要看单次摇号中不同倍率的中签者的分布。

    作者回复: 老弟分析得鞭辟入里~ 官方的倍率设计,确实比较粗糙,统计下来,结论实际上也不是十分牢靠。咱们这里用小汽车摇号的例子,初衷还是找一个大家都熟悉的场景,来更好地学习Spark SQL的开发流程~

    2021-10-12
    3
    14
  • Alvin-L
    ``` import os from pyspark import SparkContext, SparkConf from pyspark.sql.session import SparkSession from pyspark.sql.functions import first, collect_list, mean, count, max import matplotlib.pyplot as plt def plot(res): x = [x["multiplier"] for x in res] y = [y["cnt"] for y in res] plt.figure(figsize=(8, 5), dpi=100) plt.xlabel('倍率') plt.ylabel('人数') plt.rcParams['font.sans-serif']=['SimHei'] plt.rcParams['axes.unicode_minus']=False plt.bar(x, y, width=0.5) plt.xticks(x) plt.show() # py文件就在项目的根目录下 rootPath = os.path.split(os.path.realpath(__file__))[0] conf = SparkConf() conf.set('spark.executor.memory', '4g') conf.set('spark.driver.memory', '8g') conf.set("spark.executor.cores", '4') conf.set('spark.cores.max', 16) conf.set('spark.local.dir', rootPath) spark = SparkSession(SparkContext(conf=conf)) # 申请者数据 # Windows环境 # 注意点1:增加 option("basePath", rootPath) 选项 # 注意点2:路径 hdfs_path_apply 需要追加 /*/*.parquet hdfs_path_apply = rootPath + "/apply" applyNumbersDF = spark.read.option("basePath", rootPath).parquet( hdfs_path_apply + "/*/*.parquet" ) # 中签者数据 hdfs_path_lucky = rootPath + "/lucky" luckyDogsDF = spark.read.option("basePath", rootPath).parquet( hdfs_path_lucky + "/*/*.parquet" ) # 过滤2016年以后的中签数据,且仅抽取中签号码carNum字段 filteredLuckyDogs = ( luckyDogsDF .filter(luckyDogsDF["batchNum"] >= "201601") .select("carNum") ) # 摇号数据与中签数据做内关联,Join Key为中签号码carNum jointDF = applyNumbersDF.join(filteredLuckyDogs, "carNum", "inner") # 以batchNum、carNum做分组,统计倍率系数 multipliers = ( jointDF .groupBy(["batchNum", "carNum"]) .agg(count("batchNum").alias("multiplier")) ) # 以carNum做分组,保留最大的倍率系数 uniqueMultipliers = ( multipliers .groupBy("carNum") .agg(max("multiplier").alias("multiplier")) ) # 以multiplier倍率做分组,统计人数 result = ( uniqueMultipliers .groupBy("multiplier") .agg(count("carNum").alias("cnt")) .orderBy("multiplier") ) result.show(40) res = result.collect() # 画图 plot(res) ```

    作者回复: 赞👍,感谢老弟整理Python代码~

    2021-10-26
    2
    5
  • 东围居士
    补一个完整的 spark 代码(windows环境): package spark.basic import org.apache.spark.sql.functions.{col,count, lit, max} import org.apache.spark.sql.{DataFrame, SparkSession} object Chapter13 { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder().master("local[*]").appName("Chapter13").getOrCreate() import spark.implicits._ val rootPath: String = "E:\\temp\\yaohao_home\\yaohao" // 申请者数据 val hdfs_path_apply: String = s"${rootPath}/apply" // spark是spark-shell中默认的SparkSession实例 // 通过read API读取源文件 val applyNumbersDF: DataFrame = spark.read.option("basePath", rootPath).parquet(hdfs_path_apply + "/*/*.parquet") // 中签者数据 val hdfs_path_lucky: String = s"${rootPath}/lucky" // 通过read API读取源文件 val luckyDogsDF: DataFrame = spark.read.option("basePath", rootPath).parquet(hdfs_path_lucky + "/*/*.parquet") // 过滤2016年以后的中签数据,且仅抽取中签号码carNum字段 val filteredLuckyDogs: DataFrame = luckyDogsDF.filter(col("batchNum") >= "201601").select("carNum") // 摇号数据与中签数据做内关联,Join Key为中签号码carNum val jointDF: DataFrame = applyNumbersDF.join(filteredLuckyDogs, Seq("carNum"), "inner") // 以batchNum、carNum做分组,统计倍率系数 val multipliers: DataFrame = jointDF.groupBy(col("batchNum"),col("carNum")) .agg(count(lit(1)).alias("multiplier")) // 以carNum做分组,保留最大的倍率系数 val uniqueMultipliers: DataFrame = multipliers.groupBy("carNum") .agg(max("multiplier").alias("multiplier")) // 以multiplier倍率做分组,统计人数 val result: DataFrame = uniqueMultipliers.groupBy("multiplier") .agg(count(lit(1)).alias("cnt")) .orderBy("multiplier") result.collect result.show() } }

    作者回复: 棒👍,感谢!~

    2021-11-12
    2
    2
  • 火炎焱燚
    对应的python代码为: # 在notebook上运行时,加上下面的配置 from pyspark import SparkContext, SparkConf from pyspark.sql.session import SparkSession sc_conf = SparkConf() # spark参数配置 # sc_conf.setMaster() # sc_conf.setAppName('my-app') sc_conf.set('spark.executor.memory', '2g') sc_conf.set('spark.driver.memory', '4g') sc_conf.set("spark.executor.cores", '2') sc_conf.set('spark.cores.max', 20) sc = SparkContext(conf=sc_conf) # 加载数据,转换成dataframe rootPath='~~/RawData' hdfs_path_apply=rootPath+'/apply' spark = SparkSession(sc) applyNumbersDF=spark.read.parquet(hdfs_path_apply) # applyNumbersDF.show() # 打印出前几行数据,查看数据结构 hdfs_path_lucky=rootPath+'/lucky' luckyDogsDF=spark.read.parquet(hdfs_path_lucky) # luckyDogsDF.show() filteredLuckyDogs=luckyDogsDF.filter(luckyDogsDF['batchNum']>='201601').select('carNum') jointDF=applyNumbersDF.join(filteredLuckyDogs,'carNum','inner') # join函数消耗内存较大,容易出现OOM错误,如果出错,要将spark.driver.memory调大 # jointDF.show() # 打印出join之后的df部分数据 # 进行多种groupBy操作 from pyspark.sql import functions as f multipliers=jointDF.groupBy(['batchNum','carNum']).agg(f.count('batchNum').alias("multiplier")) # multipliers.show() uniqueMultipliers=multipliers.groupBy('carNum').agg(f.max('multiplier').alias('multiplier')) # uniqueMultipliers.show() result=uniqueMultipliers.groupBy('multiplier').agg(f.count('carNum').alias('cnt')).orderBy('multiplier') result2=result.collect() # 绘图 import matplotlib.pyplot as plt x=[i['multiplier'] for i in result2] y=[i['cnt'] for i in result2] plt.bar(x,y)

    作者回复: 赞👍!!!感谢老弟~ 后续收录到GitHub~

    2021-10-23
    2
    1
  • Geek_d447af
    文章里的代码需要在 Hadoop 环境才能跑起来,spark 本身不支持解析 parquet 文件

    作者回复: 支持的,不需要Hadoop,spark-shell本地就能跑

    2021-10-09
    2
    1
  • lightning_女巫
    我在本地跑这个代码碰到了如下错误,请问如何解决? 22/01/28 15:13:22 ERROR BypassMergeSortShuffleWriter: Error while deleting file /private/var/folders/hk/7j9sqdtn55j3cq_gv5qvp5pm39d49n/T/blockmgr-88ef94e9-943a-4971-a3a8-33d25949886f/1a/temp_shuffle_e0e163fb-852c-4298-b08e-dc4989277ab3 22/01/28 15:13:22 ERROR DiskBlockObjectWriter: Uncaught exception while reverting partial writes to file /private/var/folders/hk/7j9sqdtn55j3cq_gv5qvp5pm39d49n/T/blockmgr-88ef94e9-943a-4971-a3a8-33d25949886f/08/temp_shuffle_6c160c23-3395-445f-be03-b29a375e1139 java.io.FileNotFoundException: /private/var/folders/hk/7j9sqdtn55j3cq_gv5qvp5pm39d49n/T/blockmgr-88ef94e9-943a-4971-a3a8-33d25949886f/08/temp_shuffle_6c160c23-3395-445f-be03-b29a375e1139 (No such file or directory) at java.io.FileOutputStream.open0(Native Method) at java.io.FileOutputStream.open(FileOutputStream.java:270) at java.io.FileOutputStream.<init>(FileOutputStream.java:213) at org.apache.spark.storage.DiskBlockObjectWriter$$anonfun$revertPartialWritesAndClose$2.apply$mcV$sp(DiskBlockObjectWriter.scala:217) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1369) at org.apache.spark.storage.DiskBlockObjectWriter.revertPartialWritesAndClose(DiskBlockObjectWriter.scala:214) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.stop(BypassMergeSortShuffleWriter.java:237) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:105) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

    作者回复: 从stacktrace来看,报错原因是shuffle write过程中,写shuffle中间文件的时候报错,老弟检查一下spark.local.dir配置的文件系统目录空间是否足够,如果该配置项没有配置的话,Spark默认把中间文件写入到文件系统的/tmp目录,这个目录一般来说空间都不大,很容易写爆的

    2022-01-28
  • 东围居士
    老师,数据文件方便存一份到别的地方吗,比如马云家的网盘,或者做个种子下载什么的,百度网盘那速度真的是,我下到下午下班过周末都下不完

    作者回复: 老弟加我微信吧,搜索“方块K”或是“rJunior”,我QQ邮箱大邮件发你~ 加微信是确保你收到了~

    2021-10-22
    3
  • Geek_995b78
    用scala实现,lit(1)是什么意思呀

    作者回复: 实际上就是常数1,更准确地说,是表示一个常数列,这列的数值都是1。只不过Spark SQL这里的语法比较特殊,其实和Scala没什么关系哈~

    2021-10-11
    2
  • GAC·DU
    result具体数值: scala> result.collect res7: Array[org.apache.spark.sql.Row] = Array([1,8967], [2,19174], [3,26952], [4,29755], [5,32988], [6,34119], [7,29707], [8,26123], [9,19476], [10,9616], [11,3930], [12,1212])

    作者回复: 赞👍~

    2021-10-08
  • Neo-dqy
    【.agg(count(lit(1)).alias("cnt"))】问下老师,这里count中的lit(1)是什么意思啊? 对于汽车摇号的倍率制度,如果为了优先让倍率高的人摇到号,可以把每一期的资格分多次抽取。就是说,先构建一个所有人都在里面的样本,抽部分人;再将倍率高于某个阈值的人都取出来,构建一个新的样本,再抽取部分人。(具体划分成几个样本可以按倍率的人数分布来划分)当然这样又会对新来的人不公平,所以大家还是挤地铁吧~~

    作者回复: lit(1)表示常数列,这列的数值都是1,是Spark SQL的语法,跟Scala无关哈~ 倍率制度确实需要更好的设计~ 不过也别放弃,哈哈,万一哪天摇上了呢~

    2021-10-08
收起评论
显示
设置
留言
18
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部