docker和k3s安装kafka,go语言发送和接收kafka消息
【摘要】 docker安装命令,其中172.16.11.111是宿主机ip,14818是宿主机端口,对应容器端口9092:docker run -d \ --name kafka \ -p 14818:9092 \ -p 9093:9093 \ -v /tmp/kraft-combined-logs:/tmp/kraft-combined-logs \ -e TZ=Asia/Shangha...
docker安装命令,其中172.16.11.111是宿主机ip,14818是宿主机端口,对应容器端口9092:
docker run -d \
--name kafka \
-p 14818:9092 \
-p 9093:9093 \
-v /tmp/kraft-combined-logs:/tmp/kraft-combined-logs \
-e TZ=Asia/Shanghai \
-e KAFKA_NODE_ID=1 \
-e KAFKA_PROCESS_ROLES=broker,controller \
-e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.16.11.111:14818 \
-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
-e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 \
-e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 \
-e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 \
-e KAFKA_NUM_PARTITIONS=3 \
-e KAFKA_LOG_DIRS=/tmp/kraft-combined-logs \
-e CLUSTER_ID=5L6g3nShT-eMCtK--X86sw \
apache/kafka-native:4.1.0
k3s的yaml,其中172.16.11.111是宿主机ip,14818是宿主机端口,对应容器端口9092:
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: kafka
name: kafka
namespace: moonfdd
spec:
replicas: 1
selector:
matchLabels:
app: kafka
template:
metadata:
labels:
app: kafka
spec:
initContainers:
- name: kafka-fix-data-volume-permissions
image: alpine
imagePullPolicy: IfNotPresent
command:
- sh
- -c
- "chown -R 1000:1000 /tmp/kraft-combined-logs"
volumeMounts:
- mountPath: /tmp/kraft-combined-logs
name: volv
containers:
- env:
- name: TZ
value: Asia/Shanghai
- name: KAFKA_NODE_ID
value: "1"
- name: KAFKA_PROCESS_ROLES
value: broker,controller
- name: KAFKA_LISTENERS
value: PLAINTEXT://:9092,CONTROLLER://:9093
- name: KAFKA_ADVERTISED_LISTENERS
value: PLAINTEXT://172.16.11.111:14818
- name: KAFKA_CONTROLLER_LISTENER_NAMES
value: CONTROLLER
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- name: KAFKA_CONTROLLER_QUORUM_VOTERS
value: 1@localhost:9093
- name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
value: "1"
- name: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
value: "1"
- name: KAFKA_TRANSACTION_STATE_LOG_MIN_ISR
value: "1"
- name: KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS
value: "0"
- name: KAFKA_NUM_PARTITIONS
value: "3"
- name: KAFKA_LOG_DIRS
value: /tmp/kraft-combined-logs
- name: CLUSTER_ID
value: "5L6g3nShT-eMCtK--X86sw" # 固定集群ID,仅首次启动格式化使用
image: 'apache/kafka-native:4.1.0'
imagePullPolicy: IfNotPresent
name: kafka
volumeMounts:
- mountPath: /tmp/kraft-combined-logs
name: volv
volumes:
- hostPath:
path: /root/k8s/moonfdd/kafka/tmp/kraft-combined-logs
type: DirectoryOrCreate
name: volv
---
apiVersion: v1
kind: Service
metadata:
labels:
app: kafka
name: kafka
namespace: moonfdd
spec:
ports:
- port: 9092
protocol: TCP
targetPort: 9092
name: 9092-9092
- port: 9093
protocol: TCP
targetPort: 9093
name: 9093-9093
selector:
app: kafka
type: NodePort
go发送kafka消息:github.com/segmentio/kafka-go
package main
import (
"context"
"log"
"github.com/segmentio/kafka-go"
)
func main() {
// 创建一个Kafka writer(Producer)
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"172.16.11.111:14818"}, // Kafka broker 地址
Topic: "test-topic", // 发送的 topic
Balancer: &kafka.LeastBytes{}, // 负载均衡策略
})
// 写入消息
err := w.WriteMessages(context.Background(),
kafka.Message{
Key: []byte("Key-A"),
Value: []byte("Hello Kafka from Go!"),
},
)
if err != nil {
log.Fatalf("could not write message: %v", err)
}
log.Println("Message sent successfully!")
// 关闭 writer
w.Close()
}
go接收kafka消息:github.com/segmentio/kafka-go
package main
import (
"context"
"log"
"github.com/segmentio/kafka-go"
)
func main() {
// 创建 Kafka reader(Consumer)
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"172.16.11.111:14818"}, // Kafka broker 地址
Topic: "test-topic", // 订阅的 topic
GroupID: "my-consumer-group", // 消费者组,确保相同组会读取上一 offset
MinBytes: 10e3, // 最小fetch字节数
MaxBytes: 10e6, // 最大fetch字节数
})
for {
// 读取消息(会自动从上次的 offset 开始)
m, err := r.ReadMessage(context.Background())
if err != nil {
log.Fatalf("could not read message: %v", err)
}
log.Printf("offset:%d | key:%s | value:%s\n", m.Offset, string(m.Key), string(m.Value))
}
// r.Close() // 如果你打算退出循环时关闭
}
go发送kafka消息:github.com/IBM/sarama
package main
import (
"fmt"
"log"
"time"
"github.com/IBM/sarama"
)
func main() {
// 配置生产者
config := sarama.NewConfig()
config.Producer.Return.Successes = true // 确保消息发送成功
config.Producer.RequiredAcks = sarama.WaitForAll // 等待所有副本确认
config.Producer.Retry.Max = 3 // 重试次数
// 重要:配置客户端使用正确的主机
config.Net.SASL.Enable = false
config.Net.TLS.Enable = false
config.Version = sarama.MaxVersion
// 创建同步生产者
producer, err := sarama.NewSyncProducer([]string{"172.16.11.111:14818"}, config)
if err != nil {
log.Fatalf("创建生产者失败: %v", err)
}
defer producer.Close()
// 构造消息
message := &sarama.ProducerMessage{
Topic: "test-topic",
Key: sarama.StringEncoder("message-key"),
Value: sarama.StringEncoder(fmt.Sprintf("Hello Kafka! %v", time.Now())),
}
// 发送消息
partition, offset, err := producer.SendMessage(message)
if err != nil {
log.Fatalf("发送消息失败: %v", err)
}
fmt.Printf("消息发送成功! 分区: %d, 偏移量: %d\n", partition, offset)
}
go接收kafka消息:github.com/IBM/sarama
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"github.com/IBM/sarama"
)
type Consumer struct{}
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
// 会话初始化,可以在这里做一些准备工作
return nil
}
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
// 会话结束时的清理操作
return nil
}
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// claim.Messages() 会不断返回新消息
for msg := range claim.Messages() {
fmt.Printf("Topic:%s Partition:%d Offset:%d Value:%s\n",
msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
// 标记该消息已被处理,Kafka会自动保存offset
session.MarkMessage(msg, "")
}
return nil
}
func main() {
// Kafka集群地址
brokers := []string{"172.16.11.111:14818"}
groupID := "my-group" // 消费者组ID,保持不变才能从上次offset消费
topics := []string{"test-topic"}
// 配置
config := sarama.NewConfig()
config.Version = sarama.MaxVersion // Kafka版本
config.Consumer.Return.Errors = true
// 非首次启动时自动从上次位置开始
config.Consumer.Offsets.Initial = sarama.OffsetNewest
// OffsetNewest: 如果没有历史offset,从最新开始;
// OffsetOldest: 如果没有历史offset,从最旧开始。
// 创建消费者组
consumerGroup, err := sarama.NewConsumerGroup(brokers, groupID, config)
if err != nil {
log.Fatalf("Error creating consumer group: %v", err)
}
defer consumerGroup.Close()
consumer := &Consumer{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
for err := range consumerGroup.Errors() {
log.Printf("Error: %v", err)
}
}()
log.Println("Kafka consumer started...")
// 优雅退出
go func() {
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, os.Interrupt)
<-sigchan
cancel()
}()
// 循环消费
for {
if err := consumerGroup.Consume(ctx, topics, consumer); err != nil {
log.Printf("Error from consumer: %v", err)
}
// 检查退出
if ctx.Err() != nil {
return
}
}
}
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)