生产者基本操作

kafka自带了一个在终端演示生产者发布消息的脚本kafka-console-producer.sh,熟练掌握该脚本用法,更能直观的帮助我们理解生产者发送消息的过程.运行该脚本启动一个生产者进程,在运行该脚本时可以传递相应配置以覆盖默认配置,该脚本提供3个命令参数用于设置配置项的方式.
参数producer.config,用于加载一个生产者级别相关配置的配置文件,如producer.properties
参数producer-property,通过该命令参数可以直接在启动生产者命令行中设置生产者级别的配置,在命令行中设置的参数将会覆盖所加载配置文件中的参数设置
参数property,通过该命令可以设置消息消费者相关的配置
该脚本还支持其他命令参数,包括配置消息序列化类,配置消息确认方式,配置消息失败重试次数等,这里不一一列举
1.启动生产者
kafka自带了一个kafka-console-producer.sh脚本,通过执行该脚本可以在终端调用kafka生产者向kafka发送消息,该脚本运行时需要指定broker-list和topic两个必传参数,分别用来指定kafka的代理地址列表以及消息被发送的目标主题,同时该脚本还支持其他可选参数,例如,通过参数sync指定以同步模式发送消息,property参数后跟配置项键值对,producer.config参数加载一个生产者级别的配置文件,这里不一一列出
执行以下命令.启动一个向主题kafka-action发送消息的生产者,同时指定每条消息包含有key:
./kafka-console-producer.sh –broker-list kafka-1:9092 –topic kafka-action –property parse.key=true
该命令执行后,控制台等待客户端输入消息,由于没有指定消息key与消息净荷(payload)之间的分隔符,默认是以制表符分隔,若希望修改分隔符,则通过配置项key.separator指定,例如,执行以下命令启动一个生产者,同时指定启用消息的key配置,并指定key与消息实际数据之间以空格作为分隔符
./kafka-console-producer.sh –broker-list kafka-1:9092 –topic kafka-action –property parse.key=true –property key.separator=’ ‘
在控制台分别输入一批消息,消息key与消息实际数据之间以空格分隔,然后执行以下命令,验证消息是否发送成功
./kafka-run-class.sh kafka.tools.GetOffsetShell –broker-list kafka-1:9092 –topic kafka-action –time -1
该命令用于查看某个主题各分区对应消息偏移量,可以通过partitions参数指定一个或多个分区,多个分区之间以逗号分隔,若不指定则默认查看该主题所有分区,time参数表示查看在指定时间之前的数据,支持-1(latest),-2(earlist)两个时间选项,默认取-1
执行以上命令输出结果信息如下,共3列,分别表示主题名,分区编号,消息偏移量
kafka-action:0:5
kafka-action:1:0
kafka-action:2:0
通过结果信息可知,总共产生了5条消息,3个分区按编号从大到小依次有5条,0条,0条消息
2.创建主题
若开启了自动创建主题配置项auto.create.topic.enable=true,当生产者向一个还不存在的主题发送消息时,kafka会自动创建该主题.例如,执行以下命令启动一个生产者向主题’producer-create-topic’发送消息
./kafka-console-producer.sh –broker-list kafka-1:9092 –topic producer-create-topic
>111 333
[2019-02-18 15:31:28,939] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 1 : {producer-create-topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2019-02-18 15:31:29,043] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {producer-create-topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
输出以上警告信息是由于当向该主题发送消息时该主题并不存在,因此获取不到该主题对应的元数据信息,此时就会创建一个新主题,该主题有${num.paritions}个分区和${default.replication.factor}个副本
3.查看消息
kafka生产的消息以二进制的形式存在文件中,为了便于查看消息内容,kafka提供了一个查看日志文件的工具类kafka.tools.DumpLogSegments,通过kafka-run-class.sh脚本,可以直接在终端运行该工具类,例如,查看主题producer-create-topic相应分区下的日志文件,执行命令如下:
[root@inter bin]# ./kafka-run-class.sh kafka.tools.DumpLogSegments –files /tmp/kafka-logs/kafka-action-0/00000000000000000000.log
Dumping /tmp/kafka-logs/kafka-action-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1550473789399 isvalid: true size: 92 magic: 2 compresscodec: NONE crc: 3779389381
baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 92 CreateTime: 1550473989264 isvalid: true size: 79 magic: 2 compresscodec: NONE crc: 1203805574
baseOffset: 2 lastOffset: 2 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 171 CreateTime: 1550474098162 isvalid: true size: 97 magic: 2 compresscodec: NONE crc: 3060605914
baseOffset: 3 lastOffset: 3 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 268 CreateTime: 1550474114715 isvalid: true size: 94 magic: 2 compresscodec: NONE crc: 622797319
baseOffset: 4 lastOffset: 4 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 362 CreateTime: 1550474123730 isvalid: true size: 93 magic: 2 compresscodec: NONE crc: 3095856602
上述命令,files是必传参数,用于指定哟啊转储(dump)文件路径,可同时指定多个文件,多个文件路径之间以逗号分隔
4.生产者性能测试工具
kafka提供了一个用来测试生产者性能的工具脚本kafka-producer-perf-test.sh,通过该工具可以对生产者性能进行调优,通过优化不同的配置来提升生产者的发送效率,从而得到一组最优的参数配置,提高吞吐量
(1)kafka自带测试工具应用
kafka自带的生产者测试脚本核心代码内容如下:
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance “$@”
该脚本调用的是org.apache.kafka.tools.ProducerPerformance类,该类在tools工程下,用Java语言实现,与之前的版本的kafka相比,kafka 0.10.1.1版本中生产者性能测试工具并没有提供线程数设置的threads参数,我认为这应该与当前版本的kafkaProducer实现方式有关,通过java语言重新实现的kafkaProducer是线程安全的,多线程共享一个kafkaProducer实例要比每个线程创建一个实例发送消息时要快得多,而当前ProducerPerformanc类中也没有采用多线程的实现方式.同时,当前版本的脚本将指定连接kafka代理地址的参数不再是直接通过参数broker-list设置,而是通过参数producer-props指定配置项的形式设置,通过配置boostrap.servers来指定代理列表.该工具支持参数详细说明如下表所示:
参数名
参数说明
topic
指定生产者发送消息的目标主题
num-records
测试时发送消息的总条数
record-size
每条消息的字节数
throughput
限流控制
producer-props
以键值对的形式指定配置,可同时指定多组配置,多组配置之间以空格分隔
producer.config
加载生产者级别的配置文件
需要特别说明的是,throughput参数是用来进行限流控制的,当throughput值小于0时则不进行限流,若该参数值大于0时,当已发送消息总字节数与当前已执行的时间取整大于该字段时,生产者线程会被阻塞,生产者线程被阻塞时,在控制台可以看到输出一行吞吐量统计信息,若该参数值等于0时,则生产者在发送一次消息之后检测满足阻塞条件时将会一直被阻塞
例如,向一个名为”producer-perf-test”主题发送100万条消息,每条信息大小为1000字节,同时acks设置为all,对应的acks值为-1,测试kafka生产消息的性能执行命令如下:
./kafka-producer-perf-test.sh –num-records=1000000 –record-size=1000 –topic producer-perf-test –throughput 1000000 –producer-props bootstrap.servers=kafka-1:9092 acks=all
测试结果输出如下:
115486 records sent, 23069.5 records/sec (22.00 MB/sec), 1015.5 ms avg latency, 1487.0 max latency.
211264 records sent, 42252.8 records/sec (40.30 MB/sec), 745.2 ms avg latency, 1159.0 max latency.
144215 records sent, 28843.0 records/sec (27.51 MB/sec), 1139.1 ms avg latency, 1435.0 max latency.
156245 records sent, 31249.0 records/sec (29.80 MB/sec), 1073.3 ms avg latency, 1555.0 max latency.
185236 records sent, 37047.2 records/sec (35.33 MB/sec), 877.0 ms avg latency, 1290.0 max latency.
117566 records sent, 23513.2 records/sec (22.42 MB/sec), 1374.9 ms avg latency, 2684.0 max latency.
1000000 records sent, 30886.122865 records/sec (29.46 MB/sec), 1014.74 ms avg latency, 2684.00 ms max latency, 961 ms 50th, 1500 ms 95th, 2657 ms 99th, 2682 ms 99.9th.
测试结果各字段说明如下表所示:
字段名
描述
recores sent
测试时发送的消息总数
records/sec
以每秒发送的消息数来统计吞吐量
MS/sec
以每秒发送的消息大小(单位为MB)来统计吞吐量
avg latency
消息处理的平均耗时,单位为MS
max latency
消息处理的最大耗时,单位为MS
50th/95th/99.9th
分别表示50%,95%,99.9%的消息处理耗时