package com.company.sparkcore
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
/**
* 使用Spark Core的算子实现简单的join操作
*/
object JoinBySpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName(JoinBySpark.getClass.getSimpleName)
.setMaster("local")
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
val sc = new SparkContext(conf)
//通过文本文件创建RDD
val page_viewRDD = sc.textFile("file:///e:/page_view.txt")
val pv_usersRDD = sc.textFile("file:///e:/pv_users.txt")
//提取需要的字段,组合成形如(userid,pageid)的RDD
val userid_pageidRDD = page_viewRDD.map(_.split(",")).map(viewData => (viewData(1), viewData(0)))
//提取需要的字段,组合成形如(userid,age)的RDD
val userid_ageRDD = pv_usersRDD.map(_.split(",")).map(userData => (userData(0), userData(1)))
//对上述的两个RDD执行Join操作,形成形如(userid,(pageid,age))的RDD
val userid_pageid_ageRDD = userid_pageidRDD.join(userid_ageRDD)
userid_pageid_ageRDD.collect().foreach(println)
//对join操作形成的RDD提取pageid、age字段
val joinRes = userid_pageid_ageRDD.map(upaData => (upaData._2._1, upaData._2._2))
//打印输出结果
// (1,32)
// (1,25)
// (2,25)
joinRes.collect().foreach(println)
}
}
展开