pair RDD
键值对类型的RDD,一个重要的RDD
操作
reduceByKey(fun) groupByKey() mapValues(fun) flatMapValues(fun) keys() values() sortBykey() join cogroup leftOuterJoin rightOuterJoin
groupByKey数据分组
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public static void groupbykey(JavaSparkContext sc){ List<Tuple2<Integer,String>> asList = Arrays.asList(new Tuple2(100,"spark"),new Tuple2(100,"scala"),new Tuple2(90,"hadoop"),new Tuple2(90,"java")); JavaPairRDD<Integer, String> parallelizePairs = sc.parallelizePairs(asList, 1); parallelizePairs.groupByKey().foreach(new VoidFunction<Tuple2<Integer,Iterable<String>>>(){
@Override public void call(Tuple2<Integer, Iterable<String>> arg0) throws Exception { System.out.println(arg0); } }); }
(100,[spark, scala]) (90,[hadoop, java])
|
join 内连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public static void Join(JavaSparkContext sc){ List<Tuple2<Integer,String>> names = Arrays.asList(new Tuple2(1,"spark"),new Tuple2(2,"scala"),new Tuple2(3,"hadoop")); List<Tuple2<Integer,Integer>> scores = Arrays.asList(new Tuple2(1,100),new Tuple2(2,90),new Tuple2(3,90)); JavaPairRDD<Integer, String> p_names = sc.parallelizePairs(names); JavaPairRDD<Integer, Integer> p_scores = sc.parallelizePairs(scores); JavaPairRDD<Integer, Tuple2<String, Integer>> join = p_names.join(p_scores); join.foreach(new VoidFunction<Tuple2<Integer,Tuple2<String,Integer>>>(){
@Override public void call(Tuple2<Integer, Tuple2<String, Integer>> arg0) throws Exception { System.out.println(arg0._1+" "+arg0._2._1+" "+arg0._2._2); } }); }
1 spark 100 3 hadoop 90 2 scala 90
|
cogroup
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public static void cogroup(JavaSparkContext sc){ List<Tuple2<Integer,String>> names = Arrays.asList(new Tuple2(1,"spark"),new Tuple2(2,"scala"),new Tuple2(3,"hadoop")); List<Tuple2<Integer,Integer>> scores = Arrays.asList(new Tuple2(1,100),new Tuple2(2,90),new Tuple2(3,90)); JavaPairRDD<Integer, String> p_names = sc.parallelizePairs(names); JavaPairRDD<Integer, Integer> p_scores = sc.parallelizePairs(scores); p_names.cogroup(p_scores).foreach(new VoidFunction<Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>>>(){
@Override public void call(Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> arg0) throws Exception { System.out.println(arg0._1+" "+arg0._2._1+" "+arg0._2._2); } }); } 1 [spark] [100] 3 [hadoop] [90] 2 [scala] [90]
|
数据排序
sortBykey()函数接收ascending参数,默认为升序(true)
自定义排序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| class SecondarySort implements Serializable,Ordered<SecondarySort> 重写相关代码 @Override public boolean $greater(SecondarySort arg0) { if(this.first>arg0.getFirst()){ return true; }else if(this.first==arg0.getFirst()&&this.getSecond()>arg0.getSecond()) { return true; } return false; }
@Override public boolean $greater$eq(SecondarySort arg0) { if(this.$greater(arg0)){ return true; }else if(this.first==arg0.getFirst()&&this.getSecond()==arg0.getSecond()) { return true; } return false; } ...
|
测试代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| JavaPairRDD<SecondarySort, String> mapToPair = textFile.mapToPair(new PairFunction<String,SecondarySort,String>(){
@Override public Tuple2<SecondarySort, String> call(String arg0) throws Exception { String[] split = arg0.split(" "); SecondarySort ss=new SecondarySort(Integer.valueOf(split[0]),Integer.valueOf(split[1])); return new Tuple2<SecondarySort,String>(ss,arg0); } }); JavaRDD<String> map = mapToPair.sortByKey().map(new Function<Tuple2<SecondarySort,String>,String>(){
@Override public String call(Tuple2<SecondarySort, String> arg0) throws Exception { return arg0._2; } });
|