《Spark in Action》Chapter4

个人总结:

  • DStream <- a seq of RDDs <- Partitions
  • key-value RDD = pair RDD manipulated by funtionByKey()

pair RDDs

key-value 模型是一种简单、通用的数据模型。

Creating

利用map将RDD中数据转换成二元组(f(element), element),即RDD->pair RDD。

原理:二元组形式的RDD会隐式转换为PairRDDFunctions的实例(Scala语法)。

Basic pair RDD functions

getting keys and values
1
pRDD.keys.distinct.count()
counting values per key
1
2
3
4
pRDD.countByKey()

// 求最值
val (cid, purch) = transByCust.countByKey.toSeq.maxBy(_._2)
looking up values for a single key
1
2
3
4
transByCust.lookup(53)

// 打印结果
transByCust.lookup(53).foreach(tran => println(tran.mkString(", ")))
using the mapValues transformation to change values in a pair RDD

只改变值

1
2
3
4
5
6
transByCust = transByCust.mapValues(tran => {
if (tran(3).toInt == 25 && tran(4).toDouble > 1)
tran(5) = (tran(5).toDouble * 0.95).toString

tran
})
using the flatMapValues transformation to add values to keys

此函数可以给一个键添加多个值或一起移除某个键(键的数量会增减)。

From each of the values in the return collection, a new key-value pair is created for the corresponding key.

1
2
3
4
5
6
7
8
9
10
11
transByCust = transByCust.flatMapValues(tran => {
if (tran(3).toInt == 81 && tran(4).toDouble >= 5) {
val cloned = tran.clone()
cloned(5) = "0.00"
cloned(3) = "70"
cloned(4) = "1"
List(tran, cloned)
} else {
List(tran)
}
})s
using reduceByKey transformation to merge all values of a key

foldByKey 与 reduceByKey 类似,区别在于需要一个额外的 zeroValue 参数。

foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

由于RDD的并行特性,zeroValues可能被多次使用。

1
2
val amounts = transByCust.mapValues(t => t(5).toDouble)
val totals = amounts.foldByKey(0)(_ + _).collect()
using aggregateByKey to group all values of a key

data partition and reducing data shuffling

例如,当从本地文件系统加载文件到Spark时,文件内容被分割成分片,最终被分配到集群的结点上。这些分片形成了RDD,同一结点上可能不止一个分片。每个RDD维护一个分片列表。

1
println(transByProd.partitions.length)

The number of RDD partitions is important because, in addition to influencing data distribution throughout the cluster, it also directly determines the number of tasks that will be running RDD transformations. If this number is too small, the cluster will be underutilized. Furthermore, memory problems could result, because working sets might get too big to fit into the memory of executors. We recommend using three to four times more partitions than there are cores in your cluster. Moderately larger values shouldn’t pose a problem, so feel free to experiment. But don’t get too crazy,because management of a large number of tasks could create a bottleneck.

using Spark’s data partitioners

两种:HashPartitioner、RangePartitioner。也可定制。

默认是HashPartitioner。

partitionIndex = hashCode % numberOfPartitions

Understanding and avoiding unnecessary shuffling

Physical movement of data between partitions is called shuffling.

It occurs when data from multiple partitions needs to be combined in order to build partitions for a new RDD .When grouping elements by key,shuffling occurs.

When grouping elements by key,shuffling occurs.

Shuffling when explicitly(显式) changing partitioners

Because changing the partitioner provokes shuffles, the safest approach,performance-wise(性能优先), is to use a default partitioner as much as possible and avoidinadvertently causing a shuffle.

shuffing caused by partitions removal

详细等学Spark优化再看。

Repartitioning RDDs

partitionBy

只有pair RDD可用,接受一个Partitioner作为参数;当此Partition与原先的不一样时,shuffle发生,重新分片。

collecting partition data with a glom transfotamtion

glom(意思同 grab),即将每个分片合成一个数组,用返回的RDD将这些数组作为元素。新RDD的数组数量等于之前的分片数量。这个过程中,partitioner被移除。

Joining, sorting, and grouping data

join((K, (V, W))) 非空

leftOuterJoin (K, (V, Option(W)))

rightOuterJoin (K, (Option(V), W))

fullOuterJoin (K, (Option(V), Option(W))

using subtract(差集)

Using accumulators and broadcast variables to communicate with Spark executors

作用:维护全局状态或在任务和分片之间共享数据。

sending data to executors using broadcast variables(bv)

能在集群间共享,与accumulators不同的是,其不能被执行器修改。驱动创造bv,然后执行器读取它。

当有一个大集合数据需要被绝大多数执行器使用时,应使用bv。

创建:SparkContext.broadcast(value),读Broadcast.value

总结

  • pair RDD含二元组:keys 和 values。

  • scala中pair RDD隐式转换为pairRDDFuctions的实例,它有专有的pair RDD操作。

  • countByKey返回map,含每个键出现的次数。

  • mapValues,只改变pair RDD的值。

  • flatMapValues,一个键能对应零个或多个值(键的总数增加)。

  • reduceByKey和foldByKey,归约同一个键的所有值到一个值,值类型不变。

  • aggregateByKey,聚合值,但转换值到其它类型。

  • Data Partition,是Spark的一种在一个集群的多个结点间分数据的机制。

  • The number of RDD partitions is important because, in addition to influencing
    data distribution throughout the cluster, it also directly determines the number
    of tasks that will be running RDD transformations.

  • shffuling时,数据不仅被写到硬盘,而且在网络之间传输。因此,使Spark作业中的shuffle数最小化是重要的。

  • Every Spark job is divided into stages based on the points where shuffles occur.