springboot使用kafka收发消息

举报
杨小羊 发表于 2020/09/28 16:02:31 2020/09/28
【摘要】 springboot结合kafka,并实现web,实时更新

参考:

https://www.cnblogs.com/lixianguo/p/13254915.html

实现实体分离的序列化和反序列化操作

springboot里建立模组maven

分别建立consumer和producer以及common公共模块

  • 对consumer和producer都建立springboot启动类

common的application-common.yml全局配置

spring:
 kafka:
   #kafka配置
   bootstrap-servers: localhost:9092
   producer:
     retries: 0
     # 每次批量发送消息的数量
     batch-size: 16384
     buffer-memory: 33554432
     # 指定消息key和消息体的编解码方式
     key-serializer: org.apache.kafka.common.serialization.StringSerializer
     value-serializer: org.apache.kafka.common.serialization.StringSerializer
   consumer:
     # 指定默认消费者group id
     group-id: test-consumer-group
     auto-offset-reset: earliest
     enable-auto-commit: true
     auto-commit-interval: 5000
     # 指定消息key和消息体的编解码方式
     key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
     value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
   #自己定义的主题名称,在微服务中使用Value注解注入调用,如果kafka中没有该主题,则会自动创建
   template:
     default-topic: springkafka
 #    topic:
#      userTopic: springkafka

特别注意配置producer的application.yml

server:
 port: 8088
spring:
 application:
   name: kafka-producer
 profiles:
   active: common
 kafka:
   consumer:
     auto-offset-reset: latest
     enable-auto-commit: true
     group-id: consumer
     bootstrap-servers: ${spring.kafka.producer.bootstrap-servers}
   producer:
     bootstrap-servers: localhost:9092

配置consumer的application.yml

server:
 port: 8089
spring:
 application:
   name: kafka-consumer
 profiles:
   active: common
 kafka:
   consumer:
     auto-offset-reset: latest
     enable-auto-commit: true
     group-id: consumer
     bootstrap-servers: ${spring.kafka.producer.bootstrap-servers}
   producer:
     bootstrap-servers: localhost:9092

消息分发

客户端通过kafka直接进行消息接收,从而实现消息的分发

  参考Spring Cloud Data Flow 的Message类及定义内容。

  所有消息都会带有ClassName,如前面文件中的数据表,这明确表达了这个消息是什么对象,后续的用户操作是什么也很明确。RESTFul规范中,可以返回用户可以选择的所有下一步操作(Actions),增加灵活性。

批量获取数据

分发

配置详解

批量并发:

  • 配置文件:

#kafka配置信息
kafka:
 producer:
   bootstrap-servers: 10.161.11.222:6667,10.161.11.223:6667,10.161.11.224:6667
   batch-size: 16785                                   #一次最多发送数据量
   retries: 1                                          #发送失败后的重复发送次数
   buffer-memory: 33554432                             #32M批处理缓冲区
   linger: 1
 consumer:
   bootstrap-servers: 10.161.11.222:6667,10.161.11.223:6667,10.161.11.224:6667
   auto-offset-reset: latest                           #最早未被消费的offset earliest
   max-poll-records: 3100                              #批量消费一次最大拉取的数据量
   enable-auto-commit: false                           #是否开启自动提交
   auto-commit-interval: 1000                          #自动提交的间隔时间
   session-timeout: 20000                              #连接超时时间
   max-poll-interval: 15000                            #手动提交设置与poll的心跳数,如果消息队列中没有消息,等待毫秒后,调用poll()方法。如果队列中有消息,立即消费消息,每次消费的消息的多少可以通过max.poll.records配置。
   max-partition-fetch-bytes: 15728640                 #设置拉取数据的大小,15M
 listener:
   batch-listener: true                                #是否开启批量消费,true表示批量消费
   concurrencys: 3,6                                   #设置消费的线程数
   poll-timeout: 1500                                  #只限自动提交,

有关配置

earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据

Java代码(并发、批量获取)

  1. Kafka消费者配置类 批量获取关键代码: ①factory.setBatchListener(true); ②propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,50); 并发获取关键代码: factory.setConcurrency(concurrency);


【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。