Kafka KRaft模式下SASL鉴权配置
【摘要】 首先配置KRaft模式./bin/kafka-storage.sh random-uuid./bin/kafka-storage.sh format -t <uuid> -c ./config/kraft/server.properties 服务端properties配置kraft/server.propertieslisteners=SASL_PLAINTEXT://0.0.0.0:90...
首先配置KRaft模式
./bin/kafka-storage.sh random-uuid
./bin/kafka-storage.sh format -t <uuid> -c ./config/kraft/server.properties
服务端properties配置
kraft/server.properties
listeners=SASL_PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
inter.broker.listener.name=SASL_PLAINTEXT
advertised.listeners=SASL_PLAINTEXT://localhost:9092
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
sasl.enabled.mechanisms=SCRAM-SHA-512
服务端jaasconfig 配置
config目录下新建jaas.config
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="password";
user_alice="pwd"
};
这里的user_alice="pwd"
代表有一个用户,名为alice
,密码为pwd
然后
export KAFKA_OPTS=-Djava.security.auth.login.config=$KAFKA_HOME/config/jaas.conf
客户端代码连接
客户端使用alice
和pwd
连接上来
package com.github.shoothzj.demo.kafka;
import com.github.shoothzj.javatool.util.CommonUtil;
import com.github.shoothzj.javatool.util.LogUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.Test;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* @author hezhangjian
*/
@Slf4j
public class KafkaConsumerSaslTest {
@Test
public void kafkaConsumerSasl() {
LogUtil.configureLog();
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConst.BROKERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, KafkaConst.OFFSET_RESET_LATEST);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name);
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
String saslJaasConfig = String.format("org.apache.kafka.common.security.plain.PlainLoginModule required \nusername=\"%s\" \npassword=\"%s\";", "alice", "pwd");
props.put(SaslConfigs.SASL_JAAS_CONFIG, saslJaasConfig);
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test-sasl"));
CommonUtil.sleep(TimeUnit.SECONDS, 5);
for (int i = 0; i < 5; i++) {
final ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
log.info("consumer record {} key {}", consumerRecord.offset(), consumerRecord.key());
}
CommonUtil.sleep(TimeUnit.SECONDS, 10);
}
consumer.close();
}
}
成功完成SASL连接
代码地址
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)