Spark API in Depth
文章目录
- pair RDDs
- Creating
- Basic pair RDD functions
- getting keys and values
- counting values per key
- looking up values for a single key
- using the mapValues transformation to change values in a pair RDD
- using the flatMapValues transformation to add values to keys
- using reduceByKey transformation to merge all values of a key
- using aggregateByKey to group all values of a key
- data partition and reducing data shuffling
- Repartitioning RDDs
- Joining, sorting, and grouping data
- Using accumulators and broadcast variables to communicate with Spark executors
- 总结
《Spark in Action》Chapter4
个人总结:
- DStream <- a seq of RDDs <- Partitions
- key-value RDD = pair RDD manipulated by
funtionByKey()
pair RDDs
key-value 模型是一种简单、通用的数据模型。
Creating
利用map将RDD中数据转换成二元组(f(element), element),即RDD->pair RDD。
原理:二元组形式的RDD会隐式转换为PairRDDFunctions的实例(Scala语法)。
Basic pair RDD functions
getting keys and values
1 | pRDD.keys.distinct.count() |
counting values per key
1 | pRDD.countByKey() |
looking up values for a single key
1 | transByCust.lookup(53) |
using the mapValues transformation to change values in a pair RDD
只改变值
1 | transByCust = transByCust.mapValues(tran => { |
using the flatMapValues transformation to add values to keys
此函数可以给一个键添加多个值或一起移除某个键(键的数量会增减)。
From each of the values in the return collection, a new key-value pair is created for the corresponding key.
1 | transByCust = transByCust.flatMapValues(tran => { |
using reduceByKey transformation to merge all values of a key
foldByKey 与 reduceByKey 类似,区别在于需要一个额外的 zeroValue 参数。
foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
由于RDD的并行特性,zeroValues可能被多次使用。
1 | val amounts = transByCust.mapValues(t => t(5).toDouble) |
using aggregateByKey to group all values of a key
data partition and reducing data shuffling
例如,当从本地文件系统加载文件到Spark时,文件内容被分割成分片,最终被分配到集群的结点上。这些分片形成了RDD,同一结点上可能不止一个分片。每个RDD维护一个分片列表。
1 | println(transByProd.partitions.length) |
The number of RDD partitions is important because, in addition to influencing data distribution throughout the cluster, it also directly determines the number of tasks that will be running RDD transformations. If this number is too small, the cluster will be underutilized. Furthermore, memory problems could result, because working sets might get too big to fit into the memory of executors. We recommend using three to four times more partitions than there are cores in your cluster. Moderately larger values shouldn’t pose a problem, so feel free to experiment. But don’t get too crazy,because management of a large number of tasks could create a bottleneck.
using Spark’s data partitioners
两种:HashPartitioner、RangePartitioner。也可定制。
默认是HashPartitioner。
partitionIndex = hashCode % numberOfPartitions
Understanding and avoiding unnecessary shuffling
Physical movement of data between partitions is called shuffling.
It occurs when data from multiple partitions needs to be combined in order to build partitions for a new RDD .When grouping elements by key,shuffling occurs.
When grouping elements by key,shuffling occurs.
Shuffling when explicitly(显式) changing partitioners
Because changing the partitioner provokes shuffles, the safest approach,performance-wise(性能优先), is to use a default partitioner as much as possible and avoidinadvertently causing a shuffle.
shuffing caused by partitions removal
详细等学Spark优化再看。
Repartitioning RDDs
partitionBy
只有pair RDD可用,接受一个Partitioner作为参数;当此Partition与原先的不一样时,shuffle发生,重新分片。
collecting partition data with a glom transfotamtion
glom(意思同 grab),即将每个分片合成一个数组,用返回的RDD将这些数组作为元素。新RDD的数组数量等于之前的分片数量。这个过程中,partitioner被移除。
Joining, sorting, and grouping data
join((K, (V, W))) 非空
leftOuterJoin (K, (V, Option(W)))
rightOuterJoin (K, (Option(V), W))
fullOuterJoin (K, (Option(V), Option(W))
using subtract(差集)
…
Using accumulators and broadcast variables to communicate with Spark executors
作用:维护全局状态或在任务和分片之间共享数据。
sending data to executors using broadcast variables(bv)
能在集群间共享,与accumulators不同的是,其不能被执行器修改。驱动创造bv,然后执行器读取它。
当有一个大集合数据需要被绝大多数执行器使用时,应使用bv。
创建:SparkContext.broadcast(value)
,读Broadcast.value
。
总结
pair RDD含二元组:keys 和 values。
scala中pair RDD隐式转换为pairRDDFuctions的实例,它有专有的pair RDD操作。
countByKey返回map,含每个键出现的次数。
mapValues,只改变pair RDD的值。
flatMapValues,一个键能对应零个或多个值(键的总数增加)。
reduceByKey和foldByKey,归约同一个键的所有值到一个值,值类型不变。
aggregateByKey,聚合值,但转换值到其它类型。
Data Partition,是Spark的一种在一个集群的多个结点间分数据的机制。
The number of RDD partitions is important because, in addition to influencing
data distribution throughout the cluster, it also directly determines the number
of tasks that will be running RDD transformations.shffuling时,数据不仅被写到硬盘,而且在网络之间传输。因此,使Spark作业中的shuffle数最小化是重要的。
Every Spark job is divided into stages based on the points where shuffles occur.