使用Python使用SSL连接到Kafka集群

举报
kaliarch 发表于 2023/12/10 13:37:41 2023/12/10
【摘要】 连接到Kafka集群并使用SSL进行Python连接本文专门讨论如何使用Python编写连接到使用SSL保护的Kafka集群的生产者和消费者。本文不会详细介绍如何生成客户端证书,这是另一篇文章的主题。 先决条件Kafka集群具有SSL配置以JKS格式的客户端证书(KeyStore)在Linux环境中安装了keytool和openssl使用Python 3.6 步骤1 - 将JKS转换为PE...

连接到Kafka集群并使用SSL进行Python连接

本文专门讨论如何使用Python编写连接到使用SSL保护的Kafka集群的生产者和消费者。本文不会详细介绍如何生成客户端证书,这是另一篇文章的主题。

先决条件

  • Kafka集群具有SSL配置
  • 以JKS格式的客户端证书(KeyStore)
  • 在Linux环境中安装了keytool和openssl
  • 使用Python 3.6

步骤1 - 将JKS转换为PEM文件

为什么需要这一步?

与Java不同,Python和C#使用.pem文件连接到Kafka。因此,我们需要使用keytool和openssl命令将JKS文件转换为PEM文件。如果您在Windows 10上工作,可以参考我的文章了解如何在Windows上运行WSL。

为了方便起见,我已经创建了一个Shell脚本,用于快速将JKS文件转换为PEM。

#!/bin/bash
srcFolder=$1
keyStore=$1/$2
password=$3
alias=$4
outputFolder=$5

echo $keyStore
echo "Generating certificate.pem"
keytool -exportcert -alias $alias -keystore $keyStore -rfc -file $outputFolder/certificate.pem -storepass $password

echo "Generating key.pem"
keytool -v -importkeystore -srckeystore $keyStore -srcalias $alias -destkeystore $outputFolder/cert_and_key.p12 -deststoretype PKCS12 -storepass $password -srcstorepass $password
openssl pkcs12 -in $outputFolder/cert_and_key.p12 -nodes -nocerts -out $outputFolder/key.pem -passin pass:$password

echo "Generating CARoot.pem"
keytool -exportcert -alias $alias -keystore $keyStore -rfc -file $outputFolder/CARoot.pem -storepass $password

该脚本从keystore文件生成以下文件:
key.pem
certificate.pem
CARoot.pem
如何运行这个脚本?
将脚本保存在一个文件中,例如jkstopem.sh,并赋予如下执行权限:

chmod +x jkstopem.sh

生成PEM文件。运行shell脚本,如下面的示例所示:

./jkstopem.sh <source_path_to_jks> <keystore_file_name> <keystore_password> <alias> <output_folder>

如何找到别名
如果您不知道您的证书有什么别名。在存放keystore文件的文件夹中运行以下命令。

keytool -list -v -keystore kafka.client.keystore.jks

系统将提示您输入密码。输入密钥库密码,这将列出密钥库文件的内容。您将能够看到*别名。
下面是运行shell脚本的示例:

./jkstopem.sh ~/client-cert kafka.client.keystore.jks welcome123 client-alias ~/client-cert/pem

编写代码

#Producer.py
from kafka import KafkaProducer

kafkaBrokers='kafka1.xyz.com:443,kafka2.xyz.com:443,kafka3.xyz.com:443'
caRootLocation='CARoot.pem'
certLocation='certificate.pem'
keyLocation='key.pem'
topic='test-topic'
password='welcome123'

producer = KafkaProducer(bootstrap_servers=kafkaBrokers,
                          security_protocol='SSL',
                          ssl_check_hostname=True,
                          ssl_cafile=caRootLocation,
                          ssl_certfile=certLocation,
                          ssl_keyfile=keyLocation,
                          ssl_password=password)

producer.send(topic, bytes('Hello Kafka!','utf-8'))

# Send to a particular partition
producer.send(topic, bytes('Hello Kafka!','utf-8'),partition=0)
producer.flush()

在上面的示例中,我们使用在最后一步中生成的pem文件,并使用密码读取pem文件。

kafkaBrokers='kafka1.xyz.com:443,kafka2.xyz.com:443,kafka3.xyz.com:443'
caRootLocation='CARoote.pem'
certLocation='certificate.pem'
keyLocation='key.pem'
password='welcome123'
producer = KafkaProducer(bootstrap_servers=kafkaBrokers,
                          security_protocol='SSL',
                          ssl_check_hostname=True,
                          ssl_cafile=caRootLocation,
                          ssl_certfile=certLocation,
                          ssl_keyfile=keyLocation,
                          ssl_password=password)

将数据发送到随机主题分区
下面的代码片段将发送数据到Kafka决定的随机分区。

producer.send(topic, bytes('Hello Kafka!','utf-8'))
producer.flush()

因此,我们为Kafka构建了Python生成器。在下一部分中,我们将编写consumer来消费来自主题的消息。

【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。