SpringBoot使用Kafka生产者、消费者

举报
刘大猫 发表于 2025/03/15 22:00:55 2025/03/15
【摘要】 SpringBoot使用Kafka生产者、消费者

image.png

@[TOC]

依赖

<!--kafka-->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>xxx</version>
</dependency>

配置文件

image.png

spring:
  kafka:
    producer:
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    bootstrap-servers: 192.168.20.75:9907

kafka:
  spark:
    task:
      topic: platform-model-spark-topic1

生产者

方法一:添加@RunWith(SpringRunner.class)@SpringBootTest(classes = DataComputingModelApplication.class)实现初始化配置注入kafkaTemplate,调用send()
@Autowired
private KafkaTemplate kafkaTemplate;

    @Test
    public void kafkaSend() {
        final ProducerRecord<String, String> record = new ProducerRecord("test20201228", "{\"key\":\"27\"}");
        kafkaTemplate.send(record);
        log.info("------------send success!----------------");
    }
    
方法二:不需要注解@RunWith@SpringBootTest,但是初始化Properties,同样调用send()
@Test
    public void kafkaSend2() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.20.75:9907");
        props.put("key.serializer", StringSerializer.class);
        props.put("value.serializer", StringSerializer.class);
        KafkaProducer<String,String> producer = new KafkaProducer<>(props);
        ProducerRecord record = new ProducerRecord<String, String>("test20201228", "key", "{\"key\":\"20\"}");
        producer.send(record);
        log.info("------------send success!----------------");
        producer.close();
    }

消费者

==说明:
① Topic主题用来区分不同类型的消息
② GroupId用来解决同一个Topic主题下重复消费问题,比如一条消费需要多个消费者接收到,就可以通过设置不同的GroupId实现,实际消息是存一份的,只是通过逻辑上设置标识来区分,系统会记录Topic主题下–》GroupId分组下–》partition分区下的offsert,来标识是否消费过。==

@KafkaListener(topics = "big_data_task_state", groupId = "bigDataTaskState")
    public void taskStateConsumer(String msg) {
        log.info("----receive:{}----", msg);
    }

image.png

重要信息

image.png
image.png
image.png

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。