玩转Kafka—Spring&Go整合Kafka
【摘要】 玩转Kafka—Spring整合Kafka 1 新建Spring Boot项目,增加依赖<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependenc...
玩转Kafka—Spring整合Kafka
1 新建Spring Boot项目,增加依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
2 项目结构
3 代码
3.1 配置文件和Kafka服务器所需配置
application.properties
server.port=8080
#制定kafka代理地址
spring.kafka.bootstrap-servers=8.131.57.161:9092
#消息发送失败重试次数
spring.kafka.producer.retries=0
#每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
#每次批量发送消息的缓冲区大小
spring.kafka.producer.buffer-memory=335554432
# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 指定默认消费者group id
spring.kafka.consumer.group-id=user-log-group
spring.kafka.consumer.bootstrap-servers=8.131.57.161:9092
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
Kafka服务器所需配置,server.properties
文件
# 33行左右 0.0.0.0代表允许外部端口连接
listeners=PLAINTEXT://0.0.0.0:9092
# 36行左右 ip代表外部代理地址
advertised.listeners=PLAINTEXT://8.131.57.161:9092
3.2 生产者和实体类代码
Student.java
/**
* @desc: 实体类
* @author: YanMingXin
* @create: 2021/11/20-12:43
**/
@Data
@Accessors(chain = true)
@NoArgsConstructor
@AllArgsConstructor
public class Student implements Serializable {
private String id;
private String name;
private String context;
}
StudentService.java
/**
* @desc: 接口
* @author: YanMingXin
* @create: 2021/11/20-12:43
**/
public interface StudentService {
void stuSayHello(Student student);
}
StudentServiceImpl.java
/**
* @desc: 接口实现类
* @author: YanMingXin
* @create: 2021/11/20-12:43
**/
@Service
public class StudentServiceImpl implements StudentService {
@Autowired
private KafkaTemplate kafkaTemplate;
/**
* topic
*/
private static final String STU_TOPIC = "stu.sayHello";
@Override
public void stuSayHello(Student student) {
Student stu = new Student("1", "zs", "Hello Ls.");
kafkaTemplate.send(STU_TOPIC, JSON.toJSONString(stu));
}
}
3.3 消费者代码
MyKafkaListener.java
/**
* @desc: 消费者监听
* @author: YanMingXin
* @create: 2021/11/20-12:44
**/
@Component
public class MyKafkaListener {
/**
* topic
*/
private static final String STU_TOPIC = "stu.sayHello";
@KafkaListener(topics = {STU_TOPIC})
public void stuTopicConsumer(ConsumerRecord consumerRecord) {
Optional kafkaMsg = Optional.ofNullable(consumerRecord.value());
if (kafkaMsg.isPresent()) {
Object msg = kafkaMsg.get();
System.err.println(msg);
}
}
}
3.4 测试
@SpringBootTest
class SpKafkaApplicationTests {
@Autowired
private StudentService studentService;
@Test
void contextLoads() throws Exception{
for (int i = 0; i < 900000; i++) {
studentService.stuSayHello(new Student());
}
}
}
玩转Kafka—Golang整合Kafka
几个常见的Go整合Kafka客户端工具:我们本次使用的是Shopify
-
Big Data Open Source Security:https://github.com/stealthly/go_kafka_client
-
OptioPay:https://github.com/optiopay/kafka
-
Confluent:https://github.com/confluentinc/confluent-kafka-go
-
Travis Bischel: https://pkg.go.dev/github.com/twmb/kafka-go/pkg/kgo
ps:配置go get代理(类似于Maven配置阿里云镜像)教程:
1 新建go modules
2 项目结构
3 生产者代码
KakaProducer.go
package main
import (
"fmt"
"github.com/Shopify/sarama"
"time"
)
//消息生产者
func main() {
//获取配置类
config := sarama.NewConfig() //配置类实例(指针类型)
config.Producer.RequiredAcks = sarama.WaitForAll //代理需要的确认可靠性级别(默认为WaitForLocal)
config.Producer.Partitioner = sarama.NewRandomPartitioner //生成用于选择要发送消息的分区的分区(默认为散列消息键)。
config.Producer.Return.Successes = true //如果启用,成功传递的消息将在成功通道(默认禁用)。
//获取客户端对象
client, err := sarama.NewSyncProducer([]string{"8.131.57.161:9092"}, config)
if err != nil {
//获取客户端失败
fmt.Println("producer close, err:", err)
return
}
//延迟执行,类似于栈,等到其他代码都执行完毕后再执行
defer client.Close()
//一直循环
for {
//获取Message对象
msg := &sarama.ProducerMessage{}
//设置topic
msg.Topic = "go_kafka"
//设置Message值
msg.Value = sarama.StringEncoder("this is a good test, my message is good")
//发送消息,返回pid、片偏移
pid, offset, err := client.SendMessage(msg)
//发送失败
if err != nil {
fmt.Println("send message failed,", err)
return
}
//打印返回结果
fmt.Printf("pid:%v offset:%v\n", pid, offset)
//线程休眠下
time.Sleep(10 * time.Second)
}
}
4 消费者代码
KafkaConsumer.go
package main
import (
"fmt"
"github.com/Shopify/sarama"
"strings"
"sync"
"time"
)
var (
wg sync.WaitGroup //同步等待组
//在类型上,它是一个结构体。一个WaitGroup的用途是等待一个goroutine的集合执行完成。
//主goroutine调用了Add()方法来设置要等待的goroutine的数量。
//然后,每个goroutine都会执行并且执行完成后调用Done()这个方法。
//与此同时,可以使用Wait()方法来阻塞,直到所有的goroutine都执行完成。
)
func main() {
//获取消费者对象 可以设置多个IP地址和端口号,使用逗号进行分割
consumer, err := sarama.NewConsumer(strings.Split("8.131.57.161:9092", ","), nil)
//获取失败
if err != nil {
fmt.Println("Failed to start consumer: %s", err)
return
}
//对该topic进行监听
partitionList, err := consumer.Partitions("go_kafka")
if err != nil {
fmt.Println("Failed to get the list of partitions: ", err)
return
}
//打印分区
fmt.Println(partitionList)
//获取分区和片偏移
for partition := range partitionList {
pc, err := consumer.ConsumePartition("go_kafka", int32(partition), sarama.OffsetNewest)
if err != nil {
fmt.Printf("Failed to start consumer for partition %d: %s\n", partition, err)
return
}
//延迟执行
defer pc.AsyncClose()
//启动多线程
go func(pc sarama.PartitionConsumer) {
wg.Add(1)
//获得message的信息
for msg := range pc.Messages() {
fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
fmt.Println()
}
wg.Done()
}(pc)
}
//线程休眠
time.Sleep(10 * time.Second)
wg.Wait()
consumer.Close()
}
5 测试
参考文章:https://www.cnblogs.com/angelyan/p/10800739.html
欢迎关注公众号
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)