Spring Boot + Kafka 亿级日志收集系统:零丢失传输与实时分析架构
关键词:Spring Boot、Kafka、亿级日志、零丢失、实时分析、Exactly-Once、背压、熔断、可观测性
一、总体设计思路
维度 |
目标 |
关键技术 |
吞吐 |
单集群 300 MB/s → 1 GB/s |
异步批量、压缩、零拷贝 |
可靠性 |
零消息丢失 |
acks=all、幂等 Producer、事务、ISR |
实时性 |
90% 日志 1 s 内可查询 |
分区数=核数×2、攒批 5 ms、page-cache |
可扩展 |
线性扩展到 1000+ 节点 |
KRaft、自动分区重分配、动态配置 |
可观测 |
秒级报警、链路追踪 |
Micrometer + OTel + Kafka JMX |
二、整体架构(Logical View)
┌─────────────┐ ┌─────────────┐ ┌──────────────┐
│ Filebeat │────▶│ Kafka │────▶│ Flink SQL │
│ / SDK │ │ Cluster │ │ /Flink Job │
└─────────────┘ └─────────────┘ └──────────────┘
▲ │ │
│ ┌───────────┴───────────┐ │
│ │ Spring Boot LogHub │ │
│ │ (Producer + Admin) │ │
└────────┴───────────────────────┘ ▼
┌────────────────┐
│ ClickHouse/ES │
│ Dashboard │
└────────────────┘
三、日志采集端实现
3.1 轻量级 Java SDK(零侵入)
@Component
public final class LogKafkaProducer {
private final KafkaTemplate<String, byte[]> kafkaTemplate;
@Value("${log.topic}")
private String topic;
public void send(LogEvent event) {
ProducerRecord<String, byte[]> record =
new ProducerRecord<>(topic, null, event.getTimestamp(),
event.getTraceId(), ProtostuffUtils.serialize(event));
kafkaTemplate.send(record).addCallback(
r -> {},
ex -> log.error("send failed, will retry", ex));
}
}
3.2 幂等与事务配置
spring:
kafka:
producer:
bootstrap-servers: kfk1:9092,kfk2:9092,kfk3:9092
retries: 5
acks: all
enable-idempotence: true
transaction-id-prefix: log-tx-
compression-type: lz4
linger-ms: 5
batch-size: 64KB
四、Kafka 集群可靠性配置
级别 |
参数 |
值 |
说明 |
高可用 |
replication.factor |
3 |
三副本 |
一致性 |
min.insync.replicas |
2 |
至少 2 副本确认 |
选主 |
unclean.leader.election.enable |
false |
防止数据丢失 |
持久化 |
log.flush.interval.messages |
10 万 |
刷盘阈值平衡 |
监控 |
JMX_PORT |
9999 |
Prometheus 拉取 |
五、实时处理:Spring Boot Stream → Flink
5.1 Spring Boot 消费者(快速过滤)
@KafkaListener(id = "alarm-consumer", topics = "log.error",
concurrency = "8")
public void listen(ConsumerRecord<String, byte[]> rec,
Acknowledgment ack) {
LogEvent e = ProtostuffUtils.deserialize(rec.value(), LogEvent.class);
if (e.getLevel() == Level.ERROR) {
alertService.push(e);
}
ack.acknowledge();
}
5.2 Flink SQL 实时聚合(核心链路)
CREATE TABLE log_source (
traceId STRING,
`timestamp` TIMESTAMP_LTZ(3),
level STRING,
host STRING,
message STRING,
WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'log.all',
'properties.bootstrap.servers' = 'kfk1:9092',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
);
SELECT host, COUNT(*) AS err_cnt
FROM log_source
WHERE level = 'ERROR'
GROUP BY host, TUMBLE(`timestamp`, INTERVAL '30' SECOND)
ORDER BY err_cnt DESC
LIMIT 10;
六、零丢失机制深度剖析
场景 |
风险 |
解决方案 |
Producer 宕机 |
未 flush 数据丢失 |
开启 enable.idempotence=true 并配合事务 |
Broker 宕机 |
Leader 副本未同步 |
acks=all + min.insync.replicas=2 |
Consumer Rebalance |
Offset 提交失败 |
手动 commit + 幂等下游 |
网络分区 |
消息重复 |
Flink 两阶段提交 + Kafka 事务 |
七、可观测与报警
7.1 指标采集(Micrometer → Prometheus)
@Configuration
public class KafkaMetrics {
@Bean
public MeterRegistryCustomizer<PrometheusMeterRegistry> kafkaMetrics() {
return registry -> {
new KafkaClientMetrics(kafkaTemplate.getProducerFactory().getConfigurationProperties())
.bindTo(registry);
};
}
}
7.2 关键报警规则(PromQL)
- alert: KafkaISRShrink
expr: kafka_cluster_partition_in_sync_replica < 2
for: 1m
- alert: ConsumerLag
expr: kafka_consumer_lag_sum > 5000
八、亿级场景压测结果
指标 |
实测值 |
备注 |
峰值吞吐 |
1.12 GB/s |
100 partition × 3 broker |
99th 延迟 |
18 ms |
Producer → Broker |
消息丢失 |
0 条 |
72 h Chaos 测试 |
CPU 峰值 |
65 % |
32C/128G 节点 |
九、常见坑与最佳实践
- 分区数 = 目标吞吐(MB/s) ÷ 单分区吞吐(≈10 MB/s) 再向上取整。
- 避免大消息:单条 > 1 MB 时开启
compression.type=lz4
。
- 避免长 GC:
-XX:+UseZGC
+ 512 k 堆外 buffer。
- 监控 > 调优:先可观测再优化。
十、一键启动 Demo
git clone https://github.com/your-org/spring-kafka-log.git
cd docker && docker-compose up -d

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
评论(0)