问题描述
使用Python SDK(Confluent)相关方法获取offset或lag时, 提示SSL相关错误, 是否有更清晰的实例以便参考呢?
问题解决
执行代码,因为一直连接不成功,所以检查 confluent_kafka 的连接配置,最后定位是 sasl.password 值设置有误。此处,需要使用Event Hub Namespace级别的连接字符串(Connection String).
在Event Hub中,获取方式为: (1: Shared access policies ---> 2: RootManageSharedAccessKey or ..----> 3: Connection String )
完整的示例代码:
import confluent_kafka
topics = ["<Your_topic_name>"]
broker = "<Eventhub-namespace-name>.servicebus.chinacloudapi.cn:9093"
group_name = "<Consumer-group-name>"
sasl_password = "<Connection-string>"
# Create consumer.
# This consumer will not join the group, but the group.id is required by
# committed() to know which group to get offsets for.
consumer = confluent_kafka.Consumer({'bootstrap.servers': broker,
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'sasl.username': '$ConnectionString',
'sasl.password': sasl_password,
'group.id': group_name})
print("%-50s %9s %9s" % ("Topic [Partition]", "Committed", "Lag"))
print("=" * 72)
for topic in topics:
# Get the topic's partitions
metadata = consumer.list_topics(topic, timeout=10)
if metadata.topics[topic].error is not None:
raise confluent_kafka.KafkaException(metadata.topics[topic].error)
# Construct TopicPartition list of partitions to query
partitions = [confluent_kafka.TopicPartition(topic, p) for p in metadata.topics[topic].partitions]
# Query committed offsets for this group and the given partitions
committed = consumer.committed(partitions, timeout=10)
for partition in committed:
# Get the partitions low and high watermark offsets.
(lo, hi) = consumer.get_watermark_offsets(partition, timeout=10, cached=False)
if partition.offset == confluent_kafka.OFFSET_INVALID:
offset = "-"
else:
offset = "%d" % (partition.offset)
if hi < 0:
lag = "no hwmark" # Unlikely
elif partition.offset < 0:
# No committed offset, show total message count as lag.
# The actual message count may be lower due to compaction
# and record deletions.
lag = "%d" % (hi - lo)
else:
lag = "%d" % (hi - partition.offset)
print("%-50s %9s %9s" % (
"{} [{}]".format(partition.topic, partition.partition), offset, lag))
consumer.close()
参考文档
**confluent-kafka-python : **https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/list_offsets.py
当在复杂的环境中面临问题,格物之道需:浊而静之徐清,安以动之徐生。 云中,恰是如此!
分类: 【Azure 事件中心】, 【Azure Developer】
标签: event hub confluent kafka, SSL相关错误