当前位置:首页 > 竞技风云 > 正文内容

combinebykey 浅谈Spark RDD API中的Map和Reduce

admin1周前 (06-15)竞技风云9

combinebykey 浅谈Spark RDD API中的Map和Reduce

RDD是什么?

RDD是Spark中的抽象数据结构类型,任何数据在Spark中都被表示为RDD。从编程的角度来看,RDD可以简单看成是一个数组。和普通数组的区别是combinebykey,RDD中的数据是分区存储的,这样不同分区的数据就可以分布在不同的机器上,同时可以被并行处理。因此,Spark应用程序所做的无非是把需要处理的数据转换为RDD,然后对RDD进行一系列的变换和操作从而得到结果。本文为第一部分,将介绍Spark RDD中与Map和相关的API中。

如何创建RDD?

RDD可以从普通数组创建出来,也可以从文件系统或者HDFS中的文件创建出来。

举例:从普通数组创建RDD,里面包含了1到9这9个数字,它们分别在3个分区中。

scala> val a = sc.parallelize(1 to 9, 3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at :12

举例:读取文件.md来创建RDD,文件中的每一行就是RDD中的一个元素

scala> val b = sc.textFile("README.md")
b: org.apache.spark.rdd.RDD[String] = MappedRDD[3] at textFile at :12

虽然还有别的方式可以创建RDDcombinebykey,但在本文中我们主要使用上述两种方式来创建RDD以说明RDD的API。

map

map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。

举例:

scala> val a = sc.parallelize(1 to 9, 3)
scala> val b = a.map(x => x*2)
scala> a.collect
res10: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
scala> b.collect
res11: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)

上述例子中把原RDD中每个元素都乘以2来产生一个新的RDD。

是map的一个变种。map的输入函数是应用于RDD中每个元素,而的输入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理的。

它的函数定义为:

def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

f即为输入函数,它处理每个分区里面的内容。每个分区中的内容将以传递给输入函数f,f的输出结果是。最终的RDD由所有分区经过输入函数处理后的结果合并起来的。

举例:

scala> val a = sc.parallelize(1 to 9, 3)
scala> def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
  var res = List[(T, T)]() 
  var pre = iter.next while (iter.hasNext) {
    val cur = iter.next; 

combinebykey 浅谈Spark RDD API中的Map和Reduce 第1张

res .::= (pre, cur) pre = cur; } res.iterator } scala> a.mapPartitions(myfunc).collect res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))

上述例子中的函数是把分区中一个元素和它的下一个元素组成一个Tuple。因为分区中最后一个元素没有下一个元素了,所以(3,4)和(6,7)不在结果中。

还有些变种,比如text,它能把处理过程中的一些状态信息传递给用户指定的输入函数。还有ex,它能把分区的index传递给用户指定的输入函数。

顾名思义就是输入函数应用于RDD中Kev-Value的Value,原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素。因此,该函数只适用于元素为KV对的RDD。

举例:

scala> val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", " eagle"), 2)
scala> val b = a.map(x => (x.length, x))
scala> b.mapValues("x" + _ + "x").collect
res5: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx),(3,xcatx), (7,xpantherx), (5,xeaglex))

是map的另外一个变种,map只需要一个输入函数,而有两个输入函数。它的定义如下:

def mapWith[A: ClassTag, U: ](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => U): RDD[U]

第一个函数是把RDD的 index(index从0开始)作为输入,输出为新类型A;

第二个函数f是把二元组(T, A)作为输入(其中T为原RDD中的元素,A为第一个函数的输出),输出类型为U。

举例:把 index 乘以10,然后加上2作为新的RDD的元素。

val x = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3) 
x.mapWith(a => a * 10)((a, b) => (b + 2)).collect 
res4: Array[Int] = Array(2, 2, 2, 12, 12, 12, 22, 22, 22, 22)

与map类似,区别是原RDD中的元素经map处理后只能生成一个元素,而原RDD中的元素经处理后可生成多个元素来构建新RDD。

举例:对原RDD中的每个元素x产生y个元素(从1到y,y为元素x的值)

scala> val a = sc.parallelize(1 to 4, 2)
scala> val b = a.flatMap(x => 1 to x)
scala> b.collect
res12: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4)

与很类似,都是接收两个函数,一个函数把作为输入,输出是一个新类型A;另外一个函数是以二元组(T,A)作为输入,输出为一个序列,这些序列里面的元素组成了新的RDD。它的定义如下:

def flatMapWith[A: ClassTag, U: ClassTag](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => Seq[U]): RDD[U]

举例:

scala> val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3)
scala> a.flatMapWith(x => x, true)((x, y) => List(y, x)).collect
res58: Array[Int] = Array(0, 1, 0, 2, 0, 3, 1, 4, 1, 5, 1, 6, 2, 7, 2,
8, 2, 9)

类似于,不同的在于应用于元素为KV对的RDD中Value。每个一元素的Value被输入函数映射为一系列的值combinebykey 浅谈Spark RDD API中的Map和Reduce,然后这些值再与原RDD中的Key组成一系列新的KV对。

举例

scala> val a = sc.parallelize(List((1,2),(3,4),(3,6)))
scala> val b = a.flatMapValues(x=>x.to(5))
scala> b.collect
res3: Array[(Int, Int)] = Array((1,2), (1,3), (1,4), (1,5), (3,4), (3,5))

上述例子中原RDD中每个元素的值被转换为一个序列(从其当前值到5),比如第一个KV对(1,2), 其值2被转换为2combinebykey 浅谈Spark RDD API中的Map和Reduce,3,4,5。然后其再与原KV对中Key组成一系列新的KV对(1,2),(1,3),(1,4),(1,5)。

将RDD中元素两两传递给输入函数,同时产生一个新的值combinebykey,新产生的值与RDD中下一个元素再被传递给输入函数直到最后只有一个值为止。

举例

scala> val c = sc.parallelize(1 to 10)
scala> c.reduce((x, y) => x + y)
res4: Int = 55

上述例子对RDD中的元素求和。

顾名思义,就是对元素为KV对的RDD中Key相同的元素的Value进行,因此,Key相同的多个元素的值被为一个值,然后与原RDD中的Key组成一个新的KV对。

举例:

scala> val a = sc.parallelize(List((1,2),(3,4),(3,6)))
scala> a.reduceByKey((x,y) => x + y).collect
res7: Array[(Int, Int)] = Array((1,2), (3,10))

上述例子中,对Key相同的元素的值求和,因此Key为3的两个元素被转为了(3,10)。

本文中的部分例子来自:

总结

以上就是本文关于浅谈Spark RDD API中的Map和的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站:Spark三种属性配置方式详解、浅谈七种常见的和Spark项目案例等,有什么问题可以随时留言,小编会及时回复大家的。感谢朋友们对本站的支持!

加入微信交流群:************ ,请猛戳这里→点击入群

扫描二维码推送至手机访问。

版权声明:本文由前沿科技娱乐汇发布,如需转载请注明出处。

本文链接:https://www.kejiyl.com/post/5241.html

分享给朋友:

“combinebykey 浅谈Spark RDD API中的Map和Reduce” 的相关文章

南极冰川加速融化的危害

南极冰川加速融化的危害

在当今时代,南极冰川的加速融化成为了全球瞩目的焦点,它如同一个巨大的警钟,敲响在我们每一个人的心头,昭示着全球气候危机的日益加剧。南极,这片被冰雪覆盖的神秘大陆,拥有着地球上最庞大的冰川体系。随着人类活动的不断推进,尤其是工业革命以来温室气体的大量排放,南极冰川正以惊人的速度消融。冰层的融化不仅导致...

运动员服用兴奋剂的法律后果是什么

运动员服用兴奋剂的法律后果是什么

在体育的浩瀚星空中,运动员们如同璀璨的星辰,以汗水、拼搏和坚韧诠释着体育精神的真谛。当某运动员因服用兴奋剂而被禁赛的消息传来,那如同一道刺眼的闪电,瞬间划破了体育世界的宁静,让我们不禁要问:体育精神何在?体育精神,是超越自我的追求。每一位优秀的运动员,都在赛场上不断挑战着自己的极限,追求着更高、更快...

中国女排奥运收官战

中国女排奥运收官战

在体育的浩瀚星空中,中国女排犹如一颗璀璨的巨星,始终散发着耀眼的光芒。曾经,她们以顽强的拼搏精神和卓越的竞技水平,多次登上世界之巅,为祖国赢得了无数的荣誉。如今,中国女排再次重回巅峰,剑指奥运冠军,她们的征程令人期待,她们的故事激励着无数人。女排精神,是中国女排的灵魂。这种精神蕴含着坚韧不拔、团结协...

《神武 6》宠物养成竞技攻略,打造赛场神宠

《神武 6》宠物养成竞技攻略,打造赛场神宠

在《神武 6》的世界中,宠物不仅是玩家的伙伴,更是在竞技场上展现实力的重要力量。如何养成一只赛场神宠,成为众多玩家关注的焦点。本文将为大家详细介绍《神武 6》宠物养成竞技攻略,帮助你打造出令人瞩目的赛场神宠。一、宠物选择在开始宠物养成之前,首先要选择一只适合自己的宠物。不同的宠物拥有不同的技能和属性...

《DOTA2》职业选手直播爆料行业内幕,引发热议

《DOTA2》职业选手直播爆料行业内幕,引发热议

《DOTA2》作为一款备受全球玩家喜爱的电竞游戏,其职业选手的一举一动都备受关注。近日,一位知名的 DOTA2 职业选手在直播中爆料了行业内幕,瞬间引发了轩然,成为了电竞圈热议的话题。这位职业选手在直播中透露了许多鲜为人知的事情,其中包括俱乐部之间的转会操作、赛事主办方的一些不规范行为以及选手们在幕...

《英雄联盟》玩家举办克隆大作战竞技比赛,欢乐无限

《英雄联盟》玩家举办克隆大作战竞技比赛,欢乐无限

在《英雄联盟》的世界里,有一种独特的游戏模式——克隆大作战,它以其欢乐无限的特点,吸引了无数玩家的参与和喜爱。近日,一场盛大的克隆大作战竞技比赛在各地玩家的期待中拉开了帷幕,让我们一同走进这场充满与欢笑的赛事。比赛现场气氛热烈,玩家们早早地聚集在一起,期待着即将开始的激烈对决。他们身着各自喜欢的英雄...