零基础入门 Spark
吴磊
前 FreeWheel 机器学习研发经理
19171 人已学习
新⼈⾸单¥59
登录后,你可以任选4讲全文学习
课程目录
已完结/共 38 讲
零基础入门 Spark
15
15
1.0x
00:00/00:00
登录|注册

16 | 数据转换:如何在DataFrame之上做数据处理?

你好,我是吴磊。
在上一讲,我们学习了创建 DataFrame 的各种途径与方法,那么,有了 DataFrame 之后,我们该如何在 DataFrame 之上做数据探索、数据分析,以及各式各样的数据转换呢?在数据处理完毕之后,我们又该如何做数据展示与数据持久化呢?今天这一讲,我们就来解答这些疑问。
为了给开发者提供足够的灵活性,对于 DataFrame 之上的数据处理,Spark SQL 支持两类开发入口:一个是大家所熟知的结构化查询语言:SQL,另一类是 DataFrame 开发算子。就开发效率与执行效率来说,二者并无优劣之分,选择哪种开发入口,完全取决于开发者的个人偏好与开发习惯。
与 RDD 类似,DataFrame 支持种类繁多的开发算子,但相比 SQL 语言,DataFrame 算子的学习成本相对要高一些。因此,本着先易后难的思路,咱们先来说说 DataFrame 中 SQL 语句的用法,然后再去理解 DataFrame 开发算子。

SQL 语句

对于任意的 DataFrame,我们都可以使用 createTempView 或是 createGlobalTempView 在 Spark SQL 中创建临时数据表。
两者的区别在于,createTempView 创建的临时表,其生命周期仅限于 SparkSession 内部,而 createGlobalTempView 创建的临时表,可以在同一个应用程序中跨 SparkSession 提供访问。有了临时表之后,我们就可以使用 SQL 语句灵活地倒腾表数据。
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结

Spark SQL中的数据处理技术概览 本文详细介绍了在Spark SQL中使用SQL语句和DataFrame算子进行数据转换的方法。文章首先介绍了通过SQL语句对DataFrame进行灵活的数据操作,包括创建临时表、执行SQL查询等。其次,详细介绍了DataFrame支持的各类算子,包括探索类算子、清洗类算子、转换类算子、分析类算子和持久化算子。特别强调了DataFrame算子的丰富性和全面性,同时提到了学习路径和成本相对较低的SQL语句,为读者提供了选择开发入口的建议。 文章重点讲解了分析类算子的使用方法,通过示例演示了如何使用join算子进行数据关联,以及如何使用groupBy和agg算子进行数据分组和聚合。此外,还介绍了持久化类算子的使用方法,包括文件格式、写入选项和存储路径的指定,以及不同的写入模式。 总结指出,DataFrame算子与SQL查询语句在功能和性能上并无优劣之分,读者可以根据个人偏好自由选择使用。最后,鼓励读者结合实践加深对算子功能的理解,并推荐了官方文档作为学习的补充资源。 整体而言,本文通过介绍SQL语句和DataFrame算子的使用方法,为读者提供了在Spark SQL中进行数据转换和处理的指导,旨在帮助读者快速了解并掌握数据处理的技术特点。

仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《零基础入门 Spark》
新⼈⾸单¥59
立即购买
登录 后留言

全部留言(10)

  • 最新
  • 精选
  • kingcall
    explode 不会引入shuffle,因为shuffle是在众多计算节点进行数据传输,explode虽然会导致数据条数变多但是都是在一台节点上的

    作者回复: 正解,满分💯

    2021-10-28
    3
    10
  • 火炎焱燚
    python 代码为: # 1-SQL语句: from pyspark import SparkContext, SparkConf from pyspark.sql.session import SparkSession sc = SparkContext() spark = SparkSession(sc) seq=[('Alice',18), ('Bob',14)] column = ['name','age'] df=spark.createDataFrame(seq,column) df.createTempView('t1') query="select * from t1" result=spark.sql(query) result.show() # 2-探索类算子: employees=[(1, "John", 26, "Male"), (2, "Lily", 28, "Female"), (3, "Raymond", 30, "Male")] employeesDF=spark.createDataFrame(employees,['id','name','age','gender']) employeesDF.printSchema() employeesDF.show() age_df=employeesDF.describe('age') age_df.show() # 3-清洗类算子: # 删除某一列数据 employeesDF.drop('gender').show() # distinct对所有数据去重,注意无法设置列名 employeesDF.distinct().show() # dropDuplicates可以对某几列去重,灵活性更高 employeesDF.dropDuplicates(['gender']).show() # 4-转换类算子 # 选择某几列组成新的df employeesDF.select(['name','gender']).show() employeesDF.select('name').show() # selectExpr用选择表达式来组成新的df employeesDF.selectExpr("id", "name", "concat(id, '_', name) as id_name").show() # where选择满足条件的内容 employeesDF.where("gender='Male'").show() # 对列名重命名:将gender重命名为sex employeesDF.withColumnRenamed('gender','sex').show() # 在原列进行修改后组成新的一列,将age都+10岁 employeesDF.withColumn("crypto", employeesDF['age']+10).show() # drop删除某一列 employeesDF.withColumn("crypto", employeesDF['age']+10).drop('age').show() # explode拆分list seq2 =[(1, "John", 26, "Male",["Sports", "News"]), (2, "Lily", 28, "Female", ["Shopping", "Reading"]), (3, "Raymond", 30, "Male", ["Sports", "Reading"])] employeesDF2=spark.createDataFrame(seq2,['id','name','age','gender','interests']) from pyspark.sql.functions import explode employeesDF2.withColumn('interest',explode(employeesDF2['interests'])).show()

    作者回复: 辛苦老弟~ 太棒了!

    2021-10-23
    2
  • Geek_935079
    val aggResult = fullInfo.groupBy这里是不是要改为val aggResult = jointDF.groupBy

    作者回复: 是的~ 这里typo了,感谢老弟提醒~

    2021-11-25
    2
    1
  • 小李
    请问一下:df.select().groupBy().count()与df.select().groupBy().agg(count(lit(1)))内部处理逻辑会不一样吗,还是会都会经过spark sql优化引擎优化成map阶段预聚合?比如会不会像rdd的aggregateByKey或者reduceByKey一样在shuffle write阶段做partition内的预聚合。

    作者回复: 这两条语句的效果是一样的,在spark sql里面的优化过程,在效果方面,完全一样。像你说的,也会做map端聚合,这些优化机制,都是有的~

    2022-01-14
  • 火炎焱燚
    # 5-分析类算子 # 创建员工df seq=[(1,'Mike',28,'Male'), (2, "Lily", 30, "Female"), (3, "Raymond", 26, "Male")] employees=spark.createDataFrame(seq,['id','name','age','gender']) employees.show() # 创建薪水df seq2=[(1, 26000), (2, 30000), (4, 25000), (3, 20000)] salaries=spark.createDataFrame(seq2,['id','salary']) salaries.show() # 将员工df和薪水df进行合并,inner方式: joinDF=salaries.join(employees,'id','inner') joinDF.show() # 按照性别统计出薪水之和,平均值 from pyspark.sql import functions as f aggResult=joinDF.groupBy('gender').agg(f.sum('salary').alias('sum_salary'),f.avg('salary').alias('avg_salary')) aggResult.show() # 排序,sort方法和orderBy方法一样 aggResult.sort(f.desc('sum_salary'),f.asc('gender')).show() aggResult.orderBy(f.desc('sum_salary'),f.asc('gender')).show()

    作者回复: 👍👍👍

    2021-10-23
  • 小玲铛🍯
    自己开发的时候createTempView会在内存中创建临时表,重新运行的话会报table is exist 错误,建议使用 createOrReplaceTempView

    作者回复: 没错,createOrReplaceTempView更实用,避免重复建表带来的报错~

    2021-10-18
  • GAC·DU
    Spark中Shuffle算子的分类:重分区算子、ByKey算子、Join算子

    作者回复: 说到Shuffle,我们往往需要分类讨论。 重分区算子repartition、ByKey算子确实会引入Shuffle,这个是确定性的。 不确定的是join,一般来说,join都会引入Shuffle。不过有一种特殊的join,学名叫Collocated Join,这种join是不会引入Shuffle的。 名字听上去挺唬人,但本质上,就是参与join的两张表,提前按照join keys,做好了分区。因而在join的时候,自然就不会有Shuffle了。不过实际应用中,Collocated Join场景并不多,因此暂时可以忽略掉~

    2021-10-16
  • Spoon
    Java代码实现 https://github.com/Spoon94/spark-practice/blob/master/src/main/java/com/spoon/spark/sql/DataFrameOperatorJob.java
    2022-04-10
    2
  • 嬴梦川
    从我的开发经验发现explode不会引入shuffle
    2023-09-24归属地:新加坡
  • Geek_b2839b
    老师请问一下,使用spark同步hive数据到Oracle的时候,由于executor失败重试,导致偶尔出现同步时hive数据和Oracle数据不一致的情况,这个该怎么解决呢
    2022-05-28
收起评论
显示
设置
留言
10
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部