基于Sqlalchemy的ORM event机制做event sourcing
【摘要】 背景 当前我所在的产品的缓存消息采用的是Etag标签机制,以Domain为粒度,由缓存读取服务通过主动拉取Etag的方式,对缓存和DB中的消息进行比较。颗粒太大,效率低下。所以我们需要一种以更细的粒度的,通过事件驱动,主动推送变化的方式。技术选型 方法一:在S层解决。在增删改接口中,拼装业务变化,推送kafka消息。缺点很明显,需要对所有的接口进行遍...
背景
当前我所在的产品的缓存消息采用的是Etag标签机制,以Domain为粒度,由缓存读取服务通过主动拉取Etag的方式,对缓存和DB中的消息进行比较。
颗粒太大,效率低下。所以我们需要一种以更细的粒度的,通过事件驱动,主动推送变化的方式。
技术选型
方法一:在S层解决。在增删改接口中,拼装业务变化,推送kafka消息。缺点很明显,需要对所有的接口进行遍历,并对业务代码进行侵入式的遍地开花修改。
方法二:在I层解决。对DB的binlog变化进行监听订阅。优点就是业务代码无须修改,缺点是,每条binlog变化是独立的事件,对于一次请求中的多个事物,会缺失数据间的关联关系,另外,binlog中,通过where条件进行的批量删除或者更新,只能得到where条件中的属性,其他的属性无法获取到。也没有补偿方式。
前两种方法的优缺点非常明显。
后来我们找到了第三种方法,在P层解决。通过Sqlalchemy的ORM事件机制,对指定的表的增删改动作进行监听,缓存起来,在请求结束时,以完整的消息以kafka方式推送出去。
ORM event代码示例
如下面代码所示,我们对credential表的增删改动作进行了hook监听,先尝试放置在了协程tmp信息中,如果事物回滚,则对tmp信息进行清理,事物成功提交后,再统一组装发出给kafka。在ORM中,增加to_kafka_msg转换方法,对敏感信息或者无用信息进行处理。
@event.listens_for(CredentialModel, 'after_delete')
def add_kafka_msg_for_delete(mapper, connection, target):
try:
event_type = "delete_" + target.__tablename__
event_obj = target.to_kafka_msg()
thread_information.set_tmp_notify_info(event_type, event_obj)
except Exception:
LOG.exception("add_kafka_msg_for_delete occured exception")
@event.listens_for(CredentialModel, 'after_update')
def add_kafka_msg_for_update(mapper, connection, target):
try:
event_type = "update_" + target.__tablename__
event_obj = target.to_kafka_msg()
thread_information.set_tmp_notify_info(event_type, event_obj)
except Exception:
LOG.exception("add_kafka_msg_for_update occured exception")
@event.listens_for(CredentialModel, 'after_insert')
def add_kafka_msg_for_insert(mapper, connection, target):
try:
event_type = "create_" + target.__tablename__
event_obj = target.to_kafka_msg()
thread_information.set_tmp_notify_info(event_type, event_obj)
except Exception:
LOG.exception("add_kafka_msg_for_insert occured exception ")
组装后的消息示例
{
'create_etag_ex': [
[
{
'domain_id': u'03e94705fe0033440f15c0055655b360',
'etag_id': 10099
}
]
],
'delete_assignment_relation_ex': [
{
'description': None,
'extra': {
},
'created_at': datetime.datetime(2020, 1, 17, 1, 24, 45),
'target_id': u'baiweitest',
'domain_id': u'03e94705fe0033440f15c0055655b360',
'actor_id': u'0781e68ff8004b483f01c0056032f07f',
'type': u'GroupResource',
'id': u'46e0ed4924684055b987e7d29f742244',
'policy_id': u'b9996fa53eaf4c17971accc67db3e5df'
}
]
}
更多的ORM事件,可以参考资料https://docs.sqlalchemy.org/en/13/orm/events.html#session-events
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
热门文章
评论(0)