Spark Streaming base on Kafka
实时计算
任务:计算每秒的买、卖数,买或卖总量前五的客户端及最近一小时前五的证券。
《Spark in Action》 chapter 6
Spark流处理概念图。
Spark是面向批处理的,以mini-batches来在实时计算中应用Spark的批处理特性。
以下用以Kafka作为数据源为例。
一、数据源
1、启动Kafka
zookeeper-server-start .\zookeeper.properties(Kafka依赖zookeeper)
kafka-server-start .\server.properties
2、Kafka topic
kafka-topics –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic orders
kafka-topics –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic metrics
以上创建orders和metrics两topic。
3、做好发送数据的准备
可以用一个脚本模拟数据发送。
1 | cat orders.txt | while read line; do |
每隔0.1s向orders生产数据,$BROKER='localhost:9092'
(9092kafka默认端口)。
1、创建StreamingContext上下文,设为ssc。本地模式。
1 | val conf = new SparkConf().setMaster("local[*]") |
第二个参数,指定了Streaming分割输入数据和创建mini-batch的时间间隔。时间间隔暂定为5s,再详谈。
2、连接Kafka
1 | val kafkaParams = Map[String, Object]( |
kafka流中数据结构是key-value型。访问:.key() .value()
3、处理数据orders.txt
每行是一笔交易。Schema如下:
Order timestamp —Format yyyy-mm-dd hh:MM:ss
Order ID —Serially incrementing integer
Client ID —Integer randomly picked from the range 1 to 100
Stock symbol —Randomly picked from a list of 80 stock symbols
Number of stocks to be bought or sold —Random number from 1 to 1,000
Price at which to buy or sell—Random number from 1 to 100
Character B or S —Whether the event is an order to buy or sell
按以上构造一个case class。
case适用于Bean
编译器会添加 1、工厂方法。 2、域访问。 3、 toString, hashCode, and equals。4、copy方法。
1 | case class Order(time:Timestamp, orderId: Long, cliendId: Long, |
然后处理Kafka中的value()。(kafka中的key()只是标识,不是我们所需的key(),要自己构造key-value)
1 | val orders = kafkaStream.flatMap(....) |
4、构造key-value型DStream
由于二元组DStream隐式转换成PairDStreamFunctions的实例。这样,xxxByKey,flatMapValues这些都能派上用场了。
任务一:要计算每秒的买、卖数,做法:以order.buy类型为参数,构造key-value…
1 | val numPerType = orders.map(o => (o.buy, 1L)).reduceByKey(_ + _) |
任务二:前五…
任务一只需要当前批处理的数据,但任务二需要追踪时间和不同的mini-batches。
使用updateStateByKey方法。
1 | val amountPerClient = orders.map(o => (o.clientId, o.amount * o.price)) |
然后提取前五客户端ID。
为了每个Batch处理间隔只写一次结果,将以上结果倍合并。
使用mapWithState
此方法是updateStateByKey的性能改善。此方法只有一个参数,即StateSpec的实例。
StateSpec的函数签名
(Time, KeyType, Option[ValueType], State[StateType]) => Option[MappedType]
State对象的方法:
exsits()——若状态已定义,则返回true
get()——获得状态值
remove()——移除
update()——更新或设置键的状态值
1 | val updateAmountState = (cliendId: Long, amount: Option[Double], |
Without that last method, stateSnapshots , you’d get a DStream with client ID s and
their total amounts, but only for the clients whose orders arrived during the current
mini-batch. stateSnaphots gives you a DStream with the whole state (all clients), just
like updateStateByKey.
使用window操作来处理限制时间的计算
任务三:每小时前五
这里的窗口时间是一小时,滑动时间间隔可以设为批处理时间间隔(5s),这样可以在每个批处理间隔与其它指标一起产生。
1 | //滑动时间间隔默认是mini-batch时间间隔 |
以上三类:每个批处理、批处理叠加、时间限制。
5、写回kafka。
方式:Producer.send()
最佳模式:每个JVM只创建一个Producer实例(单例模式)
1 | case class KafkaProducerWrapper(brokerList: String) { |
在metrics主题下消费数据:
kafka-console-consumer –bootstrap-server localhost:9092 –topic metrics –property print.key=true