流式数据处理
# Kafka
是一个消息队列,同时具备拓展功能(海量数据存储,海量数据流式处理)
activemq(6k) rabbitmq(1.2w) rocketMQ(3-5w) 理论上来讲避免了数据丢失的问题.
kafka(25-50w) 有数据丢失的可能,所以只能用于大数据,或者对数据一致性要求不高的业务中. 亿分之几
kafka只有一种模式:发布/订阅(与rabbitmq不一样)
Kafka
消息队列
海量数据存储
流式数据处理
重要概念
broker: kafka中所有的服务器节点都叫做broker. 编号规则为 从0开始的整型数字.
topic: 在kafka中,可能同时为多个项目或者业务提供消息队列的服务,也就是说会同时连接多个生产者和多个消费者.那么为了避免不同业务数据交叉,设置了主题来分别保存不同业务的数据.来进行隔离.
partition: 在主题的内部,又进行了进一步的细分,就是所谓的分区. 在一个独立的主题内部,如果业务上还有细分的话,可以将不同业务的数据存放在不同的分区. 同时,一个主题中包含多个分区,每个分区存放在不同的服务器上,本身也是一种负载均衡的体现.
leader和follower: 在分区内部,每个分区还会在其他的服务器上对数据进行备份.备份出来的数据,被称为follower 原本的分区叫做leader. 所有写入和消费数据的操作全部由leader完成.follower只负责实时同步leader的数据变化.只有leader宕机之后才会在follower中进行选举,选出新的leader保证服务可用.
ConsumerGroup: 在消费者订阅消息时,需要以组的形式来注册offset.如果该消费者需要消费所有数据,那么自己注册一个组.如果多个消费者负载均衡分摊一份数据,那么可以将这几个消费者注册为一组.
log: 在kafka中非常特殊,表示的并不是日志,而是kafka存储的数据,生产者将数据写入kafka后以event的形式封装,在kafka内部叫做log
队列
队列:队列中的数据一旦被消费者消费成功,就会被清除出队列,目的是为了保证数据不会被重复消费。(删除一个去掉一个)
优点:天然支持多个消费者之间的负载均衡
缺点:不能实现多个消费者同时需要消费全量数据(数据库里面包含了1,2,3。如果消费者c1消耗了1,那么消费者c2进来只能消费2)
发布订阅
发布订阅:数据被消费者消费之后并不会直接删除,而是保留一段时间,同一个消费者以偏移量的记录来保证自己不会重复消费数据。
缺点:不适合做多个消费者之间的负载均衡
优点:适合多个消费者需要同时消费 全量数据的场景。由于消费的进度是由offset控制的,所以我们可以通过修改offset的值来控制消费者跳跃消费或者重新消费。
生产者通过生产者客户端去注册,然后生产者客户端将具体的数据保存在服务器上。
如果有两个生产者,JT生产者和DB生产者。利用topic来区分不同的业务。如果在区分JT里面的子业务,需要跨服务器来操作,利用parition分区存放。如图:
这个图的大概意思就是
生产者通过生产者客户端去注册,然后生产者客户端将具体的数据保存在服务器上。如果有两个生产者,JT生产者和DB生产者。利用topic来区分不同的业务。如果在区分JT里面的子业务,需要跨服务器来操作,利用parition分区存放。leader代表的是主机(所有的工作在这个里面进行),follower代表就是从机(主要就是进行备份的)类似于主从复制。三个颜色深的如果出现故障的话,会由从机接替工作。
Kafka和zookeeper的安装和测试
基础环境: JDK(Scala开发java的衍生语言,编译.class,在jvm上运行),zookeeper支持.
zookeeper安装步骤如下:
1.解压文件
2.
3.
4.
Kafka安装步骤
2.
3.
4.启动
cd /home/app/kafka_2.13-2.4.1
bin/kafka-server-start.sh config/server.properties &
创建主题
创建主题flux
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic flux
参数说明:
–create: 要创建主题
–replication-factor:副本个数
–partitions:主题分区存储,分区的个数
–topic:主题名(唯一,不能重复)
可以使用 --list 命令查看所有topic
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flux
消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flux --from-beginning
参数说明:
–from-beginning 是表示从头开始消费消息
If the consumer does not already have an established offset to consume from, start with the earliest message present in the log rather than the latest message.
消息已经永久保存在kafka集群了,从头消费或是从当前时间消费都可以。
实现效果
Kafka整合flink
创建一个maven工程(java)
导入pom文件
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.11.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.11.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.11.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.11.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-base -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-base_2.11</artifactId>
<version>1.11.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.12</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
</dependencies>
整合kafka+flink
按照上述步骤把kafka在启动即可