图数据库实践--COVID-19患者轨迹追溯
图数据库实践–COVID-19患者轨迹追溯
背景
COVID-19 大流行的形势依然很严峻,新冠疫情确诊患者的轨迹信息成为疫情发展过程中大众关注的焦点,政府部门也陆续公开了部分确诊患者的非隐私信息,这部分数据为相关研究人员研究疫情的传播与防控提供了重要的数据支撑。
然而,公布的数据多为文本等非结构化数据,而且极其分散,难以直接为后续研究提供深度的支撑。患者的轨迹信息蕴含居家、出行、餐饮等丰富的接触关系,在新冠病毒人传人的特性下,如果能直观地展示这些接触信息,相信能对疫情的研究提供很大的帮助。
基于此,我们利用相关技术手段从公开的病患轨迹数据抽取了病患相关的基本信息(年龄、性别等)、轨迹、病患关系等数据,并利用图数据库技术对其进行研究,尝试为政府相关部门对疫情的传播与防控工作提供有效的决策支撑。
数据建模
首先我们需要构建新冠疫情轨迹数据的图数据模型。图数据模型中的实体包括患者、轨迹点、交通工具等,实体关系包括居住于、逗留过等,具体的图模型见下图:
图模型中包含了四类点和六类边,元信息说明如下:
数据集
我们采集了近期“旅行团疫情”公开的部分轨迹文本数据,根据设计的图模型对数据格式进行转换,转换后的数据可从此处下载。这份数据包含800+个点,1300+条边。
创图
我们下面将使用华为云图数据库 GES 对以上数据集进行探索和演示,我们需要先在 GES 中创图并将以上数据集导入,详细指导流程可参见华为图引擎文档-快速入门和华为云图引擎服务 GES 实战——创图。
使用 GES 查询的预置条件
首先通过moxing包从对象存储服务obs中下载ges4jupyter。ges4jupyter是jupyter连接GES服务的工具文件。文件中封装了使用 GES 查询的预置条件,包括配置相关参数和对所调用 API 接口的封装,如果你对这些不感兴趣,可直接运行而不需要了解细节,这对理解后续具体查询没有影响。
import moxing as mox
mox.file.copy('obs://obs-aigallery-zc/GES/ges4jupyter/beta/ges4jupyter.py', 'ges4jupyter.py')
mox.file.copy('obs://obs-aigallery-zc/GES/ges4jupyter/beta/ges4jupyter.html', 'ges4jupyter.html')
GESConfig的参数都是与调用 GES 服务有关的参数,依次为“公网访问地址”、“项目ID”、“图名”、“终端节点”、“IAM 用户名”、“IAM 用户密码”、“IAM 用户所属账户名”、“所属项目”,其获取方式可参考调用 GES 服务业务面 API 相关参数的获取。这里通过read_csv_config方法从配置文件中读取这些信息。如果没有配置文件,可以根据自己的需要补充下列字段。对于开启了https安全模式的图实例,参数port的值为443。
from ges4jupyter import GESConfig, GES4Jupyter, read_csv_config
eip = ''
project_id = ''
graph_name = ''
iam_url = ''
user_name = ''
password = ''
domain_name = ''
project_name = ''
port = 80
eip, project_id, graph_name, iam_url, user_name, password, domain_name, project_name, port = read_csv_config('cn_north_4_graph.csv')
config = GESConfig(eip, project_id, graph_name,
iam_url = iam_url,
user_name = user_name,
password = password,
domain_name = domain_name,
project_name = project_name,
port = port)
ges_util = GES4Jupyter(config, True);
创建好图之后,就可以对其进行查询和分析了。
GES 支持 cypher和gremlin 两种查询语言,这里的查询示例以cypher举例。
在使用 cypher 查询之前,我们先创建点索引和边索引(可直接点击GES画布右下角“创建索引”按钮)。
print('开始创建点索引:')
job_id = ges_util.build_vertex_index()
job_result = ges_util.get_job(job_id)
if 'errorCode' not in job_result:
for i in range(100):
if job_result['status'] == 'success':
break
else:
time.sleep(1)
job_result = ges_util.get_job(job_id)
print('点索引创建完成')
print('开始创建边索引:')
job_id = ges_util.build_edge_index()
job_result = ges_util.get_job(job_id)
if 'errorCode' not in job_result:
for i in range(100):
if job_result['status'] == 'success':
break
else:
time.sleep(1)
job_result = ges_util.get_job(job_id)
print('边索引创建完成')
查询演示
首先查询这份图数据的统计信息,对全图的点边数目有所了解:
import json
print('统计信息:')
result = ges_util.summary()
format_result = json.dumps(result, indent=4)
print(format_result)
使用查询语言获取前10条边以及其顶点,对数据有个粗略的了解(建议使用nodebook打开并连接GES服务,可以看到运行后数据可视化的效果):
cypher_result = ges_util.cypher_query("match (n)-[r]->(m) return n,r,m limit 10",formats=['row','graph']);
ges_util.format_cypher_result(cypher_result)
对于数据中的城市,采集了部分城市的防疫政策。了解其他城市的防疫政策,有助于规划出行。
print('查看各个城市的防疫政策:')
statement = "match (m:city) return id(m), m.防疫政策"
result = ges_util.cypher_query(statement)
format_result = ges_util.format_cypher_result(result)
format_result
在现实中,随着采集到更多的数据,不免对原有的数据进行更新,比如新发现关联患者,就涉及到节点和边的增加操作。
假设北京新增病例40,其与原有北京病例39是亲属关系。
print('数据更新:')
print('先查询点是否存在:')
vertex_id = '北京病例40'
statement = "match (p) where id(p) ="' + vertex_id + '" return p"
result = ges_util.cypher_query(statement)
if len(result) == 0:
print('该节点不存在,增加该节点:')
label = 'patient'
property_name = '性别'
value = '男'
ges_util.add_vertex(vertex_id, label, property_name, value)
print('再次查询该节点:')
statement = "match (p) where id(p) ='" + vertex_id + "' return p"
result = ges_util.cypher_query(statement)
format_vertex_detail = json.dumps(result[0]['data'], indent=4, ensure_ascii=False)
print(format_vertex_detail)
print('增加关联边:')
source = '北京病例39'
target = '北京病例40'
label = 'relation'
property_name = '类型'
value = '家人'
ges_util.add_edge(source, target, label, property_name, value)
statement = "match (p)-[r]-(m) where id(p)='" + source + "' and id(m)='" + target + "' return r"
result = ges_util.cypher_query(statement)
format_edge_detail = json.dumps(result[0]['data'], indent=4, ensure_ascii=False)
print(format_edge_detail)
else:
print('该节点已存在')
对数据做宏观把控,往往有助于判断疫情趋势。可以对数据从不同维度做统计。
下面会从空间(城市地区风险)、时间(增长趋势)、年龄(患者年龄分布)等维度做统计。
首先通过查询语句查看本轮疫情已经波及到的城市及每个城市疫情的严重程度。
print('查看有确诊病例的城市并按确诊人数排序:')
statement = "match (m:city)<-[r:comfirm]-(p:patient) with m, count(p) as patientNum return id(m), patientNum order by patientNum desc"
result = ges_util.cypher_query(statement)
format_result = ges_util.format_cypher_result(result)
format_result
目前中高风险区域的判断,是以区域确诊病例数目划分的。统计不同地域的确诊病例数目,可以判断各个区域的风险。
print('将图数据库中的轨迹点,按涉及的确诊病例数目多少进行统计,并从按病例数目从多到少排序:')
statement = "match (m:place)<-[s:stay]-(p:patient) with m, count(p) as patientNum return id(m), patientNum order by patientNum desc limit 10"
result = ges_util.cypher_query(statement)
format_result = ges_util.format_cypher_result(result)
format_result
此外对此轮疫情患者的年龄做一个调查,以了解此轮疫情中病毒传播的特点。
print('查看全国患者年龄构成并按人数排序:')
statement = "match (p:patient) where p.年龄 is not null return p.年龄, count(*) as m order by m desc limit 10"
result = ges_util.cypher_query(statement)
format_result = ges_util.format_cypher_result(result)
format_result
从时间维度统计确诊病例数目,往往会反映疫情的趋势,疫情是否得到了有效的控制。
city_id = '石家庄市'
print('查看{}每天确诊人数:'.format(city_id))
statement = "match (p:patient)-[r:comfirm]->(m:city) where id(m)='" + city_id + "' return r.时间, count(*) order by r.时间 asc"
result = ges_util.cypher_query(statement)
format_result = ges_util.format_cypher_result(result)
format_result
除了宏观上的统计,我们会关注重点区域(如景区、机场)等,是否和确诊病例发生交集。
statement = "match (n)-[r]-(m) where id(n)='胡杨林景区' return n,r,m"
result = ges_util.cypher_query(statement,formats=['row','graph'])
ges_util.format_cypher_result(result)
此外,我们可以对重点人员进行探索,分析其接触史,查询与该患者有直接关联关系的其他患者或地点等,探查可能的传播路径和圈定可疑人群。
statement = "match (n)-[r]-(m) where id(n)='额济纳旗病例1' return n,r,m"
result = ges_util.cypher_query(statement,formats=['row','graph'])
ges_util.format_cypher_result(result)
patient_id = '额济纳旗病例1'
print('查看{}的活动轨迹:'.format(patient_id))
statement = "match (p:patient)-[r]->(m:place) where id(p)='" + patient_id + "' return id(p), r.时间, type(r), id(m) order by r.时间 asc"
result = ges_util.cypher_query(statement)
format_result = ges_util.format_cypher_result(result)
format_result
patient_id = '额济纳旗病例1'
print('查看与{}有直接关联的患者:'.format(patient_id))
print('要么同时到过某地点:')
statement = "match path = (p:patient)-[r1]-(m:place)-[r2]-(n:patient) where id(p)='" + patient_id + "' and id(p) <> id(n) and r1.时间 = r2.时间 return id(p), id(m), id(n), r1.时间, path"
result = ges_util.cypher_query(statement,formats=['row','graph'])
format_result = ges_util.format_cypher_result(result)
format_result
print('要么是某种亲密关系:')
patient_id = '额济纳旗病例1'
statement = "match path = (p:patient)-[r]-(n:patient) where id(p)='" + patient_id + "' return id(p), id(n), r.类型, path"
result = ges_util.cypher_query(statement,formats=['row','graph'])
format_result = ges_util.format_cypher_result(result)
format_result
此轮疫情源头是在额济纳旗,那么疫情又是怎么传到北京的呢?
statement = "match path=(p:patient)-[]-(m:city) where id(m)='额济纳旗' and not (tostring(id(p)) contains '额济纳旗') return path"
statement += " union match path=(p:patient)-[]-(m:city) where id(m)='北京' and not (tostring(id(p)) contains '北京') return path"
result = ges_util.cypher_query(statement,formats=['row','graph'])
ges_util.format_cypher_result(result)
可以枚举所有在北京确诊的患者,通过最短路算法,检查其与额济纳旗之间的关系。
import copy
statement = "match (p:patient)-[r:comfirm]-(m:city) where id(m)='北京' return collect(distinct id(p))"
result = ges_util.cypher_query(statement)
print('北京所有的患者:')
vertex_list = result["results"][0]['data'][0]['row'][0]
print(vertex_list)
print('查看此轮疫情入京可能的传播链:')
source = '额济纳旗'
chain_vid = []
for vid in vertex_list:
target = vid
avoid_ids = copy.deepcopy(vertex_list)
avoid_ids.remove(vid)
result = ges_util.filtered_shortest_path(source, target, avoid_ids)
if len(result) != 0:
chain_vid.append(target)
path = ''
for vtx_id in result:
path = path + vtx_id + '-'
print(path[:-1])
可以对可能传播链上的节点进行一跳查询,分析传播链之间的关系。
result = ges_util.cypher_query('match p=(n)--(m) where id(n) in $idlist and not (m:city) and not(m:patient and not(id(m) in $idlist)) return id(n),p',formats=['row','graph'], param={"idlist":chain_vid})
ges_util.format_cypher_result(result)
我们分析了此轮疫情入京可能的传播链,会发现病例24和25是一同去额济纳旗旅游,而病例33、34、35、36和37也是一起去额济纳旗旅游的,另外病例39是因为和外省确诊患者同乘车被感染的,我们等于得到了三条独立的传播链。
我们已经知道了疫情的源头(额济纳旗),通过路径搜索功能,可以探查某个患者可能的感染路径。
vertex_id = '北京病例2'
print('查看{}可能的感染路径:'.format(vertex_id))
result = ges_util.path_query({
"repeat": [
{
"operator": "bothV",
"vertex_filter": {
"property_filter": {
"leftvalue": {
"id": ""
},
"predicate": "NOTIN",
"rightvalue": {
"value": ["北京"]
}
}
}
}
],
"until": [
{
"vertex_filter": {
"property_filter": {
"leftvalue": {
"id": ""
},
"predicate": "=",
"rightvalue": {
"value": ["额济纳旗"]
}
}
}
}
],
"times": 5,
"queryType": "Tree",
"vertices": ["北京病例2"]
})
ges_util.format_path_query(result)
import time
print('连通性分析:')
job_id = ges_util.connected_component()
result = ges_util.get_job(job_id)
if 'errorCode' not in result:
for i in range(1000):
if result['status'] == 'success':
break
else:
time.sleep(1)
result = ges_util.get_job(job_id)
com_dict = {}
for v_dict in result['data']['outputs']['community']:
for key, value in v_dict.items():
statement = "match (p) where id(p)='" + key + "' return labels(p)"
v_label = ges_util.cypher_query(statement)['results'][0]['data'][0]['row'][0]
if v_label in ['city', 'patient']:
com_dict.setdefault(value, []).append(key)
print('连通分支个数 : {}'.format(len(com_dict)))
for key, value in com_dict.items():
print('连通分支 ' + key + ' 中的点(仅关注城市和患者):')
print(value)
通过连通性分析,我们暂未发现此轮大连的疫情与额济纳旗有什么关联,可能来自不同的源头。
- 点赞
- 收藏
- 关注作者
评论(0)