如何实现kafka与zookeeper的SSL认证

举报
乐维社区 发表于 2024/07/11 16:02:12 2024/07/11
【摘要】 作者 乐维社区(forum.lwops.cn)许远在构建现代的分布式系统时,确保数据传输的安全性至关重要。Apache Kafka 和 Zookeeper 作为流行的分布式消息队列和协调服务,提供了SSL(Secure Sockets Layer)认证机制,以增强数据传输过程中的安全性。本文将详细介绍从生成SSL证书到配置服务端和客户端的全过程,确保数据在传输过程中得到充分的保护。1、 配置...

作者 乐维社区forum.lwops.cn)许远

在构建现代的分布式系统时,确保数据传输的安全性至关重要。Apache Kafka 和 Zookeeper 作为流行的分布式消息队列和协调服务,提供了SSL(Secure Sockets Layer)认证机制,以增强数据传输过程中的安全性。

本文将详细介绍从生成SSL证书到配置服务端和客户端的全过程,确保数据在传输过程中得到充分的保护。


1、 配置Kafka账号密码:

1、首先,需要修改kafka配置文件:vim /asop/kafka/kafka_2.11-2.1.0/config/server.properties


broker.id=0

listeners=SASL_PLAINTEXT://:9092

advertised.listeners=SASL_PLAINTEXT://10.176.31.137:9092

num.network.threads=3

num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600

log.dirs=/asop/kafka/logs

num.partitions=1

num.recovery.threads.per.data.dir=1

offsets.topic.replication.factor=1

transaction.state.log.replication.factor=1

transaction.state.log.min.isr=1

log.retention.hours=168

log.segment.bytes=1073741824

log.retention.check.interval.ms=300000

zookeeper.connect=localhost:2181

zookeeper.connection.timeout.ms=6000

group.initial.rebalance.delay.ms=0


#使用的认证协议

security.inter.broker.protocol=SASL_PLAINTEXT

#SASL机制

sasl.enabled.mechanisms=PLAIN

sasl.mechanism.inter.broker.protocol=PLAIN

#完成身份验证的类

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

#如果没有找到ACL(访问控制列表)配置,则允许任何操作。

allow.everyone.if.no.acl.found=false

#需要开启设置超级管理员,设置visitor用户为超级管理员

super.users=User:visitor


2、其次,为server创建登录验证文件,可以根据自己爱好命名文件,如vim /asop/kafka/kafka_2.11-2.1.0/config/kafka_server_jaas.conf ,文件内容如下


KafkaServer {

org.apache.kafka.common.security.plain.PlainLoginModule required

username="visitor"

password="qaz@123"

user_visitor="qaz@123";

};



3、然后修改kafka安装目录vim /asop/kafka/kafka_2.11-2.1.0/bin/kafka-server-start.sh,在文件最上面添加变量


export KAFKA_OPTS=" -Djava.security.auth.login.config=/asop/kafka/kafka_2.11-2.1.0/config/kafka_server_jaas.conf"


4、 接下来为consumer和producer创建登录验证文件,可以根据爱好命名文件,如kafka_client_jaas.conf,文件内容如下(如果是程序访问,如springboot访问,可以不配置

vim /asop/kafka/kafka_2.11-2.1.0/config/kafka_client_jaas.conf


KafkaClient {

org.apache.kafka.common.security.plain.PlainLoginModule required

username="visitor"

password="qaz@123";

};


5、 在consumer.properties和producer.properties里分别加上如下配置

vim /asop/kafka/kafka_2.11-2.1.0/config/consumer.properties

vim /asop/kafka/kafka_2.11-2.1.0/config/producer.properties


security.protocol=SASL_PLAINTEXT

sasl.mechanism=PLAIN


6、 修改kafka安装目录bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh,在文件最上面添加变量

vim /asop/kafka/kafka_2.11-2.1.0/bin/kafka-console-producer.sh

vim /asop/kafka/kafka_2.11-2.1.0/bin/kafka-console-consumer.sh


export KAFKA_OPTS=" -Djava.security.auth.login.config=/asop/kafka/kafka_2.11-2.1.0/config/kafka_client_jaas.conf"


7、 分别启动zookeeper和kafka,至此服务端kafka用户登录验证配置完成(先关闭kafka后关闭zookeeper)


关闭服务kafka

# /asop/kafka/kafka_2.11-2.1.0/bin/kafka-server-stop.sh -daemon /asop/kafka/kafka_2.11-2.1.0/config/server.properties


启动服务kafka

#/asop/kafka/kafka_2.11-2.1.0/bin/kafka-server-start.sh -daemon /asop/kafka/kafka_2.11-2.1.0/config/server.properties



关闭服务zookeeper-3.4.13

/asop/zk/zookeeper-3.4.13/bin/zkServer.sh stop /asop/zk/zookeeper-3.4.13/conf/zoo.cfg

启动服务zookeeper-3.4.13

/asop/zk/zookeeper-3.4.13/bin/zkServer.sh start /asop/zk/zookeeper-3.4.13/conf/zoo.cfg


8、 创建及查看主题


/asop/kafka/kafka_2.11-2.1.0/bin/kafka-console-producer.sh --broker-list 10.176.31.137:9092 --topic cmdb --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN


接收消息


/asop/kafka/kafka_2.11-2.1.0/bin/kafka-console-consumer.sh --bootstrap-server 10.176.31.137:9092 --topic cmdb --from-beginning --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN



2、 zk和kafka配置ssal账号密码:

1. Zookeeper 配置 SASL

1.1 新建 zoo_jaas.conf 文件

zoo_jaas.conf 文件名、文件所在路径没有特殊要求,一般放置在${ZOOKEEPER_HOME}/conf目录下vim /asop/zk/zookeeper-3.4.13/conf/zoo_jaas.conf


Server {

org.apache.kafka.common.security.plain.PlainLoginModule required

username="admin"

password="admin@12"

user_kafka="kafka@123";

};


Server.username、Server.password为 Zookeeper 内部通信的用户名和密码,因此保证每个 zk 节点该属性一致即可

Server.user_xxx 中 xxx 为自定义用户名,用于 zkClient 连接所使用的用户名和密码,即为 kafka 创建的用户名


1.2 配置 /asop/zk/zookeeper-3.4.13/conf/zoo.cfg 文件

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider

requireClientAuthScheme=sasl

jaasLoginRenew=3600000

zookeeper.sasl.client=true


zookeeper.sasl.client 设置为 true,开启客户端身份验证,否则zoo_jaas.conf中配置的用户名将不起作用,客户端仍然可以无 jaas 文件连接,只是带有 WARNNING 而已


1.3 导入依赖包

因为使用的权限验证类为:org.apache.kafka.common.security.plain.PlainLoginModule,所以需要 kafka 相关 jar 包,新建文件夹 zk_sasl_lib,如下:从kafka/lib目录下复制以下几个jar包到zookeeper的lib和新建的zk_sasl_lib目录下:


kafka-clients-2.4.1.jar

lz4-java-1.6.0.jar

slf4j-api-1.7.28.jar

slf4j-log4j12-1.7.28.jar

snappy-java-1.1.7.3.jar


mkdir /asop/zk/zookeeper-3.4.13/zk_sasl_lib

cp /asop/kafka/kafka_2.11-2.1.0/libs/kafka-clients-2.1.0.jar /asop/zk/zookeeper-3.4.13/lib/

cp /asop/kafka/kafka_2.11-2.1.0/libs/lz4-java-1.5.0.jar /asop/zk/zookeeper-3.4.13/lib/

cp /asop/kafka/kafka_2.11-2.1.0/libs/slf4j-api-1.7.25.jar /asop/zk/zookeeper-3.4.13/lib/

cp /asop/kafka/kafka_2.11-2.1.0/libs/slf4j-log4j12-1.7.25.jar /asop/zk/zookeeper-3.4.13/lib/

cp /asop/kafka/kafka_2.11-2.1.0/libs/snappy-java-1.1.7.2.jar /asop/zk/zookeeper-3.4.13/lib/

cp /asop/kafka/kafka_2.11-2.1.0/libs/kafka-clients-2.1.0.jar /asop/zk/zookeeper-3.4.13/zk_sasl_lib

cp /asop/kafka/kafka_2.11-2.1.0/libs/lz4-java-1.5.0.jar /asop/zk/zookeeper-3.4.13/zk_sasl_lib

cp /asop/kafka/kafka_2.11-2.1.0/libs/slf4j-api-1.7.25.jar /asop/zk/zookeeper-3.4.13/zk_sasl_lib

cp /asop/kafka/kafka_2.11-2.1.0/libs/slf4j-log4j12-1.7.25.jar /asop/zk/zookeeper-3.4.13/zk_sasl_lib

cp /asop/kafka/kafka_2.11-2.1.0/libs/snappy-java-1.1.7.2.jar /asop/zk/zookeeper-3.4.13/zk_sasl_lib


chmod 755 -R /asop/zk/zookeeper-3.4.13/zk_sasl_lib/

chmod 755 -R /asop/zk/zookeeper-3.4.13/zk_sasl_lib/


1.4 修改 zkEnv.sh 文件/asop/zk/zookeeper-3.4.13/bin/zkEnv.sh

修改前:如果没有就直接加


export SERVER_JVMFLAGS="-Xmx${ZK_SERVER_HEAP}m $SERVER_JVMFLAGS"


修改后:


for jar in /asop/zk/zookeeper-3.4.13/zk_sasl_lib/*.jar;

do

CLASSPATH="$jar:$CLASSPATH"

done


export SERVER_JVMFLAGS=" -Djava.security.auth.login.config=/asop/zk/zookeeper-3.4.13/conf/zoo_jaas.conf "


重启 Zookeeper 服务即可


关闭服务zookeeper-3.4.13

/asop/zk/zookeeper-3.4.13/bin/zkServer.sh stop /asop/zk/zookeeper-3.4.13/conf/zoo.cfg

启动服务zookeeper-3.4.13

/asop/zk/zookeeper-3.4.13/bin/zkServer.sh start /asop/zk/zookeeper-3.4.13/conf/zoo.cfg



2. Kakfa 配置 SASL

2.1 新建 kafka_server_jaas.conf 文件

kafka_server_jaas.conf 文件名和存放路径没有要求,一般放置在${KAFKA_HOME}/config目录下/asop/kafka/kafka_2.11-2.1.0/config/kafka_server_jaas.conf


KafkaServer {

org.apache.kafka.common.security.plain.PlainLoginModule required

username="visitor"

password="qaz@123"

user_visitor="qaz@123";

};

Client{

org.apache.kafka.common.security.plain.PlainLoginModule required

username="kafka"

password="kafka@123";

};


KafkaServer.username、KafkaServer.password 为 broker 内部通信的用户名密码,同上

KafkaServer.user_xxx 其中 xxx 必须和 KafkaServer.username 配置的用户名一致,密码也一致

KafkaServer.user_producer、KafkaServer.user_consumer 为了之后的 ACL 做准备,达到消费者生产者使用不同账号且消费者账号只能消费数据,生产者账号只能生产数据

Client.username、Client.password 填写 Zookeeper 中注册的账号密码,用于 broker 与 zk 的通信(若 zk 没有配置 SASL 可以忽略、若 zookeeper.sasl.client 为 false 也可以忽略只是带有,日志如下)


[2021-06-29 17:14:30,204] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/Users/wjun/env/kafka/config/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)



2.2 修改 server.properties 文件

broker.id=0

listeners=SASL_PLAINTEXT://:9092

advertised.listeners=SASL_PLAINTEXT://192.168.157.198:9092

num.network.threads=3

num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600

log.dirs=/asop/kafka/logs

num.partitions=1

num.recovery.threads.per.data.dir=1

offsets.topic.replication.factor=1

transaction.state.log.replication.factor=1

transaction.state.log.min.isr=1

log.retention.hours=168

log.segment.bytes=1073741824

log.retention.check.interval.ms=300000

zookeeper.connect=127.0.0.1:2181

zookeeper.connection.timeout.ms=6000

group.initial.rebalance.delay.ms=0


#使用的认证协议

security.inter.broker.protocol=SASL_PLAINTEXT

#SASL机制

sasl.enabled.mechanisms=PLAIN

sasl.mechanism.inter.broker.protocol=PLAIN

#完成身份验证的类

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

#如果没有找到ACL(访问控制列表)配置,则允许任何操作。

allow.everyone.if.no.acl.found=false

#需要开启设置超级管理员,设置visitor用户为超级管理员

super.users=User:visitor


其中 localhost 需要修改成 IP地址


super.users 配置超级用户,该用户不受之后的 ACL 配置影响


2.3 修改启动脚本

修改 kafka-server-start.sh 文件,使之加载到 kafka_server_jaas.conf 文件/asop/kafka/kafka_2.11-2.1.0/bin/kafka-server-start.sh


修改前:


if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then

export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"

fi


修改后:

(先在首行加这一行,如果有了就不用加了)export KAFKA_OPTS=" -Djava.security.auth.login.config=/asop/kafka/kafka_2.11-2.1.0/config/kafka_server_jaas.conf"

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then

export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/asop/kafka/kafka_2.11-2.1.0/config/kafka_server_jaas.conf"

fi



设置zookeeper的ACL规则

/asop/zk/zookeeper-3.4.13/bin/zkCli.sh #进入zk的命令行模式


addauth digest admin:admin@12 #切换登陆用户(超级管理员是在zk的配置文件/asop/zk/zookeeper-3.4.13/conf/zoo_jaas.conf里面)


setAcl / ip:127.0.0.1:cdrwa,auth:kafka:kafka@123:cdrwa #(设置可以登陆的IP和用户账号密码,admin是上面的zk的配置文件里面定义的管理员,Kafka用户是/asop/kafka/kafka_2.11-2.1.0/config/kafka_server_jaas.conf文件里面的定义的kafka连接zk的用户 (Client下面的))


addauth digest kafka:kafka@123 #再切换为kafka用户再设置一次acl

setAcl / ip:127.0.0.1:cdrwa,auth:kafka:kafka@123:cdrwa


注意:如果要加白名单IP或者用户要在原来的基础上加,不然会覆盖

setAcl / ip:127.0.0.1:cdrwa,auth:kafka:kafka@123:cdrwa,auth:admin:admin@12:cdrwa,ip:1.1.1.1


需要恢复权限,不设置acl的话就运行

setAcl / world:anyone:cdrwa


重启 kafka 服务即可


关闭服务kafka

# /asop/kafka/kafka_2.11-2.1.0/bin/kafka-server-stop.sh -daemon /asop/kafka/kafka_2.11-2.1.0/config/server.properties


启动服务kafka

#/asop/kafka/kafka_2.11-2.1.0/bin/kafka-server-start.sh -daemon /asop/kafka/kafka_2.11-2.1.0/config/server.properties


至此,完成kafka与zookeeper配置ssl认证,更多运维问题也欢迎留言提问。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。