springboot使用kafka收发消息
https://www.cnblogs.com/lixianguo/p/13254915.html
实现实体分离的序列化和反序列化操作
springboot里建立模组maven
分别建立consumer和producer以及common公共模块
对consumer和producer都建立springboot启动类
common的application-common.yml全局配置
spring:
kafka:
#kafka配置
bootstrap-servers: localhost:9092
producer:
retries: 0
# 每次批量发送消息的数量
batch-size: 16384
buffer-memory: 33554432
# 指定消息key和消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
# 指定默认消费者group id
group-id: test-consumer-group
auto-offset-reset: earliest
enable-auto-commit: true
auto-commit-interval: 5000
# 指定消息key和消息体的编解码方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#自己定义的主题名称,在微服务中使用Value注解注入调用,如果kafka中没有该主题,则会自动创建
template:
default-topic: springkafka
# topic:
# userTopic: springkafka
特别注意配置producer的application.yml
server:
port: 8088
spring:
application:
name: kafka-producer
profiles:
active: common
kafka:
consumer:
auto-offset-reset: latest
enable-auto-commit: true
group-id: consumer
bootstrap-servers: ${spring.kafka.producer.bootstrap-servers}
producer:
bootstrap-servers: localhost:9092
配置consumer的application.yml
server:
port: 8089
spring:
application:
name: kafka-consumer
profiles:
active: common
kafka:
consumer:
auto-offset-reset: latest
enable-auto-commit: true
group-id: consumer
bootstrap-servers: ${spring.kafka.producer.bootstrap-servers}
producer:
bootstrap-servers: localhost:9092
消息分发
客户端通过kafka直接进行消息接收,从而实现消息的分发
参考Spring Cloud Data Flow 的Message类及定义内容。
所有消息都会带有ClassName,如前面文件中的数据表,这明确表达了这个消息是什么对象,后续的用户操作是什么也很明确。RESTFul规范中,可以返回用户可以选择的所有下一步操作(Actions),增加灵活性。
批量并发:
配置文件:
#kafka配置信息
kafka:
producer:
bootstrap-servers: 10.161.11.222:6667,10.161.11.223:6667,10.161.11.224:6667
batch-size: 16785 #一次最多发送数据量
retries: 1 #发送失败后的重复发送次数
buffer-memory: 33554432 #32M批处理缓冲区
linger: 1
consumer:
bootstrap-servers: 10.161.11.222:6667,10.161.11.223:6667,10.161.11.224:6667
auto-offset-reset: latest #最早未被消费的offset earliest
max-poll-records: 3100 #批量消费一次最大拉取的数据量
enable-auto-commit: false #是否开启自动提交
auto-commit-interval: 1000 #自动提交的间隔时间
session-timeout: 20000 #连接超时时间
max-poll-interval: 15000 #手动提交设置与poll的心跳数,如果消息队列中没有消息,等待毫秒后,调用poll()方法。如果队列中有消息,立即消费消息,每次消费的消息的多少可以通过max.poll.records配置。
max-partition-fetch-bytes: 15728640 #设置拉取数据的大小,15M
listener:
batch-listener: true #是否开启批量消费,true表示批量消费
concurrencys: 3,6 #设置消费的线程数
poll-timeout: 1500 #只限自动提交,
有关配置
earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
Java代码(并发、批量获取)
Kafka消费者配置类 批量获取关键代码: ①factory.setBatchListener(true); ②propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,50); 并发获取关键代码: factory.setConcurrency(concurrency);
- 点赞
- 收藏
- 关注作者
评论(0)