Python中的事件驱动架构(EDA)设计模式
【摘要】 Python中的事件驱动架构(EDA)设计模式事件驱动架构(Event-Driven Architecture,EDA)是一种以事件为核心的软件架构模式。本文将介绍如何在Python中实现事件驱动架构,包括事件总线、事件处理器和 Saga 模式。 EDA核心概念事件驱动架构包含以下核心组件:事件(Event):表示系统中发生的某件事情事件生产者(Producer):产生和发布事件的组件事件...
Python中的事件驱动架构(EDA)设计模式
事件驱动架构(Event-Driven Architecture,EDA)是一种以事件为核心的软件架构模式。本文将介绍如何在Python中实现事件驱动架构,包括事件总线、事件处理器和 Saga 模式。
EDA核心概念
事件驱动架构包含以下核心组件:
- 事件(Event):表示系统中发生的某件事情
- 事件生产者(Producer):产生和发布事件的组件
- 事件消费者(Consumer):订阅和处理事件的组件
- 事件总线(Event Bus):负责事件的传递和路由
EDA核心实现
"""
Python事件驱动架构(EDA)实现
包含事件总线、事件处理器、Saga模式等
"""
import asyncio
from typing import Dict, List, Callable, Any, Optional, Set
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto
import uuid
import json
import logging
from abc import ABC, abstractmethod
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class EventPriority(Enum):
"""事件优先级"""
LOW = 1
NORMAL = 2
HIGH = 3
CRITICAL = 4
@dataclass
class DomainEvent:
"""领域事件基类"""
event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
event_type: str = field(default="")
aggregate_id: str = field(default="")
aggregate_type: str = field(default="")
timestamp: datetime = field(default_factory=datetime.now)
version: int = field(default=1)
payload: Dict[str, Any] = field(default_factory=dict)
metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
return {
"event_id": self.event_id,
"event_type": self.event_type,
"aggregate_id": self.aggregate_id,
"aggregate_type": self.aggregate_type,
"timestamp": self.timestamp.isoformat(),
"version": self.version,
"payload": self.payload,
"metadata": self.metadata
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'DomainEvent':
return cls(
event_id=data.get("event_id", str(uuid.uuid4())),
event_type=data.get("event_type", ""),
aggregate_id=data.get("aggregate_id", ""),
aggregate_type=data.get("aggregate_type", ""),
timestamp=datetime.fromisoformat(data.get("timestamp", datetime.now().isoformat())),
version=data.get("version", 1),
payload=data.get("payload", {}),
metadata=data.get("metadata", {})
)
# 具体领域事件
@dataclass
class OrderCreatedEvent(DomainEvent):
"""订单创建事件"""
def __post_init__(self):
self.event_type = "order.created"
self.aggregate_type = "order"
@dataclass
class OrderPaidEvent(DomainEvent):
"""订单支付事件"""
def __post_init__(self):
self.event_type = "order.paid"
self.aggregate_type = "order"
@dataclass
class InventoryReservedEvent(DomainEvent):
"""库存预留事件"""
def __post_init__(self):
self.event_type = "inventory.reserved"
self.aggregate_type = "inventory"
@dataclass
class PaymentProcessedEvent(DomainEvent):
"""支付处理事件"""
def __post_init__(self):
self.event_type = "payment.processed"
self.aggregate_type = "payment"
class EventHandler(ABC):
"""事件处理器基类"""
@abstractmethod
async def handle(self, event: DomainEvent) -> None:
"""处理事件"""
pass
@abstractmethod
def can_handle(self, event: DomainEvent) -> bool:
"""判断是否可处理该事件"""
pass
class EventBus:
"""异步事件总线"""
def __init__(self):
self._handlers: Dict[str, List[EventHandler]] = {}
self._async_handlers: Dict[str, List[Callable]] = {}
self._event_store: List[DomainEvent] = []
self._max_store_size = 1000
self._running = False
self._event_queue: asyncio.Queue = asyncio.Queue()
def register_handler(self, event_type: str, handler: EventHandler):
"""注册事件处理器"""
if event_type not in self._handlers:
self._handlers[event_type] = []
self._handlers[event_type].append(handler)
logger.info(f"注册处理器: {event_type} -> {handler.__class__.__name__}")
def register_async_handler(self, event_type: str, handler: Callable):
"""注册异步事件处理器"""
if event_type not in self._async_handlers:
self._async_handlers[event_type] = []
self._async_handlers[event_type].append(handler)
async def publish(self, event: DomainEvent) -> None:
"""发布事件"""
# 存储事件
self._event_store.append(event)
if len(self._event_store) > self._max_store_size:
self._event_store.pop(0)
# 放入处理队列
await self._event_queue.put(event)
logger.info(f"发布事件: {event.event_type} (ID: {event.event_id})")
async def start(self):
"""启动事件总线"""
self._running = True
logger.info("事件总线已启动")
while self._running:
try:
event = await asyncio.wait_for(
self._event_queue.get(),
timeout=1.0
)
await self._process_event(event)
except asyncio.TimeoutError:
continue
except Exception as e:
logger.error(f"事件处理错误: {e}")
async def _process_event(self, event: DomainEvent) -> None:
"""处理单个事件"""
event_type = event.event_type
# 调用处理器类
if event_type in self._handlers:
for handler in self._handlers[event_type]:
try:
if handler.can_handle(event):
await handler.handle(event)
except Exception as e:
logger.error(f"处理器错误 {handler.__class__.__name__}: {e}")
# 调用异步处理器
if event_type in self._async_handlers:
for handler in self._async_handlers[event_type]:
try:
await handler(event)
except Exception as e:
logger.error(f"异步处理器错误: {e}")
def stop(self):
"""停止事件总线"""
self._running = False
logger.info("事件总线已停止")
def get_event_history(self, event_type: Optional[str] = None) -> List[DomainEvent]:
"""获取事件历史"""
if event_type:
return [e for e in self._event_store if e.event_type == event_type]
return self._event_store.copy()
class SagaOrchestrator:
"""Saga编排器 - 处理分布式事务"""
def __init__(self, event_bus: EventBus):
self._event_bus = event_bus
self._sagas: Dict[str, 'Saga'] = {}
self._running_sagas: Dict[str, 'SagaInstance'] = {}
def register_saga(self, saga: 'Saga'):
"""注册Saga"""
self._sagas[saga.name] = saga
async def start_saga(self, saga_name: str, payload: Dict[str, Any]) -> str:
"""启动Saga"""
if saga_name not in self._sagas:
raise ValueError(f"Saga {saga_name} 未注册")
saga = self._sagas[saga_name]
instance_id = str(uuid.uuid4())
instance = SagaInstance(
instance_id=instance_id,
saga_name=saga_name,
current_step=0,
status=SagaStatus.STARTED,
payload=payload,
event_bus=self._event_bus
)
self._running_sagas[instance_id] = instance
# 执行第一步
await instance.execute_next_step()
return instance_id
async def handle_event(self, event: DomainEvent):
"""处理Saga相关事件"""
# 查找相关的Saga实例
for instance in self._running_sagas.values():
if instance.status == SagaStatus.RUNNING:
await instance.handle_event(event)
class SagaStatus(Enum):
"""Saga状态"""
STARTED = auto()
RUNNING = auto()
COMPLETED = auto()
FAILED = auto()
COMPENSATING = auto()
COMPENSATED = auto()
@dataclass
class SagaStep:
"""Saga步骤"""
name: str
action: Callable
compensation: Optional[Callable] = None
event_trigger: Optional[str] = None
class Saga:
"""Saga定义"""
def __init__(self, name: str):
self.name = name
self.steps: List[SagaStep] = []
def add_step(self, step: SagaStep):
"""添加步骤"""
self.steps.append(step)
def add_compensation_step(self, step_index: int, compensation: Callable):
"""添加补偿步骤"""
if 0 <= step_index < len(self.steps):
self.steps[step_index].compensation = compensation
class SagaInstance:
"""Saga实例"""
def __init__(
self,
instance_id: str,
saga_name: str,
current_step: int,
status: SagaStatus,
payload: Dict[str, Any],
event_bus: EventBus
):
self.instance_id = instance_id
self.saga_name = saga_name
self.current_step = current_step
self.status = status
self.payload = payload
self._event_bus = event_bus
self._completed_steps: List[int] = []
async def execute_next_step(self):
"""执行下一步"""
saga = self._get_saga()
if self.current_step >= len(saga.steps):
self.status = SagaStatus.COMPLETED
logger.info(f"Saga {self.instance_id} 完成")
return
step = saga.steps[self.current_step]
self.status = SagaStatus.RUNNING
try:
await step.action(self.payload)
self._completed_steps.append(self.current_step)
self.current_step += 1
# 自动执行下一步
await self.execute_next_step()
except Exception as e:
logger.error(f"Saga步骤执行失败: {e}")
await self._compensate()
async def handle_event(self, event: DomainEvent):
"""处理事件"""
saga = self._get_saga()
if self.current_step < len(saga.steps):
step = saga.steps[self.current_step]
if step.event_trigger == event.event_type:
await self.execute_next_step()
async def _compensate(self):
"""执行补偿"""
self.status = SagaStatus.COMPENSATING
saga = self._get_saga()
# 逆向执行补偿
for step_index in reversed(self._completed_steps):
step = saga.steps[step_index]
if step.compensation:
try:
await step.compensation(self.payload)
except Exception as e:
logger.error(f"补偿执行失败: {e}")
self.status = SagaStatus.COMPENSATED
logger.info(f"Saga {self.instance_id} 已补偿")
def _get_saga(self) -> Saga:
"""获取Saga定义"""
# 简化实现
return Saga(self.saga_name)
# ============ 具体事件处理器 ============
class OrderEventHandler(EventHandler):
"""订单事件处理器"""
def can_handle(self, event: DomainEvent) -> bool:
return event.aggregate_type == "order"
async def handle(self, event: DomainEvent) -> None:
if event.event_type == "order.created":
logger.info(f"处理订单创建: {event.aggregate_id}")
# 发送通知、更新统计等
elif event.event_type == "order.paid":
logger.info(f"处理订单支付: {event.aggregate_id}")
class InventoryEventHandler(EventHandler):
"""库存事件处理器"""
def can_handle(self, event: DomainEvent) -> bool:
return event.aggregate_type == "inventory"
async def handle(self, event: DomainEvent) -> None:
if event.event_type == "inventory.reserved":
logger.info(f"处理库存预留: {event.aggregate_id}")
# ============ 示例领域服务 ============
class OrderService:
"""订单服务"""
def __init__(self, event_bus: EventBus):
self._event_bus = event_bus
self._orders: Dict[str, Dict] = {}
async def create_order(self, customer_id: str, items: List[Dict]) -> str:
"""创建订单"""
order_id = str(uuid.uuid4())
order = {
"id": order_id,
"customer_id": customer_id,
"items": items,
"status": "created",
"total": sum(item["price"] * item["quantity"] for item in items)
}
self._orders[order_id] = order
# 发布订单创建事件
event = OrderCreatedEvent(
aggregate_id=order_id,
payload=order
)
await self._event_bus.publish(event)
return order_id
async def pay_order(self, order_id: str) -> bool:
"""支付订单"""
if order_id not in self._orders:
return False
self._orders[order_id]["status"] = "paid"
# 发布订单支付事件
event = OrderPaidEvent(
aggregate_id=order_id,
payload={"order_id": order_id, "amount": self._orders[order_id]["total"]}
)
await self._event_bus.publish(event)
return True
class InventoryService:
"""库存服务"""
def __init__(self, event_bus: EventBus):
self._event_bus = event_bus
self._inventory: Dict[str, int] = {
"product_1": 100,
"product_2": 50,
"product_3": 200
}
async def reserve_inventory(self, product_id: str, quantity: int) -> bool:
"""预留库存"""
if product_id not in self._inventory:
return False
if self._inventory[product_id] < quantity:
return False
self._inventory[product_id] -= quantity
# 发布库存预留事件
event = InventoryReservedEvent(
aggregate_id=product_id,
payload={"product_id": product_id, "quantity": quantity}
)
await self._event_bus.publish(event)
return True
async def main():
"""主函数"""
print("="*60)
print("Python事件驱动架构(EDA)演示")
print("="*60)
# 创建事件总线
event_bus = EventBus()
# 注册事件处理器
order_handler = OrderEventHandler()
inventory_handler = InventoryEventHandler()
event_bus.register_handler("order.created", order_handler)
event_bus.register_handler("order.paid", order_handler)
event_bus.register_handler("inventory.reserved", inventory_handler)
# 创建服务
order_service = OrderService(event_bus)
inventory_service = InventoryService(event_bus)
# 启动事件总线
event_task = asyncio.create_task(event_bus.start())
# 演示场景
print("\n【场景1: 创建订单】")
order_id = await order_service.create_order(
customer_id="customer_1",
items=[
{"product_id": "product_1", "price": 99.99, "quantity": 2},
{"product_id": "product_2", "price": 49.99, "quantity": 1}
]
)
print(f"订单创建成功: {order_id}")
# 等待事件处理
await asyncio.sleep(0.5)
print("\n【场景2: 预留库存】")
success = await inventory_service.reserve_inventory("product_1", 2)
print(f"库存预留: {'成功' if success else '失败'}")
await asyncio.sleep(0.5)
print("\n【场景3: 支付订单】")
success = await order_service.pay_order(order_id)
print(f"订单支付: {'成功' if success else '失败'}")
await asyncio.sleep(0.5)
# 显示事件历史
print("\n【事件历史】")
events = event_bus.get_event_history()
for event in events:
print(f" {event.timestamp.strftime('%H:%M:%S')} - {event.event_type}")
# 停止事件总线
event_bus.stop()
await event_task
print("\n" + "="*60)
print("EDA架构总结")
print("="*60)
print("1. 松耦合: 组件间通过事件通信")
print("2. 可扩展: 易于添加新的事件处理器")
print("3. 可追踪: 事件历史记录系统状态")
print("4. Saga模式: 处理分布式事务")
print("5. 异步处理: 提高系统吞吐量")
print("="*60)
if __name__ == "__main__":
asyncio.run(main())
EDA架构图
Saga模式流程
关键要点
- 松耦合:组件间通过事件通信,降低耦合度
- 可扩展:易于添加新的事件处理器
- 可追踪:事件历史记录系统状态变化
- Saga模式:处理跨服务的分布式事务
- 异步处理:提高系统吞吐量和响应性
事件驱动架构是现代分布式系统的核心设计模式,特别适合微服务架构。
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)