基于kafka消息队列的事件驱动

举报
码乐 发表于 2025/03/19 09:57:41 2025/03/19
【摘要】 1 简介事件架构的几个例子,事件驱动型架构(Event-Driven Architecture, EDA)是一种以事件为中心的架构模式,通常用于解耦系统组件,提高可扩展性和响应性。主流的事件驱动架构包括: 基于消息队列(Message Queue-Based EDA)典型技术栈:Kafka、RabbitMQ、NSQ事件存储在消息队列中,多个消费者可以异步消费事件。适用于高吞吐量和可靠性要求...

1 简介

事件架构的几个例子,事件驱动型架构(Event-Driven Architecture, EDA)是一种以事件为中心的架构模式,通常用于解耦系统组件,提高可扩展性和响应性。

主流的事件驱动架构包括:

	基于消息队列(Message Queue-Based EDA

典型技术栈:Kafka、RabbitMQ、NSQ
事件存储在消息队列中,多个消费者可以异步消费事件。
适用于高吞吐量和可靠性要求高的场景。

	基于事件溯源(Event Sourcing-Based EDA

典型技术栈:EventStore、CQRS
所有状态变更都以事件的形式存储,可以随时回放恢复状态。
适用于金融、审计、交易等需要历史数据回溯的场景。

	基于发布-订阅(Pub/Sub-Based EDA

典型技术栈:Redis Pub/Sub、NATS、Google Pub/Sub
事件生产者与消费者通过中间代理解耦,订阅者可以实时响应事件。
适用于分布式微服务通信和实时数据推送。

2 实现示例服务

以下是三个不同架构的 Gin 示例服务,分别基于 Kafka(消息队列)、事件溯源(本地存储)、Redis(Pub/Sub) 进行事件驱动通信。

  1. 基于 Kafka 的事件驱动

    github.com/segmentio/kafka-go

    var kafkaWriter *kafka.Writer

    func main() {
    kafkaWriter = &kafka.Writer{
    Addr: kafka.TCP(“localhost:9092”),
    Topic: “events”,
    Balancer: &kafka.LeastBytes{},
    }

       r := gin.Default()
       r.POST("/publish", publishToKafka)
       go consumeFromKafka()
    
       r.Run(":8080")
    

    }

    func publishToKafka(c *gin.Context) {
    event := c.PostForm(“event”)
    err := kafkaWriter.WriteMessages(context.Background(),
    kafka.Message{Value: []byte(event)},
    )
    if err != nil {
    c.JSON(http.StatusInternalServerError, gin.H{“error”: “failed to send event”})
    return
    }
    c.JSON(http.StatusOK, gin.H{“message”: “event published”})
    }

    func consumeFromKafka() {
    reader := kafka.NewReader(kafka.ReaderConfig{
    Brokers: []string{“localhost:9092”},
    Topic: “events”,
    GroupID: “group1”,
    })
    defer reader.Close()

       for {
           msg, err := reader.ReadMessage(context.Background())
           if err != nil {
               log.Println("Error reading message:", err)
               continue
           }
           log.Println("Received Kafka event:", string(msg.Value))
       }
    

    }

3 总结

Kafka 适合高吞吐量的日志、任务队列等场景,但部署较复杂,适合企业级系统。
事件溯源适用于金融、审计等需要完整历史回放的业务场景,但存储开销较大。
Redis Pub/Sub 适用于实时推送和轻量级微服务通信,但数据不会持久化,不适合关键业务。

【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。