DataFrame
DataFrame让Spark具备了处理大规模结构化数据的能力,在比原有的RDD转化方式易用的前提下,计算性能更还快了两倍
构建DataFrame
通过反射机制
Spark SQL的scala接口支持把RDD自动转换为DataFrame
people.txt数据1
2
3michael,23
andy,20
jsutin,18
首先定义样本类
加载数据,使用map映射到样本类,得到RDD
RDD转化为DataFrame1
2
3case class people(name:String,age:Int)
val rdd=sc.textFile("people.txt").map(_.split(",")).map(p=>people(p(0),p(1).trim.toInt))
val people=rdd.toDF()
编程指定
1.加载文件,并构建文件对应的Schema的列名字符串1
2val other_people=sc.textFile("e:\\mllib\\SogouQ1.txt",1)
val schemaString="date session name s_seq c_seq uri"
2.构建Schema信息1
2
3
4import org.apache.spark.sql.types._
val schema=StructType(
schemaString.split(" ").map { fieldName => StructField(fieldName,StringType,true) }
)
3.将RDD的字符串元素转换为DataFrame的Row类型1
val rowRDD=other_people.map {_.split("\t") }.map { p => Row(p(0),p(1),p(2),p(3),p(4),p(5)) }
4.应用之前定义的schema 创建DataFrame1
val peopleDataFrame=sqlcontext.createDataFrame(rowRDD, schema)
json文件和parquet文件
1 | val dff=sqlcontext.read.parquet("path") |
相关API
people.json1
2
3
4
5{"name":"michale","job number":"001","age":33,"gender":"male","salary":233,"deptid":1}
{"name":"jack","job number":"002","age":23,"gender":"male","salary":2233,"deptid":2}
{"name":"rose","job number":"003","age":13,"gender":"female","salary":2330,"deptid":3}
{"name":"peter","job number":"004","age":53,"gender":"male","salary":233,"deptid":1}
{"name":"betty","job number":"005","age":33,"gender":"female","salary":2033,"deptid":2}
new_people.json1
2
3
4
5{"name":"michale","job number":"001","age":33,"gender":"male","salary":233,"deptid":1}
{"name":"jack","job number":"002","age":23,"gender":"male","salary":2233,"deptid":2}
{"name":"rose","job number":"003","age":13,"gender":"female","salary":2330,"deptid":3}
{"name":"peter_new","job number":"006","age":33,"gender":"male","salary":1233,"deptid":3}
{"name":"betty_new","job number":"007","age":23,"gender":"female","salary":2033,"deptid":3}
department.json1
2
3{"name":"Development","deptid":1}
{"name":"Person Dept","deptid":2}
{"name":"Testing dept","deptid":3}
val people=sqlcontext.read.json(“e:\mllib\people.json”)
查询操作1
2
3people.filter($"age">19)..show()
people.filter("gender='male'").show()
people.where($"name"==="jack").show()
排序1
people.sort($"job number".asc,$"salary".desc).show()
增加列1
2people.withColumn("level", people("age")/10).show()
people.withColumn("per",people("salary")/100).show()
修改列名1
val rnDept=people.withColumnRenamed("salary", "sal")
合并俩个数据集1
2val new_people=sqlcontext.read.json("e:\\mllib\\new_people.json")
people.unionAll(new_people).show()
查询同名员工的数目1
2val namecount=people.unionAll(new_people).groupBy($"name").count
namecount.show()
分组统计信息1
2
3
4
5val aggnumber=people.groupBy("job number").agg(Map(
"age" ->"max",
"gender"->"count"
))
aggnumber.show()
名字去重1
people.unionAll(new_people).select("name").distinct().show()
差集1
people.select("name").except(new_people.select("name")).show()
交集1
people.select("name").intersect(new_people.select("name")).show()
俩个dataframe的外连接1
2val dept=sqlcontext.read.json("e:\\mllib\\department.json")
people.join(dept, people("deptid")===dept("deptid"),"outer").show()