Spark核心手册-自整理

## 0. 目录 - Spark 基础 - Spark SQL - SparkMLlib - StructuredStreaming - Spark性能调优 ## 1. 基础认知 - 官方QuickStart: https://spark.apache.org/ - 官方文档: https://spark.apache.org/docs/latest/ - 最新版本 【截止2022-7-20】 - 学习版本 【截止2022-7-20】 ## 2. 基础 ### 2.1 Spark 的 Hello World - IDEA 开发配置 - 插件 Scala 下载 + [项目结构 -> 库 -> 添加Jar -> 选spark的jars目录下的文件] #### A. Word Count 代码实现 - 读取内容、分词、分组计数这三步来看看 Word Count具体怎么实现。 - **第一步,读取内容** - 首先,我们调用 SparkContext 的 textFile 方法,读取源文件,也就是 wikiOfSpark.txt, ```scala import org.apache.spark.rdd.RDD val rootPath: String = _ val file: String = s"${rootPath}/wikiOfSpark.txt" // 读取文件内容 val lineRDD: RDD[String] = spark.sparkContext.textFile(file) ``` - 3 个新概念,分别是 spark、sparkContext 和 RDD - 其中,spark 和 sparkContext 分别是两种不同的开发入口实例: - spark 是开发入口 SparkSession 实例(Instance),SparkSession 在 spark-shell 中会由系统自动创建; - sparkContext 是开发入口 SparkContext 实例。 - 在 Spark 版本演进的过程中,从 2.0 版本开始,SparkSession 取代了 SparkContext,成为统一的开发入口。换句话说,要开发 Spark 应用,你必须先创建 SparkSession。 - RDD 的全称是 Resilient Distributed Dataset,意思是“弹性分布式数据集”。RDD 是 Spark 对于分布式数据的统一抽象,它定义了一系列分布式数据的基本属性与处理方法。 - **第二步,分词** - “分词”就是把“数组”的行元素打散为单词。要实现这一点,我们可以调用 RDD 的flatMap 方法来完成。flatMap 操作在逻辑上可以分成两个步骤:**映射和展平**。 ```scala // 以行为单位做分词 val wordRDD: RDD[String] = lineRDD.flatMap(line => line.split(" ")) ``` - 要把 lineRDD 的行元素转换为单词,我们得先用分隔符对每个行元素进行分割(Split),这里的分隔符是空格 - 分割之后,每个行元素就都变成了单词数组,元素类型也从 String 变成了 Array[String],像这样以元素为单位进行转换的操作,统一称作“映射”。 - RDD 类型由原来的 RDD[String]变为 RDD[Array[String]] - RDD[String]看成是“数组”的话,那么 RDD[Array[String]]就是一个“二维数组”,它的每一个元素都是单词。 - ![以行为单位做分词](pic/以行为单位做分词.png) - 为了后续对单词做分组,我们还需要对这个“二维数组”做展平,也就是去掉内层的嵌套 - 结构,把“二维数组”还原成“一维数组”,如下图所示。 - ![分词后做展平](pic/分词后做展平.png) - 就这样,在 flatMap 算子的作用下,原来以行为元素的 lineRDD,转换成了以单词为元素的 wordRDD。 - 不过,值得注意的是,我们用“空格”去分割句子,有可能会产生空字符串。所以,在完成“映射”和“展平”之后,对于这样的“单词”,我们要把其中的空字符串都过滤掉,这里我们调用 RDD 的 filter 方法来过滤: ```scala // 过滤掉空字符串 val cleanWordRDD: RDD[String] = wordRDD.filter(word => !word.equals("")) ``` - 这样一来,我们在分词阶段就得到了过滤掉空字符串之后的单词“数组”,类型是RDD[String]。接下来,我们就可以准备做分组计数了。 - **第三步,分组计数** - 在 RDD 的开发框架下,聚合类操作,如计数、求和、求均值,需要依赖键值对(Key Value Pair)类型的数据元素,也就是(Key,Value)形式的“数组”元素。 - 因此,在调用聚合算子做分组计数之前,我们要先把 RDD 元素转换为(Key,Value)的形式,也就是把 RDD[String]映射成 RDD[(String, Int)]。 - 其中,我们统一把所有的 Value 置为 1。这样一来,对于同一个的单词,在后续的计数运算中,我们只要对 Value 做累加即可,就像这样: - ![把元素转换为Key-Value形式](pic/把元素转换为Key-Value形式.png) - 下面是对应的代码: ```scala // 把RDD元素转换为(Key,Value)的形式 val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, 1)) ``` - 这样一来,RDD 就由原来存储 String 元素的 cleanWordRDD,转换为了存储(String,Int)的 kvRDD。 - 完成了形式的转换之后,我们就该正式做分组计数了。分组计数其实是两个步骤,也就是先“分组”,再“计数”。下面,我们使用聚合算子 reduceByKey 来同时完成分组和计数这两个操作。 - 对于 kvRDD 这个键值对“数组”,reduceByKey 先是按照 Key(也就是单词)来做分组,分组之后,每个单词都有一个与之对应的 Value 列表。然后根据用户提供的聚合函数,对同一个 Key 的所有 Value 做 reduce 运算。 - 这里的 reduce,你可以理解成是一种计算步骤或是一种计算方法。当我们给定聚合函数后,它会用折叠的方式,把包含多个元素的列表转换为单个元素值,从而统计出不同元素的数量。 - 在 Word Count 的示例中,我们调用 reduceByKey 实现分组计算的代码如下: ```scala // 按照单词做分组计数 val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y) ``` - reduceByKey 分组聚合 - 可以看到,我们传递给 reduceByKey 算子的聚合函数是 (x, y) => x + y,也就是累加函数。因此,在每个单词分组之后,reduce 会使用累加函数,依次折叠计算 Value 列表中的所有元素, 最终把元素列表转换为单词的频次。对于任意一个单词来说,reduce 的计算过程都是一样的,如下图所示。 - ![reduce操作示意图](pic/reduce操作示意图.png) - reduceByKey 完成计算之后,我们得到的依然是类型为 RDD[(String, Int)]的 RDD。不过,与 kvRDD 不同,wordCounts 元素的 Value 值,记录的是每个单词的统计词频。 - ![reduceByKey转换示意图](pic/reduceByKey转换示意图.png) - 在程序的最后,我们还要把 wordCounts 按照词频做排序,并把词频最高的 5 个单词打印到屏幕上,代码如下所示。 ```scala // 打印词频最高的5个词汇 wordCounts.map{case (k, v) => (v, k)}.sortByKey(false).take(5) ``` - 完整代码 : HelloWorldDemo ### 2.2 RDD与编程模型:延迟计算是怎么回事? #### A. 什么是 RDD - RDD 是构建 Spark 分布式内存计算引擎的基石,很多Spark 核心概念与核心组件,如 DAG 和调度系统都衍生自 RDD。因此,深入理解 RDD有利于你更全面、系统地学习 Spark 的工作原理 - 尽管 RDD API 使用频率越来越低,绝大多数人也都已经习惯于 DataFrame 和Dataset API,但是,无论采用哪种 API 或是哪种开发语言,你的应用在 Spark 内部最终都会转化为 RDD 之上的分布式计算。 - 换句话说,如果你想要对 Spark 作业有更好的把握,前提是你要对 RDD 足够了解。 - 用一句话来概括,**RDD 是一种抽象,是Spark 对于分布式数据集的抽象,它用于囊括所有内存中和磁盘中的分布式数据实体**。 - RDD和数组类似, RDD 和数组对比 - ![RDD与数组的对比](pic/RDD与数组的对比.png) - 首先,就概念本身来说,数组是实体,它是一种存储同类元素的数据结构,而 RDD 是一种抽象,它所囊括的是分布式计算环境中的分布式数据集。 - 因此,这两者第二方面的不同就是在活动范围,数组的“活动范围”很窄,仅限于单个计算节点的某个进程内,而 RDD 代表的数据集是跨进程、跨节点的,它的“活动范围”是整个集群。 - 至于数组和 RDD 的第三个不同,则是在数据定位方面。在数组中,承载数据的基本单元是元素,而 RDD 中承载数据的基本单元是数据分片。在分布式计算环境中,一份完整的数据集,会按照某种规则切割成多份数据分片。 这些数据分片被均匀地分发给集群内不同的计算节点和执行进程,从而实现分布式并行计算。 - 通过以上对比,不难发现,**数据分片(Partitions)** 是 RDD 抽象的重要属性之一。 - 接下来咱们换个视角,从 RDD 的重要属性出发,去进一步深入理解RDD。要想吃透 RDD,我们必须掌握它的 4 大属性: - partitions:数据分片 - partitioner:分片切割规则 - dependencies:RDD 依赖 - compute:转换函数 #### B. 从薯片的加工流程看 RDD 的 4 大属性 - ![RDD的生活化类比](pic/RDD的生活化类比.png) - 为了充分利用每一颗土豆、降低生产成本 - 工坊使用 3 条流水线来同时生产 3 种不同尺寸的桶装薯片。3 条流水线可以同时加工 3 颗土豆,每条流水线的作业流程都是一样的 - 分别是清洗、切片、烘焙、分发和装桶 - 其中,分发环节用于区分小、中、大号 3 种薯片,3 种不同尺寸的薯片分别被发往第 1、2、3 条流水线。 - 那如果我们把每一条流水线看作是分布式运行环境的计算节点,用薯片生产的流程去类比 Spark 分布式计算,会有哪些有趣的发现呢? - 这里的每一种食材形态,如“带泥土豆”、“干净土豆”、“土豆片”等,都可以看成是一个个 RDD。 - 而薯片的制作过程,实际上就是不同食材形态的转换过程。 - 从上到下的方向,去观察上图中土豆工坊的制作工艺。 - 我们可以看到对于每一种食材形态来说,流水线上都有多个实物与之对应,比如,“带泥土豆”是一种食材形态,流水线上总共有 3 颗“脏兮兮”的土豆同属于这一形态。 - 如果把“带泥土豆”看成是 RDD 的话,那么 RDD 的 partitions 属性,囊括的正是麻袋里那一颗颗脏兮兮的土豆。同理,流水线上所有洗净的土豆,一同构成了“干净土豆”RDD的 partitions 属性。 - 我们再来看 RDD 的 partitioner 属性,这个属性定义了把原始数据集切割成数据分片的切割规则。在土豆工坊的例子中,“带泥土豆”RDD 的切割规则是随机拿取,也就是从麻袋中随机拿取一颗脏兮兮的土豆放到流水线上。 后面的食材形态,如“干净土豆”、“土豆片”和“即食薯片”,则沿用了“带泥土豆”RDD 的切割规则。换句话说,后续的这些RDD,分别继承了前一个 RDD 的 partitioner 属性。 - 这里面与众不同的是“分发的即食薯片”。显然,“分发的即食薯片”是通过对“即食薯片”按照大、中、小号做分发得到的。也就是说,对于“分发的即食薯片”来说,它的partitioner 属性,重新定义了这个 RDD 数据分片的切割规则, 也就是把先前 RDD 的数据分片打散,按照薯片尺寸重新构建数据分片。 - 由这个例子我们可以看出,数据分片的分布,是由 RDD 的 partitioner 决定的。因此,RDD 的 partitions 属性,与它的 partitioner 属性是强相关的。 - 接下来,我们横向地,也就是沿着从左至右的方向,再来观察土豆工坊的制作工艺。 - 流水线上的每一种食材形态,都是上一种食材形态在某种操作下进行转换得到的。比如,“土豆片”依赖的食材形态是“干净土豆”,这中间用于转换的操作是“切片”这个动作。回顾 Word Count 当中 RDD 之间的转换关系,我们也会发现类似的现象。 - ![WordCount中的RDD转换](pic/WordCount中的RDD转换.png) - 在数据形态的转换过程中,每个 RDD 都会通过 dependencies 属性来记录它所依赖的前一个、或是多个 RDD,简称“父 RDD”。与此同时,RDD 使用 compute 属性,来记录从父 RDD 到当前 RDD 的转换操作。 - 拿 Word Count 当中的 wordRDD 来举例,它的父 RDD 是 lineRDD,因此,它的dependencies 属性记录的是 lineRDD。从 lineRDD 到 wordRDD 的转换,其所依赖的操作是 flatMap, 因此,wordRDD 的 compute 属性,记录的是 flatMap 这个转换函数。 - 总结下来,薯片的加工流程,与 RDD 的概念和 4 大属性是一一对应的: - 不同的食材形态,如带泥土豆、土豆片、即食薯片等等,对应的就是 RDD 概念; - 同一种食材形态在不同流水线上的具体实物,就是 RDD 的 partitions 属性; - 食材按照什么规则被分配到哪条流水线,对应的就是 RDD 的 partitioner 属性; - 每一种食材形态都会依赖上一种形态,这种依赖关系对应的是 RDD 中的dependencies 属性; - 不同环节的加工方法对应 RDD 的 compute 属性。 - 编程模型指导我们如何进行代码实现,而延迟计算是 Spark 分布式运行机制的基础。只有搞明白编程模型与延迟计算,你才能流畅地在 Spark 之上做应用开发,在实现业务逻辑的同时,避免埋下性能隐患。 #### C. 编程模型与延迟计算 - map、filter、flatMap 和 reduceByKey 这些算子,有哪些共同点? - 首先,这 4 个算子都是作用(Apply)在 RDD 之上、用来做 RDD 之间的转换。比如,flatMap 作用在 lineRDD 之上,把 lineRDD 转换为 wordRDD。 - 其次,这些算子本身是函数,而且它们的参数也是函数。参数是函数、或者返回值是函数的函数,我们把这类函数统称为“高阶函数”(Higher-order Functions)。 - 这里,我们先专注在 RDD 算子的第一个共性:RDD 转换。 - RDD 是 Spark 对于分布式数据集的抽象,每一个 RDD 都代表着**一种分布式数据形态**。比如 lineRDD,它表示数据在集群中以行(Line)的形式存在;而 wordRDD 则意味着数据的形态是单词,分布在计算集群中。 - RDD 代表的是分布式数据形态,因此,**RDD 到 RDD 之间的转换,本质上是数据形态上的转换(Transformations)**。 - 在 RDD 的编程模型中,一共有两种算子,Transformations 类算子和 Actions 类算子。 - 开发者需要使用 Transformations 类算子,定义并描述数据形态的转换过程,然后调用Actions 类算子,将计算结果收集起来、或是物化到磁盘。 - 在这样的编程模型下,Spark 在运行时的计算被划分为两个环节。 - 基于不用数据形态之间的转换,构建计算流图(DAG,Directed Acyclic Graph); - 通过 Actions 类算子,以回溯的方式去触发执行这个计算流图。 - 换句话说,开发者调用的各类 Transformations 算子,并不立即执行计算,当且仅当开发者调用 Actions 算子时,之前调用的转换算子才会付诸执行。在业内,这样的计算模式有个专门的术语,叫作“延迟计算”(Lazy Evaluation)。 - 为什么 Word Count 在执行的过程中,只有最后一行代码会花费很长时间,而前面的代码都是瞬间执行完毕的呢? - 答案正是 Spark 的延迟计算。flatMap、filter、map 这些算子,仅用于构建计算流图,因此,当你在 spark-shell 中敲入这些代码时,spark-shell 会立即返回。只有在你敲入最后那行包含 take 的代码时, Spark 才会触发执行从头到尾的计算流程,所以直观地看上去,最后一行代码是最耗时的。 - Spark 程序的整个运行流程如下图所示: - ![延迟计算](pic/延迟计算.png) - 常用的 RDD 算子进行了归类,并整理到了下面的表格中,随时查阅 - ![RDD算子归类](pic/RDD算子归类.png) - 参考: https://spark.apache.org/docs/latest/rdd-programming-guide.html ### 2.3 RDD常用算子(一):RDD内部的数据转换 - 重点讲解的就是 map、mapPartitions、flatMap、filter。这 4 个算子几乎囊括了日常开发中 99% 的数据转换场景 - ![RDD算子分类表](pic/RDD算子分类表.png) #### A. 创建 RDD - 在 Spark 中,创建 RDD 的典型方式有两种: - 通过 SparkContext.parallelize 在内部数据之上创建 RDD; - 通过 SparkContext.textFile 等 API 从外部数据创建 RDD。 - 这里的内部、外部是相对应用程序来说的。开发者在 Spark 应用中自定义的各类数据结构,如数组、列表、映射等,都属于“内部数据”; - 而“外部数据”指代的,是 Spark 系统之外的所有数据形式,如本地文件系统或是分布式文件系统中的数据,再比如来自其他大数据组件(Hive、Hbase、RDBMS 等)的数据。 - 第一种创建方式的用法非常简单,只需要用 parallelize 函数来封装内部数据即可,比如下面的例子: ```scala import org.apache.spark.rdd.RDD val words: Array[String] = Array("Spark", "is", "cool") val rdd: RDD[String] = sc.parallelize(words) ``` - 通常来说,在 Spark 应用内定义体量超大的数据集,其实都是不太合适的,因为数据集完全由 Driver 端创建,且创建完成后,还要在全网范围内跨节点、跨进程地分发到其他 Executors,所以往往会带来性能问题。因此,parallelize API 的典型用法,是在“小数据”之上创建 RDD。 - 要想在真正的“大数据”之上创建 RDD,我们还得依赖第二种创建方式,也就是通过SparkContext.textFile 等 API 从外部数据创建 RDD。由于 textFile API 比较简单,而且 它在日常的开发中出现频率比较高,因此我们使用 textFile API 来创建 RDD。在后续对各类 RDD 算子讲解的过程中,我们都会使用 textFile API 从文件系统创建 RDD。 ```scala import org.apache.spark.rdd.RDD val rootPath: String = _ val file: String = s"${rootPath}/wikiOfSpark.txt" // 读取文件内容 val lineRDD: RDD[String] = spark.sparkContext.textFile(file) ``` #### B. RDD 内的数据转换 ##### 1. map:以元素为粒度的数据转换 - map 算子的用法:给定映射函数 f,map(f) 以元素为粒度对 RDD 做数据转换。其中 f 可以是带有明确签名的带名函数,也可以是匿名函数,它的形参类型必须与RDD 的元素类型保持一致,而输出类型则任由开发者自行决定 - 把包含单词的 RDD 转换成元素为(Key,Value)对的 RDD,后者统称为 Paired RDD。 ```scala // 把普通RDD转换为Paired RDD val cleanWordRDD: RDD[String] = _ // 请参考第一讲获取完整代码 val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, 1)) ``` - 上面的代码实现中,传递给 map 算子的形参,即:word => (word,1),就是我们上面说的映射函数 f。只不过,这里 f 是以匿名函数的方式进行定义的,其中左侧的 word表示匿名函数 f 的输入形参,而右侧的(word,1)则代表函数 f 的输出结果。 - 如果我们把匿名函数变成带名函数的话,可能你会看的更清楚一些。用一段代码重新定义了带名函数 f。 ```scala // 把RDD元素转换为(Key,Value)的形式 // 定义映射函数f def f(word: String): (String, Int) = { return (word, 1) } val cleanWordRDD: RDD[String] = _ // 请参考第一讲获取完整代码 val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(f) ```` - 使用 Scala 的 def 语法,明确定义了带名映射函数 f,它的计算逻辑与刚刚的匿名函数是一致的。在做 RDD 数据转换的时候,我们只需把函数 f 传递给 map 算子即可。不管 f 是匿名函数,还是带名函数,map 算子的转换逻辑都是一样的 - 通过定义如下的映射函数 f,我们就可以改写 Word Count 的计数逻辑,也就是把“Spark”这个单词的统计计数权重提高一倍: ```scala // 把RDD元素转换为(Key,Value)的形式 // 定义映射函数f def f(word: String): (String, Int) = { if (word.equals("Spark")) { return (word, 2) } return (word, 1) } val cleanWordRDD: RDD[String] = _ // 请参考第一讲获取完整代码 val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(f) ``` - map(f) 是以元素为粒度对 RDD 做数据转换的,在某些计算场景下,这个特点会严重影响执行效率。为什么这么说呢? - 比方说,我们把 Word Count 的计数需求,从原来的对单词计数,改为对单词的哈希值计数,在这种情况下,我们的代码实现需要做哪些改动呢? ```scala // 把普通RDD转换为Paired RDD import java.security.MessageDigest val cleanWordRDD: RDD[String] = _ // 请参考第一讲获取完整代码 val kvRDD: RDD[(String, Int)] = cleanWordRDD.map{ word => // 获取MD5对象实例 val md5 = MessageDigest.getInstance("MD5") // 使用MD5计算哈希值 val hash = md5.digest(word.getBytes).mkString // 返回哈希值与数字1的Pair (hash, 1) } ``` - 由于 map(f) 是以元素为单元做转换的,那么对于 RDD 中的每一条数据记录,我们都需要实例化一个 MessageDigest 对象来计算这个元素的哈希值。 - 在工业级生产系统中,一个 RDD 动辄包含上百万甚至是上亿级别的数据记录,如果处理每条记录都需要事先创建 MessageDigest,那么实例化对象的开销就会聚沙成塔,不知不觉地成为影响执行效率的罪魁祸首。 - 有没有什么办法,能够让 Spark 在更粗的数据粒度上去处理数据呢? - 还真有,mapPartitions 和 mapPartitionsWithIndex 这对“孪生兄弟”就是用来解决类似的问题。相比 mapPartitions,mapPartitionsWithIndex 仅仅多出了一个数据分区索引 ##### 2. mapPartitions:以数据分区为粒度的数据转换 - mapPartitions,顾名思义,就是以数据分区为粒度,使用映射函数 f 对 RDD 进行数据转换。 - 对于上述单词哈希值计数的例子,我们结合后面的代码,来看看如何使用 mapPartitions 来改善执行性能: ```scala // 把普通RDD转换为Paired RDD import java.security.MessageDigest val cleanWordRDD: RDD[String] = _ // 请参考第一讲获取完整代码 val kvRDD: RDD[(String, Int)] = cleanWordRDD.mapPartitions( partition => { // 注意!这里是以数据分区为粒度,获取MD5对象实例 val md5 = MessageDigest.getInstance("MD5") val newPartition = partition.map( word => { // 在处理每一条数据记录的时候,可以复用同一个Partition内的MD5对象 md5.digest(word.getBytes).mkString }) newPartition }) ``` - 在上面的改进代码中,mapPartitions 以数据分区(匿名函数的形参partition)为粒度,对 RDD 进行数据转换。具体的数据处理逻辑,则由代表数据分区的形参 partition 进一步调用 map(f) 来完成。 - 你可能会说:“partition. map(f) 仍然是以元素为粒度做映射呀!这和前一个版本的实现,有什么本质上的区别呢?” - 仔细观察,你就会发现,相比前一个版本,我们把实例化 MD5 对象的语句挪到了 map 算子之外。如此一来,以数据分区为单位,实例化对象的操作只需要执行一次,而同一个数据分区中所有的数据记录,都可以共享该 MD5 对象,从而完成单词到哈希值的转换。 - 通过下图的直观对比,你会发现,以数据分区为单位,mapPartitions 只需实例化一次MD5 对象,而 map 算子却需要实例化多次,具体的次数则由分区内数据记录的数量来决定。 - ![map与mapPartitions的区别](pic/map与mapPartitions的区别.png) - 对于一个有着上百万条记录的 RDD 来说,其数据分区的划分往往是在百这个量级,因此,相比 map 算子,mapPartitions 可以显著降低对象实例化的计算开销,这对于 Spark 作业端到端的执行性能来说,无疑是非常友好的。 - 实际上。除了计算哈希值以外,对于数据记录来说,凡是可以共享的操作,都可以用mapPartitions 算子进行优化。 - 这样的共享操作还有很多,比如创建用于连接远端数据库的 Connections 对象,或是用于连接 Amazon S3 的文件系统句柄,再比如用于在线推理的机器学习模型,等等,不一而足。 - 相比 mapPartitions,mapPartitionsWithIndex 仅仅多出了一个数据分区索引,这个数据分区索引可以为我们获取分区编号,当你的业务逻辑中需要使用到分区编号的时候,不妨考虑使用这个算子来实现代码。 - 除了这个额外的分区索引以外,mapPartitionsWithIndex在其他方面与 mapPartitions 是完全一样的。 ##### 3. flatMap:从元素到集合、再从集合到元素 - flatMap 其实和 map 与 mapPartitions 算子类似,在功能上,与 map 和 mapPartitions 一样,flatMap 也是用来做数据映射的,在实现上,对于给定映射函数 f,flatMap(f) 以元 素为粒度,对 RDD 进行数据转换。 - 不过,与前两者相比,flatMap 的映射函数 f 有着显著的不同。对于 map 和 mapPartitions 来说,其映射函数 f 的类型,都是(元素) => (元素),即元素到元 素。而 flatMap 映射函数 f 的类型,是(元素) => (集合),即元素到集合(如数组、 列表等)。因此,flatMap 的映射过程在逻辑上分为两步: - 以元素为单位,创建集合; - 去掉集合“外包装”,提取集合元素。 - 假设,我们再次改变 Word Count 的计算逻辑由原来统计单词的计数,改为统计相邻单词共现的次数,如下图所示: - ![变更Word Count计算逻辑](pic/变更Word%20Count计算逻辑.png) - 对于这样的计算逻辑,我们该如何使用 flatMap 进行实现呢?这里我们先给出代码实现,然后再分阶段地分析 flatMap 的映射过程: ```scala // 读取文件内容 val lineRDD: RDD[String] = _ // 请参考第一讲获取完整代码 // 以行为单位提取相邻单词 val wordPairRDD: RDD[String] = lineRDD.flatMap( line => { // 将行转换为单词数组 val words: Array[String] = line.split(" ") // 将单个单词数组,转换为相邻单词数组 for (i <- 0 until words.length - 1) yield words(i) + "-" + words(i+1) }) ``` - 在上面的代码中,我们采用匿名函数的形式,来提供映射函数 f。这里 f 的形参是 String 类型的 line,也就是源文件中的一行文本,而 f 的返回类型是 Array[String],也就是 String 类型的数组。在映射函数 f 的函数体中,我们先用 split 语句把 line 转化为单词数组,然后再用 for 循环结合 yield 语句,依次把单个的单词,转化为相邻单词词对。 - 注意,for 循环返回的依然是数组,也即类型为 Array[String]的词对数组。由此可见,函 数 f 的类型是(String) => (Array[String]),也就是刚刚说的第一步,从元素到集 合。但如果我们去观察转换前后的两个 RDD,也就是 lineRDD 和 wordPairRDD,会发现它们的类型都是 RDD[String],换句话说,它们的元素类型都是 String。 - 回顾 map 与 mapPartitions 这两个算子,我们会发现,转换前后 RDD 的元素类型,与映射函数 f 的类型是一致的。但在 flatMap 这里,却出现了 RDD 元素类型与函数类型不一 致的情况。这是怎么回事呢?其实呢,这正是 flatMap 的“奥妙”所在,为了让你直观地理解 flatMap 的映射过程,我画了一张示意图 - ![flatmap算子的作用范围](pic/flatmap算子的作用范围.png) - 不难发现,映射函数 f 的计算过程,对应着图中的步骤 1 与步骤 2,每行文本都被转化为包含相邻词对的数组。紧接着,flatMap 去掉每个数组的“外包装”,提取出数组中类型 为 String 的词对元素,然后以词对为单位,构建新的数据分区,如图中步骤 3 所示。这就是 flatMap 映射过程的第二步:去掉集合“外包装”,提取集合元素。 - 得到包含词对元素的 wordPairRDD 之后,我们就可以沿用 Word Count 的后续逻辑,去计算相邻词汇的共现次数。 ##### 4. filter:过滤 RDD - 与 map 一样常用的算子:filter。filter,顾名思义, 这个算子的作用,是对 RDD 进行过滤。就像是 map 算子依赖其映射函数一样,filter 算子也需要借助一个判定函数 f,才能实现对 RDD 的过滤转换 - 所谓判定函数,它指的是类型为(RDD 元素类型) => (Boolean)的函数。可以看到, 判定函数 f 的形参类型,必须与 RDD 的元素类型保持一致,而 f 的返回结果,只能是 True 或者 False。在任何一个 RDD 之上调用 filter(f),其作用是保留 RDD 中满足 f(也就是 f 返回 True)的数据元素,而过滤掉不满足 f(也就是 f 返回 False)的数据元素。 - filter 算子与判定函数 f - 在上面 flatMap 例子的最后,我们得到了元素为相邻词汇对的 wordPairRDD,它包含的是像“Spark-is”、“is-cool”这样的字符串。为了仅保留有意义的词对元素,我们希望 结合标点符号列表,对 wordPairRDD 进行过滤。例如,我们希望过滤掉像“Spark-&”、“|-data”这样的词对。 - 实现这样的过滤逻辑 ```scala // 定义特殊字符列表 val list: List[String] = List("&", "|", "#", "^", "@") // 定义判定函数f def f(s: String): Boolean = { val words: Array[String] = s.split("-") val b1: Boolean = list.contains(words(0)) val b2: Boolean = list.contains(words(1)) return !b1 && !b2 // 返回不在特殊字符列表中的词汇对 } // 使用filter(f)对RDD进行过滤 val cleanedPairRDD: RDD[String] = wordPairRDD.filter(f) ``` - 掌握了 filter 算子的用法之后,你就可以定义任意复杂的判定函数 f,然后在 RDD 之上通过调用 filter(f) 去变着花样地做数据过滤,从而满足不同的业务需求。 ### 2.4 进程模型与分布式部署:分布式计算是怎么回事?