[代码重构](master): 新坑 Spark基础

Spark 基础总结1
master
土豆兄弟 2 years ago
parent f84029c438
commit 9afa3ba4e4

@ -167,20 +167,172 @@ wordCounts.map{case (k, v) => (v, k)}.sortByKey(false).take(5)
- ![RDD算子归类](pic/RDD算子归类.png) - ![RDD算子归类](pic/RDD算子归类.png)
- 参考: https://spark.apache.org/docs/latest/rdd-programming-guide.html - 参考: https://spark.apache.org/docs/latest/rdd-programming-guide.html
### 2.3 RDD常用算子RDD内部的数据转换
- 重点讲解的就是 map、mapPartitions、flatMap、filter。这 4 个算子几乎囊括了日常开发中 99% 的数据转换场景
- ![RDD算子分类表](pic/RDD算子分类表.png)
#### A. 创建 RDD
- 在 Spark 中,创建 RDD 的典型方式有两种:
- 通过 SparkContext.parallelize 在内部数据之上创建 RDD
- 通过 SparkContext.textFile 等 API 从外部数据创建 RDD。
- 这里的内部、外部是相对应用程序来说的。开发者在 Spark 应用中自定义的各类数据结构,如数组、列表、映射等,都属于“内部数据”;
- 而“外部数据”指代的,是 Spark 系统之外的所有数据形式如本地文件系统或是分布式文件系统中的数据再比如来自其他大数据组件Hive、Hbase、RDBMS 等)的数据。
- 第一种创建方式的用法非常简单,只需要用 parallelize 函数来封装内部数据即可,比如下面的例子:
```scala
import org.apache.spark.rdd.RDD
val words: Array[String] = Array("Spark", "is", "cool")
val rdd: RDD[String] = sc.parallelize(words)
```
- 通常来说,在 Spark 应用内定义体量超大的数据集,其实都是不太合适的,因为数据集完全由 Driver 端创建,且创建完成后,还要在全网范围内跨节点、跨进程地分发到其他
Executors所以往往会带来性能问题。因此parallelize API 的典型用法,是在“小数据”之上创建 RDD。
- 要想在真正的“大数据”之上创建 RDD我们还得依赖第二种创建方式也就是通过SparkContext.textFile 等 API 从外部数据创建 RDD。由于 textFile API 比较简单,而且
它在日常的开发中出现频率比较高,因此我们使用 textFile API 来创建 RDD。在后续对各类 RDD 算子讲解的过程中,我们都会使用 textFile API 从文件系统创建 RDD。
```scala
import org.apache.spark.rdd.RDD
val rootPath: String = _
val file: String = s"${rootPath}/wikiOfSpark.txt"
// 读取文件内容
val lineRDD: RDD[String] = spark.sparkContext.textFile(file)
```
#### B. RDD 内的数据转换
##### 1. map以元素为粒度的数据转换
- map 算子的用法:给定映射函数 fmap(f) 以元素为粒度对 RDD 做数据转换。其中 f 可以是带有明确签名的带名函数也可以是匿名函数它的形参类型必须与RDD 的元素类型保持一致,而输出类型则任由开发者自行决定
- 把包含单词的 RDD 转换成元素为KeyValue对的 RDD后者统称为 Paired RDD。
```scala
// 把普通RDD转换为Paired RDD
val cleanWordRDD: RDD[String] = _ // 请参考第一讲获取完整代码
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, 1))
```
- 上面的代码实现中,传递给 map 算子的形参word => word1就是我们上面说的映射函数 f。只不过这里 f 是以匿名函数的方式进行定义的,其中左侧的 word表示匿名函数 f 的输入形参而右侧的word1则代表函数 f 的输出结果。
- 如果我们把匿名函数变成带名函数的话,可能你会看的更清楚一些。用一段代码重新定义了带名函数 f。
```scala
// 把RDD元素转换为KeyValue的形式
// 定义映射函数f
def f(word: String): (String, Int) = {
return (word, 1)
}
val cleanWordRDD: RDD[String] = _ // 请参考第一讲获取完整代码
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(f)
````
- 使用 Scala 的 def 语法,明确定义了带名映射函数 f它的计算逻辑与刚刚的匿名函数是一致的。在做 RDD 数据转换的时候,我们只需把函数 f 传递给 map 算子即可。不管 f 是匿名函数还是带名函数map 算子的转换逻辑都是一样的
- 通过定义如下的映射函数 f我们就可以改写 Word Count 的计数逻辑也就是把“Spark”这个单词的统计计数权重提高一倍
```scala
// 把RDD元素转换为KeyValue的形式
// 定义映射函数f
def f(word: String): (String, Int) = {
if (word.equals("Spark")) { return (word, 2) }
return (word, 1)
}
val cleanWordRDD: RDD[String] = _ // 请参考第一讲获取完整代码
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(f)
```
- map(f) 是以元素为粒度对 RDD 做数据转换的,在某些计算场景下,这个特点会严重影响执行效率。为什么这么说呢?
- 比方说,我们把 Word Count 的计数需求,从原来的对单词计数,改为对单词的哈希值计数,在这种情况下,我们的代码实现需要做哪些改动呢?
```scala
// 把普通RDD转换为Paired RDD
import java.security.MessageDigest
val cleanWordRDD: RDD[String] = _ // 请参考第一讲获取完整代码
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map{ word =>
// 获取MD5对象实例
val md5 = MessageDigest.getInstance("MD5")
// 使用MD5计算哈希值
val hash = md5.digest(word.getBytes).mkString
// 返回哈希值与数字1的Pair
(hash, 1)
}
```
- 由于 map(f) 是以元素为单元做转换的,那么对于 RDD 中的每一条数据记录,我们都需要实例化一个 MessageDigest 对象来计算这个元素的哈希值。
- 在工业级生产系统中,一个 RDD 动辄包含上百万甚至是上亿级别的数据记录,如果处理每条记录都需要事先创建 MessageDigest那么实例化对象的开销就会聚沙成塔不知不觉地成为影响执行效率的罪魁祸首。
- 有没有什么办法,能够让 Spark 在更粗的数据粒度上去处理数据呢?
- 还真有mapPartitions 和 mapPartitionsWithIndex 这对“孪生兄弟”就是用来解决类似的问题。相比 mapPartitionsmapPartitionsWithIndex 仅仅多出了一个数据分区索引
##### 2. mapPartitions以数据分区为粒度的数据转换
- mapPartitions顾名思义就是以数据分区为粒度使用映射函数 f 对 RDD 进行数据转换。
- 对于上述单词哈希值计数的例子,我们结合后面的代码,来看看如何使用 mapPartitions 来改善执行性能:
```scala
// 把普通RDD转换为Paired RDD
import java.security.MessageDigest
val cleanWordRDD: RDD[String] = _ // 请参考第一讲获取完整代码
val kvRDD: RDD[(String, Int)] = cleanWordRDD.mapPartitions( partition => {
// 注意这里是以数据分区为粒度获取MD5对象实例
val md5 = MessageDigest.getInstance("MD5")
val newPartition = partition.map( word => {
// 在处理每一条数据记录的时候可以复用同一个Partition内的MD5对象
md5.digest(word.getBytes).mkString
})
newPartition
})
```
- 在上面的改进代码中mapPartitions 以数据分区匿名函数的形参partition为粒度对 RDD 进行数据转换。具体的数据处理逻辑,则由代表数据分区的形参 partition 进一步调用 map(f) 来完成。
- 你可能会说“partition. map(f) 仍然是以元素为粒度做映射呀!这和前一个版本的实现,有什么本质上的区别呢?”
- 仔细观察,你就会发现,相比前一个版本,我们把实例化 MD5 对象的语句挪到了 map 算子之外。如此一来,以数据分区为单位,实例化对象的操作只需要执行一次,而同一个数据分区中所有的数据记录,都可以共享该 MD5 对象,从而完成单词到哈希值的转换。
- 通过下图的直观对比你会发现以数据分区为单位mapPartitions 只需实例化一次MD5 对象,而 map 算子却需要实例化多次,具体的次数则由分区内数据记录的数量来决定。
- ![map与mapPartitions的区别](pic/map与mapPartitions的区别.png)
- 对于一个有着上百万条记录的 RDD 来说,其数据分区的划分往往是在百这个量级,因此,相比 map 算子mapPartitions 可以显著降低对象实例化的计算开销,这对于 Spark 作业端到端的执行性能来说,无疑是非常友好的。
- 实际上。除了计算哈希值以外对于数据记录来说凡是可以共享的操作都可以用mapPartitions 算子进行优化。
- 这样的共享操作还有很多,比如创建用于连接远端数据库的 Connections 对象,或是用于连接 Amazon S3 的文件系统句柄,再比如用于在线推理的机器学习模型,等等,不一而足。
- 相比 mapPartitionsmapPartitionsWithIndex 仅仅多出了一个数据分区索引,这个数据分区索引可以为我们获取分区编号,当你的业务逻辑中需要使用到分区编号的时候,不妨考虑使用这个算子来实现代码。
- 除了这个额外的分区索引以外mapPartitionsWithIndex在其他方面与 mapPartitions 是完全一样的。
##### 3. flatMap从元素到集合、再从集合到元素
- flatMap 其实和 map 与 mapPartitions 算子类似,在功能上,与 map 和 mapPartitions 一样flatMap 也是用来做数据映射的,在实现上,对于给定映射函数 fflatMap(f) 以元
素为粒度,对 RDD 进行数据转换。
- 不过与前两者相比flatMap 的映射函数 f 有着显著的不同。对于 map 和 mapPartitions 来说,其映射函数 f 的类型,都是(元素) => (元素),即元素到元
素。而 flatMap 映射函数 f 的类型,是(元素) => (集合),即元素到集合(如数组、 列表等。因此flatMap 的映射过程在逻辑上分为两步:
- 以元素为单位,创建集合;
- 去掉集合“外包装”,提取集合元素。
- 假设,我们再次改变 Word Count 的计算逻辑由原来统计单词的计数,改为统计相邻单词共现的次数,如下图所示:
- ![变更Word Count计算逻辑](pic/变更Word%20Count计算逻辑.png)
- 对于这样的计算逻辑,我们该如何使用 flatMap 进行实现呢?这里我们先给出代码实现,然后再分阶段地分析 flatMap 的映射过程:
```scala
// 读取文件内容
val lineRDD: RDD[String] = _ // 请参考第一讲获取完整代码
// 以行为单位提取相邻单词
val wordPairRDD: RDD[String] = lineRDD.flatMap( line => {
// 将行转换为单词数组
val words: Array[String] = line.split(" ")
// 将单个单词数组,转换为相邻单词数组
for (i <- 0 until words.length - 1) yield words(i) + "-" + words(i+1)
})
```
- 在上面的代码中,我们采用匿名函数的形式,来提供映射函数 f。这里 f 的形参是 String 类型的 line也就是源文件中的一行文本而 f 的返回类型是 Array[String],也就是
String 类型的数组。在映射函数 f 的函数体中,我们先用 split 语句把 line 转化为单词数组,然后再用 for 循环结合 yield 语句,依次把单个的单词,转化为相邻单词词对。
- 注意for 循环返回的依然是数组,也即类型为 Array[String]的词对数组。由此可见,函 数 f 的类型是String => Array[String]),也就是刚刚说的第一步,从元素到集
合。但如果我们去观察转换前后的两个 RDD也就是 lineRDD 和 wordPairRDD会发现它们的类型都是 RDD[String],换句话说,它们的元素类型都是 String。
- 回顾 map 与 mapPartitions 这两个算子,我们会发现,转换前后 RDD 的元素类型,与映射函数 f 的类型是一致的。但在 flatMap 这里,却出现了 RDD 元素类型与函数类型不一
致的情况。这是怎么回事呢?其实呢,这正是 flatMap 的“奥妙”所在,为了让你直观地理解 flatMap 的映射过程,我画了一张示意图
- ![flatmap算子的作用范围](pic/flatmap算子的作用范围.png)
- 不难发现,映射函数 f 的计算过程,对应着图中的步骤 1 与步骤 2每行文本都被转化为包含相邻词对的数组。紧接着flatMap 去掉每个数组的“外包装”,提取出数组中类型
为 String 的词对元素,然后以词对为单位,构建新的数据分区,如图中步骤 3 所示。这就是 flatMap 映射过程的第二步:去掉集合“外包装”,提取集合元素。
- 得到包含词对元素的 wordPairRDD 之后,我们就可以沿用 Word Count 的后续逻辑,去计算相邻词汇的共现次数。
##### 4. filter过滤 RDD
- 与 map 一样常用的算子filter。filter顾名思义 这个算子的作用,是对 RDD 进行过滤。就像是 map 算子依赖其映射函数一样filter 算子也需要借助一个判定函数 f才能实现对 RDD 的过滤转换
- 所谓判定函数它指的是类型为RDD 元素类型) => Boolean的函数。可以看到 判定函数 f 的形参类型,必须与 RDD 的元素类型保持一致,而 f 的返回结果,只能是
True 或者 False。在任何一个 RDD 之上调用 filter(f),其作用是保留 RDD 中满足 f也就是 f 返回 True的数据元素而过滤掉不满足 f也就是 f 返回 False的数据元素。
- filter 算子与判定函数 f
- 在上面 flatMap 例子的最后,我们得到了元素为相邻词汇对的 wordPairRDD它包含的是像“Spark-is”、“is-cool”这样的字符串。为了仅保留有意义的词对元素我们希望
结合标点符号列表,对 wordPairRDD 进行过滤。例如我们希望过滤掉像“Spark-&”、“|-data”这样的词对。
- 实现这样的过滤逻辑
```scala
// 定义特殊字符列表
val list: List[String] = List("&", "|", "#", "^", "@")
// 定义判定函数f
def f(s: String): Boolean = {
val words: Array[String] = s.split("-")
val b1: Boolean = list.contains(words(0))
val b2: Boolean = list.contains(words(1))
return !b1 && !b2 // 返回不在特殊字符列表中的词汇对
} /
/ 使用filter(f)对RDD进行过滤
val cleanedPairRDD: RDD[String] = wordPairRDD.filter(f)
```
- 掌握了 filter 算子的用法之后,你就可以定义任意复杂的判定函数 f然后在 RDD 之上通过调用 filter(f) 去变着花样地做数据过滤,从而满足不同的业务需求。
### 2.4 进程模型与分布式部署:分布式计算是怎么回事?

Binary file not shown.

After

Width:  |  Height:  |  Size: 115 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 171 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 285 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 118 KiB

Loading…
Cancel
Save