0%

Spark RDD

背景介绍

Spark中的最基本的数据抽象就是RDD(Resilient Distributed Dataset, 弹性分布式数据集)。传统的MapReduce框架下,中间计算结果需要写入硬盘来防止运行结果丢失,而调用中间结果对硬盘进行读写是比较耗时的。RDD则是一个基于分布式内存的数据抽象,并且提供了很多计算函数。这样的特点使得Spark比MapReduce快很多。

RDD的定义

RDD表示已被分区、不可变的,并能够被并行操作的数据集合

分区

分区代表同一个RDD包含的数据被存储在系统的不同节点中,这是它可以被并行处理的前提。

逻辑上,我们可以认为RDD是一个大的数组,数组的每一个元素代表一个分区。在物理存储中,每个分区指向一个存放在内存或者硬盘中的数据块,这些数据块可以被存放在系统中的不同节点。实际上,RDD只是抽象意义的数据集合,分区内部不会存储具体的数据。

下图给出RDD的分区逻辑结构:

RDD的分区逻辑结构

RDD的分区存有数据块的index,通过RDD的ID和分区的index可以唯一确定数据块的编号,来取得相应数据。

在集群中,各个节点上的数据块会尽可能地放在内存中,内存没有空间才会存入硬盘,这样可以最大化地减少I/O操作。

不可变性
不可变性代表每个一RDD都是可读的,它所包含的分区信息不可以被改变。

并行操作
由于单个RDD的分区特性,使得它天然支持并行操作。

RDD的结构

以下是一个RDD的简易结构示意图:

RDD的结构简单示意图

SparkContext是所有Spark功能的入口,它代表了与Spark节点的连接,可以用来创建RDD对象以及在节点中的广播变量等。一个线程只有一个SparkContext。

SparkConf则是一些参数配置信息。

Partitioner代表了RDD中数据的逻辑结构,每个Partition会映射到某个节点或硬盘的一个数据块。Partitioner决定了RDD的分区方式。有两种主流分区方式:Hash partitioner、Range partitioner。Hash是对数据的Key进行散列分区,Range则是按照Key的排序进行均匀分区。也可以创建自定义的Partitioner。

Dependencies叫做依赖关系。Spark不需要将每个中间计算结果进行数据复制以防丢失。因为每一步产生的RDD里都会存储它的依赖关系,即它是通过哪个RDD经过哪个转换得到的。Spark支持两种依赖关系:窄依赖(Narrow Dependency)和宽依赖(Wide Dependency)。

窄依赖就是父RDD的分区可以一一对应到子RDD的分区,宽依赖就是父RDD的每个分区可以被多个子RDD的分区使用。

窄依赖

窄依赖

宽依赖

宽依赖

窄依赖允许子RDD的每个分区可以被并行处理,而宽依赖则必须等父RDD的所有分区都被计算好才能开始处理。

窄依赖可以支持在同一个节点上链式执行多条命令,如在map下可接着执行filter。并且窄依赖的失败恢复更有效。但是如果一个RDD的依赖链比较长,且中间有多个RDD出现故障的话,进行恢复会很耗时。检查点(Checkpoint)可以优化这种情况下的数据恢复。检查点机制指在连续的transaction列表中记录某几个transaction后数据的内容,从而加快数据恢复。

在计算过程中,对于一些计算过程比较耗时的RDD,可以将它缓存到硬盘或HDFS中,标记这个RDD有被检查点处理过,并且清空它的所有依赖关系。同时,给它新建一个依赖于CheckpointRDD的依赖关系,CheckpointRDD可以用来从硬盘中读取RDD和生成新的分区信息。当某个子RDD需要错误恢复时,回溯至该RDD,发现它被检查点记录过,就可以直接到硬盘中读取这个RDD,不用再向前回溯计算。

存储级别(Storage Level)是一个用来记录RDD持久化时的存储级别。在后面4.4节中会有更进一步的介绍。

RDD编程(示例均用Scala编写)

创建RDD

有两种创建RDD的方式:

  1. 读取外部数据集
  2. 对一个集合进行并行化,即把已有的一个集合传给SparkContext的parallelize()方法。

一、 读取外部数据集

Scala中的textFile()方法:

1
val lines = sc.textFile("/path/README.md")

二、对一个集合并行化:

1
val lines = sc.parallelize(List("pandas","i like pandas"))

RDD操作

RDD有两种数据操作:转换操作(Transformation)和动作操作(Action)。

转换操作的结果是返回一个新的RDD,而动作操作是返回一个结果,在返回结果的时候会触发实际的计算。简单来说,可以通过返回值类型来判断是转换操作还是动作操作。转换操作返回的是RDD,动作操作返回的是其他的数据类型。

转化操作

转换操作是得到新RDD的操作,后面4.3节会提到,转化出的RDD是惰性求值的,只有在动作操作中用到这些RDD时会对其进行计算。

map转换操作

map()接收一个函数,然后把函数应用于RDD中的每个元素,将函数的返回结果作为新生成的RDD中对应元素的值。

在spark-shell运行的例子:

map函数

flatMap转换操作

flatMap()将函数应用于RDD中的每个元素,将返回的迭代器的所有内容构成新的RDD。

在spark-shell运行的例子:

flatMap函数

map和flatMap的区别

map

map和flatMap的区别

flatMap

flatMap

filter转换操作

filter()返回一个由通过传给filter()的函数的元素组成的RDD。

在spark-shell运行的例子:

filter函数

注:collect()函数是一个动作操作,会以数组形式返回RDD中的所有元素。

distinct转换操作

distinct()函数的作用是去重。

在spark-shell运行的例子:

distinct函数

动作操作

前面的例子中用到的first()、collect()都是动作操作。之前提过,collect会以数组形式返回RDD的所有元素。值得注意的是,collect操作只有在数组所含的数据量较小的时候使用,因为所有的数据都会载到程序的内存中,如果数据量较大,会占用大量JVM内存。

其他常用动作操作还有count()、reduce()、countByKey等。

reduce动作操作

reduce函数是操作两个相同元素类型的RDD数据并返回一个同样类型的新元素。

以函数+为例,可以用它来对我们的RDD进行累加。使用reduce,可以很方便地算出RDD中所有元素的总和。

在spark-shell运行的例子:

reduce函数

count动作操作

count操作的作用是返回RDD中的所有元素。

在spark-shell运行的例子:

count函数

countByKey动作操作

仅适用于Key-Value pair类型的RDD,返回具有每个Key的计数的<Key, Count>的Map。

在spark-shell中运行的例子:

countByKey函数

惰性求值

RDD的转换操作都是惰性求值的,这意味着调动动作操作之前Spark不会开始计算。前面提到过,转换操作得到的是一个新的RDD,而动作操作得到的是其他的数据类型。惰性求值的优势在于可以让Spark的运算更加高效和快速。 比如,当我们读取一个文件生成了rdd后,又用filter筛选出符合某些要求的元素,最后用first返回第一个元素。由于Spark只是在使用first的时候才开始真正的运算,此时的Spark就可以只扫描第一个匹配的行,不需要读取整个文件。

总结Spark的计算逻辑:在每次转换操作的时候,使用了新产生的RDD来记录逻辑,这样就把作用在RDD上的计算逻辑串起来,形成一个链条。当对RDD进行动作操作时,Spark会从计算链的最后一个RDD开始,依此从上一个RDD获取数据并执行计算逻辑,最后输出结果。

持久化(缓存)

上面提到RDD是惰性求值的,如果对RDD采用动作操作,Spark每次都会重算RDD以及它所有的依赖。如果某个RDD被重复使用的话,每次都从头计算非常低效。因此,可以对多次使用的RDD进行持久化操作。

persist()和cache()方法支持将RDD的数据缓存至内存或硬盘中。下次使用该RDD时,可以直接读取RDD的结果。前面提到RDD的结构中还包含了Storage Level,指持久化级别。常用的持久化级别有以下几个:

  • MEMORY_ONLY:只缓存在内存中。
  • MEMORY_AND_DISK:缓存在内存中,空间不够则缓存在硬盘中。
  • DISK_ONLY:只缓存在硬盘中。

注:有时在持久化级别加上“_2”表示把持久化的数据存在两个节点上建立副本。如:MEMORY_ONLY_2。

persist操作

1
2
val rdd = sc.parallelize(List(1,2,3,3))
rdd.persist()

加上存储级别示例:

1
2
val rdd = sc.parallelize(List(1,2,3,3))
rdd.persist(rdd.persist(StorageLevel.DISK_ONLY))

注:unpersist()可以删除持久化。

参考资料:

  1. 《Spark快速大数据分析》
  2. 极客时间课程——《大规模数据处理实战》
  3. https://blog.csdn.net/mingWar/article/details/79643126