基于Sqlalchemy的ORM event机制做event sourcing
背景
当前我所在的产品的缓存消息采用的是Etag标签机制,以Domain为粒度,由缓存读取服务通过主动拉取Etag的方式,对缓存和DB中的消息进行比较。
颗粒太大,效率低下。所以我们需要一种以更细的粒度的,通过事件驱动,主动推送变化的方式。
技术选型
方法一:在S层解决。在增删改接口中,拼装业务变化,推送kafka消息。缺点很明显,需要对所有的接口进行遍历,并对业务代码进行侵入式的遍地开花修改。
方法二:在I层解决。对DB的binlog变化进行监听订阅。优点就是业务代码无须修改,缺点是,每条binlog变化是独立的事件,对于一次请求中的多个事物,会缺失数据间的关联关系,另外,binlog中,通过where条件进行的批量删除或者更新,只能得到where条件中的属性,其他的属性无法获取到。也没有补偿方式。
前两种方法的优缺点非常明显。
后来我们找到了第三种方法,在P层解决。通过Sqlalchemy的ORM事件机制,对指定的表的增删改动作进行监听,缓存起来,在请求结束时,以完整的消息以kafka方式推送出去。
ORM event代码示例
如下面代码所示,我们对credential表的增删改动作进行了hook监听,先尝试放置在了协程tmp信息中,如果事物回滚,则对tmp信息进行清理,事物成功提交后,再统一组装发出给kafka。在ORM中,增加to_kafka_msg转换方法,对敏感信息或者无用信息进行处理。
@event.listens_for(CredentialModel, 'after_delete') def add_kafka_msg_for_delete(mapper, connection, target): try: event_type = "delete_" + target.__tablename__ event_obj = target.to_kafka_msg() thread_information.set_tmp_notify_info(event_type, event_obj) except Exception: LOG.exception("add_kafka_msg_for_delete occured exception") @event.listens_for(CredentialModel, 'after_update') def add_kafka_msg_for_update(mapper, connection, target): try: event_type = "update_" + target.__tablename__ event_obj = target.to_kafka_msg() thread_information.set_tmp_notify_info(event_type, event_obj) except Exception: LOG.exception("add_kafka_msg_for_update occured exception") @event.listens_for(CredentialModel, 'after_insert') def add_kafka_msg_for_insert(mapper, connection, target): try: event_type = "create_" + target.__tablename__ event_obj = target.to_kafka_msg() thread_information.set_tmp_notify_info(event_type, event_obj) except Exception: LOG.exception("add_kafka_msg_for_insert occured exception ")
组装后的消息示例
{ 'create_etag_ex': [ [ { 'domain_id': u'03e94705fe0033440f15c0055655b360', 'etag_id': 10099 } ] ], 'delete_assignment_relation_ex': [ { 'description': None, 'extra': { }, 'created_at': datetime.datetime(2020, 1, 17, 1, 24, 45), 'target_id': u'baiweitest', 'domain_id': u'03e94705fe0033440f15c0055655b360', 'actor_id': u'0781e68ff8004b483f01c0056032f07f', 'type': u'GroupResource', 'id': u'46e0ed4924684055b987e7d29f742244', 'policy_id': u'b9996fa53eaf4c17971accc67db3e5df' } ] }
更多的ORM事件,可以参考资料https://docs.sqlalchemy.org/en/13/orm/events.html#session-events
- 点赞
- 收藏
- 关注作者
评论(0)