Kafka KRaft模式下SASL鉴权配置

举报
张俭 发表于 2023/12/29 17:58:59 2023/12/29
【摘要】 首先配置KRaft模式./bin/kafka-storage.sh random-uuid./bin/kafka-storage.sh format -t <uuid> -c ./config/kraft/server.properties 服务端properties配置kraft/server.propertieslisteners=SASL_PLAINTEXT://0.0.0.0:90...

首先配置KRaft模式

./bin/kafka-storage.sh random-uuid
./bin/kafka-storage.sh format -t <uuid> -c ./config/kraft/server.properties

服务端properties配置

kraft/server.properties

listeners=SASL_PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
inter.broker.listener.name=SASL_PLAINTEXT
advertised.listeners=SASL_PLAINTEXT://localhost:9092
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
sasl.enabled.mechanisms=SCRAM-SHA-512

服务端jaasconfig 配置

config目录下新建jaas.config

KafkaServer {
   org.apache.kafka.common.security.scram.ScramLoginModule required
   username="admin"
   password="password";
   user_alice="pwd"
};

这里的user_alice="pwd"代表有一个用户,名为alice,密码为pwd

然后

export KAFKA_OPTS=-Djava.security.auth.login.config=$KAFKA_HOME/config/jaas.conf

客户端代码连接

客户端使用alicepwd连接上来

package com.github.shoothzj.demo.kafka;

import com.github.shoothzj.javatool.util.CommonUtil;
import com.github.shoothzj.javatool.util.LogUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.Test;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * @author hezhangjian
 */
@Slf4j
public class KafkaConsumerSaslTest {

    @Test
    public void kafkaConsumerSasl() {
        LogUtil.configureLog();
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConst.BROKERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, KafkaConst.OFFSET_RESET_LATEST);

        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name);
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        String saslJaasConfig = String.format("org.apache.kafka.common.security.plain.PlainLoginModule required \nusername=\"%s\" \npassword=\"%s\";", "alice", "pwd");
        props.put(SaslConfigs.SASL_JAAS_CONFIG, saslJaasConfig);

        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("test-sasl"));
        CommonUtil.sleep(TimeUnit.SECONDS, 5);
        for (int i = 0; i < 5; i++) {
            final ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(500));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                log.info("consumer record {} key {}", consumerRecord.offset(), consumerRecord.key());
            }
            CommonUtil.sleep(TimeUnit.SECONDS, 10);
        }
        consumer.close();
    }

}

成功完成SASL连接

代码地址

https://github.com/Shoothzj/maven-demo/blob/58409a4ad4de76c7458267a1fda1895d60a2ec95/demo-kafka/src/test/java/com/github/shoothzj/demo/kafka/KafkaConsumerSaslTest.java

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。