使用Python使用SSL连接到Kafka集群
【摘要】 连接到Kafka集群并使用SSL进行Python连接本文专门讨论如何使用Python编写连接到使用SSL保护的Kafka集群的生产者和消费者。本文不会详细介绍如何生成客户端证书,这是另一篇文章的主题。 先决条件Kafka集群具有SSL配置以JKS格式的客户端证书(KeyStore)在Linux环境中安装了keytool和openssl使用Python 3.6 步骤1 - 将JKS转换为PE...
连接到Kafka集群并使用SSL进行Python连接
本文专门讨论如何使用Python编写连接到使用SSL保护的Kafka集群的生产者和消费者。本文不会详细介绍如何生成客户端证书,这是另一篇文章的主题。
先决条件
- Kafka集群具有SSL配置
- 以JKS格式的客户端证书(KeyStore)
- 在Linux环境中安装了keytool和openssl
- 使用Python 3.6
步骤1 - 将JKS转换为PEM文件
为什么需要这一步?
与Java不同,Python和C#使用.pem文件连接到Kafka。因此,我们需要使用keytool和openssl命令将JKS文件转换为PEM文件。如果您在Windows 10上工作,可以参考我的文章了解如何在Windows上运行WSL。
为了方便起见,我已经创建了一个Shell脚本,用于快速将JKS文件转换为PEM。
#!/bin/bash
srcFolder=$1
keyStore=$1/$2
password=$3
alias=$4
outputFolder=$5
echo $keyStore
echo "Generating certificate.pem"
keytool -exportcert -alias $alias -keystore $keyStore -rfc -file $outputFolder/certificate.pem -storepass $password
echo "Generating key.pem"
keytool -v -importkeystore -srckeystore $keyStore -srcalias $alias -destkeystore $outputFolder/cert_and_key.p12 -deststoretype PKCS12 -storepass $password -srcstorepass $password
openssl pkcs12 -in $outputFolder/cert_and_key.p12 -nodes -nocerts -out $outputFolder/key.pem -passin pass:$password
echo "Generating CARoot.pem"
keytool -exportcert -alias $alias -keystore $keyStore -rfc -file $outputFolder/CARoot.pem -storepass $password
该脚本从keystore文件生成以下文件:
key.pem
certificate.pem
CARoot.pem
如何运行这个脚本?
将脚本保存在一个文件中,例如jkstopem.sh,并赋予如下执行权限:
chmod +x jkstopem.sh
生成PEM文件。运行shell脚本,如下面的示例所示:
./jkstopem.sh <source_path_to_jks> <keystore_file_name> <keystore_password> <alias> <output_folder>
如何找到别名
如果您不知道您的证书有什么别名。在存放keystore文件的文件夹中运行以下命令。
keytool -list -v -keystore kafka.client.keystore.jks
系统将提示您输入密码。输入密钥库密码,这将列出密钥库文件的内容。您将能够看到*别名。
下面是运行shell脚本的示例:
./jkstopem.sh ~/client-cert kafka.client.keystore.jks welcome123 client-alias ~/client-cert/pem
编写代码
#Producer.py
from kafka import KafkaProducer
kafkaBrokers='kafka1.xyz.com:443,kafka2.xyz.com:443,kafka3.xyz.com:443'
caRootLocation='CARoot.pem'
certLocation='certificate.pem'
keyLocation='key.pem'
topic='test-topic'
password='welcome123'
producer = KafkaProducer(bootstrap_servers=kafkaBrokers,
security_protocol='SSL',
ssl_check_hostname=True,
ssl_cafile=caRootLocation,
ssl_certfile=certLocation,
ssl_keyfile=keyLocation,
ssl_password=password)
producer.send(topic, bytes('Hello Kafka!','utf-8'))
# Send to a particular partition
producer.send(topic, bytes('Hello Kafka!','utf-8'),partition=0)
producer.flush()
在上面的示例中,我们使用在最后一步中生成的pem文件,并使用密码读取pem文件。
kafkaBrokers='kafka1.xyz.com:443,kafka2.xyz.com:443,kafka3.xyz.com:443'
caRootLocation='CARoote.pem'
certLocation='certificate.pem'
keyLocation='key.pem'
password='welcome123'
producer = KafkaProducer(bootstrap_servers=kafkaBrokers,
security_protocol='SSL',
ssl_check_hostname=True,
ssl_cafile=caRootLocation,
ssl_certfile=certLocation,
ssl_keyfile=keyLocation,
ssl_password=password)
将数据发送到随机主题分区
下面的代码片段将发送数据到Kafka决定的随机分区。
producer.send(topic, bytes('Hello Kafka!','utf-8'))
producer.flush()
因此,我们为Kafka构建了Python生成器。在下一部分中,我们将编写consumer来消费来自主题的消息。
【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)