实时计算

任务:计算每秒的买、卖数,买或卖总量前五的客户端及最近一小时前五的证券。

《Spark in Action》 chapter 6

Spark流处理概念图。

Spark是面向批处理的,以mini-batches来在实时计算中应用Spark的批处理特性。

以下用以Kafka作为数据源为例。

一、数据源

1、启动Kafka

zookeeper-server-start .\zookeeper.properties(Kafka依赖zookeeper)

kafka-server-start .\server.properties

2、Kafka topic

kafka-topics –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic orders

kafka-topics –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic metrics

以上创建orders和metrics两topic。

3、做好发送数据的准备

可以用一个脚本模拟数据发送。

1
2
3
4
cat orders.txt | while read line; do
echo "$line"
sleep 0.1
done | kafka-console-producer.bat --broker-list $BROKER --topic orders

每隔0.1s向orders生产数据,$BROKER='localhost:9092'(9092kafka默认端口)。

1、创建StreamingContext上下文,设为ssc。本地模式。

1
2
3
val conf = new SparkConf().setMaster("local[*]")
.setAppName("Orders")
val ssc = new StreamingContext(conf, Seconds(5))

第二个参数,指定了Streaming分割输入数据和创建mini-batch的时间间隔。时间间隔暂定为5s,再详谈。

2、连接Kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "group1",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("orders")
val kafkaStream = KafkaUtils
.createDirectStream[String, String](ssc,
PreferBrokers, Subscribe[String, String](topics, kafkaParams))

kafka流中数据结构是key-value型。访问:.key() .value()

3、处理数据orders.txt

每行是一笔交易。Schema如下:

Order timestamp —Format yyyy-mm-dd hh:MM:ss
Order ID —Serially incrementing integer
Client ID —Integer randomly picked from the range 1 to 100
Stock symbol —Randomly picked from a list of 80 stock symbols
Number of stocks to be bought or sold —Random number from 1 to 1,000
Price at which to buy or sell—Random number from 1 to 100
Character B or S —Whether the event is an order to buy or sell

按以上构造一个case class。

case适用于Bean

编译器会添加 1、工厂方法。 2、域访问。 3、 toString, hashCode, and equals。4、copy方法。

1
2
case class Order(time:Timestamp, orderId: Long, cliendId: Long,
symbol: String, amount: Int, price: Double, buy: Boolean)

然后处理Kafka中的value()。(kafka中的key()只是标识,不是我们所需的key(),要自己构造key-value)

1
val orders = kafkaStream.flatMap(....)

4、构造key-value型DStream

由于二元组DStream隐式转换成PairDStreamFunctions的实例。这样,xxxByKey,flatMapValues这些都能派上用场了。

任务一:要计算每秒的买、卖数,做法:以order.buy类型为参数,构造key-value…

1
val numPerType = orders.map(o => (o.buy, 1L)).reduceByKey(_ + _)

任务二:前五…

任务一只需要当前批处理的数据,但任务二需要追踪时间和不同的mini-batches。

使用updateStateByKey方法。

1
2
3
4
5
6
7
8
9
10
val amountPerClient = orders.map(o => (o.clientId, o.amount * o.price))

//累加
val amountState = amountPerClient.updateStateByKey(
(vals, totalOpt: Option[Double]) => {
totalOpt match {
case Some(total) => Some(vals.sum + total)
case None => Some(vals.sum)
}
})

然后提取前五客户端ID。

为了每个Batch处理间隔只写一次结果,将以上结果倍合并。

使用mapWithState

此方法是updateStateByKey的性能改善。此方法只有一个参数,即StateSpec的实例。

StateSpec的函数签名

(Time, KeyType, Option[ValueType], State[StateType]) => Option[MappedType]

State对象的方法:

exsits()——若状态已定义,则返回true

get()——获得状态值

remove()——移除

update()——更新或设置键的状态值

1
2
3
4
5
6
7
8
9
10
11
12
val updateAmountState = (cliendId: Long, amount: Option[Double],
state: State[Double]) => {
var total = amount.getOrElse(0.toDouble)
if (state.exists())
total += state.get()

state.update(total) //total设置为状态
Some((cliendId, total))//Option有两种类型:Some()和None
}

val amountState = amountPerClient.mapWithState(StateSpec
.function(updateAmountState)).stateSnapshots()

Without that last method, stateSnapshots , you’d get a DStream with client ID s and
their total amounts, but only for the clients whose orders arrived during the current
mini-batch. stateSnaphots gives you a DStream with the whole state (all clients), just
like updateStateByKey.

使用window操作来处理限制时间的计算

任务三:每小时前五

这里的窗口时间是一小时,滑动时间间隔可以设为批处理时间间隔(5s),这样可以在每个批处理间隔与其它指标一起产生。

1
2
3
4
5
6
7
8
//滑动时间间隔默认是mini-batch时间间隔
val stocksPerWindow = orders.map(x => (x.symbol, x.amount))
.window(Minutes(60)).reduceByKey(_ + _)

val top5ClList = top5Clients.repartition(1)
.map(_._1.toString)
.glom() //每个RDD中的partition聚合成Array
.map(arr => ("TOPCLIENTS", arr.toList))

以上三类:每个批处理、批处理叠加、时间限制。

5、写回kafka。

方式:Producer.send()

最佳模式:每个JVM只创建一个Producer实例(单例模式)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
case class KafkaProducerWrapper(brokerList: String) {
val producerProps: Properties = {
val prop = new Properties()
prop.put("bootstrap.servers", brokerList)
prop.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer")
prop.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer")
prop
}

val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps)

def send(topic: String, key: String, value: String): Unit = {
producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic,
key.toCharArray.map(_.toByte),
value.toCharArray.map(_.toByte)))
}
}

object KafkaProducerWrapper {
var brokerList = ""
lazy val instance = new KafkaProducerWrapper(brokerList)//首次使用时实例化
}

在metrics主题下消费数据:

kafka-console-consumer –bootstrap-server localhost:9092 –topic metrics –property print.key=true