springboot使用kafka收发消息

举报
杨小羊 发表于 2020/09/28 16:02:31 2020/09/28
【摘要】 springboot结合kafka,并实现web,实时更新

参考:

https://www.cnblogs.com/lixianguo/p/13254915.html

实现实体分离的序列化和反序列化操作

springboot里建立模组maven

分别建立consumer和producer以及common公共模块

  • 对consumer和producer都建立springboot启动类

common的application-common.yml全局配置

spring:
  kafka:
    #kafka配置
    bootstrap-servers: localhost:9092
    producer:
      retries: 0
      # 每次批量发送消息的数量
      batch-size: 16384
      buffer-memory: 33554432
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      # 指定默认消费者group id
      group-id: test-consumer-group
      auto-offset-reset: earliest
      enable-auto-commit: true
      auto-commit-interval: 5000
      # 指定消息key和消息体的编解码方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    #自己定义的主题名称,在微服务中使用Value注解注入调用,如果kafka中没有该主题,则会自动创建
    template:
      default-topic: springkafka
  #    topic:
#      userTopic: springkafka

特别注意配置producer的application.yml

server:
  port: 8088
spring:
  application:
    name: kafka-producer
  profiles:
    active: common
  kafka:
    consumer:
      auto-offset-reset: latest
      enable-auto-commit: true
      group-id: consumer
      bootstrap-servers: ${spring.kafka.producer.bootstrap-servers}
    producer:
      bootstrap-servers: localhost:9092

配置consumer的application.yml

server:
  port: 8089
spring:
  application:
    name: kafka-consumer
  profiles:
    active: common
  kafka:
    consumer:
      auto-offset-reset: latest
      enable-auto-commit: true
      group-id: consumer
      bootstrap-servers: ${spring.kafka.producer.bootstrap-servers}
    producer:
      bootstrap-servers: localhost:9092

消息分发

客户端通过kafka直接进行消息接收,从而实现消息的分发

  参考Spring Cloud Data Flow 的Message类及定义内容。

  所有消息都会带有ClassName,如前面文件中的数据表,这明确表达了这个消息是什么对象,后续的用户操作是什么也很明确。RESTFul规范中,可以返回用户可以选择的所有下一步操作(Actions),增加灵活性。

批量获取数据

分发

配置详解

批量并发:

  • 配置文件:

#kafka配置信息
kafka:
  producer:
    bootstrap-servers: 10.161.11.222:6667,10.161.11.223:6667,10.161.11.224:6667
    batch-size: 16785                                   #一次最多发送数据量
    retries: 1                                          #发送失败后的重复发送次数
    buffer-memory: 33554432                             #32M批处理缓冲区
    linger: 1
  consumer:
    bootstrap-servers: 10.161.11.222:6667,10.161.11.223:6667,10.161.11.224:6667
    auto-offset-reset: latest                           #最早未被消费的offset earliest
    max-poll-records: 3100                              #批量消费一次最大拉取的数据量
    enable-auto-commit: false                           #是否开启自动提交
    auto-commit-interval: 1000                          #自动提交的间隔时间
    session-timeout: 20000                              #连接超时时间
    max-poll-interval: 15000                            #手动提交设置与poll的心跳数,如果消息队列中没有消息,等待毫秒后,调用poll()方法。如果队列中有消息,立即消费消息,每次消费的消息的多少可以通过max.poll.records配置。
    max-partition-fetch-bytes: 15728640                 #设置拉取数据的大小,15M
  listener:
    batch-listener: true                                #是否开启批量消费,true表示批量消费
    concurrencys: 3,6                                   #设置消费的线程数
    poll-timeout: 1500                                  #只限自动提交,

有关配置

earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据

Java代码(并发、批量获取)

  1. Kafka消费者配置类 批量获取关键代码: ①factory.setBatchListener(true); ②propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,50); 并发获取关键代码: factory.setConcurrency(concurrency);


【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。