基于Sqlalchemy的ORM event机制做event sourcing

举报
老白 发表于 2020/01/17 11:28:31 2020/01/17
【摘要】 背景 当前我所在的产品的缓存消息采用的是Etag标签机制,以Domain为粒度,由缓存读取服务通过主动拉取Etag的方式,对缓存和DB中的消息进行比较。颗粒太大,效率低下。所以我们需要一种以更细的粒度的,通过事件驱动,主动推送变化的方式。技术选型 方法一:在S层解决。在增删改接口中,拼装业务变化,推送kafka消息。缺点很明显,需要对所有的接口进行遍...

背景

             当前我所在的产品的缓存消息采用的是Etag标签机制,以Domain为粒度,由缓存读取服务通过主动拉取Etag的方式,对缓存和DB中的消息进行比较。

颗粒太大,效率低下。所以我们需要一种以更细的粒度的,通过事件驱动,主动推送变化的方式。


技术选型


          方法一:在S层解决。在增删改接口中,拼装业务变化,推送kafka消息。缺点很明显,需要对所有的接口进行遍历,并对业务代码进行侵入式的遍地开花修改。


          方法二:在I层解决。对DB的binlog变化进行监听订阅。优点就是业务代码无须修改,缺点是,每条binlog变化是独立的事件,对于一次请求中的多个事物,会缺失数据间的关联关系,另外,binlog中,通过where条件进行的批量删除或者更新,只能得到where条件中的属性,其他的属性无法获取到。也没有补偿方式。

          前两种方法的优缺点非常明显。


         后来我们找到了第三种方法,在P层解决。通过Sqlalchemy的ORM事件机制,对指定的表的增删改动作进行监听,缓存起来,在请求结束时,以完整的消息以kafka方式推送出去。


image.png

         

 

ORM event代码示例

       如下面代码所示,我们对credential表的增删改动作进行了hook监听,先尝试放置在了协程tmp信息中,如果事物回滚,则对tmp信息进行清理,事物成功提交后,再统一组装发出给kafka。在ORM中,增加to_kafka_msg转换方法,对敏感信息或者无用信息进行处理。

@event.listens_for(CredentialModel, 'after_delete')
def add_kafka_msg_for_delete(mapper, connection, target):
    try:
        event_type = "delete_" + target.__tablename__
        event_obj = target.to_kafka_msg()
        thread_information.set_tmp_notify_info(event_type, event_obj)
    except Exception:
        LOG.exception("add_kafka_msg_for_delete occured exception")


@event.listens_for(CredentialModel, 'after_update')
def add_kafka_msg_for_update(mapper, connection, target):
    try:
        event_type = "update_" + target.__tablename__
        event_obj = target.to_kafka_msg()
        thread_information.set_tmp_notify_info(event_type, event_obj)
    except Exception:
        LOG.exception("add_kafka_msg_for_update occured exception")


@event.listens_for(CredentialModel, 'after_insert')
def add_kafka_msg_for_insert(mapper, connection, target):
    try:
        event_type = "create_" + target.__tablename__
        event_obj = target.to_kafka_msg()
        thread_information.set_tmp_notify_info(event_type, event_obj)
    except Exception:
        LOG.exception("add_kafka_msg_for_insert occured exception ")

组装后的消息示例

{
	'create_etag_ex': [
		[
			{
				'domain_id': u'03e94705fe0033440f15c0055655b360',
				'etag_id': 10099
			}
		]
	],
	'delete_assignment_relation_ex': [
		{
			'description': None,
			'extra': {
			},
			'created_at': datetime.datetime(2020, 1, 17, 1, 24, 45),
			'target_id': u'baiweitest',
			'domain_id': u'03e94705fe0033440f15c0055655b360',
			'actor_id': u'0781e68ff8004b483f01c0056032f07f',
			'type': u'GroupResource',
			'id': u'46e0ed4924684055b987e7d29f742244',
			'policy_id': u'b9996fa53eaf4c17971accc67db3e5df'
		}
	]
}

      更多的ORM事件,可以参考资料https://docs.sqlalchemy.org/en/13/orm/events.html#session-events



【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。