1.官网简介
2.参考资料
1,必读:再讲Spark与kafka 0.8.2.1+整合
2,必读:Spark与kafka010整合
3,spark streaming kafka 整合(010-Consumer)
3.两版本代码整理
(1).spark-streaming-kafka-0-8
1),当前版本依赖的jar包
第一和第二个jar包可以直接在当前版本的lib目录中拷贝。第三个jar包需要从本地下载好,然后上传执行。所以在本地的jar路径中我保存了一份完整的jar包。
[root@localhost jars]# pwd
/usr/local/spark/jars
[root@localhost jars]# ll | grep kafka
-rw-r--r--. 1 root root 3954430 Mar 20 11:34 kafka_2.11-0.8.2.1.jar
-rw-r--r--. 1 root root 324010 Mar 20 11:34 kafka-clients-0.8.2.1.jar
-rw-r--r--. 1 root root 298522 Mar 19 16:07 spark-streaming-kafka-0-8_2.11-2.1.0.jar
2).producer(生产者)
import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
// 指定再在哪个kafka服务器运行
val brokers="localhost:9092"
// 指定topic
val topic="demo01"
// 指定每秒钟向topic放多少消息
val messagesPerSec=3
// 每个消息中包含多少个单词
val wordsPerMessage=5
// 以下四行代码一般是固定的写法,之后直接复制粘贴就可以
// 使用键值对的方式将
val props = new HashMap[String, Object]()
// 指定 运行
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
// 对key和value都去序列化,且因为我定义的Key和Value的类型都是String的,所以直接用的 StringSerializer去序列化
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
// 生产者
val producer = new KafkaProducer[String, String](props)
// 在while循环中不断的生成数据
while(true) {
// 控制一秒钟生成多少数据(1 to XX 会生成 Range 类型变量)
(1 to messagesPerSec.toInt).foreach { messageNum =>
// 控制一条消息中有几个单词 1 to XXX,接下来对集合中每一个元素执行map操作
val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString) // Random的nextInt(10)的作用是 生成0-9之间的一个随机数
.mkString(" ") // 指定生成随机数根据什么符号进行分割
print(str)
println()
// 生产者 生产出来的消息类型都是 ProducerRecord 类型的,
// 指定 主题、key、value
val message = new ProducerRecord[String, String](topic, null, str)
// 将消息对象发送出去
producer.send(message)
}
Thread.sleep(3000) // 线程休眠(3秒)
}
3).consumer(消费者)
import _root_.kafka.serializer.StringDecoder
import org.apache.spark._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka.KafkaUtils
// 设置与brokers获取连接
val brokers="localhost:9092"
// 可以指定多个分区,然后使用 "," 分割
val topics="demo01"
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount2").setMaster("local[2]")
// 创建具有2秒批处理间隔的上下文
val ssc = new StreamingContext(sparkConf, Seconds(2))
// 为了 容错 (目录不存在的时候,其可以自动创建)
ssc.checkpoint("file:///usr/local/spark/mycode/kafka/checkpoint") // 设置检查点,如果存放在HDFS上面,则写成类似ssc.checkpoint("/user/hadoop/checkpoint")这种形式,但是,要启动hadoop
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
//
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
// Get the lines, split them into words, count the words and print
// 由于Producer生产的数据,第一个值是null,我们用不到,所以直接去数据的第二个数据
val lines = messages.map(_._2)
// 拿到数据进行拆分,拿到每一个单词
val words = lines.flatMap(_.split(" "))
// 对每一个单词,再去做map变换,然后进行计数
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
// 将结果打印
wordCounts.print()
// Start the computation
ssc.start()
ssc.awaitTermination()
4).知识补充
-
Direct模式简介
- 在spark 1.3以后引入了一种新的spark Streaming api,新的api回自己在driver内部维护一个偏移,然后自动计算指定的 topic+partition 该批次需要拉去数据的范围,然后从kafka拉去数据来计算。
- 不同于基于Receiver的方式,direct模式不会将偏移记录到Zookeeper,以保证故障恢复从上次偏移处消费消息。Direct模式采用的是,你可以通过Checkpoint或者自己编写工具来实现偏移的维护,保证数据消费不丢失。
-
Direct模式优势
- 简化并行度:不需要创建多个kafka stream,然后union他们。使用directStream,spark streaming 生成的RDD分区和kafka的分区是一一对应的,这种方式理解起来更简单而且便于调优。
-
高效:
- 基于Receiver的方式要保证数据不丢失,必须启用预写日志。这个行为实际上是非常低效的,数据会被复制两次,一次是kafka集群,一次是预写日志。
- Direct方式解决了这个问题,由于没有Receiver,故而也不需要预写日志。只要你kafka里面存有数据,那么消息就可以从kafka里面恢复。
-
仅一次消费语义:
- 基于Receiver的会把偏移提交到Zookeeper。这种方式结合预写日志能保证数据不丢失,也即是最少一次消费语义,但是有几率导致消费者在存在失败的情况下消费消息两次。比如,消息处理并经过存储之后,但是偏移并没有提交到Zookeeper,这个时候发生故障了,那么恢复之后,就会按照Zookeeper上的偏移再一次消费数据并处理,导致消息重复处理。
- 但是direct 方式偏移不会提交到Zookeeper,是spark streaming在driver使用内存变量加Checkpoint进行追踪的,所以尽管会存在任务失败,但是仍然能保证消费的一次处理。
- 注意:由于direct方式不会提交偏移到Zookeeper,所以,基于Zookeeper的kafka监控工具就不能监控到spark streaming的消费情况。然而,你可以自己讲偏移提交道Zookeeper,来满足你的需求。
(2).spark-streaming-kafka-0-10
1).当前依赖的jar包
第一个jar包可以直接在当前版本的lib目录中拷贝。第二个jar包需要从本地下载好,然后上传执行。所以在本地的jar路径中我保存了一份完整的jar包。
[root@master jars]# pwd
/usr/local/spark/jars
[root@master jars]# ll | grep kafka
-rw-r--r--. 1 root root 946811 Mar 20 16:02 kafka-clients-0.10.2.0.jar
-rw-r--r--. 1 root root 190413 Mar 20 14:51 spark-streaming-kafka-0-10_2.11-2.3.0.jar
2).Producer(生产者)
import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
// 注意当前版本的stream对象是与旧版本不一样的
import org.apache.spark.streaming.kafka010._
// 指定再在哪个kafka服务器运行
val brokers="localhost:9092"
// 指定topic
val topic="demo01"
// 指定每秒钟向topic放多少消息
val messagesPerSec=3
// 每个消息中包含多少个单词
val wordsPerMessage=5
// 以下四行代码一般是固定的写法,之后直接复制粘贴就可以
// 使用键值对的方式将
val props = new HashMap[String, Object]()
// 指定 运行
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
// 对key和value都去序列化
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
// 生产者
val producer = new KafkaProducer[String, String](props)
// 在while循环中不断的生成数据
while(true) {
// 控制一秒钟生成多少数据(1 to XX 会生成 Range 类型变量)
(1 to messagesPerSec.toInt).foreach { messageNum =>
// 控制一条消息中有几个单词 1 to XXX,接下来对集合中每一个元素执行map操作
val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString) // Random的nextInt(10)的作用是 生成0-9之间的一个随机数
.mkString(" ") // 指定生成随机数根据什么符号进行分割
print(str)
println()
// 生产者 生产出来的消息类型都是 ProducerRecord 类型的,
// 指定 主题、key、value
val message = new ProducerRecord[String, String](topic, null, str)
// 将消息对象发送出去
producer.send(message)
}
Thread.sleep(3000) // 线程休眠(3秒)
}
3).Consumer(消费者)
// 代码是在jupyter中运行的,所以没有加函数体等
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
// 创建配置文件对象
var sparkConf = new SparkConf().setMaster("local[2]").setAppName("SparkKafKaTest")
// 创建具有5秒批处理间隔的上下文
val ssc = new StreamingContext(sparkConf, Seconds(5))
// 如果指定多个topic,那么在这里直接使用","进行分割
val topics = Array("demo01")
// 配置kafka的相关参数
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
// 创建 DirectStream 对象(DStream对象:实际上就是一堆RDD的集合)
// 此时的数据来源是从kafka来的;每行数据是以(K,V)的形式存放
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
// 将DStream中数据进行map变换,将每一行记录中的key和value值进行整理,然后打印输出
stream.map(record => (record.key, record.value)).print()
/*
这里是将计数操作直接写在了一行代码中:
1.将数据中value取出来;(数据是以key,value的形式存的吗?????)
2.将每一行数据进行单词的切分,
3.对切分之后的单词进行map变换,重新映射为另一种形式
4.然后对转换之后的数据进行单词计数
5.最后将结果打印在屏幕上
*/
stream.map(record => record.value).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
// 启动当前程序
ssc.start()
// 等待一个终止命令
ssc.awaitTermination()
def createDirectStream[
K: ClassTag,
V: ClassTag,
KD <: Decoder[K]: ClassTag,
VD <: Decoder[V]: ClassTag] (
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Set[String]
): InputDStream[(K, V)] {...}