消费者基本操作

1.消费消息
kafka的消费者以pull的方式获取信息,同时kafka采用了消费组的模式,每个消费者都属于某一个消费组.在创建消费者时,若不指定消费者的groupId,则该消费者属于默认消费组.消费组是一个全局概念,因此在设置group.id时,要确保该值在kafka集群中唯一
同一个消费组下的各消费者在消费消息时是互斥的,也就是,对于一条消息而言,就同一个消费组下的消费者来讲,只能被同组下的某一个消费者消费,但不同消费组的消费者能消费同一条消息,正因如此,我们很方便通过消费组来实现消息的单播与广播,这里所说的单播与广播是相对消费者消费消息而言的.
kafka提供了一个kafka-console-consumer.sh脚本以方便用户在终端模拟消费者消费消息,该脚本内容如下:
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer “$@”
该脚本调用的是kafka core工程下kafka.tools包下的consoleConsumer对象,该对象根据运行时参数不同,分别调用kafka老版本的消费者和新版本的消费者(org.apache.kafka.clicents.consumer.kafkaConsumer)消费消息
(1)新版本消费者
新版本的消费者(org.apache.kafka.clients.consumer.kafkaConsumer)去掉了对zookeeper的依赖,当启动一个消费者时不再向zookeeper注册,而是由消费组协调器(GroupCoordinator)统一管理,消费者已消费消息的偏移量提交后会保存到名为”__consumer_offsets”的内部主题中,下面详细介绍如何kafka-console-consumer.sh脚本执行新版本消费者相关操作
首先执行以下命令启动一个新版消费者:
./kafka-console-consumer.sh –bootstrap-server kafka-1:9092 –consumer-property group.id=new-consumer-test –consumer-property client.id=new-consumer-c1 –topic kafka-action
执行该脚本关键参数是bootstrap-server,因为这种方式连接kafka时才会调用新版本的kafkaConsumer,若通过参数zookeeper方式启动则调用的是老版本的消费者,同时可以通过new-consumer参数直接指定调用新版本的消费者,若以参数bootstrap-server方式启动,则默认调用的是新版消费者,此时可以不用设置new-comsumer参数,以上启动消费者的命令通过参数consumer-property设置group.id为new-consumer-test.通过计算消费组名的hashcode值与内部主题分区总数(默认是50个分区)取模来确定消费者偏移量存储的分区,若没有指定group.id,则消费者属于默认消费组,可以通过以下命令查看消费组名信息:
./kafka-consumer-groups.sh –bootstrap-server kafka-1:9092 –list
其中,参数new-consumer指定列出新消费者类型的所有消费组信息,通过消费组名根据以下公式就可以计算出该消费组的偏移量存储在__consumer_offsets主题对应的分区
Math.abs(${group.id}.hashCode())%${offsets.topic.num.partitions}
1))查看注意kafka-action各分区的偏移量信息,命令如下:
./kafka-run-class.sh kafka.tools.GetOffsetShell –broker-list kafka-1:9092 –topic kafka-action -time -1
输出结果如下
kafka-action:0:5
kafka-action:1:0
kafka-action:2:0
(2)消费多主题
kafka自带脚本kafka-console-consumer.sh的topic参数并不支持同时指定多个主题,但该脚本提供了另外一个参数whitelist(白名单),该参数可以同时指定多个主题,且支持正则表达式.注意,主题名表达式需要加引号,例如,执行以下命令,指定消费kafka-aciton和producer-perf-test两个主题的消息
./kafka-console-consumer.sh –bootstrap-server kafka-1:9092 –consumer-property group.id=consumer-multi-topic –whitelist “kafka-action|producer-pert-test”
2.单播与多播
kafka引入了消费组,每个消费者都属于一个特定的消费组,通过消费组就可以实现消息的单播和多播
1))单播
一条消息只能被某一个消费者消费的模式称为单播,要实现消息单播,只要让这些消费者属于同一个消费组即可,下面通过一个简单实例介绍在终端模拟消息单播操作流程
首先启动一个生产者向kafka-action主题发送消息,执行命令如下:
./kafka-console-producer.sh –broker-list kafka-1:9092 –topic kafka-action
在终端分别执行以下命令,启动两个消费者:
./kafka-console-consumer.sh –bootstrap-server kafka-1:9092 –topic kafka-action –consumer-property group.id=single-consumer-group
当生产者发送一条信息时,两个消费者只有一个能收到消息,运行结果下所示
producer:
>1231 csdfdsfd
>2132 dffsd
>fd 23423432
>fdsf 23423
consumer1:
1231 csdfdsfd
fdsf 23423
consumer2:
2132 dffsd
fd 23423432
2))多播
一条消息能够被多个消费者消费的模式称为多播,之所以不称之为广播,是因为一条消息只能被kafka同一个分组下某一个消费者消费,而不是所有消费者,所以从严格意义上来讲并不能算是广播模式,当然如果希望实现广播模式只要保证每个消费者均属于不同的消费组.针对kafka同一条消息只能被同一个消费组下的某一个消费者消费的特性,要实现多播只要保证这些消费者属于不同的消费组即可,我们再增加一个消费者,该消费组属于multi-consumer-group消费组,命令如下:
然后生产者发送几条消息,可以看到不同消费组的消费者同时能消费到消息,然而同一个消费组下的消费者却只能有一个消费者能消费到消息,运行结果下所示
producer:
111
222
444
333
555
666
group.id=multi-consumer-group:
111
222
444
333
555
666
group.id=single-consumer-group
client1:
333
666
group.id=single-consumer-group
client2:
111
222
444
555
3.查看消费偏移量
kafka提供了一个查看某个消费组消费者偏移量的kafka-consumer-offset-checker.sh脚本,通过该脚本可以查看某个消费组消费消息的情况,该脚本调用的是kafka.tools.ConsumerOffsetChecker,不过在0.9版本之后已不再建议使用该脚本,而建议使用kafka-consumer-groups.sh,该脚本调用的是kafka.admin.ComsumerGroupCommand
(1)ConsumerGroupCommand用法
启动一个新版本的消费者,该消费者消费主题kafka-action的消息,同时该消费者属于消费组single-consumer-group
./kafka-console-consumer.sh –bootstrap-server kafka-1:9092 –topic kafka-action –consumer-property group.id=single-consumer-group
kafka-consumer-groups.sh脚本调用的是ConsumerGroupCommand类,该脚本支持–zookeeper和–bootstrap-server两种运行方式,支持以下3中类型的操作
list:返回与启动方式对应的所有消费组,即若是以参数zookeeper方式启动,则返回的是老版本的消费者对应的消费组信息,否则返回新版本的消费者隶属的消费组的信息
describe:查看某个消费组当前的消费情况
delete:删除消费组
list类型的操作在上一小节其实已应用过,在此不再介绍,接下来我们首先介绍describe类型的操作,describe用于查看消费组当前的消费情况,若待查看的消费组是以老版本方式创建的,则通过该脚本查看消费情况时应该以–zookeeper方式运行,以–zookeeper方式运行时,其实现原理即通过查看老版本的消费者早zookeeper中记录的相应的元数据信息,反之,若查看的是新消费者的消费情况,则应以–bootstrap-server方式运行该脚本,若消费组是通过新消费者方式创建,新版本的消费者不依赖于zookeeper,而运行该脚本时却是通过zookeeper方式执行,这样由于zookeeper中查询不到相应的元数据信息,而导致不会返回任何消费信息.
本例待查看的消费者为新版本的消费者,因此执行以下命令查看消费组消费情况:
 ./kafka-consumer-groups.sh –bootstrap-server kafka-1:9092 –describe –group single-consumer-group
输出结果如下:
TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
kafka-action    0          3               3               0               consumer-1-183bbb47-e086-48d6-a09f-7c445fea90e1 /10.128.20.122  consumer-1
kafka-action    1          3               3               0               consumer-1-183bbb47-e086-48d6-a09f-7c445fea90e1 /10.128.20.122  consumer-1
kafka-action    2          4               4               0               consumer-1-183bbb47-e086-48d6-a09f-7c445fea90e1 /10.128.20.122  consumer-1
同时,若是以bootstrap-server方式运行该脚本时,只能查看运行着的消费组,若消费组状态为DEAD,则由于在Metadata中查询不到相应元数据信息而导致不会返回任何消费信息,此时会在终端输出以下提示信息,kafka认为消费者正在进行平衡操作:
因此,若在查询消费组消费信息时出现以上提示信息,有一种可能是消费者已处于非正常运行状态,有可能消费者正在进行平衡操作
该脚本支持删除不包括任何消费者的消费组,需要注意的是,该脚本只能删除消费组为老版本消费者对应的消费组,我们可以指定删除某个主题的消费组,也可以不指定主题,当然也可以不指定消费组而指定主题,此时删除该主题下的所有不具有消费者的消费组,删除操作的本质是删除zookeeper中相应的消费组的节点及其子节点
4.消费者性能测试工具
kafka也提供了新,老两个版本的消费者性能压力测试的脚本kafka-consumer-perf-test.sh,本小节也仅介绍该脚本的相关用法,而不给出消费者性能压力测试的完整报告
与消费者相关操作的其他脚本一样,该脚本也是通过运行时所指定的连接kafka的方式来确定调用哪个版本的消费者,该脚本支持多线程(–threads参数)设置,例如,以broker-list方式启动该脚本,并指定5个线程,消费100万条消息,每条消息大小为1000字节(默认为100字节),同时指定num-fetch-threads为2,默认是1个线程,消费主题为”producer-perf-test”,该主题在生产者性能测试已写入了超过100万条消息,其他参数不在设置,执行命令如下:
./kafka-consumer-perf-test.sh –broker-list kafka-1:9092 –threads 5 –messages 1000000 –num-fetch-threads 2 –group consumer-perf-test –topic producer-perf-test
测试结果输出如下:
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
WARNING: Exiting before consuming the expected number of messages: timeout (10000 ms) exceeded. You can use the –timeout option to increase the timeout.
2019-02-21 15:54:03:905, 2019-02-21 15:54:13:942, 0.0000, 0.0000, 0, 0.0000, 318, 9719, 0.0000, 0.0000
测试共展示6列信息,依次为运行起始时间,结束时间,消费的消息总量(单位为MB),按消息总量统计的吞吐量(单位为MB/S),消费消息总条数,按消息总数统计的吞吐量(单位为条/s)