主题管理

kafka提供了一个kafka-topics.sh工具脚本用于对主题相关的操作,如创建主题,删除主题,修改主题分区数和副本分配以及修改主题级别的配置信息,查看主题信息等操作,该脚本核心代码仅一行:
exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand “$@”
运行kafka-run-class.sh脚本调用kafka.admin.topicCommand类,同时接受一个操作类型指令,该指令主要包括–list,–describe,–create,–alter和–delete
1,创建主题
kafka提供一下两种方式来创建一个主题
(1)若代理设置了auto.create.topics.enable=true,该配置默认值为true,这样当生产者向一个还未创建的主题发送消息时,会自动创建一个拥有${num.partitions}个分区和${default.replication.factor}个副本的主题
(2)客户端通过执行kafka-topics.sh脚本创建一个主题
本小节将采用第二种方式来创建主题,第一种方式创建主题将在讲解kafka-console-producer.sh进行介绍,下面创建一个名为kafka-action的主题,该主题拥有1个副本,3个分区,创建该主题命令如下:
[root@inter bin]# ./kafka-topics.sh –create –zookeeper kafka-1:2181 –replication-factor 1 –partitions 3 –topic kafka-action
Created topic “kafka-action”.
此时会在${log.dir}目录下创建相应的分区文件目录,副本分别分布在不同节点上,该主题分区目录分布如图所示(下图是2副本3分区在3个节点的分布):
drwxr-xr-x 2 root root 4096 Feb 14 14:33 kafka-action-0
drwxr-xr-x 2 root root 4096 Feb 14 14:33 kafka-action-1
drwxr-xr-x 2 root root 4096 Feb 14 14:33 kafka-action-2
[2019-02-14 14:33:46,834] INFO [Controller id=1] New topics: [Set(kafka-action)], deleted topics: [Set()], new partition replica assignment [Map(kafka-action-2 -> Vector(1), kafka-action-1 -> Vector(1), kafka-action-0 -> Vector(1))] (kafka.controller.KafkaController)
[2019-02-14 14:33:46,835] INFO [Controller id=1] New partition creation callback for kafka-action-2,kafka-action-1,kafka-action-0 (kafka.controller.KafkaController)
同时登录zookeeper客户端查看所创建的主题元数据信息,”kafka-action”元数据信息如下:
[zk: kafka-1:2181(CONNECTED) 11] get /brokers/topics/kafka-action
{“version”:1,”partitions”:{“2″:[1],”1″:[1],”0”:[1]}}
cZxid = 0x61
ctime = Thu Feb 14 14:33:46 CST 2019
mZxid = 0x61
mtime = Thu Feb 14 14:33:46 CST 2019
pZxid = 0x63
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 52
numChildren = 1
可以看到该主题有3个分区,1个副本,分布于1个节点上,上述创建主题命令各参数说明如下:
    zookeeper参数是必传参数,用于配置kafka集群与zookeeper连接地址,这里并不要求传递${zookeeper.connect}配置的所有连接地址,为了容错,建议多个zookeeper节点的集群至少传递两个zookeeper连接配置,多个配置之间以逗号隔开
    partitions参数用于设置分区数,该配置为 必传参数,kafka通过分区分配策略,将一个主题的消息分散到多个分区保存到不同的代理上,以此来提高消息处理的吞吐量,kafka的生产者和消费者可以采用多线程并行对主题消息进行处理,而每个线程处理的是一个分区的数据,因此分区实际上是kafka并行处理的基本单位,分区数越多一定程度上会提升消息处理的吞吐量,然而kafka消息是以追加的形式存储在文件中的,这就意味着分区越多需要打开更新的文件句柄,这样也会带来一定的开销
    replication-factor参数用来设置主题副本数,该配置也是必传参数,副本会被分布在不同的节点上,副本数不能超过节点数,否则创建主题会失败,例如,3个节点的kafka集群最多只能有3个副本,若创建主题时指定副本数大于3,则会抛出以下错误提示:
Error while executing topic command : Replication factor: 2 larger than available brokers: 1.
[2019-02-14 14:33:17,601] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 2 larger than available brokers: 1.
(kafka.admin.TopicCommand$)
在创建主题时,我们还可以通过config参数来设置主题级别的配置以覆盖默认配置,可以设置多组配置,具体格式为:
–config config1-name=config1-value –config config2-name=config2-value
创建一个名为config-test的主题,设置该主题的max.message.byte为404800字节,执行命令如下:
[root@inter bin]# ./kafka-topics.sh –create –zookeeper kafka-1:2181 –replication-factor 1 –partitions 3 –topic config-test –config max.message.bytes=404800
Created topic “config-test”.
2.删除主题
删除kafka主题,一般有以下两种方式
(1)手动删除各节点${log.dir}目录下该主题分区文件夹,同时登录zookeeper客户端删除待删除主题对应的节点,主题元数据保存在,/brokers/tipics和/config/topics目录下
(2)执行kafka-topics.sh脚本进行删除,若希望通过该脚本彻底删除主题,则需要保证在启动kafka时所加载的server.properties文件中配置delete.topic.enable=true,该配置默认为false,否则执行该脚本并未真正删除主题,而是在zookeeper的/admin/delete_topics目录下创建一个与待删除主题同名的节点,将该主题标记为删除状态
本小节只讲解通过kafka-topics.sh删除主题操作,如删除主题kafka-action的操作命令如下:
[root@inter bin]# ./kafka-topics.sh –delete –zookeeper kafka-1:2181 –topic kafka-action
Topic kafka-action is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
从以上执行结果可以分析出:当delete.topic.enable设置为false时,只是标记kafka-action为删除状态,主题在${log.dir}目录下对应的分区文件及在zookeeper中的相应节点并未删除,而是在/admin/delete_topics目录下创建一个以待删除主题命令的节点,做为标记.此时若希望彻底删除主题,这需要通过手动删除相应文件及节点,当该配置为true时,则会将该主题对应的所有文件目录及元数据信息删除
(3)查看主题
kafka提供了list和describe两个命令方便查看主题信息,其中list参数列出kafka所有的主题名,describe参数可以查看所有主题或某个特定主题的信息
查看所有主题
[root@inter bin]# ./kafka-topics.sh –list –zookeeper kafka-1:2181
config-test
查看某个特定的主题信息
当执行describe命令时,若指定topic参数则查看特定主题的信息,若不zhidingtopic参数则查看所有主题信息,该命令会按主题名分组显示个主题的信息,执行一下命令查看config-test主题的信息:
[root@inter bin]# ./kafka-topics.sh –describe –zookeeper kafka-1:2181
Topic:config-test       PartitionCount:3        ReplicationFactor:1     Configs:max.message.bytes=404800
        Topic: config-test      Partition: 0    Leader: 1       Replicas: 1     Isr: 1
        Topic: config-test      Partition: 1    Leader: 1       Replicas: 1     Isr: 1
        Topic: config-test      Partition: 2    Leader: 1       Replicas: 1     Isr: 1
从输出结果可以看到:已按主题分组展示,每组主题信息中第一行分别展示了主题名,该主题分区总数,该主题副本总数,创建主题是通过config参数说设置的配置,从第二行开始按主题分区编号排序,展示每个分区的Leader副本节点,副本列表AR及ISR列表信息
查看正在同步的主题
通过describe与under-replicated-partitiions命令组合使用,可以查看处于under relicaed状态的分区,处于该状态的主题可能正在进行同步操作,也有可能同步发生异常,即此时所查询到的主题分区ISR列表长度小于AR列表,对于通过该命令查询到分区的要重点监控,因为这可能意味着集群某个代理已失效或者同步速度减慢等,当然,也可以指定topic参数已查询特定主题是否处于under replicated状态,执行命令如下:
[root@inter bin]# ./kafka-topics.sh –describe –zookeeper kafka-1:2181 –under-replicated-partitions
查看没有leader的分区
通过describe与unavailable-partitions命令组合使用,可以查看没有Leader副本的主题,同样也可以指定topic参数,查看某个特定主题的哪些分区Leader已不可用,执行一下命令如下:
[root@inter bin]# ./kafka-topics.sh –describe –zookeeper kafka-1:2181 –unavailable-partitions
查看主题覆盖的配置
通过describe与topics-with-overrides命令组合使用,可以查看主题覆盖了哪些配置,组合使用与只有describe命令的区别在于:topic-with-overrides命令只显示descibe命令执行的第一行信息,同样,也可指定topic参数查看某个特定主题所覆盖的配置,执行以下命令:
[root@inter bin]# ./kafka-topics.sh –describe –zookeeper kafka-1:2181 –topics-with-overrides
Topic:config-test       PartitionCount:3        ReplicationFactor:1     Configs:max.message.bytes=404800
(4)修改主题
当创建一个主题之后,可以通过alter命令对 主题进行修改,包括修改主题级别的配置,增加主题分区,修改副本分配方案,修改主题offset等,下面详细讲解如何通过kafka的shell脚本对主题进行修改
修改主题级别配置
在创建主题时,可以通过config参数覆盖主题级别的默认配置,当主题创建后可以通过alter与config参数组合使用,修改或增加新的配置以覆盖相应配置原来的值,或者通过alter与delete-config参数组合使用删除相应配置设置使其恢复默认值.同时kafka还提供了一个kafka-configs.sh的脚本,专门用来对配置(这里的配置不单指主题级别的配置)进行操作,在未来的版本中将不支持alter与config参数组合使用的方式对配置进行操作,在当前版本(0.10.1.1)的kafka通过alter参数对配置进行操作时会有操作命令已过期并推荐使用kafka-configs.sh脚本的提示信息,虽然这种操作方式已过期,但并未被移除,因此在这里依然对其操作方法进行介绍,下面通过修改主题config-test的相关配置来介绍相关命令的具体用法
执行以下命令查看config-test主题当前的配置,命令如下:
[root@inter bin]# ./kafka-topics.sh –describe –zookeeper kafka-1:2181 –topics-with-overrides –topic config-testTopic:config-test       PartitionCount:3        ReplicationFactor:1     Configs:max.message.bytes=404800
修改该max.message.bytes配置使其值为204800,命令如下:
[root@inter bin]# ./kafka-topics.sh –alter –zookeeper kafka-1:2181 –topic config-test –config max.message.bytes=204800
输出信息如下:
WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.Going forward, please use kafka-configs.sh for this functionality Updated config for topic “config-test”.
更新配置成功,同时输出信息提示通过该方式修改主题级别配置的相关命令已过期,在未来的版本该命令将被移除,推荐使用使用kafka-configs.sh脚本
覆盖主题config-test的segment.byte大小为200MB(200*1024*1024),执行命令如下:
[root@inter bin]# ./kafka-topics.sh –alter –zookeeper kafka-1:2181 –topic config-test –config segment.bytes=209715200WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.Going forward, please use kafka-configs.sh for this functionality Updated config for topic “config-test”.
该命令执行后,通过参数topics-with-overrides查看主题config-test已覆盖的配置信息,相应信息输出结果如下:
Configs:segment.bytes=209715200,max.message.bytes=204800
删除配置项segment.bytes的设置,使其值恢复为默认值,操作命令如下:
[root@inter bin]# ./kafka-topics.sh –alter –zookeeper kafka-1:2181 –topic config-test –delete-config segment.bytes
WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.Going forward, please use kafka-configs.sh for this functionalityUpdated config for topic “config-test”.
命令执行之后,再次查看主题config-test已覆盖的配置信息如下:
Configs:max.message.bytes=204800
增加分区
kafka并不支持减少分区的操作,我们只能为一个主题增加分区,例如,主题config-test目前有3个分区,如果将其分区设置为5个,操作命令如下:
[root@inter bin]# ./kafka-topics.sh –alter –zookeeper kafka-1:2181 –topic config-test –partitions 5
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Adding partitions succeeded!
增加分区命令执行成功后,查看集群各节点${log.dir}目录下所分配的分区目录文件均已成功创建,同时登录zookeeper客户端查看分区元数据信息如下:
[zk: kafka-1:2181(CONNECTED) 13] get /brokers/topics/config-test
{“version”:1,”partitions”:{“4″:[1],”1″:[1],”0″:[1],”2″:[1],”3”:[1]}}
cZxid = 0x6d
ctime = Thu Feb 14 15:10:33 CST 2019
mZxid = 0xaf
mtime = Fri Feb 15 10:54:08 CST 2019
pZxid = 0x6e
cversion = 1
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 68
numChildren = 1
通过以上元数据信息显示,主题config-test分区已增加到5个,同时各分区副本进行了重新分配