博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RDD基本操作(下)
阅读量:4259 次
发布时间:2019-05-26

本文共 9047 字,大约阅读时间需要 30 分钟。

上一篇里我提到可以把RDD当作一个数组,这样我们在学习spark的API时候很多问题就能很好理解了。上篇文章里的API也都是基于RDD是数组的数据模型而进行操作的。

  Spark是一个计算框架,是对mapreduce计算框架的改进,mapreduce计算框架是基于键值对也就是map的形式,之所以使用键值对是人们发现世界上大部分计算都可以使用map这样的简单计算模型进行计算。但是Spark里的计算模型却是数组形式,RDD如何处理Map的数据格式了?本篇文章就主要讲解RDD是如何处理Map的数据格式。

  Pair RDD及键值对RDD,Spark里创建Pair RDD也是可以通过两种途径,一种是从内存里读取,一种是从文件读取。

  首先是从文件读取,上篇里我们看到使用textFile方法读取文件,读取的文件是按行组织成一个数组,要让其变成map格式就的进行转化,代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/*
 
* 测试文件数据:
 
* x01,1,4
   
x02,11,1
x01,3,9
x01,2,6
   
x02,18,12
   
x03,7,9
 
*
 
* */
val
rddFile
:
RDD[(String,String)]
=
sc.textFile(
"file:///F:/sparkdata01.txt"
,
1
).map { x
=
> (x.split(
","
)(
0
),x.split(
","
)(
1
) +
","
+ x.split(
","
)(
2
)) }
val
rFile
:
RDD[String]
=
rddFile.keys
println(
"=========createPairMap File========="
)
println(rFile.collect().mkString(
","
))
// x01,x02,x01,x01,x02,x03
println(
"=========createPairMap File========="
)

  我们由此可以看到以读取文件方式构造RDD,我们需要使用map函数进行转化,让其变成map的形式。

  下面是通过内存方式进行创建,代码如下:

1
2
3
4
5
val
rdd
:
RDD[(String,Int)]
=
sc.makeRDD(List((
"k01"
,
3
),(
"k02"
,
6
),(
"k03"
,
2
),(
"k01"
,
26
)))
val
r
:
RDD[(String,Int)]
=
rdd.reduceByKey((x,y)
=
> x + y)
println(
"=========createPairMap========="
)
println(r.collect().mkString(
","
))
// (k01,29),(k03,2),(k02,6)
println(
"=========createPairMap========="
)

  RDD任然是数组形式,只不过数组的元素是("k01",3)格式是scala里面特有的Tuple2及二元组,元组可以当作一个集合,这个集合可以是各种不同数据类型组合而成,二元组就是只包含两个元素的元组。

  由此可见Pair RDD也是数组,只不过是一个元素为二元组的数组而已,上篇里对RDD的操作也是同样适用于Pair RDD的。

  下面是Pair RDD的API讲解,同样我们先说转化操作的API:

1
2
3
4
5
6
7
8
9
10
11
12
13
reduceByKey:合并具有相同键的值;
groupByKey:对具有相同键的值进行分组;
keys:返回一个仅包含键值的RDD;
values:返回一个仅包含值的RDD;
sortByKey:返回一个根据键值排序的RDD;
flatMapValues:针对Pair RDD中的每个值应用一个返回迭代器的函数,然后对返回的每个元素都生成一个对应原键的键值对记录;
mapValues:对Pair RDD里每一个值应用一个函数,但是不会对键值进行操作;
combineByKey:使用不同的返回类型合并具有相同键的值;
subtractByKey:操作的RDD我们命名为RDD
1
,参数RDD命名为参数RDD,剔除掉RDD
1
里和参数RDD中键相同的元素;
join:对两个RDD进行内连接;
rightOuterJoin:对两个RDD进行连接操作,第一个RDD的键必须存在,第二个RDD的键不再第一个RDD里面有那么就会被剔除掉,相同键的值会被合并;
leftOuterJoin:对两个RDD进行连接操作,第二个RDD的键必须存在,第一个RDD的键不再第二个RDD里面有那么就会被剔除掉,相同键的值会被合并;
cogroup:将两个RDD里相同键的数据分组在一起

  下面就是行动操作的API了,具体如下:

1
2
3
countByKey:对每个键的元素进行分别计数;
collectAsMap:将结果变成一个map;
lookup:在RDD里使用键值查找数据

  接下来我再提提那些不是很常用的RDD操作,具体如下:

  转化操作的:

1
sample
:
对RDD采样;

  行动操作:

1
2
3
4
5
take(num)
:
返回RDD里num个元素,随机的;
top(num)
:
返回RDD里最前面的num个元素,这个方法实用性还比较高;
takeSample:从RDD里返回任意一些元素;
sample:对RDD里的数据采样;
takeOrdered:从RDD里按照提供的顺序返回最前面的num个元素

  接下来就是示例代码了,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package
cn.com.sparktest
 
import
org.apache.spark.SparkConf
import
org.apache.spark.SparkConf
import
org.apache.spark.SparkContext
import
org.apache.spark.SparkContext.
_
import
org.apache.spark.rdd.RDD
import
org.apache.spark.util.collection.CompactBuffer
 
object
SparkPairMap {
   
  
val
conf
:
SparkConf
=
new
SparkConf().setAppName(
"spark pair map"
).setMaster(
"local[2]"
)
  
val
sc
:
SparkContext
=
new
SparkContext(conf)
  
  
/**
   
* 构建Pair RDD
   
*/
  
def
createPairMap()
:
Unit
=
{
    
val
rdd
:
RDD[(String,Int)]
=
sc.makeRDD(List((
"k01"
,
3
),(
"k02"
,
6
),(
"k03"
,
2
),(
"k01"
,
26
)))
    
val
r
:
RDD[(String,Int)]
=
rdd.reduceByKey((x,y)
=
> x + y)
    
println(
"=========createPairMap========="
)
    
println(r.collect().mkString(
","
))
// (k01,29),(k03,2),(k02,6)
    
println(
"=========createPairMap========="
)
     
    
/*
     
* 测试文件数据:
     
* x01,1,4
             
x02,11,1
             
x01,3,9
             
x01,2,6
       
x02,18,12
       
x03,7,9
     
*
     
* */
    
val
rddFile
:
RDD[(String,String)]
=
sc.textFile(
"file:///F:/sparkdata01.txt"
,
1
).map { x
=
> (x.split(
","
)(
0
),x.split(
","
)(
1
) +
","
+ x.split(
","
)(
2
)) }
    
val
rFile
:
RDD[String]
=
rddFile.keys
    
println(
"=========createPairMap File========="
)
    
println(rFile.collect().mkString(
","
))
// x01,x02,x01,x01,x02,x03
    
println(
"=========createPairMap File========="
)
  
}
   
  
/**
   
* 关于Pair RDD的转化操作和行动操作
   
*/
  
def
pairMapRDD(path
:
String)
:
Unit
=
{
    
val
rdd
:
RDD[(String,Int)]
=
sc.makeRDD(List((
"k01"
,
3
),(
"k02"
,
6
),(
"k03"
,
2
),(
"k01"
,
26
)))
    
val
other
:
RDD[(String,Int)]
=
sc.parallelize(List((
"k01"
,
29
)),
1
)
     
    
// 转化操作
    
val
rddReduce
:
RDD[(String,Int)]
=
rdd.reduceByKey((x,y)
=
> x + y)
    
println(
"====reduceByKey===:"
+ rddReduce.collect().mkString(
","
))
// (k01,29),(k03,2),(k02,6)
    
val
rddGroup
:
RDD[(String,Iterable[Int])]
=
rdd.groupByKey()
    
println(
"====groupByKey===:"
+ rddGroup.collect().mkString(
","
))
// (k01,CompactBuffer(3, 26)),(k03,CompactBuffer(2)),(k02,CompactBuffer(6))
    
val
rddKeys
:
RDD[String]
=
rdd.keys
    
println(
"====keys=====:"
+ rddKeys.collect().mkString(
","
))
// k01,k02,k03,k01
    
val
rddVals
:
RDD[Int]
=
rdd.values
    
println(
"======values===:"
+ rddVals.collect().mkString(
","
))
// 3,6,2,26
    
val
rddSortAsc
:
RDD[(String,Int)]
=
rdd.sortByKey(
true
,
1
)
    
val
rddSortDes
:
RDD[(String,Int)]
=
rdd.sortByKey(
false
,
1
)
    
println(
"====rddSortAsc=====:"
+ rddSortAsc.collect().mkString(
","
))
// (k01,3),(k01,26),(k02,6),(k03,2)
    
println(
"======rddSortDes=====:"
+ rddSortDes.collect().mkString(
","
))
// (k03,2),(k02,6),(k01,3),(k01,26)
    
val
rddFmVal
:
RDD[(String,Int)]
=
rdd.flatMapValues { x
=
> List(x +
10
) }
    
println(
"====flatMapValues===:"
+ rddFmVal.collect().mkString(
","
))
// (k01,13),(k02,16),(k03,12),(k01,36)
    
val
rddMapVal
:
RDD[(String,Int)]
=
rdd.mapValues { x
=
> x +
10
}
    
println(
"====mapValues====:"
+ rddMapVal.collect().mkString(
","
))
// (k01,13),(k02,16),(k03,12),(k01,36)
    
val
rddCombine
:
RDD[(String,(Int,Int))]
=
rdd.combineByKey(x
=
> (x,
1
), (param
:
(Int,Int),x)
=
> (param.
_
1
+ x,param.
_
2
+
1
), (p
1
:
(Int,Int),p
2
:
(Int,Int))
=
> (p
1
.
_
1
+ p
2
.
_
1
,p
1
.
_
2
+ p
2
.
_
2
))
    
println(
"====combineByKey====:"
+ rddCombine.collect().mkString(
","
))
//(k01,(29,2)),(k03,(2,1)),(k02,(6,1))
    
val
rddSubtract
:
RDD[(String,Int)]
=
rdd.subtractByKey(other);
    
println(
"====subtractByKey====:"
+ rddSubtract.collect().mkString(
","
))
// (k03,2),(k02,6)
    
val
rddJoin
:
RDD[(String,(Int,Int))]
=
rdd.join(other)
    
println(
"=====rddJoin====:"
+ rddJoin.collect().mkString(
","
))
// (k01,(3,29)),(k01,(26,29))
    
val
rddRight
:
RDD[(String,(Option[Int],Int))]
=
rdd.rightOuterJoin(other)
    
println(
"====rightOuterJoin=====:"
+ rddRight.collect().mkString(
","
))
// (k01,(Some(3),29)),(k01,(Some(26),29))
    
val
rddLeft
:
RDD[(String,(Int,Option[Int]))]
=
rdd.leftOuterJoin(other)
    
println(
"=====rddLeft=====:"
+ rddLeft.collect().mkString(
","
))
// (k01,(3,Some(29))),(k01,(26,Some(29))),(k03,(2,None)),(k02,(6,None))
    
val
rddCogroup
:
RDD[(String, (Iterable[Int], Iterable[Int]))]
=
rdd.cogroup(other)
    
println(
"=====cogroup=====:"
+ rddCogroup.collect().mkString(
","
))
// (k01,(CompactBuffer(3, 26),CompactBuffer(29))),(k03,(CompactBuffer(2),CompactBuffer())),(k02,(CompactBuffer(6),CompactBuffer()))
     
    
// 行动操作
    
val
resCountByKey
=
rdd.countByKey()
    
println(
"=====countByKey=====:"
+ resCountByKey)
// Map(k01 -> 2, k03 -> 1, k02 -> 1)
    
val
resColMap
=
rdd.collectAsMap()
    
println(
"=====resColMap=====:"
+ resColMap)
//Map(k02 -> 6, k01 -> 26, k03 -> 2)
    
val
resLookup
=
rdd.lookup(
"k01"
)
    
println(
"====lookup===:"
+ resLookup)
// WrappedArray(3, 26)
  
}
   
  
/**
   
* 其他一些不常用的RDD操作
   
*/
  
def
otherRDDOperate(){
    
val
rdd
:
RDD[(String,Int)]
=
sc.makeRDD(List((
"k01"
,
3
),(
"k02"
,
6
),(
"k03"
,
2
),(
"k01"
,
26
)))
     
    
println(
"=====first=====:"
+ rdd.first())
//(k01,3)
    
val
resTop
=
rdd.top(
2
).map(x
=
> x.
_
1
+
";"
+ x.
_
2
)
    
println(
"=====top=====:"
+ resTop.mkString(
","
))
// k03;2,k02;6
    
val
resTake
=
rdd.take(
2
).map(x
=
> x.
_
1
+
";"
+ x.
_
2
)
    
println(
"=======take====:"
+ resTake.mkString(
","
))
// k01;3,k02;6
    
val
resTakeSample
=
rdd.takeSample(
false
,
2
).map(x
=
> x.
_
1
+
";"
+ x.
_
2
)
    
println(
"=====takeSample====:"
+ resTakeSample.mkString(
","
))
// k01;26,k03;2
    
val
resSample
1
=
rdd.sample(
false
,
0.25
)
    
val
resSample
2
=
rdd.sample(
false
,
0.75
)
    
val
resSample
3
=
rdd.sample(
false
,
0.5
)
    
println(
"=====sample======:"
+ resSample
1
.collect().mkString(
","
))
// 无
    
println(
"=====sample======:"
+ resSample
2
.collect().mkString(
","
))
// (k01,3),(k02,6),(k01,26)
    
println(
"=====sample======:"
+ resSample
3
.collect().mkString(
","
))
// (k01,3),(k01,26)
  
}
   
  
def
main(args
:
Array[String])
:
Unit
=
{
    
createPairMap()
    
pairMapRDD(
"file:///F:/sparkdata01.txt"
)
    
otherRDDOperate()
  
}
   
}

  本篇到此就将我知道的spark的API全部讲完了,两篇文章里的示例代码都是经过测试的,可以直接运行,大家在阅读代码时候最好注意这个特点:我在写RDD转化代码时候都是很明确的写上了转化后的RDD的数据类型,这样做的目的就是让读者更加清晰的认识不同RDD转化后的数据类型,这点在实际开发里非常重要,在实际的计算里我们经常会不同的计算算法不停的转化RDD的数据类型,而使用scala开发spark程序时候,我发现scala和javascript很类似,我们不去指定返回值数据类型,scala编译器也会自动推算结果的数据类型,因此编码时候我们可以不指定具体数据类型。这个特点就会让我们在实际开发里碰到种种问题,因此我在示例代码里明确了RDD转化后的数据类型。

  在使用Pair RDD时候,我们要引入:

1
import
org.apache.spark.SparkContext.
_

  否则代码就有可能报错,说找不到对应的方法,这个引入就是scala里导入的隐世类型转化的功能,原理和上段文字说到的内容差不多。

      开发spark程序不仅仅只可以使用scala,还可以使用python,java,不过scala使用起来更加方便,spark的API简单清晰,这样的编程大大降低了原先使用mapreduce编程的难度,但是如果我们要深入掌握这些API那么就要更加深入的学习下scala。下一篇我就根据spark里RDD的API讲解一些scala的语法,通过这些语法让我们更好的掌握Spark的API。

转载地址:http://dylei.baihongyu.com/

你可能感兴趣的文章
js 实现banner轮播
查看>>
Java实现地址解析为经纬度
查看>>
Java基础---@XmlRootElement 注解对象
查看>>
MyBatis的传入参数parameterType类型
查看>>
MyBatis的返回参数类型
查看>>
Spring多例模式 注解@Scope("prototype")和xml配置scope="prototype"
查看>>
APP登录----伪sessionId设计登录
查看>>
从hashcode()和equals()来对hashset对象去重做设计
查看>>
记录----一个连接oracle的jdbc测试
查看>>
springboot - 集成redis完整代码实现
查看>>
springboot - 集成kafka完整代码实现
查看>>
springboot - 集成jdk 自带webservice
查看>>
springboot - 集成MongoDB实现
查看>>
Tiny4412标准版,编译u-boot并烧录到SD卡,从SD卡启动后只打印‘OK’两个字符
查看>>
MTK LCM驱动移植
查看>>
MTK TP驱动移植
查看>>
MTK 电池曲线配置
查看>>
MTK Camera驱动移植
查看>>
MTK 前后使用相同类型的Camera
查看>>
Tiny4412 led之NDK JNI实现
查看>>