You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

344 lines
30 KiB
Markdown

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

<h1><div style="text-align: center; color: #fa4861">Spark核心手册-自整理</div></h1>
## 0. 目录
- Spark 基础
- Spark SQL
- SparkMLlib
- StructuredStreaming
- Spark性能调优
## 1. 基础认知
- 官方QuickStart: https://spark.apache.org/
- 官方文档: https://spark.apache.org/docs/latest/
- 最新版本 【截止2022-7-20】
- 学习版本 【截止2022-7-20】
## 2. 基础
### 2.1 Spark 的 Hello World
- IDEA 开发配置
- 插件 Scala 下载 + [项目结构 -> 库 -> 添加Jar -> 选spark的jars目录下的文件]
#### A. Word Count 代码实现
- 读取内容、分词、分组计数这三步来看看 Word Count具体怎么实现。
- **第一步,读取内容**
- 首先,我们调用 SparkContext 的 textFile 方法,读取源文件,也就是 wikiOfSpark.txt
```scala
import org.apache.spark.rdd.RDD
val rootPath: String = _
val file: String = s"${rootPath}/wikiOfSpark.txt"
// 读取文件内容
val lineRDD: RDD[String] = spark.sparkContext.textFile(file)
```
- 3 个新概念,分别是 spark、sparkContext 和 RDD
- 其中spark 和 sparkContext 分别是两种不同的开发入口实例:
- spark 是开发入口 SparkSession 实例InstanceSparkSession 在 spark-shell 中会由系统自动创建;
- sparkContext 是开发入口 SparkContext 实例。
- 在 Spark 版本演进的过程中,从 2.0 版本开始SparkSession 取代了 SparkContext成为统一的开发入口。换句话说要开发 Spark 应用,你必须先创建 SparkSession。
- RDD 的全称是 Resilient Distributed Dataset意思是“弹性分布式数据集”。RDD 是 Spark 对于分布式数据的统一抽象,它定义了一系列分布式数据的基本属性与处理方法。
- **第二步,分词**
- “分词”就是把“数组”的行元素打散为单词。要实现这一点,我们可以调用 RDD 的flatMap 方法来完成。flatMap 操作在逻辑上可以分成两个步骤:**映射和展平**。
```scala
// 以行为单位做分词
val wordRDD: RDD[String] = lineRDD.flatMap(line => line.split(" "))
```
- 要把 lineRDD 的行元素转换为单词我们得先用分隔符对每个行元素进行分割Split这里的分隔符是空格
- 分割之后,每个行元素就都变成了单词数组,元素类型也从 String 变成了 Array[String],像这样以元素为单位进行转换的操作,统一称作“映射”。
- RDD 类型由原来的 RDD[String]变为 RDD[Array[String]]
- RDD[String]看成是“数组”的话,那么 RDD[Array[String]]就是一个“二维数组”,它的每一个元素都是单词。
- ![以行为单位做分词](pic/以行为单位做分词.png)
- 为了后续对单词做分组,我们还需要对这个“二维数组”做展平,也就是去掉内层的嵌套
- 结构,把“二维数组”还原成“一维数组”,如下图所示。
- ![分词后做展平](pic/分词后做展平.png)
- 就这样,在 flatMap 算子的作用下,原来以行为元素的 lineRDD转换成了以单词为元素的 wordRDD。
- 不过,值得注意的是,我们用“空格”去分割句子,有可能会产生空字符串。所以,在完成“映射”和“展平”之后,对于这样的“单词”,我们要把其中的空字符串都过滤掉,这里我们调用 RDD 的 filter 方法来过滤:
```scala
// 过滤掉空字符串
val cleanWordRDD: RDD[String] = wordRDD.filter(word => !word.equals(""))
```
- 这样一来我们在分词阶段就得到了过滤掉空字符串之后的单词“数组”类型是RDD[String]。接下来,我们就可以准备做分组计数了。
- **第三步,分组计数**
- 在 RDD 的开发框架下聚合类操作如计数、求和、求均值需要依赖键值对Key Value Pair类型的数据元素也就是KeyValue形式的“数组”元素。
- 因此,在调用聚合算子做分组计数之前,我们要先把 RDD 元素转换为KeyValue的形式也就是把 RDD[String]映射成 RDD[(String, Int)]。
- 其中,我们统一把所有的 Value 置为 1。这样一来对于同一个的单词在后续的计数运算中我们只要对 Value 做累加即可,就像这样:
- ![把元素转换为Key-Value形式](pic/把元素转换为Key-Value形式.png)
- 下面是对应的代码:
```scala
// 把RDD元素转换为KeyValue的形式
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, 1))
```
- 这样一来RDD 就由原来存储 String 元素的 cleanWordRDD转换为了存储StringInt的 kvRDD。
- 完成了形式的转换之后,我们就该正式做分组计数了。分组计数其实是两个步骤,也就是先“分组”,再“计数”。下面,我们使用聚合算子 reduceByKey 来同时完成分组和计数这两个操作。
- 对于 kvRDD 这个键值对“数组”reduceByKey 先是按照 Key也就是单词来做分组分组之后每个单词都有一个与之对应的 Value 列表。然后根据用户提供的聚合函数,对同一个 Key 的所有 Value 做 reduce 运算。
- 这里的 reduce你可以理解成是一种计算步骤或是一种计算方法。当我们给定聚合函数后它会用折叠的方式把包含多个元素的列表转换为单个元素值从而统计出不同元素的数量。
- 在 Word Count 的示例中,我们调用 reduceByKey 实现分组计算的代码如下:
```scala
// 按照单词做分组计数
val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y)
```
- reduceByKey 分组聚合
- 可以看到,我们传递给 reduceByKey 算子的聚合函数是 (x, y) => x + y也就是累加函数。因此在每个单词分组之后reduce 会使用累加函数,依次折叠计算 Value 列表中的所有元素,
最终把元素列表转换为单词的频次。对于任意一个单词来说reduce 的计算过程都是一样的,如下图所示。
- ![reduce操作示意图](pic/reduce操作示意图.png)
- reduceByKey 完成计算之后,我们得到的依然是类型为 RDD[(String, Int)]的 RDD。不过与 kvRDD 不同wordCounts 元素的 Value 值,记录的是每个单词的统计词频。
- ![reduceByKey转换示意图](pic/reduceByKey转换示意图.png)
- 在程序的最后,我们还要把 wordCounts 按照词频做排序,并把词频最高的 5 个单词打印到屏幕上,代码如下所示。
```scala
// 打印词频最高的5个词汇
wordCounts.map{case (k, v) => (v, k)}.sortByKey(false).take(5)
```
- 完整代码 : HelloWorldDemo
### 2.2 RDD与编程模型延迟计算是怎么回事
#### A. 什么是 RDD
- RDD 是构建 Spark 分布式内存计算引擎的基石很多Spark 核心概念与核心组件,如 DAG 和调度系统都衍生自 RDD。因此深入理解 RDD有利于你更全面、系统地学习 Spark 的工作原理
- 尽管 RDD API 使用频率越来越低,绝大多数人也都已经习惯于 DataFrame 和Dataset API但是无论采用哪种 API 或是哪种开发语言,你的应用在 Spark 内部最终都会转化为 RDD 之上的分布式计算。
- 换句话说,如果你想要对 Spark 作业有更好的把握,前提是你要对 RDD 足够了解。
- 用一句话来概括,**RDD 是一种抽象是Spark 对于分布式数据集的抽象,它用于囊括所有内存中和磁盘中的分布式数据实体**。
- RDD和数组类似, RDD 和数组对比
- ![RDD与数组的对比](pic/RDD与数组的对比.png)
- 首先,就概念本身来说,数组是实体,它是一种存储同类元素的数据结构,而 RDD 是一种抽象,它所囊括的是分布式计算环境中的分布式数据集。
- 因此,这两者第二方面的不同就是在活动范围,数组的“活动范围”很窄,仅限于单个计算节点的某个进程内,而 RDD 代表的数据集是跨进程、跨节点的,它的“活动范围”是整个集群。
- 至于数组和 RDD 的第三个不同,则是在数据定位方面。在数组中,承载数据的基本单元是元素,而 RDD 中承载数据的基本单元是数据分片。在分布式计算环境中,一份完整的数据集,会按照某种规则切割成多份数据分片。
这些数据分片被均匀地分发给集群内不同的计算节点和执行进程,从而实现分布式并行计算。
- 通过以上对比,不难发现,**数据分片Partitions** 是 RDD 抽象的重要属性之一。
- 接下来咱们换个视角,从 RDD 的重要属性出发去进一步深入理解RDD。要想吃透 RDD我们必须掌握它的 4 大属性:
- partitions数据分片
- partitioner分片切割规则
- dependenciesRDD 依赖
- compute转换函数
#### B. 从薯片的加工流程看 RDD 的 4 大属性
- ![RDD的生活化类比](pic/RDD的生活化类比.png)
- 为了充分利用每一颗土豆、降低生产成本
- 工坊使用 3 条流水线来同时生产 3 种不同尺寸的桶装薯片。3 条流水线可以同时加工 3 颗土豆,每条流水线的作业流程都是一样的
- 分别是清洗、切片、烘焙、分发和装桶
- 其中,分发环节用于区分小、中、大号 3 种薯片3 种不同尺寸的薯片分别被发往第 1、2、3 条流水线。
- 那如果我们把每一条流水线看作是分布式运行环境的计算节点,用薯片生产的流程去类比 Spark 分布式计算,会有哪些有趣的发现呢?
- 这里的每一种食材形态,如“带泥土豆”、“干净土豆”、“土豆片”等,都可以看成是一个个 RDD。
- 而薯片的制作过程,实际上就是不同食材形态的转换过程。
- 从上到下的方向,去观察上图中土豆工坊的制作工艺。
- 我们可以看到对于每一种食材形态来说,流水线上都有多个实物与之对应,比如,“带泥土豆”是一种食材形态,流水线上总共有 3 颗“脏兮兮”的土豆同属于这一形态。
- 如果把“带泥土豆”看成是 RDD 的话,那么 RDD 的 partitions 属性囊括的正是麻袋里那一颗颗脏兮兮的土豆。同理流水线上所有洗净的土豆一同构成了“干净土豆”RDD的 partitions 属性。
- 我们再来看 RDD 的 partitioner 属性这个属性定义了把原始数据集切割成数据分片的切割规则。在土豆工坊的例子中“带泥土豆”RDD 的切割规则是随机拿取,也就是从麻袋中随机拿取一颗脏兮兮的土豆放到流水线上。
后面的食材形态如“干净土豆”、“土豆片”和“即食薯片”则沿用了“带泥土豆”RDD 的切割规则。换句话说后续的这些RDD分别继承了前一个 RDD 的 partitioner 属性。
- 这里面与众不同的是“分发的即食薯片”。显然“分发的即食薯片”是通过对“即食薯片”按照大、中、小号做分发得到的。也就是说对于“分发的即食薯片”来说它的partitioner 属性,重新定义了这个 RDD 数据分片的切割规则,
也就是把先前 RDD 的数据分片打散,按照薯片尺寸重新构建数据分片。
- 由这个例子我们可以看出,数据分片的分布,是由 RDD 的 partitioner 决定的。因此RDD 的 partitions 属性,与它的 partitioner 属性是强相关的。
- 接下来,我们横向地,也就是沿着从左至右的方向,再来观察土豆工坊的制作工艺。
- 流水线上的每一种食材形态,都是上一种食材形态在某种操作下进行转换得到的。比如,“土豆片”依赖的食材形态是“干净土豆”,这中间用于转换的操作是“切片”这个动作。回顾 Word Count 当中 RDD 之间的转换关系,我们也会发现类似的现象。
- ![WordCount中的RDD转换](pic/WordCount中的RDD转换.png)
- 在数据形态的转换过程中,每个 RDD 都会通过 dependencies 属性来记录它所依赖的前一个、或是多个 RDD简称“父 RDD”。与此同时RDD 使用 compute 属性,来记录从父 RDD 到当前 RDD 的转换操作。
- 拿 Word Count 当中的 wordRDD 来举例,它的父 RDD 是 lineRDD因此它的dependencies 属性记录的是 lineRDD。从 lineRDD 到 wordRDD 的转换,其所依赖的操作是 flatMap
因此wordRDD 的 compute 属性,记录的是 flatMap 这个转换函数。
- 总结下来,薯片的加工流程,与 RDD 的概念和 4 大属性是一一对应的:
- 不同的食材形态,如带泥土豆、土豆片、即食薯片等等,对应的就是 RDD 概念;
- 同一种食材形态在不同流水线上的具体实物,就是 RDD 的 partitions 属性;
- 食材按照什么规则被分配到哪条流水线,对应的就是 RDD 的 partitioner 属性;
- 每一种食材形态都会依赖上一种形态,这种依赖关系对应的是 RDD 中的dependencies 属性;
- 不同环节的加工方法对应 RDD 的 compute 属性。
- 编程模型指导我们如何进行代码实现,而延迟计算是 Spark 分布式运行机制的基础。只有搞明白编程模型与延迟计算,你才能流畅地在 Spark 之上做应用开发,在实现业务逻辑的同时,避免埋下性能隐患。
#### C. 编程模型与延迟计算
- map、filter、flatMap 和 reduceByKey 这些算子,有哪些共同点?
- 首先,这 4 个算子都是作用Apply在 RDD 之上、用来做 RDD 之间的转换。比如flatMap 作用在 lineRDD 之上,把 lineRDD 转换为 wordRDD。
- 其次这些算子本身是函数而且它们的参数也是函数。参数是函数、或者返回值是函数的函数我们把这类函数统称为“高阶函数”Higher-order Functions
- 这里,我们先专注在 RDD 算子的第一个共性RDD 转换。
- RDD 是 Spark 对于分布式数据集的抽象,每一个 RDD 都代表着**一种分布式数据形态**。比如 lineRDD它表示数据在集群中以行Line的形式存在而 wordRDD 则意味着数据的形态是单词,分布在计算集群中。
- RDD 代表的是分布式数据形态,因此,**RDD 到 RDD 之间的转换本质上是数据形态上的转换Transformations**。
- 在 RDD 的编程模型中一共有两种算子Transformations 类算子和 Actions 类算子。
- 开发者需要使用 Transformations 类算子定义并描述数据形态的转换过程然后调用Actions 类算子,将计算结果收集起来、或是物化到磁盘。
- 在这样的编程模型下Spark 在运行时的计算被划分为两个环节。
- 基于不用数据形态之间的转换构建计算流图DAGDirected Acyclic Graph
- 通过 Actions 类算子,以回溯的方式去触发执行这个计算流图。
- 换句话说,开发者调用的各类 Transformations 算子,并不立即执行计算,当且仅当开发者调用 Actions 算子时之前调用的转换算子才会付诸执行。在业内这样的计算模式有个专门的术语叫作“延迟计算”Lazy Evaluation
- 为什么 Word Count 在执行的过程中,只有最后一行代码会花费很长时间,而前面的代码都是瞬间执行完毕的呢?
- 答案正是 Spark 的延迟计算。flatMap、filter、map 这些算子,仅用于构建计算流图,因此,当你在 spark-shell 中敲入这些代码时spark-shell 会立即返回。只有在你敲入最后那行包含 take 的代码时,
Spark 才会触发执行从头到尾的计算流程,所以直观地看上去,最后一行代码是最耗时的。
- Spark 程序的整个运行流程如下图所示:
- ![延迟计算](pic/延迟计算.png)
- 常用的 RDD 算子进行了归类,并整理到了下面的表格中,随时查阅
- ![RDD算子归类](pic/RDD算子归类.png)
- 参考: https://spark.apache.org/docs/latest/rdd-programming-guide.html
### 2.3 RDD常用算子RDD内部的数据转换
- 重点讲解的就是 map、mapPartitions、flatMap、filter。这 4 个算子几乎囊括了日常开发中 99% 的数据转换场景
- ![RDD算子分类表](pic/RDD算子分类表.png)
#### A. 创建 RDD
- 在 Spark 中,创建 RDD 的典型方式有两种:
- 通过 SparkContext.parallelize 在内部数据之上创建 RDD
- 通过 SparkContext.textFile 等 API 从外部数据创建 RDD。
- 这里的内部、外部是相对应用程序来说的。开发者在 Spark 应用中自定义的各类数据结构,如数组、列表、映射等,都属于“内部数据”;
- 而“外部数据”指代的,是 Spark 系统之外的所有数据形式如本地文件系统或是分布式文件系统中的数据再比如来自其他大数据组件Hive、Hbase、RDBMS 等)的数据。
- 第一种创建方式的用法非常简单,只需要用 parallelize 函数来封装内部数据即可,比如下面的例子:
```scala
import org.apache.spark.rdd.RDD
val words: Array[String] = Array("Spark", "is", "cool")
val rdd: RDD[String] = sc.parallelize(words)
```
- 通常来说,在 Spark 应用内定义体量超大的数据集,其实都是不太合适的,因为数据集完全由 Driver 端创建,且创建完成后,还要在全网范围内跨节点、跨进程地分发到其他
Executors所以往往会带来性能问题。因此parallelize API 的典型用法,是在“小数据”之上创建 RDD。
- 要想在真正的“大数据”之上创建 RDD我们还得依赖第二种创建方式也就是通过SparkContext.textFile 等 API 从外部数据创建 RDD。由于 textFile API 比较简单,而且
它在日常的开发中出现频率比较高,因此我们使用 textFile API 来创建 RDD。在后续对各类 RDD 算子讲解的过程中,我们都会使用 textFile API 从文件系统创建 RDD。
```scala
import org.apache.spark.rdd.RDD
val rootPath: String = _
val file: String = s"${rootPath}/wikiOfSpark.txt"
// 读取文件内容
val lineRDD: RDD[String] = spark.sparkContext.textFile(file)
```
#### B. RDD 内的数据转换
##### 1. map以元素为粒度的数据转换
- map 算子的用法:给定映射函数 fmap(f) 以元素为粒度对 RDD 做数据转换。其中 f 可以是带有明确签名的带名函数也可以是匿名函数它的形参类型必须与RDD 的元素类型保持一致,而输出类型则任由开发者自行决定
- 把包含单词的 RDD 转换成元素为KeyValue对的 RDD后者统称为 Paired RDD。
```scala
// 把普通RDD转换为Paired RDD
val cleanWordRDD: RDD[String] = _ // 请参考第一讲获取完整代码
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, 1))
```
- 上面的代码实现中,传递给 map 算子的形参word => word1就是我们上面说的映射函数 f。只不过这里 f 是以匿名函数的方式进行定义的,其中左侧的 word表示匿名函数 f 的输入形参而右侧的word1则代表函数 f 的输出结果。
- 如果我们把匿名函数变成带名函数的话,可能你会看的更清楚一些。用一段代码重新定义了带名函数 f。
```scala
// 把RDD元素转换为KeyValue的形式
// 定义映射函数f
def f(word: String): (String, Int) = {
return (word, 1)
}
val cleanWordRDD: RDD[String] = _ // 请参考第一讲获取完整代码
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(f)
````
- 使用 Scala 的 def 语法,明确定义了带名映射函数 f它的计算逻辑与刚刚的匿名函数是一致的。在做 RDD 数据转换的时候,我们只需把函数 f 传递给 map 算子即可。不管 f 是匿名函数还是带名函数map 算子的转换逻辑都是一样的
- 通过定义如下的映射函数 f我们就可以改写 Word Count 的计数逻辑也就是把“Spark”这个单词的统计计数权重提高一倍
```scala
// 把RDD元素转换为KeyValue的形式
// 定义映射函数f
def f(word: String): (String, Int) = {
if (word.equals("Spark")) { return (word, 2) }
return (word, 1)
}
val cleanWordRDD: RDD[String] = _ // 请参考第一讲获取完整代码
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(f)
```
- map(f) 是以元素为粒度对 RDD 做数据转换的,在某些计算场景下,这个特点会严重影响执行效率。为什么这么说呢?
- 比方说,我们把 Word Count 的计数需求,从原来的对单词计数,改为对单词的哈希值计数,在这种情况下,我们的代码实现需要做哪些改动呢?
```scala
// 把普通RDD转换为Paired RDD
import java.security.MessageDigest
val cleanWordRDD: RDD[String] = _ // 请参考第一讲获取完整代码
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map{ word =>
// 获取MD5对象实例
val md5 = MessageDigest.getInstance("MD5")
// 使用MD5计算哈希值
val hash = md5.digest(word.getBytes).mkString
// 返回哈希值与数字1的Pair
(hash, 1)
}
```
- 由于 map(f) 是以元素为单元做转换的,那么对于 RDD 中的每一条数据记录,我们都需要实例化一个 MessageDigest 对象来计算这个元素的哈希值。
- 在工业级生产系统中,一个 RDD 动辄包含上百万甚至是上亿级别的数据记录,如果处理每条记录都需要事先创建 MessageDigest那么实例化对象的开销就会聚沙成塔不知不觉地成为影响执行效率的罪魁祸首。
- 有没有什么办法,能够让 Spark 在更粗的数据粒度上去处理数据呢?
- 还真有mapPartitions 和 mapPartitionsWithIndex 这对“孪生兄弟”就是用来解决类似的问题。相比 mapPartitionsmapPartitionsWithIndex 仅仅多出了一个数据分区索引
##### 2. mapPartitions以数据分区为粒度的数据转换
- mapPartitions顾名思义就是以数据分区为粒度使用映射函数 f 对 RDD 进行数据转换。
- 对于上述单词哈希值计数的例子,我们结合后面的代码,来看看如何使用 mapPartitions 来改善执行性能:
```scala
// 把普通RDD转换为Paired RDD
import java.security.MessageDigest
val cleanWordRDD: RDD[String] = _ // 请参考第一讲获取完整代码
val kvRDD: RDD[(String, Int)] = cleanWordRDD.mapPartitions( partition => {
// 注意这里是以数据分区为粒度获取MD5对象实例
val md5 = MessageDigest.getInstance("MD5")
val newPartition = partition.map( word => {
// 在处理每一条数据记录的时候可以复用同一个Partition内的MD5对象
md5.digest(word.getBytes).mkString
})
newPartition
})
```
- 在上面的改进代码中mapPartitions 以数据分区匿名函数的形参partition为粒度对 RDD 进行数据转换。具体的数据处理逻辑,则由代表数据分区的形参 partition 进一步调用 map(f) 来完成。
- 你可能会说“partition. map(f) 仍然是以元素为粒度做映射呀!这和前一个版本的实现,有什么本质上的区别呢?”
- 仔细观察,你就会发现,相比前一个版本,我们把实例化 MD5 对象的语句挪到了 map 算子之外。如此一来,以数据分区为单位,实例化对象的操作只需要执行一次,而同一个数据分区中所有的数据记录,都可以共享该 MD5 对象,从而完成单词到哈希值的转换。
- 通过下图的直观对比你会发现以数据分区为单位mapPartitions 只需实例化一次MD5 对象,而 map 算子却需要实例化多次,具体的次数则由分区内数据记录的数量来决定。
- ![map与mapPartitions的区别](pic/map与mapPartitions的区别.png)
- 对于一个有着上百万条记录的 RDD 来说,其数据分区的划分往往是在百这个量级,因此,相比 map 算子mapPartitions 可以显著降低对象实例化的计算开销,这对于 Spark 作业端到端的执行性能来说,无疑是非常友好的。
- 实际上。除了计算哈希值以外对于数据记录来说凡是可以共享的操作都可以用mapPartitions 算子进行优化。
- 这样的共享操作还有很多,比如创建用于连接远端数据库的 Connections 对象,或是用于连接 Amazon S3 的文件系统句柄,再比如用于在线推理的机器学习模型,等等,不一而足。
- 相比 mapPartitionsmapPartitionsWithIndex 仅仅多出了一个数据分区索引,这个数据分区索引可以为我们获取分区编号,当你的业务逻辑中需要使用到分区编号的时候,不妨考虑使用这个算子来实现代码。
- 除了这个额外的分区索引以外mapPartitionsWithIndex在其他方面与 mapPartitions 是完全一样的。
##### 3. flatMap从元素到集合、再从集合到元素
- flatMap 其实和 map 与 mapPartitions 算子类似,在功能上,与 map 和 mapPartitions 一样flatMap 也是用来做数据映射的,在实现上,对于给定映射函数 fflatMap(f) 以元
素为粒度,对 RDD 进行数据转换。
- 不过与前两者相比flatMap 的映射函数 f 有着显著的不同。对于 map 和 mapPartitions 来说,其映射函数 f 的类型,都是(元素) => (元素),即元素到元
素。而 flatMap 映射函数 f 的类型,是(元素) => (集合),即元素到集合(如数组、 列表等。因此flatMap 的映射过程在逻辑上分为两步:
- 以元素为单位,创建集合;
- 去掉集合“外包装”,提取集合元素。
- 假设,我们再次改变 Word Count 的计算逻辑由原来统计单词的计数,改为统计相邻单词共现的次数,如下图所示:
- ![变更Word Count计算逻辑](pic/变更Word%20Count计算逻辑.png)
- 对于这样的计算逻辑,我们该如何使用 flatMap 进行实现呢?这里我们先给出代码实现,然后再分阶段地分析 flatMap 的映射过程:
```scala
// 读取文件内容
val lineRDD: RDD[String] = _ // 请参考第一讲获取完整代码
// 以行为单位提取相邻单词
val wordPairRDD: RDD[String] = lineRDD.flatMap( line => {
// 将行转换为单词数组
val words: Array[String] = line.split(" ")
// 将单个单词数组,转换为相邻单词数组
for (i <- 0 until words.length - 1) yield words(i) + "-" + words(i+1)
})
```
- 在上面的代码中,我们采用匿名函数的形式,来提供映射函数 f。这里 f 的形参是 String 类型的 line也就是源文件中的一行文本而 f 的返回类型是 Array[String],也就是
String 类型的数组。在映射函数 f 的函数体中,我们先用 split 语句把 line 转化为单词数组,然后再用 for 循环结合 yield 语句,依次把单个的单词,转化为相邻单词词对。
- 注意for 循环返回的依然是数组,也即类型为 Array[String]的词对数组。由此可见,函 数 f 的类型是String => Array[String]),也就是刚刚说的第一步,从元素到集
合。但如果我们去观察转换前后的两个 RDD也就是 lineRDD 和 wordPairRDD会发现它们的类型都是 RDD[String],换句话说,它们的元素类型都是 String。
- 回顾 map 与 mapPartitions 这两个算子,我们会发现,转换前后 RDD 的元素类型,与映射函数 f 的类型是一致的。但在 flatMap 这里,却出现了 RDD 元素类型与函数类型不一
致的情况。这是怎么回事呢?其实呢,这正是 flatMap 的“奥妙”所在,为了让你直观地理解 flatMap 的映射过程,我画了一张示意图
- ![flatmap算子的作用范围](pic/flatmap算子的作用范围.png)
- 不难发现,映射函数 f 的计算过程,对应着图中的步骤 1 与步骤 2每行文本都被转化为包含相邻词对的数组。紧接着flatMap 去掉每个数组的“外包装”,提取出数组中类型
为 String 的词对元素,然后以词对为单位,构建新的数据分区,如图中步骤 3 所示。这就是 flatMap 映射过程的第二步:去掉集合“外包装”,提取集合元素。
- 得到包含词对元素的 wordPairRDD 之后,我们就可以沿用 Word Count 的后续逻辑,去计算相邻词汇的共现次数。
##### 4. filter过滤 RDD
- 与 map 一样常用的算子filter。filter顾名思义 这个算子的作用,是对 RDD 进行过滤。就像是 map 算子依赖其映射函数一样filter 算子也需要借助一个判定函数 f才能实现对 RDD 的过滤转换
- 所谓判定函数它指的是类型为RDD 元素类型) => Boolean的函数。可以看到 判定函数 f 的形参类型,必须与 RDD 的元素类型保持一致,而 f 的返回结果,只能是
True 或者 False。在任何一个 RDD 之上调用 filter(f),其作用是保留 RDD 中满足 f也就是 f 返回 True的数据元素而过滤掉不满足 f也就是 f 返回 False的数据元素。
- filter 算子与判定函数 f
- 在上面 flatMap 例子的最后,我们得到了元素为相邻词汇对的 wordPairRDD它包含的是像“Spark-is”、“is-cool”这样的字符串。为了仅保留有意义的词对元素我们希望
结合标点符号列表,对 wordPairRDD 进行过滤。例如我们希望过滤掉像“Spark-&”、“|-data”这样的词对。
- 实现这样的过滤逻辑
```scala
// 定义特殊字符列表
val list: List[String] = List("&", "|", "#", "^", "@")
// 定义判定函数f
def f(s: String): Boolean = {
val words: Array[String] = s.split("-")
val b1: Boolean = list.contains(words(0))
val b2: Boolean = list.contains(words(1))
return !b1 && !b2 // 返回不在特殊字符列表中的词汇对
}
// 使用filter(f)对RDD进行过滤
val cleanedPairRDD: RDD[String] = wordPairRDD.filter(f)
```
- 掌握了 filter 算子的用法之后,你就可以定义任意复杂的判定函数 f然后在 RDD 之上通过调用 filter(f) 去变着花样地做数据过滤,从而满足不同的业务需求。
### 2.4 进程模型与分布式部署:分布式计算是怎么回事?