当前位置: 首页>后端>正文

kafka 一个消息堵住整个队列 kafka队列能存放多少消息

流式数据处理

# Kafka

是一个消息队列,同时具备拓展功能(海量数据存储,海量数据流式处理)
activemq(6k) rabbitmq(1.2w) rocketMQ(3-5w) 理论上来讲避免了数据丢失的问题.
kafka(25-50w) 有数据丢失的可能,所以只能用于大数据,或者对数据一致性要求不高的业务中. 亿分之几
kafka只有一种模式:发布/订阅(与rabbitmq不一样)

Kafka
消息队列
海量数据存储
流式数据处理

重要概念

broker: kafka中所有的服务器节点都叫做broker. 编号规则为 从0开始的整型数字.

topic: 在kafka中,可能同时为多个项目或者业务提供消息队列的服务,也就是说会同时连接多个生产者和多个消费者.那么为了避免不同业务数据交叉,设置了主题来分别保存不同业务的数据.来进行隔离.

partition: 在主题的内部,又进行了进一步的细分,就是所谓的分区. 在一个独立的主题内部,如果业务上还有细分的话,可以将不同业务的数据存放在不同的分区. 同时,一个主题中包含多个分区,每个分区存放在不同的服务器上,本身也是一种负载均衡的体现.

leader和follower: 在分区内部,每个分区还会在其他的服务器上对数据进行备份.备份出来的数据,被称为follower 原本的分区叫做leader. 所有写入和消费数据的操作全部由leader完成.follower只负责实时同步leader的数据变化.只有leader宕机之后才会在follower中进行选举,选出新的leader保证服务可用.

ConsumerGroup: 在消费者订阅消息时,需要以组的形式来注册offset.如果该消费者需要消费所有数据,那么自己注册一个组.如果多个消费者负载均衡分摊一份数据,那么可以将这几个消费者注册为一组.

log: 在kafka中非常特殊,表示的并不是日志,而是kafka存储的数据,生产者将数据写入kafka后以event的形式封装,在kafka内部叫做log

队列

队列:队列中的数据一旦被消费者消费成功,就会被清除出队列,目的是为了保证数据不会被重复消费。(删除一个去掉一个)
优点:天然支持多个消费者之间的负载均衡
缺点:不能实现多个消费者同时需要消费全量数据(数据库里面包含了1,2,3。如果消费者c1消耗了1,那么消费者c2进来只能消费2)

发布订阅

发布订阅:数据被消费者消费之后并不会直接删除,而是保留一段时间,同一个消费者以偏移量的记录来保证自己不会重复消费数据。

缺点:不适合做多个消费者之间的负载均衡

优点:适合多个消费者需要同时消费 全量数据的场景。由于消费的进度是由offset控制的,所以我们可以通过修改offset的值来控制消费者跳跃消费或者重新消费。

kafka 一个消息堵住整个队列 kafka队列能存放多少消息,kafka 一个消息堵住整个队列 kafka队列能存放多少消息_kafka 一个消息堵住整个队列,第1张

kafka 一个消息堵住整个队列 kafka队列能存放多少消息,kafka 一个消息堵住整个队列 kafka队列能存放多少消息_flink_02,第2张

生产者通过生产者客户端去注册,然后生产者客户端将具体的数据保存在服务器上。

如果有两个生产者,JT生产者和DB生产者。利用topic来区分不同的业务。如果在区分JT里面的子业务,需要跨服务器来操作,利用parition分区存放。如图:

kafka 一个消息堵住整个队列 kafka队列能存放多少消息,kafka 一个消息堵住整个队列 kafka队列能存放多少消息_kafka_03,第3张

这个图的大概意思就是

生产者通过生产者客户端去注册,然后生产者客户端将具体的数据保存在服务器上。如果有两个生产者,JT生产者和DB生产者。利用topic来区分不同的业务。如果在区分JT里面的子业务,需要跨服务器来操作,利用parition分区存放。leader代表的是主机(所有的工作在这个里面进行),follower代表就是从机(主要就是进行备份的)类似于主从复制。三个颜色深的如果出现故障的话,会由从机接替工作。

Kafka和zookeeper的安装和测试

基础环境: JDK(Scala开发java的衍生语言,编译.class,在jvm上运行),zookeeper支持.

zookeeper安装步骤如下:

1.解压文件

kafka 一个消息堵住整个队列 kafka队列能存放多少消息,kafka 一个消息堵住整个队列 kafka队列能存放多少消息_apache_04,第4张

2.

kafka 一个消息堵住整个队列 kafka队列能存放多少消息,kafka 一个消息堵住整个队列 kafka队列能存放多少消息_flink_05,第5张

3.

kafka 一个消息堵住整个队列 kafka队列能存放多少消息,kafka 一个消息堵住整个队列 kafka队列能存放多少消息_kafka_06,第6张

4.

kafka 一个消息堵住整个队列 kafka队列能存放多少消息,kafka 一个消息堵住整个队列 kafka队列能存放多少消息_flink_07,第7张

Kafka安装步骤


kafka 一个消息堵住整个队列 kafka队列能存放多少消息,kafka 一个消息堵住整个队列 kafka队列能存放多少消息_apache_08,第8张

2.

kafka 一个消息堵住整个队列 kafka队列能存放多少消息,kafka 一个消息堵住整个队列 kafka队列能存放多少消息_apache_09,第9张

3.

kafka 一个消息堵住整个队列 kafka队列能存放多少消息,kafka 一个消息堵住整个队列 kafka队列能存放多少消息_kafka_10,第10张

4.启动

cd /home/app/kafka_2.13-2.4.1
bin/kafka-server-start.sh config/server.properties &

kafka 一个消息堵住整个队列 kafka队列能存放多少消息,kafka 一个消息堵住整个队列 kafka队列能存放多少消息_kafka 一个消息堵住整个队列_11,第11张

kafka 一个消息堵住整个队列 kafka队列能存放多少消息,kafka 一个消息堵住整个队列 kafka队列能存放多少消息_flink_12,第12张

创建主题

创建主题flux

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic flux

参数说明:
–create: 要创建主题
–replication-factor:副本个数
–partitions:主题分区存储,分区的个数
–topic:主题名(唯一,不能重复)
可以使用 --list 命令查看所有topic
bin/kafka-topics.sh --list --bootstrap-server localhost:9092

生产者

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flux

消费者

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flux --from-beginning

参数说明:
–from-beginning 是表示从头开始消费消息
If the consumer does not already have an established offset to consume from, start with the earliest message present in the log rather than the latest message.
消息已经永久保存在kafka集群了,从头消费或是从当前时间消费都可以。

实现效果

kafka 一个消息堵住整个队列 kafka队列能存放多少消息,kafka 一个消息堵住整个队列 kafka队列能存放多少消息_kafka 一个消息堵住整个队列_13,第13张

Kafka整合flink

创建一个maven工程(java)

kafka 一个消息堵住整个队列 kafka队列能存放多少消息,kafka 一个消息堵住整个队列 kafka队列能存放多少消息_kafka_14,第14张

导入pom文件

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.11.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.11.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.11.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.11.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-base -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-base_2.11</artifactId>
<version>1.11.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.12</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
</dependencies>

整合kafka+flink

kafka 一个消息堵住整个队列 kafka队列能存放多少消息,kafka 一个消息堵住整个队列 kafka队列能存放多少消息_kafka_15,第15张

按照上述步骤把kafka在启动即可



https://www.xamrdz.com/backend/3qg1926518.html

相关文章: