基于kafka消息队列的事件驱动
1 简介
事件架构的几个例子,事件驱动型架构(Event-Driven Architecture, EDA)是一种以事件为中心的架构模式,通常用于解耦系统组件,提高可扩展性和响应性。
主流的事件驱动架构包括:
基于消息队列(Message Queue-Based EDA)
典型技术栈:Kafka、RabbitMQ、NSQ
事件存储在消息队列中,多个消费者可以异步消费事件。
适用于高吞吐量和可靠性要求高的场景。
基于事件溯源(Event Sourcing-Based EDA)
典型技术栈:EventStore、CQRS
所有状态变更都以事件的形式存储,可以随时回放恢复状态。
适用于金融、审计、交易等需要历史数据回溯的场景。
基于发布-订阅(Pub/Sub-Based EDA)
典型技术栈:Redis Pub/Sub、NATS、Google Pub/Sub
事件生产者与消费者通过中间代理解耦,订阅者可以实时响应事件。
适用于分布式微服务通信和实时数据推送。
2 实现示例服务
以下是三个不同架构的 Gin 示例服务,分别基于 Kafka(消息队列)、事件溯源(本地存储)、Redis(Pub/Sub) 进行事件驱动通信。
-
基于 Kafka 的事件驱动
“github.com/segmentio/kafka-go”
var kafkaWriter *kafka.Writer
func main() {
kafkaWriter = &kafka.Writer{
Addr: kafka.TCP(“localhost:9092”),
Topic: “events”,
Balancer: &kafka.LeastBytes{},
}r := gin.Default() r.POST("/publish", publishToKafka) go consumeFromKafka() r.Run(":8080")
}
func publishToKafka(c *gin.Context) {
event := c.PostForm(“event”)
err := kafkaWriter.WriteMessages(context.Background(),
kafka.Message{Value: []byte(event)},
)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{“error”: “failed to send event”})
return
}
c.JSON(http.StatusOK, gin.H{“message”: “event published”})
}func consumeFromKafka() {
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{“localhost:9092”},
Topic: “events”,
GroupID: “group1”,
})
defer reader.Close()for { msg, err := reader.ReadMessage(context.Background()) if err != nil { log.Println("Error reading message:", err) continue } log.Println("Received Kafka event:", string(msg.Value)) }
}
3 总结
Kafka 适合高吞吐量的日志、任务队列等场景,但部署较复杂,适合企业级系统。
事件溯源适用于金融、审计等需要完整历史回放的业务场景,但存储开销较大。
Redis Pub/Sub 适用于实时推送和轻量级微服务通信,但数据不会持久化,不适合关键业务。
- 点赞
- 收藏
- 关注作者
评论(0)