KAFKA安全机制

在0.9版本之后,kafka增加了身份认证与权限控制两种安全机制
身份认证是指客户端与服务器连接进行身份认证,包括客户端与kafka代理之间的连接认证、代理之间的连接认证、代理与zookeeper之间的连接认证。目前支持SSL、SASL/Kerberos、SASL/PLAIN这3种认证机制。
权限控制是指对客户端的读写操作进行权限控制,包括对于消息或kafka集群操作权限控制。权限控制是可插拔的,并且支持与外部的授权服务进行集成,kafka自带了简单的授权实现类SimpleAclAuthorizer,可以在server.properties文件种通过配置项authorizer.class.name指定,如设置authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer,kafka权限类型如下表所示:
权限类型
权限说明
READ
读操作权限,如消费者消费主体的权限、消费组管理时相关读取相关元信息权限等
WRITE
写操作权限,如生产者向主题写消息的权限
DELETE
删除主题操作的权限
CREATE
创建主题操作的权限
ALTER
修改主题及配置操作的权限
DESCRIBE
获取主题元数据信息的权限
CLUSTERACTION
集群元数据操作权限,如更新集群元数据操作、关闭控制器、停止副本等
ALL
所有权限
kafka将权限控制列表ACL存储在zookeeper种,当添加权限控制之后,会在zookeeper中创建两个节点:即存储ACL信息的kafka-acl节点和存储ACL变更信息的kafka-acl-changes节点
关于身份认证和权限控制的实现原理以及响应的认证机制原理,不作介绍。本节通过kafka自带脚本kafka-acls.sh介绍基于SASL/PLAIN认证机制配置身份认证操作的步骤以及相应的权限控制操作。SASL-PLAIN是一套简单的通过用户名和密码进行身份认证的机制,主要与TLS加密一起使用来实现安全身份认证,kafka对SASL/PLAIN支持一种默认实现。
下面详细介绍基于SASL/PLAIN机制进行身份认证及权限控制操作的基本步骤。
(1)利用SASL/PLAIN进行身份认证
修改server.properties文件,开启SASL认证配置。在kafka集群每个节点的server.properties文件中增加开启SASL认证机制相关配置,如下所示:
配置SASL端口
设置代理之间通信协议
security.inter.broker.protocol=SASL_PLAINTEXT
启用SASL机制
sasl.enabled.mechanisms=PLAIN
配置SASL机制
sasl.mechanism.inter.broker.protocol=PLAIN
完成身份验证的类
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
如果没有找到ACL(访问控制列表)配置,则允许任何操作
allow.everyone.if.no.acl.found=false
super.users=User:kafka
(2)创建服务端JAAS文件,配置PLAIN。创建一个服务端java验证与授权JAAS文件,该文件名为“kafka_server_jaas.conf”,在该文件中指定认证机制,并配置连接代理用户名和密码。该文件内容如下所示。
[root@kafka1 bin]# vi ../config/kafka_server_jaas.conf
KafkaServer{
        username=”kafka”
        password=”kafkapswd”
        user_kafka=”kafkapswd”
        user_morton=”mortonpswd”;
};
kafka定义了关键字“kafkaServer”字段用于指定服务端登录配置,该配置通过org.apache.kafka.common.security.plain.PlainLoginModule指定操作PLAIN机制,定义了两个用户,用户通过username和password指定该代理与集群其他代理初始化连接的用户名和密码,通过user_为前缀后接用户名的方式创建连接代理的用户名和密码,例如,user_morton=”mortonpswd”是指用户名为morton,密码为mortonpswd。
(3)创建和配置客户端JAAS文件,创建一个客户端JAAS文件,在该文件指定客户端消费者和生产者连接kafkaServer的认证机制。该文件名为“kafka_client_jaas.conf”,具体配置如下所示:
[root@kafka1 bin]# vi ../config/kafka_server_jaas.conf
KafkaServer{
        username=”kafka”
        password=”kafkapswd”
        user_kafka=”kafkapswd”
        user_morton=”mortonpswd”;
};
kafka通过关键字“KafkaClient”字段用于客户端连接kafka服务端登录配置,通过username和password字段配置客户端连接服务端的用户信息,这里指定用户为“morton”。
(4)将JAAS配置文件加入相应启动脚本中
修改kafka-server-start.sh脚本,在该脚本中引入服务端JAAS文件路径,添加以下内容:
if [“x$KAFKA_OPTS”]; then
        export KAFKA_OPTS=”-Djava.security.auth.login.config=/kafka/kafka_2.12-2.1.1/config/kafka_server_jaas.conf”
fi
修改kafka-cconsole-producer.sh和kafka-console-consumer脚本,在该脚本中引入服务端JAAS文件路径,添加以下内容:
if [ “x$KAFKA_OPTS” ]; then
        export KAFKA_OPTS=”-Djava.security.auth.login.config=/kafka/kafka_2.12-2.1.1/config/kafka_client_jaas.conf”
fi
至此,认证相关配置基本完成。当然,还可以配置代理连接zookeeper的认证,这里不再介绍。
下面分别启动客户端进行验证。为了对比验证,首先注释掉kafka-console-producer.sh脚本文件中添加的JAAS路径配置,然后重启kafka代理,创建一个主题并启动生产者和消费者客户端,相关命令如下:
[root@kafka1 bin]# ./kafka-topics.sh –zookeeper node1:2181,node2:2181,node3:2181 –create –topic acls-foo –partitions 3 –replication-factor 2
Created topic “acls-foo”.
./kafka-console-producer.sh –broker-list kafka1:9092,kafka2:9092,kafka3:9092 –topic acls-foo
./kafka-console-consumer.sh –bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 –topic acls-foo
生产者启动后,在控制台输入一条消息,此时发现消费者并没有首先消息,同时在启动生产者的控制台打印如下所示的错误信息:
[root@kafka1 bin]# ./kafka-console-producer.sh –broker-list kafka1:9092,kafka2:9092,kafka3:9092 –topic acls-foo
>111
[2019-03-30 05:35:31,281] WARN [Producer clientId=console-producer] Connection to node -1 (kafka1/192.168.6.231:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[root@kafka1 bin]# ./kafka-console-consumer.sh –bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 –topic acls-foo
[2019-03-30 05:35:13,456] WARN [Consumer clientId=consumer-1, groupId=console-consumer-66148] Connection to node -1 (kafka1/192.168.6.231:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
可见,生产者并没有连接上代理,现在,在生产者脚本中引入对客户端JAAS路径的配置,重新启动生产者,然后再尝试发送一条消息,执行结果如下所示:
[root@kafka1 bin]# ./kafka-console-producer.sh –broker-list kafka1:9092,kafka2:9092,kafka3:9092 –topic acls-foo –producer-property security.protocol=SASL_PLAINTEXT –producer-property sasl.mechanism=PLAIN
./kafka-console-consumer.sh –bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 –topic acls-foo –consumer-property security.protocol=SASL_PLAINTEXT –consumer-property sasl.mechanism=PLAIN
因为用户为norton,但是我们没有为该用户设置ACL所以会报错
[2019-03-30 07:00:19,052] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 2 : {acls-foo=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2019-03-30 07:00:19,053] ERROR Error when sending message to topic acls-foo with key: null, value: 3 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [acls-foo]
我们将用户修改为kafka,客户端可以正常连接上kafka代理,并能够正常通信。
(2)权限控制
kafka-acls.sh脚本支持查询(list)、添加(add)、移除(remove)这3类权限控制操作。要启用Kafka ACL权限控制,首先需要再server.properties文件中增加权限控制实现类的设置,如指定kafka实现的SimpleAclAuthorizer类:
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
(1)查询权限列表:通过kafka-acls.sh脚本可以查询某个主题(–topic)、某个消费组(–group)、集群(–cluster)当前的权限列表。
[root@kafka1 bin]# ./kafka-acls.sh –authorizer-properties zookeeper.connect=node1:2181,node2:2181,node3:2181 –list  -cluster
[root@kafka1 bin]#
(2)为生产者授权
首先在未授权的情况下,执行以下启动生产者命令:
[root@kafka1 bin]# ./kafka-console-producer.sh –broker-list kafka1:9092,kafka2:9092,kafka3:9092 –topic acls-foo –producer-property security.protocol=SASL_PLAINTEXT –producer-property sasl.mechanism=PLAIN
在未授权的情况下启动生产者,在控制台输出以下失败信息:
>333
[2019-04-01 07:56:45,063] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 2 : {acls-foo=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2019-04-01 07:56:45,065] ERROR Error when sending message to topic acls-foo with key: null, value: 3 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [acls-foo]
这是由于当前客户端并没有查询元数据信息的权限,下面未当前客户端授予生产者相应的权限。
通过add指令增加一条权限设置,增加权限时需要通过参数allow-principal指定给某个用户授权或者通过参数deny-principal指定某用户不具有的权限,该用户即为身份认证时创建的客户端用户。因为权限控制就是控制客户端相应的操作权限,因此必须对应到具体的用户。当然,也可以通过User.*指定权限控制的对象为所有用户。执行以下命令,为用户morton授予作为生产者所具有的权限:
[root@kafka1 bin]# ./kafka-acls.sh –add –authorizer-properties zookeeper.connect=node1:2181,node2:2181,node3:2181 –allow-principal User:morton –producer –topic=*
Adding ACLs for resource `Topic:LITERAL:*`:
        User:morton has Allow permission for operations: Describe from hosts: *
        User:morton has Allow permission for operations: Create from hosts: *
        User:morton has Allow permission for operations: Write from hosts: *
Current ACLs for resource `Topic:LITERAL:*`:
        User:morton has Allow permission for operations: Describe from hosts: *
        User:morton has Allow permission for operations: Create from hosts: *
        User:morton has Allow permission for operations: Write from hosts: *
该命令通过produver参数指定为该用户生产者角色授权,相当于通过–operation参数赋予write和describe权限,该命令指定该用户对所有主题具有写和查询权限,包括查询元数据信息的权限。此时,执行以下命令查询当前权限信息:
[root@kafka1 bin]# ./kafka-acls.sh –authorizer-properties zookeeper.connect=node1:2181,node2:2181,node3:2181 -list
Current ACLs for resource `Topic:LITERAL:*`:
        User:morton has Allow permission for operations: Describe from hosts: *
        User:morton has Allow permission for operations: Create from hosts: *
        User:morton has Allow permission for operations: Write from hosts: *
由权限信息可知,该用户对所有主题具有查询及写的权限,对集群具有写的权限。权限信息中hosts:*标识允许所有的机器可以访问,即该用户作为生产者时对其机器IP不进行权限控制,可以通过参数allow-host或者deny-host来设置允许或禁止生产者访问IP。
同时,登录zookeeper客户端,在/kafka-acls路径下会创建两个节点名为topic和cluster的节点,在topic节点再创建一个以“”为名的节点,用于记录当前设置的权限信息,之所以节点名为“”是因为在授权时通过*指定当前客户端作为对生产者时对所有主题具有操作权限,在zookeeper的kafka-acls路径下的topic权限列表信息如下所示:
[zk: node1:2181.node2:2181,node3:2181(CONNECTED) 1] get /kafka-acl/Topic/*
{“version”:1,”acls”:[{“principal”:”User:morton”,”permissionType”:”Allow”,”operation”:”Describe”,”host”:”*”},{“principal”:”User:morton”,”permissionType”:”Allow”,”operation”:”Create”,”host”:”*”},{“principal”:”User:morton”,”permissionType”:”Allow”,”operation”:”Write”,”host”:”*”}]}
cZxid = 0xd00001423
ctime = Mon Apr 01 08:10:20 EDT 2019
mZxid = 0xd00001423
mtime = Mon Apr 01 08:10:20 EDT 2019
pZxid = 0xd00001423
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 278
numChildren = 0
授权后,再次启动生产者,可以正常发送消息。
(3)为消费者授权。在未对消费者授权时,执行以下命令启动消费者:
[root@kafka1 bin]# ./kafka-console-consumer.sh –bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 –topic acls-foo –consumer-property security.protocol=SASL_PLAINTEXT –consumer-property sasl.mechanism=PLAIN
消费者启动失败,同时在控制台输出如下错误信息:
[2019-04-01 09:30:22,531] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: console-consumer-54952
Processed a total of 0 messages
由日志信息可知,当前客户端不具有访问消费组的权限,现在,执行以下命令为客户端授予消费者的权限,这里指定该客户端消费者只能消费acls-foo主题的消息。
[root@kafka1 bin]# ./kafka-acls.sh –add –authorizer-properties zookeeper.connect=node1:2181,node2:2181,node3:2181  –allow-principal User:morton –consumer –topic acls-foo –group acls-group
Adding ACLs for resource `Topic:LITERAL:acls-foo`:
        User:morton has Allow permission for operations: Read from hosts: *
        User:morton has Allow permission for operations: Describe from hosts: *
Adding ACLs for resource `Group:LITERAL:acls-group`:
        User:morton has Allow permission for operations: Read from hosts: *
Current ACLs for resource `Topic:LITERAL:acls-foo`:
        User:morton has Allow permission for operations: Read from hosts: *
        User:morton has Allow permission for operations: Describe from hosts: *
Current ACLs for resource `Group:LITERAL:acls-group`:
        User:morton has Allow permission for operations: Read from hosts: *
由于消费者属于某个消费组,因此授权时需要通过–group参数指定该消费者所属的消费组,这里指定属于acls-group消费组的消费者才具有消费该主题的权限,执行以下启动消费者的命令,通过group.id指定消费组,当然也可将消费者相关的配置添加到一个属性文件,在启动消费者时通过consumer.conf参数加载该配置文件。
[root@kafka1 bin]# ./kafka-console-consumer.sh –bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 –topic acls-foo –consumer-property security.protocol=SASL_PLAINTEXT –consumer-property sasl.mechanism=PLAIN –consumer-property group.id=acls-group
该消费者启动后,正常接收到生产者发送到acls-foo主题的消息。与生产者权限设置一样,也可以通过allow-host或者deny-host参数设置允许或禁止消费者的访问IP。在zookeeper的/kafka-acls路径下会创建一个Group节点,通过zookeeper客户端查看该节点及其子节点信息如图所示:
[zk: node1:2181.node2:2181,node3:2181(CONNECTED) 2] get /kafka-acl/Group/acls-group
{“version”:1,”acls”:[{“principal”:”User:morton”,”permissionType”:”Allow”,”operation”:”Read”,”host”:”*”}]}
cZxid = 0xd00001459
ctime = Mon Apr 01 09:37:54 EDT 2019
mZxid = 0xd00001459
mtime = Mon Apr 01 09:37:54 EDT 2019
pZxid = 0xd00001459
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 105
numChildren = 0
(4)删除权限。通过remove参数删除相应的权限信息,可以删除某用户对主题(–topic)操作的权限、对集群(–cluster)操作的权限和对消费组(–group)操作的权限。
例如,删除所有用户对acls-foo主题的所有权限,当然在多个用户的情况下,也可以指定某个用户,命令如下:可以通过–topic=*指定主题,也可以通过–operation指定删除具体的权限。在删除权限时会让用户确认是否进行删除操作,可以通过force参数标识强制删除,这样在控制台执行删除权限命令时就不会出现让客户端确认删除操作的提示。
[root@kafka1 bin]# ./kafka-acls.sh –remove –authorizer-properties zookeeper.connect=node1:2181,node2:2181,node3:2181 –topic acls-foo -force
删除所有用户对集群的操作权限,命令如下:
[root@kafka1 bin]# ./kafka-acls.sh –remove –authorizer-properties zookeeper.connect=node1:2181,node2:2181,node3:2181 -cluster
Are you sure you want to delete all ACLs for resource filter `ResourcePattern(resourceType=CLUSTER, name=kafka-cluster, patternType=LITERAL)`? (y/n)
y
删除“acls-group”消费组的消费者相应权限,命令如下:
[root@kafka1 bin]# ./kafka-acls.sh –remove –authorizer-properties zookeeper.connect=node1:2181,node2:2181,node3:2181 –group acls-group
Are you sure you want to delete all ACLs for resource filter `ResourcePattern(resourceType=GROUP, name=acls-group, patternType=LITERAL)`? (y/n)
y
至此,kafka身份认证及权限控制基本操作介绍完毕,对于其他认证机制的配置及组合权限的设置,这里不再介绍。