kafkaServer管理

kafka运行依赖于zookeeper,在启动kafka之前,首先要保证zookeeper已正常启动,在$KAFKA_HOME/bin目录下,kafka自带了zookeeper操作的相应脚本,这里对zookeeper的操作并未使用kafka提供的脚本,而是直接执行@ZOOKEEPER_HOME/bin目录下提供的相应操作脚本及命令,kafka提供了启动kafkaserver的执行脚本,kafka-server-start.sh,而该脚本核心代码是调用kafka-run-class.sh脚本编译kafka.kafka类,代码如下:
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka “$@”
kafka-run-class.sh是kafka自带的用来直接编译运行kafka源码中有main方法的scala文件,根据启动kafkaServer时调用的脚本不同,对kafkaServer启动操作分以下两小节分别进行介绍:
1.启动kafka单个节点
kafka提供了直接启动本地kafkaServer的执行脚本kafka-server-start.sh,该脚本在调用执行时需要传入传入server.properties文件路径,当然该文件名可以随意指定,在启动kafkaServer读取并解析该文件中相关配置信息以完成kafkaServer的实例化
(1)启动脚本分析
kafka-server-start.sh脚本执行入参定义如下:
if [ $# -lt 1 ];
then
        echo “USAGE: $0 [-daemon] server.properties [–override property=value]*”
        exit 1
fi
可以看到,在执行脚本时必须指定kafkaServer用于实例化的配置文件,可选参数-daemon表示使程序以守护进程的方式后台运行,在启动时我们还可以覆盖kafkaConfig相应的默认配置,格式为:–override property=value,其中property表示待覆盖的配置项名称,value为该配置项新设置的值
在kafka运行时,会创建相应的日志文件以便对kafka运行状态即异常情况进行跟踪,因此在该脚本中配置; $KAFKA_LOG2J_OPTS参数,代码如下:
if [ “x$KAFKA_LOG4J_OPTS” = “x” ]; then
    export KAFKA_LOG4J_OPTS=”-Dlog4j.configuration=file:$base_dir/../config/log4j.properties”
fi
其中$base_dir为$KAFKA_HOME/bin目录,这里指定在kafka启动时加载的是$KAFKA_HOME/config/log4j.properties文件,如果希望对kafka输出日志进行调整,如开启或关闭某些运行日志输出,修改kafka运行日志切分规则(默认是按小时进行切分)等,只需对该log4j.properties文件进行修改即可
同时,该脚本还对JVM的内存大小进行了设置,代码如下:
if [ “x$KAFKA_HEAP_OPTS” = “x” ]; then
    export KAFKA_HEAP_OPTS=”-Xmx1G -Xms1G”
fi
默认堆初始化(-Xms)空间为1GB,堆最大空间为1GB,因此若运行kafka的服务器内存大小不足时会导致kafka启动失败,例如,尝试修改该脚本内存分配大小大于机器物理内存并以非-daemon方式启动kafka时,在控制台输出以下启动失败日志:
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00007fdebc000000, 107374182400, 0) failed; error=’Cannot allocate memory’ (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 107374182400 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /root/kafka_2.12-2.1.0/bin/hs_err_pid31839.log
(2)启动kafkaServer
执行以下命令启动kafkaServer
./kafka-server-start.sh -daemon ../config/server.properties
首次启动成功后在$KAFKA_HOME/logs目录下会创建相应的日志文件,相关日志文件说明如表所示,同时在$log.dir目录下创建相应文件
日志名称
日志说明
默认日志级别
controller.log
kafkaController运行时日志
TRACE
kafka-authorizer.log
kafka权限认证相应操作日志
WARN
kafka-request.log
kafka相应网络请求日志
WARN
kafkaServer-gc.log
kafka运行过程,进行GC操作时的日志
INFO
log-cleaner.log
kafka日志清理操作相关统计信息
INFO
server.log
kafkaserver运行日志
INFO
state-change.log
kafka分区角色切换等状态转换日志
TRACE
分别在集群其他机器上运行此命令,启动kafkaServer,启动完毕后,登录ZooKeeper客户端查看相应节点信息,例如,查看brokers信息,,执行命令及输出结果信息如下:
./zkCli.sh -server kafka-1:2181
[zk: kafka-1:2181(CONNECTED) 0] ls /brokers/ids
[1]
[zk: kafka-1:2181(CONNECTED) 3] get /controller
{“version”:1,”brokerid”:1,”timestamp”:”1549872194335″}
cZxid = 0x1a
ctime = Mon Feb 11 16:03:14 CST 2019
mZxid = 0x1a
mtime = Mon Feb 11 16:03:14 CST 2019
pZxid = 0x1a
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x102f4f897240000
dataLength = 54
numChildren = 0
通过以上信息,可以看到1个节点组成的kafka集群已成功启动,其中[1]代表这个节点的kafka的brokerId,broker.id=1作为Leader控制器
此时假设我们kill掉broker.id=1的进程,可以看到zookeeper会将broker.id=1的节点从broker/ids中移除
[root@inter ~]# jps
22209 QuorumPeerMain
22961 Kafka
883 Jps
792 ZooKeeperMain
[root@inter ~]# kill -9 22961
[zk: kafka-1:2181(CONNECTED) 7] ls /brokers/ids
[]
[zk: kafka-1:2181(CONNECTED) 6] get /controller
Node does not exist: /controller
若希望开启JMX监控,则在kafkaServer启动时需要设置JMX_PORT,可以将JMX_PORT配置添加到kafkaServer启动脚本kafka-server-start,sh文件中,例如,设置JMX_PORT端口为9999,则在启动脚本中增加以下配置:
export JMX_PORT = 9999
再次启动kafkaServer,通过zooKeeper客户端查看节点信息,可以看到该节点JMX_PORT信息为设置的9999,若不设置JMX_PORT端口为一个无效端口-1,信息如下:
[zk: kafka-1:2181(CONNECTED) 10] get /brokers/ids/1
{“listener_security_protocol_map”:{“PLAINTEXT”:”PLAINTEXT”},”endpoints”:[“”],”jmx_port”:9999,”host”:”xxx”,”timestamp”:”1550041825438″,”port”:9092,”version”:4}
cZxid = 0x43
ctime = Wed Feb 13 15:10:25 CST 2019
mZxid = 0x43
mtime = Wed Feb 13 15:10:25 CST 2019
pZxid = 0x43
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x102f4f897240006
dataLength = 208
numChildren = 0
当然也可以在执行启动kafkaServer脚本时指定JMX_PORT配置,启动命令如下:
JMX_PORT=9999 ./kafka-server-start.sh -daemon ../config/server.properties
2.关闭kafka单个节点
PIDS=$(ps ax | grep -i ‘kafka\.Kafka’ | grep java | grep -v grep | awk ‘{print $1}’)
if [ -z “$PIDS” ]; then
  echo “No kafka server to stop”
  exit 1
else
  kill -s $SIGNAL $PIDS
fi
该脚本实现的功能是查找进程名为kafka的进程的PID,然后杀掉该进程,但该脚本在某些版本的操作系统执行时并不能关闭kafka,这里使用的操作系统为:
[root@inter bin]# lsb_release
LSB Version:    :base-4.0-amd64:base-4.0-noarch:core-4.0-amd64:core-4.0-noarch:graphics-4.0-amd64:graphics-4.0-noarch:printing-4.0-amd64:printing-4.0-noarch
因此这里将脚本查找PID的命令修改如下:
PIDS=$(jps | grep -i ‘kafka’ | awk ‘{print $1}’)
[root@inter bin]# echo $(jps | grep -i ‘kafka’ | awk ‘{print $1}’)
8754
通过jps命令查看进程信息,然后从输出的进程信息中查找kafka进程信息所在的行,通过awk提取第二列即为kafka的进程PID,修改后保存再次执行kafka-server-stop.sh脚本,脚本正常执行