说明
除了使用Kafka的bin目录下的脚本工具来管理Kafka,还可以使用管理Kafka的API将某些管理查看 的功能集成到系统中。在Kafka0.11.0.0版本之前,可以通过kafka-core包(Kafka的服务端,采用Scala 编写)中的AdminClient和AdminUtils来实现部分的集群管理操作。Kafka0.11.0.0之后,又多了一个 AdminClient,在kafka-client包下,一个抽象类,具体的实现是 org.apache.kafka.clients.admin.KafkaAdminClient。
KafkaAdminClient包含了一下几种功能(以Kafka1.0.2版本为准):
1. 创建主题:
createTopics(final Collection newTopics, final CreateTopicsOptions options)
2. 删除主题:
deleteTopics(final Collection topicNames, DeleteTopicsOptions options)
3. 列出所有主题:
listTopics(final ListTopicsOptions options)
4. 查询主题:
describeTopics(final Collection topicNames, DescribeTopicsOptions options)
5. 查询集群信息:
describeCluster(DescribeClusterOptions options)
6. 查询配置信息:
describeConfigs(Collection configResources, final DescribeConfigsOptions options)
7. 修改配置信息:
alterConfigs(Map configs, final AlterConfigsOptions options)
8. 修改副本的日志目录:
alterReplicaLogDirs(Map replicaAssignment, final AlterReplicaLogDirsOptions options)
9. 查询节点的日志目录信息:
describeLogDirs(Collection brokers, DescribeLogDirsOptions options)
10. 查询副本的日志目录信息:
describeReplicaLogDirs(Collection replicas, DescribeReplicaLogDirsOptions options)
11. 增加分区:
createPartitions(Map newPartitions, final CreatePartitionsOptions options)
用到的参数:
属性 | 说明 | 重要性 |
bootstrap.servers | 向Kafka集群建立初始连接用到的host/port列表。 客户端会使用这里列出的所有服务器进行集群其他服 务器的发现,而不管是否指定了哪个服务器用作引 导。 这个列表仅影响用来发现集群所有服务器的初始主 机。 字符串形式:host1:port1,host2:port2,... 由于这组服务器仅用于建立初始链接,然后发现集群 中的所有服务器,因此没有必要将集群中的所有地址 写在这里。 一般最好两台,以防其中一台宕掉。 | high |
client.id | 生产者发送请求的时候传递给broker的id字符串。 用于在broker的请求日志中追踪什么应用发送了什 么消息。 一般该id是跟业务有关的字符串。 | medium |
connections.max.idle.ms | 当连接空闲时间达到这个值,就关闭连接。long型 数据,默认:300000 | medium |
receive.buffer.bytes | TCP接收缓存(SO_RCVBUF),如果设置为-1,则 使用操作系统默认的值。int类型值,默认65536, 可选值:[-1,...] | medium |
request.timeout.ms | 客户端等待服务端响应的最大时间。如果该时间超 时,则客户端要么重新发起请求,要么如果重试耗 尽,请求失败。int类型值,默认:120000 | medium |
security.protocol | 跟broker通信的协议:PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. string类型值,默认:PLAINTEXT | medium |
send.buffer.bytes | 用于TCP发送数据时使用的缓冲大小 (SO_SNDBUF),-1表示使用OS默认的缓冲区大 小。 int类型值,默认值:131072 | medium |
reconnect.backoff.max.ms | 对于每个连续的连接失败,每台主机的退避将成倍增 加,直至达到此最大值。在计算退避增量之后,添加 20%的随机抖动以避免连接风暴。 long型值,默认1000,可选值:[0,...] | low |
reconnect.backoff.ms | 重新连接主机的等待时间。避免了重连的密集循环。 该等待时间应用于该客户端到broker的所有连接。 long型值,默认:50 | low |
retries | The maximum number of times to retry a call before failing it.重试的次数,达到此值,失败。 int类型值,默认5。 | low |
主要操作步骤:
客户端根据方法的调用创建相应的协议请求,比如创建Topic的createTopics方法,其内部就是发送 CreateTopicRequest请求。
客户端发送请求至Kafka Broker。
Kafka Broker处理相应的请求并回执,比如与CreateTopicRequest对应的是 CreateTopicResponse。
客户端接收相应的回执并进行解析处理。
和协议有关的请求和回执的类基本都在org.apache.kafka.common.requests包中, AbstractRequest和AbstractResponse是这些请求和响应类的两个父类。
综上,如果要自定义实现一个功能,只需要三个步骤:
1. 自定义XXXOptions;
2. 自定义XXXResult返回值;
3. 自定义Call,然后挑选合适的XXXRequest和XXXResponse来实现Call类中的3个抽象方法。
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
public class MyAdminClient {
private KafkaAdminClient client;
@Before
public void before() {
Map<String, Object> conf = new HashMap<>();
conf.put("bootstrap.servers", "192.168.25.129:9092");
conf.put("client.id", "adminclient-1");
client = (KafkaAdminClient) KafkaAdminClient.create(conf);
}
@After
public void after() {
client.close();
}
@Test
public void testListTopics1() throws ExecutionException, InterruptedException {
ListTopicsResult listTopicsResult = client.listTopics();
// KafkaFuture<Collection<TopicListing>> listings = listTopicsResult.listings();
// Collection<TopicListing> topicListings = listings.get();
//
// topicListings.forEach(new Consumer<TopicListing>() {
// @Override
// public void accept(TopicListing topicListing) {
// boolean internal = topicListing.isInternal();
// String name = topicListing.name();
// String s = topicListing.toString();
// System.out.println(s + "\t" + name + "\t" + internal);
// }
// });
// KafkaFuture<Set<String>> names = listTopicsResult.names();
// Set<String> strings = names.get();
//
// strings.forEach(name -> {
// System.out.println(name);
// });
// KafkaFuture<Map<String, TopicListing>> mapKafkaFuture = listTopicsResult.namesToListings();
// Map<String, TopicListing> stringTopicListingMap = mapKafkaFuture.get();
//
// stringTopicListingMap.forEach((k, v) -> {
// System.out.println(k + "\t" + v);
// });
ListTopicsOptions options = new ListTopicsOptions();
options.listInternal(false);
options.timeoutMs(500);
ListTopicsResult listTopicsResult1 = client.listTopics(options);
Map<String, TopicListing> stringTopicListingMap = listTopicsResult1.namesToListings().get();
stringTopicListingMap.forEach((k, v) -> {
System.out.println(k + "\t" + v);
});
// 关闭管理客户端
client.close();
}
@Test
public void testCreateTopic() throws ExecutionException, InterruptedException {
Map<String, String> configs = new HashMap<>();
configs.put("max.message.bytes", "1048576");
configs.put("segment.bytes", "1048576000");
NewTopic newTopic = new NewTopic("test_topic", 2, (short) 1);
newTopic.configs(configs);
CreateTopicsResult topics = client.createTopics(Collections.singleton(newTopic));
KafkaFuture<Void> all = topics.all();
Void aVoid = all.get();
System.out.println(aVoid);
}
@Test
public void testDeleteTopic() throws ExecutionException, InterruptedException {
DeleteTopicsOptions options = new DeleteTopicsOptions();
options.timeoutMs(500);
DeleteTopicsResult deleteResult = client.deleteTopics(Collections.singleton("test_topic"), options);
deleteResult.all().get();
}
@Test
public void testAlterTopic() throws ExecutionException, InterruptedException {
NewPartitions newPartitions = NewPartitions.increaseTo(5);
Map<String, NewPartitions> newPartitionsMap = new HashMap<>();
newPartitionsMap.put("test_topic", newPartitions);
CreatePartitionsOptions option = new CreatePartitionsOptions();
// Set to true if the request should be validated without creating
new partitions.
// 如果只是验证,而不创建分区,则设置为true
// option.validateOnly(true);
CreatePartitionsResult partitionsResult = client.createPartitions(newPartitionsMap, option);
Void aVoid = partitionsResult.all().get();
}
@Test
public void testDescribeTopics() throws ExecutionException, InterruptedException {
DescribeTopicsOptions options = new DescribeTopicsOptions();
options.timeoutMs(3000);
DescribeTopicsResult topicsResult = client.describeTopics(Collections.singleton("test_topic"), options);
Map<String, TopicDescription> stringTopicDescriptionMap = topicsResult.all().get();
stringTopicDescriptionMap.forEach((k, v) -> {
System.out.println(k + "\t" + v);
System.out.println("=======================================");
System.out.println(k);
boolean internal = v.isInternal();
String name = v.name();
List<TopicPartitionInfo> partitions = v.partitions();
String partitionStr = Arrays.toString(partitions.toArray());
System.out.println("内部的?" + internal);
System.out.println("topic name = " + name);
System.out.println("分区:" + partitionStr);
partitions.forEach(partition -> {
System.out.println(partition);
});
});
}
@Test
public void testDescribeCluster() throws ExecutionException, InterruptedException {
DescribeClusterResult describeClusterResult = client.describeCluster();
KafkaFuture<String> stringKafkaFuture = describeClusterResult.clusterId();
String s = stringKafkaFuture.get();
System.out.println("cluster name = " + s);
KafkaFuture<Node> controller = describeClusterResult.controller();
Node node = controller.get();
System.out.println("集群控制器:" + node);
Collection<Node> nodes = describeClusterResult.nodes().get();
nodes.forEach(node1 -> {
System.out.println(node1);
});
}
@Test
public void testDescribeConfigs() throws ExecutionException, InterruptedException, TimeoutException {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, "0");
DescribeConfigsResult describeConfigsResult = client.describeConfigs(Collections.singleton(configResource));
Map<ConfigResource, Config> configMap = describeConfigsResult.all().get(15, TimeUnit.SECONDS);
configMap.forEach(new BiConsumer<ConfigResource, Config>() {
@Override
public void accept(ConfigResource configResource, Config config) {
ConfigResource.Type type = configResource.type();
String name = configResource.name();
System.out.println("资源名称:" + name);
Collection<ConfigEntry> entries = config.entries();
entries.forEach(new Consumer<ConfigEntry>() {
@Override
public void accept(ConfigEntry configEntry) {
boolean aDefault = configEntry.isDefault();
boolean readOnly = configEntry.isReadOnly();
boolean sensitive = configEntry.isSensitive();
String name1 = configEntry.name();
String value = configEntry.value();
System.out.println("是否默认:" + aDefault + "\t是否只读?" + readOnly + "\t是否敏感?" + sensitive + "\t" + name1 + " --> " + value);
}
});
ConfigEntry retries = config.get("retries");
if (retries != null) {
System.out.println(retries.name() + " -->" + retries.value());
} else {
System.out.println("没有这个属性");
}
}
});
}
@Test
public void testAlterConfig() throws ExecutionException, InterruptedException {
// 这里设置后,原来资源中不冲突的属性也会丢失,直接按照这里的配置设置
Map<ConfigResource, Config> configMap = new HashMap<>();
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "test_topic");
Config config = new Config(Collections.singleton(new ConfigEntry("segment.bytes", "1048576000")));
configMap.put(resource, config);
AlterConfigsResult alterConfigsResult = client.alterConfigs(configMap);
Void aVoid = alterConfigsResult.all().get();
}
@Test
public void testDescribeLogDirs() throws ExecutionException, InterruptedException {
DescribeLogDirsOptions option = new DescribeLogDirsOptions();
option.timeoutMs(1000);
DescribeLogDirsResult describeLogDirsResult = client.describeLogDirs(Collections.singleton(0), option);
Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>integerMapMap = describeLogDirsResult.all().get();
integerMapMap.forEach(new BiConsumer<Integer, Map<String,DescribeLogDirsResponse.LogDirInfo>>() {
@Override
public void accept(Integer integer, Map<String, DescribeLogDirsResponse.LogDirInfo> stringLogDirInfoMap) {
System.out.println("broker.id = " + integer);
stringLogDirInfoMap.forEach(new BiConsumer<String,DescribeLogDirsResponse.LogDirInfo>() {
@Override
public void accept(String s, DescribeLogDirsResponse.LogDirInfo logDirInfo) {
System.out.println("log.dirs:" + s);
// 查看该broker上的主题/分区/偏移量等信息
// logDirInfo.replicaInfos.forEach(new BiConsumer<TopicPartition, DescribeLogDirsResponse.ReplicaInfo>() {
//
// @Override
// public void accept(TopicPartition topicPartition, DescribeLogDirsResponse.ReplicaInfo replicaInfo) {
// int partition = topicPartition.partition();
// String topic = topicPartition.topic();
// boolean isFuture = replicaInfo.isFuture;
// long offsetLag = replicaInfo.offsetLag;
// long size = replicaInfo.size;
// System.out.println("partition:" + partition + "\ttopic:" + topic
// + "\tisFuture:" + isFuture
// + "\toffsetLag:" + offsetLag
// + "\tsize:" + size);
// }
// });
}
});
}
});
}
}