Python中的事件驱动架构(EDA)设计模式

举报
柠檬🍋 发表于 2026/02/26 20:24:56 2026/02/26
【摘要】 Python中的事件驱动架构(EDA)设计模式事件驱动架构(Event-Driven Architecture,EDA)是一种以事件为核心的软件架构模式。本文将介绍如何在Python中实现事件驱动架构,包括事件总线、事件处理器和 Saga 模式。 EDA核心概念事件驱动架构包含以下核心组件:事件(Event):表示系统中发生的某件事情事件生产者(Producer):产生和发布事件的组件事件...

Python中的事件驱动架构(EDA)设计模式

事件驱动架构(Event-Driven Architecture,EDA)是一种以事件为核心的软件架构模式。本文将介绍如何在Python中实现事件驱动架构,包括事件总线、事件处理器和 Saga 模式。

EDA核心概念

事件驱动架构包含以下核心组件:

  • 事件(Event):表示系统中发生的某件事情
  • 事件生产者(Producer):产生和发布事件的组件
  • 事件消费者(Consumer):订阅和处理事件的组件
  • 事件总线(Event Bus):负责事件的传递和路由

EDA核心实现

"""
Python事件驱动架构(EDA)实现
包含事件总线、事件处理器、Saga模式等
"""

import asyncio
from typing import Dict, List, Callable, Any, Optional, Set
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto
import uuid
import json
import logging
from abc import ABC, abstractmethod

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class EventPriority(Enum):
    """事件优先级"""
    LOW = 1
    NORMAL = 2
    HIGH = 3
    CRITICAL = 4


@dataclass
class DomainEvent:
    """领域事件基类"""
    event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    event_type: str = field(default="")
    aggregate_id: str = field(default="")
    aggregate_type: str = field(default="")
    timestamp: datetime = field(default_factory=datetime.now)
    version: int = field(default=1)
    payload: Dict[str, Any] = field(default_factory=dict)
    metadata: Dict[str, Any] = field(default_factory=dict)
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            "event_id": self.event_id,
            "event_type": self.event_type,
            "aggregate_id": self.aggregate_id,
            "aggregate_type": self.aggregate_type,
            "timestamp": self.timestamp.isoformat(),
            "version": self.version,
            "payload": self.payload,
            "metadata": self.metadata
        }
    
    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'DomainEvent':
        return cls(
            event_id=data.get("event_id", str(uuid.uuid4())),
            event_type=data.get("event_type", ""),
            aggregate_id=data.get("aggregate_id", ""),
            aggregate_type=data.get("aggregate_type", ""),
            timestamp=datetime.fromisoformat(data.get("timestamp", datetime.now().isoformat())),
            version=data.get("version", 1),
            payload=data.get("payload", {}),
            metadata=data.get("metadata", {})
        )


# 具体领域事件
@dataclass
class OrderCreatedEvent(DomainEvent):
    """订单创建事件"""
    def __post_init__(self):
        self.event_type = "order.created"
        self.aggregate_type = "order"


@dataclass
class OrderPaidEvent(DomainEvent):
    """订单支付事件"""
    def __post_init__(self):
        self.event_type = "order.paid"
        self.aggregate_type = "order"


@dataclass
class InventoryReservedEvent(DomainEvent):
    """库存预留事件"""
    def __post_init__(self):
        self.event_type = "inventory.reserved"
        self.aggregate_type = "inventory"


@dataclass
class PaymentProcessedEvent(DomainEvent):
    """支付处理事件"""
    def __post_init__(self):
        self.event_type = "payment.processed"
        self.aggregate_type = "payment"


class EventHandler(ABC):
    """事件处理器基类"""
    
    @abstractmethod
    async def handle(self, event: DomainEvent) -> None:
        """处理事件"""
        pass
    
    @abstractmethod
    def can_handle(self, event: DomainEvent) -> bool:
        """判断是否可处理该事件"""
        pass


class EventBus:
    """异步事件总线"""
    
    def __init__(self):
        self._handlers: Dict[str, List[EventHandler]] = {}
        self._async_handlers: Dict[str, List[Callable]] = {}
        self._event_store: List[DomainEvent] = []
        self._max_store_size = 1000
        self._running = False
        self._event_queue: asyncio.Queue = asyncio.Queue()
    
    def register_handler(self, event_type: str, handler: EventHandler):
        """注册事件处理器"""
        if event_type not in self._handlers:
            self._handlers[event_type] = []
        self._handlers[event_type].append(handler)
        logger.info(f"注册处理器: {event_type} -> {handler.__class__.__name__}")
    
    def register_async_handler(self, event_type: str, handler: Callable):
        """注册异步事件处理器"""
        if event_type not in self._async_handlers:
            self._async_handlers[event_type] = []
        self._async_handlers[event_type].append(handler)
    
    async def publish(self, event: DomainEvent) -> None:
        """发布事件"""
        # 存储事件
        self._event_store.append(event)
        if len(self._event_store) > self._max_store_size:
            self._event_store.pop(0)
        
        # 放入处理队列
        await self._event_queue.put(event)
        logger.info(f"发布事件: {event.event_type} (ID: {event.event_id})")
    
    async def start(self):
        """启动事件总线"""
        self._running = True
        logger.info("事件总线已启动")
        
        while self._running:
            try:
                event = await asyncio.wait_for(
                    self._event_queue.get(),
                    timeout=1.0
                )
                await self._process_event(event)
            except asyncio.TimeoutError:
                continue
            except Exception as e:
                logger.error(f"事件处理错误: {e}")
    
    async def _process_event(self, event: DomainEvent) -> None:
        """处理单个事件"""
        event_type = event.event_type
        
        # 调用处理器类
        if event_type in self._handlers:
            for handler in self._handlers[event_type]:
                try:
                    if handler.can_handle(event):
                        await handler.handle(event)
                except Exception as e:
                    logger.error(f"处理器错误 {handler.__class__.__name__}: {e}")
        
        # 调用异步处理器
        if event_type in self._async_handlers:
            for handler in self._async_handlers[event_type]:
                try:
                    await handler(event)
                except Exception as e:
                    logger.error(f"异步处理器错误: {e}")
    
    def stop(self):
        """停止事件总线"""
        self._running = False
        logger.info("事件总线已停止")
    
    def get_event_history(self, event_type: Optional[str] = None) -> List[DomainEvent]:
        """获取事件历史"""
        if event_type:
            return [e for e in self._event_store if e.event_type == event_type]
        return self._event_store.copy()


class SagaOrchestrator:
    """Saga编排器 - 处理分布式事务"""
    
    def __init__(self, event_bus: EventBus):
        self._event_bus = event_bus
        self._sagas: Dict[str, 'Saga'] = {}
        self._running_sagas: Dict[str, 'SagaInstance'] = {}
    
    def register_saga(self, saga: 'Saga'):
        """注册Saga"""
        self._sagas[saga.name] = saga
    
    async def start_saga(self, saga_name: str, payload: Dict[str, Any]) -> str:
        """启动Saga"""
        if saga_name not in self._sagas:
            raise ValueError(f"Saga {saga_name} 未注册")
        
        saga = self._sagas[saga_name]
        instance_id = str(uuid.uuid4())
        
        instance = SagaInstance(
            instance_id=instance_id,
            saga_name=saga_name,
            current_step=0,
            status=SagaStatus.STARTED,
            payload=payload,
            event_bus=self._event_bus
        )
        
        self._running_sagas[instance_id] = instance
        
        # 执行第一步
        await instance.execute_next_step()
        
        return instance_id
    
    async def handle_event(self, event: DomainEvent):
        """处理Saga相关事件"""
        # 查找相关的Saga实例
        for instance in self._running_sagas.values():
            if instance.status == SagaStatus.RUNNING:
                await instance.handle_event(event)


class SagaStatus(Enum):
    """Saga状态"""
    STARTED = auto()
    RUNNING = auto()
    COMPLETED = auto()
    FAILED = auto()
    COMPENSATING = auto()
    COMPENSATED = auto()


@dataclass
class SagaStep:
    """Saga步骤"""
    name: str
    action: Callable
    compensation: Optional[Callable] = None
    event_trigger: Optional[str] = None


class Saga:
    """Saga定义"""
    
    def __init__(self, name: str):
        self.name = name
        self.steps: List[SagaStep] = []
    
    def add_step(self, step: SagaStep):
        """添加步骤"""
        self.steps.append(step)
    
    def add_compensation_step(self, step_index: int, compensation: Callable):
        """添加补偿步骤"""
        if 0 <= step_index < len(self.steps):
            self.steps[step_index].compensation = compensation


class SagaInstance:
    """Saga实例"""
    
    def __init__(
        self,
        instance_id: str,
        saga_name: str,
        current_step: int,
        status: SagaStatus,
        payload: Dict[str, Any],
        event_bus: EventBus
    ):
        self.instance_id = instance_id
        self.saga_name = saga_name
        self.current_step = current_step
        self.status = status
        self.payload = payload
        self._event_bus = event_bus
        self._completed_steps: List[int] = []
    
    async def execute_next_step(self):
        """执行下一步"""
        saga = self._get_saga()
        if self.current_step >= len(saga.steps):
            self.status = SagaStatus.COMPLETED
            logger.info(f"Saga {self.instance_id} 完成")
            return
        
        step = saga.steps[self.current_step]
        self.status = SagaStatus.RUNNING
        
        try:
            await step.action(self.payload)
            self._completed_steps.append(self.current_step)
            self.current_step += 1
            
            # 自动执行下一步
            await self.execute_next_step()
            
        except Exception as e:
            logger.error(f"Saga步骤执行失败: {e}")
            await self._compensate()
    
    async def handle_event(self, event: DomainEvent):
        """处理事件"""
        saga = self._get_saga()
        if self.current_step < len(saga.steps):
            step = saga.steps[self.current_step]
            if step.event_trigger == event.event_type:
                await self.execute_next_step()
    
    async def _compensate(self):
        """执行补偿"""
        self.status = SagaStatus.COMPENSATING
        saga = self._get_saga()
        
        # 逆向执行补偿
        for step_index in reversed(self._completed_steps):
            step = saga.steps[step_index]
            if step.compensation:
                try:
                    await step.compensation(self.payload)
                except Exception as e:
                    logger.error(f"补偿执行失败: {e}")
        
        self.status = SagaStatus.COMPENSATED
        logger.info(f"Saga {self.instance_id} 已补偿")
    
    def _get_saga(self) -> Saga:
        """获取Saga定义"""
        # 简化实现
        return Saga(self.saga_name)


# ============ 具体事件处理器 ============
class OrderEventHandler(EventHandler):
    """订单事件处理器"""
    
    def can_handle(self, event: DomainEvent) -> bool:
        return event.aggregate_type == "order"
    
    async def handle(self, event: DomainEvent) -> None:
        if event.event_type == "order.created":
            logger.info(f"处理订单创建: {event.aggregate_id}")
            # 发送通知、更新统计等
        elif event.event_type == "order.paid":
            logger.info(f"处理订单支付: {event.aggregate_id}")


class InventoryEventHandler(EventHandler):
    """库存事件处理器"""
    
    def can_handle(self, event: DomainEvent) -> bool:
        return event.aggregate_type == "inventory"
    
    async def handle(self, event: DomainEvent) -> None:
        if event.event_type == "inventory.reserved":
            logger.info(f"处理库存预留: {event.aggregate_id}")


# ============ 示例领域服务 ============
class OrderService:
    """订单服务"""
    
    def __init__(self, event_bus: EventBus):
        self._event_bus = event_bus
        self._orders: Dict[str, Dict] = {}
    
    async def create_order(self, customer_id: str, items: List[Dict]) -> str:
        """创建订单"""
        order_id = str(uuid.uuid4())
        
        order = {
            "id": order_id,
            "customer_id": customer_id,
            "items": items,
            "status": "created",
            "total": sum(item["price"] * item["quantity"] for item in items)
        }
        
        self._orders[order_id] = order
        
        # 发布订单创建事件
        event = OrderCreatedEvent(
            aggregate_id=order_id,
            payload=order
        )
        await self._event_bus.publish(event)
        
        return order_id
    
    async def pay_order(self, order_id: str) -> bool:
        """支付订单"""
        if order_id not in self._orders:
            return False
        
        self._orders[order_id]["status"] = "paid"
        
        # 发布订单支付事件
        event = OrderPaidEvent(
            aggregate_id=order_id,
            payload={"order_id": order_id, "amount": self._orders[order_id]["total"]}
        )
        await self._event_bus.publish(event)
        
        return True


class InventoryService:
    """库存服务"""
    
    def __init__(self, event_bus: EventBus):
        self._event_bus = event_bus
        self._inventory: Dict[str, int] = {
            "product_1": 100,
            "product_2": 50,
            "product_3": 200
        }
    
    async def reserve_inventory(self, product_id: str, quantity: int) -> bool:
        """预留库存"""
        if product_id not in self._inventory:
            return False
        
        if self._inventory[product_id] < quantity:
            return False
        
        self._inventory[product_id] -= quantity
        
        # 发布库存预留事件
        event = InventoryReservedEvent(
            aggregate_id=product_id,
            payload={"product_id": product_id, "quantity": quantity}
        )
        await self._event_bus.publish(event)
        
        return True


async def main():
    """主函数"""
    print("="*60)
    print("Python事件驱动架构(EDA)演示")
    print("="*60)
    
    # 创建事件总线
    event_bus = EventBus()
    
    # 注册事件处理器
    order_handler = OrderEventHandler()
    inventory_handler = InventoryEventHandler()
    
    event_bus.register_handler("order.created", order_handler)
    event_bus.register_handler("order.paid", order_handler)
    event_bus.register_handler("inventory.reserved", inventory_handler)
    
    # 创建服务
    order_service = OrderService(event_bus)
    inventory_service = InventoryService(event_bus)
    
    # 启动事件总线
    event_task = asyncio.create_task(event_bus.start())
    
    # 演示场景
    print("\n【场景1: 创建订单】")
    order_id = await order_service.create_order(
        customer_id="customer_1",
        items=[
            {"product_id": "product_1", "price": 99.99, "quantity": 2},
            {"product_id": "product_2", "price": 49.99, "quantity": 1}
        ]
    )
    print(f"订单创建成功: {order_id}")
    
    # 等待事件处理
    await asyncio.sleep(0.5)
    
    print("\n【场景2: 预留库存】")
    success = await inventory_service.reserve_inventory("product_1", 2)
    print(f"库存预留: {'成功' if success else '失败'}")
    
    await asyncio.sleep(0.5)
    
    print("\n【场景3: 支付订单】")
    success = await order_service.pay_order(order_id)
    print(f"订单支付: {'成功' if success else '失败'}")
    
    await asyncio.sleep(0.5)
    
    # 显示事件历史
    print("\n【事件历史】")
    events = event_bus.get_event_history()
    for event in events:
        print(f"  {event.timestamp.strftime('%H:%M:%S')} - {event.event_type}")
    
    # 停止事件总线
    event_bus.stop()
    await event_task
    
    print("\n" + "="*60)
    print("EDA架构总结")
    print("="*60)
    print("1. 松耦合: 组件间通过事件通信")
    print("2. 可扩展: 易于添加新的事件处理器")
    print("3. 可追踪: 事件历史记录系统状态")
    print("4. Saga模式: 处理分布式事务")
    print("5. 异步处理: 提高系统吞吐量")
    print("="*60)


if __name__ == "__main__":
    asyncio.run(main())

EDA架构图

事件消费者
事件总线
事件生产者
OrderCreated
PaymentProcessed
InventoryReserved
通知服务
分析服务
审计服务
其他服务
事件路由器
事件存储
订单服务
支付服务
库存服务

Saga模式流程

成功
失败
成功
失败
成功
失败
开始Saga
步骤1: 创建订单
步骤2: 预留库存
补偿: 取消订单
步骤3: 处理支付
补偿: 释放库存
Saga完成
补偿: 退款

关键要点

  1. 松耦合:组件间通过事件通信,降低耦合度
  2. 可扩展:易于添加新的事件处理器
  3. 可追踪:事件历史记录系统状态变化
  4. Saga模式:处理跨服务的分布式事务
  5. 异步处理:提高系统吞吐量和响应性

事件驱动架构是现代分布式系统的核心设计模式,特别适合微服务架构。

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。