Spark-特殊的RDD

pair RDD

键值对类型的RDD,一个重要的RDD

操作

reduceByKey(fun) groupByKey() mapValues(fun) flatMapValues(fun) keys() values() sortBykey() join cogroup leftOuterJoin rightOuterJoin

groupByKey数据分组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void groupbykey(JavaSparkContext sc){
List<Tuple2<Integer,String>> asList = Arrays.asList(new Tuple2(100,"spark"),new Tuple2(100,"scala"),new Tuple2(90,"hadoop"),new Tuple2(90,"java"));
JavaPairRDD<Integer, String> parallelizePairs = sc.parallelizePairs(asList, 1);
parallelizePairs.groupByKey().foreach(new VoidFunction<Tuple2<Integer,Iterable<String>>>(){

@Override
public void call(Tuple2<Integer, Iterable<String>> arg0) throws Exception {
// TODO Auto-generated method stub
System.out.println(arg0);
}

});
}


(100,[spark, scala])
(90,[hadoop, java])

join 内连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static void Join(JavaSparkContext sc){
List<Tuple2<Integer,String>> names = Arrays.asList(new Tuple2(1,"spark"),new Tuple2(2,"scala"),new Tuple2(3,"hadoop"));
List<Tuple2<Integer,Integer>> scores = Arrays.asList(new Tuple2(1,100),new Tuple2(2,90),new Tuple2(3,90));
JavaPairRDD<Integer, String> p_names = sc.parallelizePairs(names);
JavaPairRDD<Integer, Integer> p_scores = sc.parallelizePairs(scores);
JavaPairRDD<Integer, Tuple2<String, Integer>> join = p_names.join(p_scores);
join.foreach(new VoidFunction<Tuple2<Integer,Tuple2<String,Integer>>>(){

@Override
public void call(Tuple2<Integer, Tuple2<String, Integer>> arg0) throws Exception {
// TODO Auto-generated method stub
System.out.println(arg0._1+" "+arg0._2._1+" "+arg0._2._2);
}


});
}

1 spark 100
3 hadoop 90
2 scala 90

cogroup

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void cogroup(JavaSparkContext sc){
List<Tuple2<Integer,String>> names = Arrays.asList(new Tuple2(1,"spark"),new Tuple2(2,"scala"),new Tuple2(3,"hadoop"));
List<Tuple2<Integer,Integer>> scores = Arrays.asList(new Tuple2(1,100),new Tuple2(2,90),new Tuple2(3,90));
JavaPairRDD<Integer, String> p_names = sc.parallelizePairs(names);
JavaPairRDD<Integer, Integer> p_scores = sc.parallelizePairs(scores);
p_names.cogroup(p_scores).foreach(new VoidFunction<Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>>>(){

@Override
public void call(Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> arg0) throws Exception {
// TODO Auto-generated method stub
System.out.println(arg0._1+" "+arg0._2._1+" "+arg0._2._2);
}



});
}
1 [spark] [100]
3 [hadoop] [90]
2 [scala] [90]

数据排序

sortBykey()函数接收ascending参数,默认为升序(true)

自定义排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class SecondarySort implements Serializable,Ordered<SecondarySort>
重写相关代码
@Override
public boolean $greater(SecondarySort arg0) {

// TODO Auto-generated method stub
if(this.first>arg0.getFirst()){
return true;
}else if(this.first==arg0.getFirst()&&this.getSecond()>arg0.getSecond())
{
return true;
}

return false;
}

@Override
public boolean $greater$eq(SecondarySort arg0) {
// TODO Auto-generated method stub
if(this.$greater(arg0)){
return true;
}else if(this.first==arg0.getFirst()&&this.getSecond()==arg0.getSecond())
{
return true;
}

return false;
}
...

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
JavaPairRDD<SecondarySort, String> mapToPair = textFile.mapToPair(new PairFunction<String,SecondarySort,String>(){

@Override
public Tuple2<SecondarySort, String> call(String arg0) throws Exception {
// TODO Auto-generated method stub
String[] split = arg0.split(" ");
SecondarySort ss=new SecondarySort(Integer.valueOf(split[0]),Integer.valueOf(split[1]));
return new Tuple2<SecondarySort,String>(ss,arg0);

}

});
JavaRDD<String> map = mapToPair.sortByKey().map(new Function<Tuple2<SecondarySort,String>,String>(){

@Override
public String call(Tuple2<SecondarySort, String> arg0) throws Exception {
// TODO Auto-generated method stub
//过滤掉排序后自定义的KEY,保留排序的结果
return arg0._2;
}

});