当前位置: 首页>后端>正文

SparkStream与kafka对接

1.官网简介

SparkStream与kafka对接,第1张
SparkStream与kafka对接.png

2.参考资料

1,必读:再讲Spark与kafka 0.8.2.1+整合
2,必读:Spark与kafka010整合
3,spark streaming kafka 整合(010-Consumer)

3.两版本代码整理

(1).spark-streaming-kafka-0-8

1),当前版本依赖的jar包

第一和第二个jar包可以直接在当前版本的lib目录中拷贝。第三个jar包需要从本地下载好,然后上传执行。所以在本地的jar路径中我保存了一份完整的jar包。

[root@localhost jars]# pwd
/usr/local/spark/jars

[root@localhost jars]# ll | grep kafka
-rw-r--r--. 1 root root   3954430 Mar 20 11:34 kafka_2.11-0.8.2.1.jar
-rw-r--r--. 1 root root    324010 Mar 20 11:34 kafka-clients-0.8.2.1.jar
-rw-r--r--. 1 root root    298522 Mar 19 16:07 spark-streaming-kafka-0-8_2.11-2.1.0.jar

2).producer(生产者)

import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

 // 指定再在哪个kafka服务器运行
val brokers="localhost:9092"
// 指定topic
val topic="demo01"
// 指定每秒钟向topic放多少消息
val messagesPerSec=3
// 每个消息中包含多少个单词
val wordsPerMessage=5

// 以下四行代码一般是固定的写法,之后直接复制粘贴就可以
// 使用键值对的方式将
val props = new HashMap[String, Object]()
// 指定 运行 
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
// 对key和value都去序列化,且因为我定义的Key和Value的类型都是String的,所以直接用的 StringSerializer去序列化
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")
// 生产者
val producer = new KafkaProducer[String, String](props)

// 在while循环中不断的生成数据
while(true) {
  // 控制一秒钟生成多少数据(1 to XX 会生成 Range 类型变量)
  (1 to messagesPerSec.toInt).foreach { messageNum =>
      // 控制一条消息中有几个单词 1 to XXX,接下来对集合中每一个元素执行map操作
    val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString) // Random的nextInt(10)的作用是 生成0-9之间的一个随机数
      .mkString(" ") // 指定生成随机数根据什么符号进行分割
      print(str)
      println()
    // 生产者 生产出来的消息类型都是 ProducerRecord 类型的,
      // 指定 主题、key、value
    val message = new ProducerRecord[String, String](topic, null, str)
    // 将消息对象发送出去
    producer.send(message)
  }
 Thread.sleep(3000) // 线程休眠(3秒)
}

3).consumer(消费者)

import _root_.kafka.serializer.StringDecoder

import org.apache.spark._
import org.apache.spark.SparkConf

import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka.KafkaUtils

// 设置与brokers获取连接
val brokers="localhost:9092"
// 可以指定多个分区,然后使用 "," 分割
val topics="demo01"

val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount2").setMaster("local[2]")
// 创建具有2秒批处理间隔的上下文
val ssc = new StreamingContext(sparkConf, Seconds(2))

// 为了 容错 (目录不存在的时候,其可以自动创建)
ssc.checkpoint("file:///usr/local/spark/mycode/kafka/checkpoint") // 设置检查点,如果存放在HDFS上面,则写成类似ssc.checkpoint("/user/hadoop/checkpoint")这种形式,但是,要启动hadoop

// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)

// 
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)

// Get the lines, split them into words, count the words and print
// 由于Producer生产的数据,第一个值是null,我们用不到,所以直接去数据的第二个数据
val lines = messages.map(_._2)
// 拿到数据进行拆分,拿到每一个单词
val words = lines.flatMap(_.split(" "))
// 对每一个单词,再去做map变换,然后进行计数
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
// 将结果打印
wordCounts.print()

// Start the computation
ssc.start()
ssc.awaitTermination()

4).知识补充

  • Direct模式简介

    • 在spark 1.3以后引入了一种新的spark Streaming api,新的api回自己在driver内部维护一个偏移,然后自动计算指定的 topic+partition 该批次需要拉去数据的范围,然后从kafka拉去数据来计算。
    • 不同于基于Receiver的方式,direct模式不会将偏移记录到Zookeeper,以保证故障恢复从上次偏移处消费消息。Direct模式采用的是,你可以通过Checkpoint或者自己编写工具来实现偏移的维护,保证数据消费不丢失。
  • Direct模式优势

    • 简化并行度:不需要创建多个kafka stream,然后union他们。使用directStream,spark streaming 生成的RDD分区和kafka的分区是一一对应的,这种方式理解起来更简单而且便于调优。
    • 高效:
      • 基于Receiver的方式要保证数据不丢失,必须启用预写日志。这个行为实际上是非常低效的,数据会被复制两次,一次是kafka集群,一次是预写日志。
      • Direct方式解决了这个问题,由于没有Receiver,故而也不需要预写日志。只要你kafka里面存有数据,那么消息就可以从kafka里面恢复。
    • 仅一次消费语义:
      • 基于Receiver的会把偏移提交到Zookeeper。这种方式结合预写日志能保证数据不丢失,也即是最少一次消费语义,但是有几率导致消费者在存在失败的情况下消费消息两次。比如,消息处理并经过存储之后,但是偏移并没有提交到Zookeeper,这个时候发生故障了,那么恢复之后,就会按照Zookeeper上的偏移再一次消费数据并处理,导致消息重复处理。
      • 但是direct 方式偏移不会提交到Zookeeper,是spark streaming在driver使用内存变量加Checkpoint进行追踪的,所以尽管会存在任务失败,但是仍然能保证消费的一次处理。
    • 注意:由于direct方式不会提交偏移到Zookeeper,所以,基于Zookeeper的kafka监控工具就不能监控到spark streaming的消费情况。然而,你可以自己讲偏移提交道Zookeeper,来满足你的需求。

(2).spark-streaming-kafka-0-10

1).当前依赖的jar包

第一个jar包可以直接在当前版本的lib目录中拷贝。第二个jar包需要从本地下载好,然后上传执行。所以在本地的jar路径中我保存了一份完整的jar包。

[root@master jars]# pwd
/usr/local/spark/jars

[root@master jars]# ll | grep kafka
-rw-r--r--. 1 root root   946811 Mar 20 16:02 kafka-clients-0.10.2.0.jar
-rw-r--r--. 1 root root   190413 Mar 20 14:51 spark-streaming-kafka-0-10_2.11-2.3.0.jar

2).Producer(生产者)

import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
// 注意当前版本的stream对象是与旧版本不一样的
import org.apache.spark.streaming.kafka010._


// 指定再在哪个kafka服务器运行
val brokers="localhost:9092"
// 指定topic
val topic="demo01"
// 指定每秒钟向topic放多少消息
val messagesPerSec=3
// 每个消息中包含多少个单词
val wordsPerMessage=5

// 以下四行代码一般是固定的写法,之后直接复制粘贴就可以
// 使用键值对的方式将
val props = new HashMap[String, Object]()
// 指定 运行 
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
// 对key和value都去序列化
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")
// 生产者
val producer = new KafkaProducer[String, String](props)

// 在while循环中不断的生成数据
while(true) {
  // 控制一秒钟生成多少数据(1 to XX 会生成 Range 类型变量)
  (1 to messagesPerSec.toInt).foreach { messageNum =>
      // 控制一条消息中有几个单词 1 to XXX,接下来对集合中每一个元素执行map操作
    val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString) // Random的nextInt(10)的作用是 生成0-9之间的一个随机数
      .mkString(" ") // 指定生成随机数根据什么符号进行分割
      print(str)
      println()
    // 生产者 生产出来的消息类型都是 ProducerRecord 类型的,
      // 指定 主题、key、value
    val message = new ProducerRecord[String, String](topic, null, str)
    // 将消息对象发送出去
    producer.send(message)
  }
 Thread.sleep(3000) // 线程休眠(3秒)
}

3).Consumer(消费者)

// 代码是在jupyter中运行的,所以没有加函数体等
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

// 创建配置文件对象
var sparkConf = new SparkConf().setMaster("local[2]").setAppName("SparkKafKaTest")
// 创建具有5秒批处理间隔的上下文
val ssc = new StreamingContext(sparkConf, Seconds(5))

// 如果指定多个topic,那么在这里直接使用","进行分割
val topics = Array("demo01")
// 配置kafka的相关参数
val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

// 创建 DirectStream 对象(DStream对象:实际上就是一堆RDD的集合)
// 此时的数据来源是从kafka来的;每行数据是以(K,V)的形式存放
val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

// 将DStream中数据进行map变换,将每一行记录中的key和value值进行整理,然后打印输出
stream.map(record => (record.key, record.value)).print()
/*
这里是将计数操作直接写在了一行代码中:
1.将数据中value取出来;(数据是以key,value的形式存的吗?????)
2.将每一行数据进行单词的切分,
3.对切分之后的单词进行map变换,重新映射为另一种形式
4.然后对转换之后的数据进行单词计数
5.最后将结果打印在屏幕上
*/
stream.map(record =>  record.value).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()

// 启动当前程序
ssc.start() 
// 等待一个终止命令
ssc.awaitTermination() 
def createDirectStream[
    K: ClassTag,
    V: ClassTag,
    KD <: Decoder[K]: ClassTag,
    VD <: Decoder[V]: ClassTag] (
      ssc: StreamingContext,
      kafkaParams: Map[String, String],
      topics: Set[String]
  ): InputDStream[(K, V)] {...}

https://www.xamrdz.com/backend/3zq1924826.html

相关文章: