当前位置: 首页>后端>正文

【Azure事件中心】使用Python SDK(Confluent)相关方法获取offset或lag时提示SSL相关错误

问题描述

使用Python SDK(Confluent)相关方法获取offset或lag时, 提示SSL相关错误, 是否有更清晰的实例以便参考呢?

问题解决

执行代码,因为一直连接不成功,所以检查 confluent_kafka 的连接配置,最后定位是 sasl.password 值设置有误。此处,需要使用Event Hub Namespace级别的连接字符串(Connection String).


【Azure事件中心】使用Python SDK(Confluent)相关方法获取offset或lag时提示SSL相关错误,第1张

在Event Hub中,获取方式为: (1: Shared access policies ---> 2: RootManageSharedAccessKey or ..----> 3: Connection String )

【Azure事件中心】使用Python SDK(Confluent)相关方法获取offset或lag时提示SSL相关错误,第2张

完整的示例代码:

import confluent_kafka

topics = ["<Your_topic_name>"]
broker = "<Eventhub-namespace-name>.servicebus.chinacloudapi.cn:9093"
group_name = "<Consumer-group-name>"
sasl_password = "<Connection-string>"

# Create consumer.
# This consumer will not join the group, but the group.id is required by
# committed() to know which group to get offsets for.
consumer = confluent_kafka.Consumer({'bootstrap.servers': broker,
                                     'security.protocol': 'SASL_SSL',
                                     'sasl.mechanism': 'PLAIN',
                                     'sasl.username': '$ConnectionString',
                                     'sasl.password': sasl_password,
                                     'group.id': group_name})

print("%-50s  %9s  %9s" % ("Topic [Partition]", "Committed", "Lag"))
print("=" * 72)

for topic in topics:
    # Get the topic's partitions
    metadata = consumer.list_topics(topic, timeout=10)
    if metadata.topics[topic].error is not None:
        raise confluent_kafka.KafkaException(metadata.topics[topic].error)

    # Construct TopicPartition list of partitions to query
    partitions = [confluent_kafka.TopicPartition(topic, p) for p in metadata.topics[topic].partitions]

    # Query committed offsets for this group and the given partitions
    committed = consumer.committed(partitions, timeout=10)

    for partition in committed:
        # Get the partitions low and high watermark offsets.
        (lo, hi) = consumer.get_watermark_offsets(partition, timeout=10, cached=False)

        if partition.offset == confluent_kafka.OFFSET_INVALID:
            offset = "-"
        else:
            offset = "%d" % (partition.offset)

        if hi < 0:
            lag = "no hwmark"  # Unlikely
        elif partition.offset < 0:
            # No committed offset, show total message count as lag.
            # The actual message count may be lower due to compaction
            # and record deletions.
            lag = "%d" % (hi - lo)
        else:
            lag = "%d" % (hi - partition.offset)

        print("%-50s  %9s  %9s" % (
            "{} [{}]".format(partition.topic, partition.partition), offset, lag))


consumer.close()

参考文档

**confluent-kafka-python : **https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/list_offsets.py

当在复杂的环境中面临问题,格物之道需:浊而静之徐清,安以动之徐生。 云中,恰是如此!

分类: 【Azure 事件中心】, 【Azure Developer】

标签: event hub confluent kafka, SSL相关错误


https://www.xamrdz.com/backend/3qc1920694.html

相关文章: