13 | Spark SQL:让我们从“小汽车摇号分析”开始
业务需求
- 深入了解
- 翻译
- 解释
- 总结
学习Spark SQL的重要性以及从小汽车摇号数据分析入手学习Spark SQL的方法。作者指出,熟练掌握Spark常用算子与核心原理后,学习Spark计算子框架是成为资深赛车手的第三步。在众多子框架中,Spark SQL是最重要的一个,因此建议从Spark SQL开始学习。文章以北京市小汽车摇号数据为例,介绍了业务需求和准备工作,并提供了数据探索的代码示例。作者还简要介绍了SparkSession和DataFrame的概念,强调DataFrame是一种带Schema的分布式数据集,可以简单地看作是数据库中的一张二维表。通过实际业务需求出发,通过数据分析学习Spark SQL的实践方法,为读者快速了解Spark SQL的重要性和学习路径提供了指导。文章重点介绍了数据过滤、数据关联、倍率统计、分组计数等操作的代码示例,并通过实际计算结果展示了中签率与倍率之间的关系。最后,鼓励读者在自己的Spark环境中运行代码,并提出了脑洞时间和每课一练的问题,以促进读者的交流和思考。整体而言,本文为读者提供了一个系统的学习Spark SQL的实践方法,既有理论指导,又有实际操作,适合读者快速了解和掌握Spark SQL的技术特点。
《零基础入门 Spark》,新⼈⾸单¥59
全部留言(18)
- 最新
- 精选
- qinsispark无关。讨论下摇号。 评论区有匿名读者质疑文中的结论。这里尝试换个角度代入具体的数字分析下。 简单起见,假设每轮摇号有1000人中签,并且倍率和轮次一致,即第一轮大家都是1倍,第一轮没中的人在第二轮变为2倍,第二轮又没中的人到了第三轮就变成3倍,依次类推。 先看第一轮的1000个中签者,显然他们的倍率都是1,没有其他倍率的中签者,记为: [1000, 0, 0, ...] 再看第二轮的1000个中签者。由于新加入的申请者倍率为1,第一轮未中的人倍率为2。按照倍率摇号的话,期望的结果就是,倍率为2的中签人数是倍率为1的中签人数的2倍,记为: [333, 667, 0, 0, ...] 以此类推,第三轮就是: [167, 333, 500, 0, 0, ...] 尝试摇个10轮,可以得到下表: [1000, 0, 0, ...] [333, 667, 0, 0, ...] [167, 333, 500, 0, 0, ...] [100, 200, 300, 400, 0, 0, ...] [67, 133, 200, 267, 333, 0, 0, ...] [48, 95, 143, 190, 238, 286, 0, 0, ...] [36, 71, 107, 143, 179, 214, 250, 0, 0, ...] [28, 56, 83, 111, 139, 167, 194, 222, 0, 0, ...] [22, 44, 67, 89, 111, 133, 156, 178, 200, 0, 0, ...] [18, 36, 55, 73, 91, 109, 127, 145, 164, 182, 0, 0, ...] 可以看到在每一轮的中签者中,确实是倍率越高中签的人数越多。 而文中的统计方法,相当于把这张表按列求和: [1819, 1635, 1455, 1273, 1091, 909, 727, 545, 364, 182, 0, 0, ...] 可以看到这是一条单调递减的曲线。然而却不能像文中一样得出“中签率没有随着倍率增加”的结论。高倍率的中签人数比低倍率的人数少,是因为能达到高倍率的人本身就少。比如上面例子中,10轮过后10倍率的中签者只有182人,是因为前9轮没有人能达到10倍率。相比之下,在第一轮就有1000个1倍率的人中签。 至于文中配图为什么会是一条类似钟型的曲线,猜测可能第一次引入倍率摇号的时候,就已经给不同的人分配不同的倍率了,而不是大家一开始都是1倍率。在上面的例子中,如果只对后5轮求和,可以得到: [152, 302, 455, 606, 758, 909, 727, 545, 364, 182] 这样就和文中的配图比较接近了。 所以结论就是要验证中签率和倍率的关系,不能按照倍率去累加中签人数,而是要看单次摇号中不同倍率的中签者的分布。
作者回复: 老弟分析得鞭辟入里~ 官方的倍率设计,确实比较粗糙,统计下来,结论实际上也不是十分牢靠。咱们这里用小汽车摇号的例子,初衷还是找一个大家都熟悉的场景,来更好地学习Spark SQL的开发流程~
2021-10-12314 - Alvin-L``` import os from pyspark import SparkContext, SparkConf from pyspark.sql.session import SparkSession from pyspark.sql.functions import first, collect_list, mean, count, max import matplotlib.pyplot as plt def plot(res): x = [x["multiplier"] for x in res] y = [y["cnt"] for y in res] plt.figure(figsize=(8, 5), dpi=100) plt.xlabel('倍率') plt.ylabel('人数') plt.rcParams['font.sans-serif']=['SimHei'] plt.rcParams['axes.unicode_minus']=False plt.bar(x, y, width=0.5) plt.xticks(x) plt.show() # py文件就在项目的根目录下 rootPath = os.path.split(os.path.realpath(__file__))[0] conf = SparkConf() conf.set('spark.executor.memory', '4g') conf.set('spark.driver.memory', '8g') conf.set("spark.executor.cores", '4') conf.set('spark.cores.max', 16) conf.set('spark.local.dir', rootPath) spark = SparkSession(SparkContext(conf=conf)) # 申请者数据 # Windows环境 # 注意点1:增加 option("basePath", rootPath) 选项 # 注意点2:路径 hdfs_path_apply 需要追加 /*/*.parquet hdfs_path_apply = rootPath + "/apply" applyNumbersDF = spark.read.option("basePath", rootPath).parquet( hdfs_path_apply + "/*/*.parquet" ) # 中签者数据 hdfs_path_lucky = rootPath + "/lucky" luckyDogsDF = spark.read.option("basePath", rootPath).parquet( hdfs_path_lucky + "/*/*.parquet" ) # 过滤2016年以后的中签数据,且仅抽取中签号码carNum字段 filteredLuckyDogs = ( luckyDogsDF .filter(luckyDogsDF["batchNum"] >= "201601") .select("carNum") ) # 摇号数据与中签数据做内关联,Join Key为中签号码carNum jointDF = applyNumbersDF.join(filteredLuckyDogs, "carNum", "inner") # 以batchNum、carNum做分组,统计倍率系数 multipliers = ( jointDF .groupBy(["batchNum", "carNum"]) .agg(count("batchNum").alias("multiplier")) ) # 以carNum做分组,保留最大的倍率系数 uniqueMultipliers = ( multipliers .groupBy("carNum") .agg(max("multiplier").alias("multiplier")) ) # 以multiplier倍率做分组,统计人数 result = ( uniqueMultipliers .groupBy("multiplier") .agg(count("carNum").alias("cnt")) .orderBy("multiplier") ) result.show(40) res = result.collect() # 画图 plot(res) ```
作者回复: 赞👍,感谢老弟整理Python代码~
2021-10-2625 - 东围居士补一个完整的 spark 代码(windows环境): package spark.basic import org.apache.spark.sql.functions.{col,count, lit, max} import org.apache.spark.sql.{DataFrame, SparkSession} object Chapter13 { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder().master("local[*]").appName("Chapter13").getOrCreate() import spark.implicits._ val rootPath: String = "E:\\temp\\yaohao_home\\yaohao" // 申请者数据 val hdfs_path_apply: String = s"${rootPath}/apply" // spark是spark-shell中默认的SparkSession实例 // 通过read API读取源文件 val applyNumbersDF: DataFrame = spark.read.option("basePath", rootPath).parquet(hdfs_path_apply + "/*/*.parquet") // 中签者数据 val hdfs_path_lucky: String = s"${rootPath}/lucky" // 通过read API读取源文件 val luckyDogsDF: DataFrame = spark.read.option("basePath", rootPath).parquet(hdfs_path_lucky + "/*/*.parquet") // 过滤2016年以后的中签数据,且仅抽取中签号码carNum字段 val filteredLuckyDogs: DataFrame = luckyDogsDF.filter(col("batchNum") >= "201601").select("carNum") // 摇号数据与中签数据做内关联,Join Key为中签号码carNum val jointDF: DataFrame = applyNumbersDF.join(filteredLuckyDogs, Seq("carNum"), "inner") // 以batchNum、carNum做分组,统计倍率系数 val multipliers: DataFrame = jointDF.groupBy(col("batchNum"),col("carNum")) .agg(count(lit(1)).alias("multiplier")) // 以carNum做分组,保留最大的倍率系数 val uniqueMultipliers: DataFrame = multipliers.groupBy("carNum") .agg(max("multiplier").alias("multiplier")) // 以multiplier倍率做分组,统计人数 val result: DataFrame = uniqueMultipliers.groupBy("multiplier") .agg(count(lit(1)).alias("cnt")) .orderBy("multiplier") result.collect result.show() } }
作者回复: 棒👍,感谢!~
2021-11-1222 - 火炎焱燚对应的python代码为: # 在notebook上运行时,加上下面的配置 from pyspark import SparkContext, SparkConf from pyspark.sql.session import SparkSession sc_conf = SparkConf() # spark参数配置 # sc_conf.setMaster() # sc_conf.setAppName('my-app') sc_conf.set('spark.executor.memory', '2g') sc_conf.set('spark.driver.memory', '4g') sc_conf.set("spark.executor.cores", '2') sc_conf.set('spark.cores.max', 20) sc = SparkContext(conf=sc_conf) # 加载数据,转换成dataframe rootPath='~~/RawData' hdfs_path_apply=rootPath+'/apply' spark = SparkSession(sc) applyNumbersDF=spark.read.parquet(hdfs_path_apply) # applyNumbersDF.show() # 打印出前几行数据,查看数据结构 hdfs_path_lucky=rootPath+'/lucky' luckyDogsDF=spark.read.parquet(hdfs_path_lucky) # luckyDogsDF.show() filteredLuckyDogs=luckyDogsDF.filter(luckyDogsDF['batchNum']>='201601').select('carNum') jointDF=applyNumbersDF.join(filteredLuckyDogs,'carNum','inner') # join函数消耗内存较大,容易出现OOM错误,如果出错,要将spark.driver.memory调大 # jointDF.show() # 打印出join之后的df部分数据 # 进行多种groupBy操作 from pyspark.sql import functions as f multipliers=jointDF.groupBy(['batchNum','carNum']).agg(f.count('batchNum').alias("multiplier")) # multipliers.show() uniqueMultipliers=multipliers.groupBy('carNum').agg(f.max('multiplier').alias('multiplier')) # uniqueMultipliers.show() result=uniqueMultipliers.groupBy('multiplier').agg(f.count('carNum').alias('cnt')).orderBy('multiplier') result2=result.collect() # 绘图 import matplotlib.pyplot as plt x=[i['multiplier'] for i in result2] y=[i['cnt'] for i in result2] plt.bar(x,y)
作者回复: 赞👍!!!感谢老弟~ 后续收录到GitHub~
2021-10-2321 - Geek_d447af文章里的代码需要在 Hadoop 环境才能跑起来,spark 本身不支持解析 parquet 文件
作者回复: 支持的,不需要Hadoop,spark-shell本地就能跑
2021-10-0921 - lightning_女巫我在本地跑这个代码碰到了如下错误,请问如何解决? 22/01/28 15:13:22 ERROR BypassMergeSortShuffleWriter: Error while deleting file /private/var/folders/hk/7j9sqdtn55j3cq_gv5qvp5pm39d49n/T/blockmgr-88ef94e9-943a-4971-a3a8-33d25949886f/1a/temp_shuffle_e0e163fb-852c-4298-b08e-dc4989277ab3 22/01/28 15:13:22 ERROR DiskBlockObjectWriter: Uncaught exception while reverting partial writes to file /private/var/folders/hk/7j9sqdtn55j3cq_gv5qvp5pm39d49n/T/blockmgr-88ef94e9-943a-4971-a3a8-33d25949886f/08/temp_shuffle_6c160c23-3395-445f-be03-b29a375e1139 java.io.FileNotFoundException: /private/var/folders/hk/7j9sqdtn55j3cq_gv5qvp5pm39d49n/T/blockmgr-88ef94e9-943a-4971-a3a8-33d25949886f/08/temp_shuffle_6c160c23-3395-445f-be03-b29a375e1139 (No such file or directory) at java.io.FileOutputStream.open0(Native Method) at java.io.FileOutputStream.open(FileOutputStream.java:270) at java.io.FileOutputStream.<init>(FileOutputStream.java:213) at org.apache.spark.storage.DiskBlockObjectWriter$$anonfun$revertPartialWritesAndClose$2.apply$mcV$sp(DiskBlockObjectWriter.scala:217) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1369) at org.apache.spark.storage.DiskBlockObjectWriter.revertPartialWritesAndClose(DiskBlockObjectWriter.scala:214) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.stop(BypassMergeSortShuffleWriter.java:237) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:105) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
作者回复: 从stacktrace来看,报错原因是shuffle write过程中,写shuffle中间文件的时候报错,老弟检查一下spark.local.dir配置的文件系统目录空间是否足够,如果该配置项没有配置的话,Spark默认把中间文件写入到文件系统的/tmp目录,这个目录一般来说空间都不大,很容易写爆的
2022-01-28 - 东围居士老师,数据文件方便存一份到别的地方吗,比如马云家的网盘,或者做个种子下载什么的,百度网盘那速度真的是,我下到下午下班过周末都下不完
作者回复: 老弟加我微信吧,搜索“方块K”或是“rJunior”,我QQ邮箱大邮件发你~ 加微信是确保你收到了~
2021-10-223 - Geek_995b78用scala实现,lit(1)是什么意思呀
作者回复: 实际上就是常数1,更准确地说,是表示一个常数列,这列的数值都是1。只不过Spark SQL这里的语法比较特殊,其实和Scala没什么关系哈~
2021-10-112 - GAC·DUresult具体数值: scala> result.collect res7: Array[org.apache.spark.sql.Row] = Array([1,8967], [2,19174], [3,26952], [4,29755], [5,32988], [6,34119], [7,29707], [8,26123], [9,19476], [10,9616], [11,3930], [12,1212])
作者回复: 赞👍~
2021-10-08 - Neo-dqy【.agg(count(lit(1)).alias("cnt"))】问下老师,这里count中的lit(1)是什么意思啊? 对于汽车摇号的倍率制度,如果为了优先让倍率高的人摇到号,可以把每一期的资格分多次抽取。就是说,先构建一个所有人都在里面的样本,抽部分人;再将倍率高于某个阈值的人都取出来,构建一个新的样本,再抽取部分人。(具体划分成几个样本可以按倍率的人数分布来划分)当然这样又会对新来的人不公平,所以大家还是挤地铁吧~~
作者回复: lit(1)表示常数列,这列的数值都是1,是Spark SQL的语法,跟Scala无关哈~ 倍率制度确实需要更好的设计~ 不过也别放弃,哈哈,万一哪天摇上了呢~
2021-10-08