SparkSQL-DataFrame

DataFrame

DataFrame让Spark具备了处理大规模结构化数据的能力,在比原有的RDD转化方式易用的前提下,计算性能更还快了两倍

构建DataFrame

通过反射机制

Spark SQL的scala接口支持把RDD自动转换为DataFrame
people.txt数据

1
2
3
michael,23
andy,20
jsutin,18

首先定义样本类
加载数据,使用map映射到样本类,得到RDD
RDD转化为DataFrame

1
2
3
case class people(name:String,age:Int)
val rdd=sc.textFile("people.txt").map(_.split(",")).map(p=>people(p(0),p(1).trim.toInt))

val people=rdd.toDF()

编程指定

1.加载文件,并构建文件对应的Schema的列名字符串

1
2
val other_people=sc.textFile("e:\\mllib\\SogouQ1.txt",1)
val schemaString="date session name s_seq c_seq uri"

2.构建Schema信息

1
2
3
4
import org.apache.spark.sql.types._
val schema=StructType(
schemaString.split(" ").map { fieldName => StructField(fieldName,StringType,true) }
)

3.将RDD的字符串元素转换为DataFrame的Row类型

1
val rowRDD=other_people.map {_.split("\t")  }.map { p => Row(p(0),p(1),p(2),p(3),p(4),p(5)) }

4.应用之前定义的schema 创建DataFrame

1
val peopleDataFrame=sqlcontext.createDataFrame(rowRDD, schema)

json文件和parquet文件

1
2
val dff=sqlcontext.read.parquet("path")
val dff=sqlcontext.read.json("path")

相关API

people.json

1
2
3
4
5
{"name":"michale","job number":"001","age":33,"gender":"male","salary":233,"deptid":1}
{"name":"jack","job number":"002","age":23,"gender":"male","salary":2233,"deptid":2}
{"name":"rose","job number":"003","age":13,"gender":"female","salary":2330,"deptid":3}
{"name":"peter","job number":"004","age":53,"gender":"male","salary":233,"deptid":1}
{"name":"betty","job number":"005","age":33,"gender":"female","salary":2033,"deptid":2}

new_people.json

1
2
3
4
5
{"name":"michale","job number":"001","age":33,"gender":"male","salary":233,"deptid":1}
{"name":"jack","job number":"002","age":23,"gender":"male","salary":2233,"deptid":2}
{"name":"rose","job number":"003","age":13,"gender":"female","salary":2330,"deptid":3}
{"name":"peter_new","job number":"006","age":33,"gender":"male","salary":1233,"deptid":3}
{"name":"betty_new","job number":"007","age":23,"gender":"female","salary":2033,"deptid":3}

department.json

1
2
3
{"name":"Development","deptid":1}
{"name":"Person Dept","deptid":2}
{"name":"Testing dept","deptid":3}

val people=sqlcontext.read.json(“e:\mllib\people.json”)
查询操作

1
2
3
people.filter($"age">19)..show()
people.filter("gender='male'").show()
people.where($"name"==="jack").show()

排序

1
people.sort($"job number".asc,$"salary".desc).show()

增加列

1
2
people.withColumn("level", people("age")/10).show()
people.withColumn("per",people("salary")/100).show()

修改列名

1
val rnDept=people.withColumnRenamed("salary", "sal")

合并俩个数据集

1
2
val new_people=sqlcontext.read.json("e:\\mllib\\new_people.json")
people.unionAll(new_people).show()

查询同名员工的数目

1
2
val namecount=people.unionAll(new_people).groupBy($"name").count
namecount.show()

分组统计信息

1
2
3
4
5
val aggnumber=people.groupBy("job number").agg(Map(
"age" ->"max",
"gender"->"count"
))
aggnumber.show()

名字去重

1
people.unionAll(new_people).select("name").distinct().show()

差集

1
people.select("name").except(new_people.select("name")).show()

交集

1
people.select("name").intersect(new_people.select("name")).show()

俩个dataframe的外连接

1
2
val dept=sqlcontext.read.json("e:\\mllib\\department.json")
people.join(dept, people("deptid")===dept("deptid"),"outer").show()