Spark中的函数 - cogroup和join

Posted by Julius on October 5, 2016

一、cogroup

1.处理两个RDD中的Key-Value元素,每个RDD中相同Key中的元素分别聚合成一个集合。与reduceByKey不同的是针对两个RDD中相同的key的元素进行合并。

2.cogroup的参数可以是1个或者多个RDD。

例如:

var rdd3 = rdd1.cogroup(rdd2)
var rdd4 = rdd1.cogroup(rdd2,rdd3)

3.cogroup相当于SQL中的全外关联full join(full outer join),返回左右RDD中的记录,关联不上的为空。

测试例子

val DBName=Array(  
  Tuple2(1,"Spark"),  
  Tuple2(2,"Hadoop"),  
  Tuple2(3,"Kylin"),  
  Tuple2(4,"Flink")
)  

val numType=Array(  
  Tuple2(1,"String"),  
  Tuple2(2,"int"),  
  Tuple2(3,"byte"),  
  Tuple2(4,"boolean"),  
  Tuple2(5,"float"),  
  Tuple2(1,"34"),  
  Tuple2(1,"45"),  
  Tuple2(2,"47"),  
  Tuple2(3,"75"),  
  Tuple2(4,"95"),  
  Tuple2(5,"16"),  
  Tuple2(1,"85")  
)  

val names=sc.parallelize(DBName)  
val types=sc.parallelize(numType)  
val nameAndType=names.cogroup(types)  //基于Key进行join, 结果并没有顺序  
nameAndType.collect.foreach(println)

输出的结果:

(4,(CompactBuffer(Flink),CompactBuffer(boolean, 95)))
(1,(CompactBuffer(Spark),CompactBuffer(String, 34, 45, 85)))
(3,(CompactBuffer(Kylin),CompactBuffer(byte, 75)))
(5,(CompactBuffer(),CompactBuffer(float, 16))) // DBName中没有Key为5的Tuple,numType中Key为5的Tuple (2,(CompactBuffer(Hadoop),CompactBuffer(int, 47)))

二、join

1.只返回两个RDD根据Key可以关联上的结果。 2.join只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可。 3.join相当于SQL中的内关联inner join

测试例子

var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2)
 
scala> rdd1.join(rdd2).collect

输出的结果:

res10: Array[(String, (String, String))] = Array((A,(1,a)), (C,(3,c)))


10/5/2016 10:34:30 AM

This work is licensed under a CC A-S 4.0 International License.