16 | 数据转换:如何在DataFrame之上做数据处理?
SQL 语句
- 深入了解
- 翻译
- 解释
- 总结
Spark SQL中的数据处理技术概览 本文详细介绍了在Spark SQL中使用SQL语句和DataFrame算子进行数据转换的方法。文章首先介绍了通过SQL语句对DataFrame进行灵活的数据操作,包括创建临时表、执行SQL查询等。其次,详细介绍了DataFrame支持的各类算子,包括探索类算子、清洗类算子、转换类算子、分析类算子和持久化算子。特别强调了DataFrame算子的丰富性和全面性,同时提到了学习路径和成本相对较低的SQL语句,为读者提供了选择开发入口的建议。 文章重点讲解了分析类算子的使用方法,通过示例演示了如何使用join算子进行数据关联,以及如何使用groupBy和agg算子进行数据分组和聚合。此外,还介绍了持久化类算子的使用方法,包括文件格式、写入选项和存储路径的指定,以及不同的写入模式。 总结指出,DataFrame算子与SQL查询语句在功能和性能上并无优劣之分,读者可以根据个人偏好自由选择使用。最后,鼓励读者结合实践加深对算子功能的理解,并推荐了官方文档作为学习的补充资源。 整体而言,本文通过介绍SQL语句和DataFrame算子的使用方法,为读者提供了在Spark SQL中进行数据转换和处理的指导,旨在帮助读者快速了解并掌握数据处理的技术特点。
《零基础入门 Spark》,新⼈⾸单¥59
全部留言(10)
- 最新
- 精选
- kingcallexplode 不会引入shuffle,因为shuffle是在众多计算节点进行数据传输,explode虽然会导致数据条数变多但是都是在一台节点上的
作者回复: 正解,满分💯
2021-10-28310 - 火炎焱燚python 代码为: # 1-SQL语句: from pyspark import SparkContext, SparkConf from pyspark.sql.session import SparkSession sc = SparkContext() spark = SparkSession(sc) seq=[('Alice',18), ('Bob',14)] column = ['name','age'] df=spark.createDataFrame(seq,column) df.createTempView('t1') query="select * from t1" result=spark.sql(query) result.show() # 2-探索类算子: employees=[(1, "John", 26, "Male"), (2, "Lily", 28, "Female"), (3, "Raymond", 30, "Male")] employeesDF=spark.createDataFrame(employees,['id','name','age','gender']) employeesDF.printSchema() employeesDF.show() age_df=employeesDF.describe('age') age_df.show() # 3-清洗类算子: # 删除某一列数据 employeesDF.drop('gender').show() # distinct对所有数据去重,注意无法设置列名 employeesDF.distinct().show() # dropDuplicates可以对某几列去重,灵活性更高 employeesDF.dropDuplicates(['gender']).show() # 4-转换类算子 # 选择某几列组成新的df employeesDF.select(['name','gender']).show() employeesDF.select('name').show() # selectExpr用选择表达式来组成新的df employeesDF.selectExpr("id", "name", "concat(id, '_', name) as id_name").show() # where选择满足条件的内容 employeesDF.where("gender='Male'").show() # 对列名重命名:将gender重命名为sex employeesDF.withColumnRenamed('gender','sex').show() # 在原列进行修改后组成新的一列,将age都+10岁 employeesDF.withColumn("crypto", employeesDF['age']+10).show() # drop删除某一列 employeesDF.withColumn("crypto", employeesDF['age']+10).drop('age').show() # explode拆分list seq2 =[(1, "John", 26, "Male",["Sports", "News"]), (2, "Lily", 28, "Female", ["Shopping", "Reading"]), (3, "Raymond", 30, "Male", ["Sports", "Reading"])] employeesDF2=spark.createDataFrame(seq2,['id','name','age','gender','interests']) from pyspark.sql.functions import explode employeesDF2.withColumn('interest',explode(employeesDF2['interests'])).show()
作者回复: 辛苦老弟~ 太棒了!
2021-10-232 - Geek_935079val aggResult = fullInfo.groupBy这里是不是要改为val aggResult = jointDF.groupBy
作者回复: 是的~ 这里typo了,感谢老弟提醒~
2021-11-2521 - 小李请问一下:df.select().groupBy().count()与df.select().groupBy().agg(count(lit(1)))内部处理逻辑会不一样吗,还是会都会经过spark sql优化引擎优化成map阶段预聚合?比如会不会像rdd的aggregateByKey或者reduceByKey一样在shuffle write阶段做partition内的预聚合。
作者回复: 这两条语句的效果是一样的,在spark sql里面的优化过程,在效果方面,完全一样。像你说的,也会做map端聚合,这些优化机制,都是有的~
2022-01-14 - 火炎焱燚# 5-分析类算子 # 创建员工df seq=[(1,'Mike',28,'Male'), (2, "Lily", 30, "Female"), (3, "Raymond", 26, "Male")] employees=spark.createDataFrame(seq,['id','name','age','gender']) employees.show() # 创建薪水df seq2=[(1, 26000), (2, 30000), (4, 25000), (3, 20000)] salaries=spark.createDataFrame(seq2,['id','salary']) salaries.show() # 将员工df和薪水df进行合并,inner方式: joinDF=salaries.join(employees,'id','inner') joinDF.show() # 按照性别统计出薪水之和,平均值 from pyspark.sql import functions as f aggResult=joinDF.groupBy('gender').agg(f.sum('salary').alias('sum_salary'),f.avg('salary').alias('avg_salary')) aggResult.show() # 排序,sort方法和orderBy方法一样 aggResult.sort(f.desc('sum_salary'),f.asc('gender')).show() aggResult.orderBy(f.desc('sum_salary'),f.asc('gender')).show()
作者回复: 👍👍👍
2021-10-23 - 小玲铛🍯自己开发的时候createTempView会在内存中创建临时表,重新运行的话会报table is exist 错误,建议使用 createOrReplaceTempView
作者回复: 没错,createOrReplaceTempView更实用,避免重复建表带来的报错~
2021-10-18 - GAC·DUSpark中Shuffle算子的分类:重分区算子、ByKey算子、Join算子
作者回复: 说到Shuffle,我们往往需要分类讨论。 重分区算子repartition、ByKey算子确实会引入Shuffle,这个是确定性的。 不确定的是join,一般来说,join都会引入Shuffle。不过有一种特殊的join,学名叫Collocated Join,这种join是不会引入Shuffle的。 名字听上去挺唬人,但本质上,就是参与join的两张表,提前按照join keys,做好了分区。因而在join的时候,自然就不会有Shuffle了。不过实际应用中,Collocated Join场景并不多,因此暂时可以忽略掉~
2021-10-16 - SpoonJava代码实现 https://github.com/Spoon94/spark-practice/blob/master/src/main/java/com/spoon/spark/sql/DataFrameOperatorJob.java2022-04-102
- 嬴梦川从我的开发经验发现explode不会引入shuffle2023-09-24归属地:新加坡
- Geek_b2839b老师请问一下,使用spark同步hive数据到Oracle的时候,由于executor失败重试,导致偶尔出现同步时hive数据和Oracle数据不一致的情况,这个该怎么解决呢2022-05-28