当前位置: 首页>数据库>正文

kafka消息带时间戳 kafka指定时间戳消费

kafka参数说明(参考):

kafka时间戳字段原因(过期清理,日志切分,流式处理),0.10版本开始才有时间戳概念

kafka消息是存放在磁盘上,发送一次,累积到一定数量或者时间间隔就落盘一次,消费一次就读一次磁盘

topic划分为若干分区,分区对一个目录,分区划分为segment,一个segment对应三个二进制文件(后缀分别是index,log,timeindex),类似mysql存储机制
消息数据存放在log文件里面,对应的位置存放在index里面,时间戳存放在timeindex里面

分区有副本概念,如果一个topic有10个分区,分3个节点,那么可能是3/3/4的存放方式,比如kjLog-1,kjLog-4,kjLog-7这样的存放方式。
但是其他kjLog-*目录也会有,但是目录下的index,log,timeindex文件一定是空的,没有数据。

副本选举机制:
如果有多个副本,会有一个选举机制,假设有1个分区有5个副本,共6份,如果分区1挂了,其他5份有3份及时和分区1同步了,就会进入ISR,这个东西存放在zk里面
kafka发现有个分区挂了后,就从ISR找到每个可用的副本所在节点ID,下发通知,这些副本里面3份及时的分区对应的节点就会同时向zk注册,zk机制是只有一个节点能注册成功,这样先注册的就选举成功,成为新的分区,其他分区都要跟它保持同步。

===================
kafka配置:
cd /usr/local/src
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/0.10.1.1/kafka_2.11-0.10.1.1.tgz
tar -zxvf kafka_2.11-0.10.1.1.tgz -C /home/dig/service/
cd /home/dig/service/
ln -s kafka_2.11-0.10.1.0 kafkavim /home/dig/service/kafka/config/consumer.properties
zookeeper.connect=bdp-001:2181,bdp-002:2181,bdp-003:2181
zookeeper.connection.timeout.ms=6000
group.id=test-consumer-groupvim /home/dig/service/kafka/config/producer.properties
metadata.broker.list=bdp-001:9092,bdp-002:9092,bdp-003:9092
compression.codec=noneproducer.type=sync
serializer.class=kafka.serializer.DefaultEncoder
batch.num.messages=200vim /home/dig/service/kafka/config/server.properties
broker.id=0
host.name=bdp-001
port=9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/home/dig/service/kafka/data
num.partitions=6
num.recovery.threads.per.data.dir=1#消息保留时间
log.retention.hours=168#消息最多保留的字节数
log.segment.bytes=1073741824#每隔多久检查上面两个参数是否达到阀值
log.retention.check.interval.ms=300000log.cleaner.enable=false
zookeeper.connect=bdp-001:2181,bdp-002:2181,bdp-003:2181
zookeeper.connection.timeout.ms=6000
auto.create.topics.enable=false
delete.topic.enable=true另外server.properties中,不要启用自动创建topic:
auto.create.topics.enable=true
否则producer发送消息时,提示分区异常。拷贝到另外两个节点:
scp -r /home/dig/service/kafka_2.11-0.10.1.0/ bdp-002:/home/dig/service/
scp -r /home/dig/service/kafka_2.11-0.10.1.0/ bdp-003:/home/dig/service/注意每个节点id不一样:
cat /home/dig/service/kafka/config/server.properties | grep broker.id临时启动kafka:
rm -rf /tmp/bdp-logs
/home/dig/service/kafka/bin/kafka-server-start.sh /home/dig/service/kafka/config/server.properties 在3台机器上分别执行:
rm -rf /home/dig/service/zookeeper/data/*ps -ef |grep -v grep | egrep -i 'kafka|Kafka' |awk '{print }' |xargs -t -i kill -9 {}
rm -rf /tmp/bdp-logs && ll /tmp/bdp-logs || jps所有节点关闭完后先启动zk:
/home/dig/service/zookeeper/bin/zkServer.sh start再启动kafka:
/home/dig/service/kafka/bin/kafka-server-start.sh -daemon /home/dig/service/kafka/config/server.properties
jps
sleep 1
ll /tmp/bdp-logs查看kafka启动状态:
ls /brokers/ids
输入3个kafka实例的id即集群启动成功:
[1, 2, 3]apache版本的kafka自带关闭命令无效:
/home/dig/service/kafka/bin/kafka-server-stop.sh /home/dig/service/kafka/config/server.properties/home/dig/service/kafka/bin/kafka-server-start.sh /home/dig/service/kafka/config/server.properties
有效关闭脚本:
ps -ef |grep -v grep | egrep -i 'kafka|Kafka' |awk '{print }' |xargs -t -i kill -9 {}===========================================
测试kafak是否部署成功:创建Topic
/home/dig/service/kafka/bin/kafka-topics.sh \
--zookeeper etl1:2181,etl2:2181,etl3:2181 \
--replication-factor 2 \
--partitions 3 \
--create \
--topic kjLog或者这么删除(只删除zk的元数据,分区文件没有删除):
kafka-run-class.sh kafka.admin.DeleteTopicCommand --zookeeper localhost:2181 --topic mytest1删除话题队列:
kafka-topics.sh --zookeeper localhost:2181 --delete --topic mytest1查看存在的话题队列:
kafka-topics.sh --zookeeper localhost:2181 --list向话题队列mytest1 发送消息,运行后直接输入:
kafka-console-producer.sh --broker-list kafka1:9092 --topic mytest1消费者从话题队列mytest3 中取消息(运行后,直接可以看到输出结果,老版本是连zk,新版本连bootstrap及broker):
kafka-console-consumer.sh \
--bootstrap-server etl3:9092 \
--topic kjLog \
--consumer-property group.id=kjEtlGroup # 从头开始消费
--from-beginning \消费组是在消费时自动生成的,默认是console-consumer-31553,后面的数组是随机的,
也可以消费时指定具体消费组名字查看有哪些消费组:
kafka-consumer-groups.sh --zookeeper localhost:2181 --list等价于登录zk,查看有哪些消费组:
ls /consumers/===========================================
查看消息数量offset:
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka1:9092 --time -1 --topic kjLog --partitions 0
输出topic的容量大小
kjLog:2:116060276
kjLog:1:70158992
kjLog:0:15411674查看指定topic的详细信息:
kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:12181 --topic kjLog --group kjEtlGroup
输出结果:
消费者组 话题id 分区id 当前已消费的条数 总条数 未消费的条数 属主
Group Topic Pid Offset logSize Lag Owner
kjEtlGroup kjLog 0 15484439 15484445 6 none
kjEtlGroup kjLog 1 70655159 70655189 30 none
kjEtlGroup kjLog 2 116860888 116860904 16 none查看分区详细信息:
kafka-topics.sh --zookeeper localhost:2181 --describe
输出结果:
Topic:mytest4 PartitionCount:6 ReplicationFactor:2 Configs:
 Topic: mytest4 Partition: 0 Leader: 2 Replicas: 2,3 Isr: 2,3
 Topic: mytest4 Partition: 1 Leader: 3 Replicas: 3,1 Isr: 3,1表示Topic名称是mytest4,一共6个分区,并且每个分区都有两份完全一样的。
分区0的leader节点是在id为2的kafka实例上,复制节点是2和3两个实例,正在服务的节点也是2和3
分区1的leader节点是在id为3的kafka实例上,复制节点是3和1两个实例,正在服务的节点也是3和1===========================================
迁移消息:
vim topics-to-move.json
{"topics":
 [{"topic": "mytest1"}],
 "version":1
}生成迁移计划:
bdp-reassign-partitions.sh \
--zookeeper bdp-001:2181,bdp-002:2181,bdp-003:2181 \
--broker-list "4" \
--topics-to-move-json-file topics-to-move.json \
--generate 输出如下结果:
Current partition replica assignment{"version":1,"partitions":[{"topic":"mytest1","partition":2,"replicas":[1]},{"topic":"mytest1","partition":1,"replicas":[3]},{"topic":"mytest1","partition":0,"replicas":[2]}]}
Proposed partition reassignment configuration{"version":1,"partitions":[{"topic":"mytest1","partition":2,"replicas":[4]},{"topic":"mytest1","partition":1,"replicas":[4]},{"topic":"mytest1","partition":0,"replicas":[4]}]}
将 Proposed partition 对应json内容写入到文件reassignment-node.json中
 
执行topic迁移:
bdp-reassign-partitions.sh \
--zookeeper bdp-001:2181,bdp-002:2181,bdp-003:2181 \
--reassignment-json-file reassignment-node.json \
--execute执行迁移后,会在zk上注册一个节点:
登录zk查看: get /admin/reassign_partitions
结果如下:
{"version":1,"partitions":[{"topic":"mytest1","partition":2,"replicas":[4]}]}查看迁移结果:
bdp-reassign-partitions.sh \
--zookeeper bdp-001:2181,bdp-002:2181,bdp-003:2181 \
--reassignment-json-file reassignment-node.json \
--verify上面的方式并没有增加新分区,而是对原有分区做了一个副本,增加一个topic的分区方法是:
kafka-topics.sh --zookeeper bdp-001:2181,bdp-002:2181,bdp-003:2181 --alter --partitions 5 --topic mytest1
这个命令的效果是topic为mytest1的分区数量增加到5个,而不是在原有分区数之上再增加5个分区。kafka分区只能增加,不能通过自带的命令删除(但是直接删除文件夹可以删除),否则报错:
Error while executing topic command The number of partitions for a topic can only be increased
kafka.admin.AdminOperationException: The number of partitions for a topic can only be increased增加了分区后,迁移分区的方法就简单了,
编写如下json,其中12,13,14是分区id, 101,102,103,104,105,106是kafka实例id
vim 
{
 "partitions": [{
 "topic": "mytest1",
 "partition": 12,
 "replicas": [101,102]
 },
 {
 "topic": "mytest1",
 "partition": 13,
 "replicas": [103,104]
 },
 {
 "topic": "mytest1",
 "partition": 14,
 "replicas": [105,106]
 }],
 "version": 1
}然后执行分区重分布命令:
bdp-reassign-partitions.sh \
--zookeeper bdp-001:2181,bdp-002:2181,bdp-003:2181 \
--reassignment-json-file partitions-extension-push-token-topic.json \
--execute这样topic为mytest1的分区就改变了,存放到新的物理服务器的节点上了。
=============================================================
官方文档:
http://kafka.apache.org/documentation.htmlkafka开发文档:
http://www.aboutyun.com/thread-9906-1-1.html官方文档翻译:
参考翻译:
=============================================================
问题1:这是Kafka消息分区造成的,你可以去了解一下Kafka是如何分区的,就知道原因了。
问题原因可能是:你的所有消息的Key都是一样的,使用默认的Partitioner: hash(key)%numPartitions,这样每次的partion num都是一样的,所以数据都落到一个分区了。
而同一consumer grop并行消息,也是按照分区来分配的,因为只有一个分区上有数据,所以有一个consumer始终拿不到消息。
解决办法:1.自定义分区函数。2.消息散列为不同的key
 
问题2:
[2016-08-01 14:13:19,609] WARN Reconnect due to socket error: null (kafka.consumer.SimpleConsumer)
[2016-08-01 14:13:19,615] WARN Reconnect due to socket error: null (kafka.consumer.SimpleConsumer)
[2016-08-01 14:13:27,784] ERROR [test-consumer-group_bdp-003-1470075194621-ae0d9636], error during syncedRebalance (kafka.consumer.ZookeeperConsumerConnector)
kafka.common.ConsumerRebalanceFailedException: test-consumer-group_bdp-003-1470075194621-ae0d9636 can't rebalance after 4 retries
 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432)
 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon.run(ZookeeperConsumerConnector.scala:355)解决这是一个bug:
There is a bug in kafka.tools.ConsumerOffsetChecker. If the a particular Zookeeper node holding consumed offset information doesn't exit, the tool exits throwing the execption.For example, suppose you have a consumer group "mygroup" and a topic "topictest". Then the offset for partition 2 is maintained in Znode: /consumers/mygroup/offsets/topictest/2.
If there is no entry for partition 2 of topic topictest in Znode, then consumer offsetchecker tool will exit while checking offset for partition 2. Basically, it will fail while checking the first partition "n" for which the Znode /consumers/mygroup/offsets/topictest/n is missing on Zookeeper.
问题3:
zookeeper启用日志文件,修改conf下面的log4j.properties,但无效。 
再修改bin下的zkEnv.sh才能生效:
if [ "x${ZOO_LOG_DIR}" = "x" ]
then
 ZOO_LOG_DIR="/home/dig/service/zookeeper/log"
fiif [ "x${ZOO_LOG4J_PROP}" = "x" ]
then
 ZOO_LOG4J_PROP="INFO,ROLLINGFILE"
fi问题4:
kafka消息存放多久?
这个取决于kafka消息清理策略。
默认kafka消息的清理策略是删除,但是没有明确写入server.propertiesp文件中:
log.cleanup.policy=delete默认消息保存一周时间,默认不限制消息总量的长度,检查间隔是1ms。
log.retention.hours=168
log.retention.bytes=-1
log.retention.check.interval.ms=1 
问题5:
logserver报错:
[ERROR][kafka-producer-network-thread | producer-1][2017-04-05 09:37:21,129][LogServiceImpl.java:onCompletion:220]:Fail to send record to Kafka. Key: appupgrade_alive_user, Value Length: 190
 org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.原因是,kafka集群宕机或者重启后,topic partition重新选举leader,并且不是原来的leader后,而生产者还是使用原来的leader来生产消息,报错。
解决:重启生产者所在服务器。 
问题6:
logserver报错:
[WARN][kafka-producer-network-thread | producer-1][2017-04-05 09:37:21,234][Sender.java:completeBatch:298]:Got error produce response with correlation id 23026690 on topic-partition kjLog-0, retrying (0 attempts left). Error: NOT_LEADER_FOR_PARTITION 原因同上,都是因为leader重新选举导致。
 
问题7:
logserver报错:
536116 [ERROR][kafka-producer-network-thread | producer-1][2017-04-04 09:26:41,952][LogServiceImpl.java:onCompletion:102]:Fail to send record to Kafka. Key: konkasysteminfo_open_p age, Value Length: 462
536117 org.apache.kafka.common.errors.TimeoutException: Batch containing 46 record(s) expired due to timeout while requesting metadata from brokers for kjLog-2原因不确定,初步怀疑也是分区选举leader变更导致。
 
===========================================
1.kafka为什么要在topic里加入分区的概念?
topic是逻辑的概念,partition是物理的概念,对用户来说是透明的。producer只需要关心消息发往哪个topic,而consumer只关心自己订阅哪个topic,并不关心每条消息存于整个集群的哪个broker。
logsize是写入分区的消息条数,offset是已经消费的条数,这两个只都是从0开始,每次累加(i++),单位是条,不是字节
lag表示没有消费的,已经缓存的条数,lag = logsize - offsize为了性能考虑,如果topic内的消息只存于一个broker,那这个broker会成为瓶颈,无法做到水平扩展。所以把topic内的数据分布到整个集群就是一个自然而然的设计方式。Partition的引入就是解决水平扩展问题的一个方案。
每个partition可以被认为是一个无限长度的数组,新数据顺序追加进这个数组。物理上,每个partition对应于一个文件夹。一个broker上可以存放多个partition。这样,producer可以将数据发送给多个broker上的多个partition,consumer也可以并行从多个broker上的不同paritition上读数据,实现了水平扩展
里所讲,每个partition可以被认为是一个无限长度的数组,新数据顺序追加进这个数组。物理上,每个partition对应于一个文件夹。一个broker上可以存放多个partition。这样,producer可以将数据发送给多个broker上的多个partition,consumer也可以并行从多个broker上的不同paritition上读数据,实现了水平扩展。2.如果没有分区,topic中的segment消息写满后,直接给订阅者不是也可以吗?
“segment消息写满后”,consume消费数据并不需要等到segment写满,只要有一条数据被commit,就可以立马被消费segment对应一个文件(实现上对应2个文件,一个数据文件,一个索引文件),一个partition对应一个文件夹,一个partition里理论上可以包含任意多个segment。所以partition可以认为是在segment上做了一层包装。
这个问题换个角度问可能更好,“为什么有了partition还需要segment”。
如果不引入segment,一个partition直接对应一个文件(应该说两个文件,一个数据文件,一个索引文件),那这个文件会一直增大。同时,在做data purge时,需要把文件的前面部分给删除,不符合kafka对文件的顺序写优化设计方案。引入segment后,每次做data purge,只需要把旧的segment整个文件删除即可,保证了每个segment的顺序写。3、kafka的消息生产者使用的包是import kafka.javaapi.producer.Producer,不是kafka.producer.Producer
import kafka.producer.KeyedMessage;
import kafka.javaapi.producer.Producer;
import kafka.producer.ProducerConfig;private final Producer<Integer, String> producer;
props.put("metadata.broker.list", KafkaProperties.kafkaConnect);
producer = new Producer<Integer, String>(new ProducerConfig(props));4、kafka消息发送字节流,扩展序列化类 参考链接:
http://grepcode.com/file/repo1.maven.org/maven2/org.apache.twill/twill-core/0.1.0-incubating/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java5、kafka消息部分代码解析:
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();//topicCountMap是设置每个topic开多少线程,每个线程处理执行多个task,
//每个task会从每个topic的一个分区中消费数据,如果有3个分区,每个task会同时从3个topic的分区中获取数据,即每个task会从3个分区中获取数据。
//这里的参数new Integer(1)表示kafka消费服务器开多少个线程,通常最好是线程数量小于等于对应topic的分区数量
topicCountMap.put(topic, new Integer(2));Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
//这里的get(0)表示从第一个分区中提取消息
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();



https://www.xamrdz.com/database/6za1939641.html

相关文章: