天池下的瑞金医院MMC人工智能辅助构建知识图谱
前言
知识图谱是个很大的概念,可惜我没数据,借用瑞金医院的数据集,来谈下命名识别。
数据说明
数据使用 brat 进行标注,每个 .txt 文件对应一个 .ann 标注文件。
txt文件对应一篇糖尿病下的论文,
ann文件有3列,以 \t 分隔,第一列为实体编号,第二列为实体类别,第三列为实体位置信息。实体位置信息共3列, 以空格分隔,分别代表实体的开始位置,结束位置,实体文本。
问题
这里我引用冠军队伍的代码,他们当时所面临的问题如下:
(1)他们是对一篇文章去做实体标注,文章的字数可能很长(几千到上万字),不可能直接输入到一个 RNN 中;
(2)样本中文章可能由于格式转换的一些原因,没有一个很好的句子边界,甚至一个词汇当中存在换行符 \n 或者句号 的情况,因此用换行 符或者句号去切割句子不一定合适。
(3)如果按照固定窗口大小的滑动窗口去切句子,刚好把一个实体切分成2个部分怎么办?
中文文本,面临是否要分词的选择;
下面是他们的解决方案:
网络模型和效果展示
网络模型为了便于上下文的关联采用了双向的lstm,为了使滑动的时候不丢到相关联的词语采用了一层CRF,作为最后最后一层的预测。
代码
代码主要分为三个部分,实体的定义和处理、句子的切分和处理、模型的搭建,除此之外还有预测评估的部分
实体的定义和处理
class Entity(object): def __init__(self, ent_id, category, start_pos, end_pos, text): self.ent_id = ent_id self.category = category self.start_pos = start_pos self.end_pos = end_pos self.text = text def __gt__(self, other): return self.start_pos > other.start_pos def offset(self, offset_val): return Entity(self.ent_id, self.category, self.start_pos + offset_val, self.end_pos + offset_val, self.text) def __repr__(self): return '({}, {}, ({}, {}), {})'.format(self.ent_id, self.category, self.start_pos, self.end_pos, self.text)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
class Entities(object): def __init__(self, ents): self.ents = sorted(ents) self.ent_dict = dict(zip([ent.ent_id for ent in ents], ents)) def __getitem__(self, key): if isinstance(key, int) or isinstance(key, slice): return self.ents[key] else: return self.ent_dict.get(key, None) def offset(self, offset_val): ents = [ent.offset(offset_val) for ent in self.ents] return Entities(ents) def vectorize(self, vec_len, cate2idx): res_vec = np.zeros(vec_len, dtype=int) for ent in self.ents: res_vec[ent.start_pos: ent.end_pos] = cate2idx[ent.category] return res_vec def find_entities(self, start_pos, end_pos): res = [] for ent in self.ents: if ent.start_pos > end_pos: break sp, ep = (max(start_pos, ent.start_pos), min(end_pos, ent.end_pos)) if ep > sp: new_ent = Entity(ent.ent_id, ent.category, sp, ep, ent.text[:(ep - sp)]) res.append(new_ent) return Entities(res) def merge(self): merged_ents = [] for ent in self.ents: if len(merged_ents) == 0: merged_ents.append(ent) elif (merged_ents[-1].end_pos == ent.start_pos and merged_ents[-1].category == ent.category): merged_ent = Entity(ent_id=merged_ents[-1].ent_id, category=ent.category, start_pos=merged_ents[-1].start_pos, end_pos=ent.end_pos, text=merged_ents[-1].text + ent.text) merged_ents[-1] = merged_ent else: merged_ents.append(ent) return Entities(merged_ents)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
句子的切分和处理
data_dir = 'ruijin_round1_train2_20181022/'
ent2idx = dict(zip(ENTITIES, range(1, len(ENTITIES) + 1)))
idx2ent = dict([(v, k) for k, v in ent2idx.items()])
# print(idx2ent)
docs = Documents(data_dir=data_dir)
# ShuffleSplit() 随机排列交叉验证,生成一个用户给定数量的独立的训练/测试数据划分。样例首先被打散然后划分为一对训练测试集合。
# n_splits:划分训练集、测试集的次数,默认为10
# test_size: 测试集比例或样本数量,
# random_state:随机种子值,默认为None,可以通过设定明确的random_state,使得伪随机生成器的结果可以重复。
rs = ShuffleSplit(n_splits=1, test_size=20, random_state=2018)
train_doc_ids, test_doc_ids = next(rs.split(docs))
train_docs, test_docs = docs[train_doc_ids], docs[test_doc_ids]
num_cates = max(ent2idx.values()) + 1
sent_len = 64
vocab_size = 3000
emb_size = 100
sent_pad = 10
sent_extrator = SentenceExtractor(window_size=sent_len, pad_size=sent_pad)
train_sents = sent_extrator(train_docs)
test_sents = sent_extrator(test_docs)
train_data = Dataset(train_sents, cate2idx=ent2idx)
train_data.build_vocab_dict(vocab_size=vocab_size)
test_data = Dataset(test_sents, word2idx=train_data.word2idx, cate2idx=ent2idx)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
class Sentence(object): """ 定义被切分的句子的类: text:句子的文本 doc_id:句子所述文档id offset:句子相对文档的偏移距离 ents:句子包含的实体列表 """ def __init__(self, doc_id, offset, text, ents): self.text = text self.doc_id = doc_id self.offset = offset self.ents = ents def __repr__(self): """ 内部魔法函数:以text显示类 :return: """ return self.text def __gt__(self, other): #内部魔法函数:按类的offset偏移距离对类进行排序 return self.offset > other.offset def __getitem__(self, key): """ 内部魔法函数:预测结果评估时,去除句子两端延申的部分 :param key: :return: """ if isinstance(key, int): return self.text[key] if isinstance(key, slice): text = self.text[key] start = key.start or 0 stop = key.stop or len(self.text) if start < 0: start += len(self.text) if stop < 0: stop += len(self.text) #改变实体相对于句子的偏移距离 ents = self.ents.find_entities(start, stop).offset(-start) #改变句子相对于文档的偏移距离 offset = self.offset + start return Sentence(self.doc_id, offset, text, ents) def _repr_html_(self): """ 内部函数:网页显示不同的实体以不同的颜色区分 :return: """ ents = [] for ent in self.ents: ents.append({'start': ent.start_pos, 'end': ent.end_pos, 'label': ent.category}) ex = {'text': self.text, 'ents': ents, 'title': None, 'settings': {}} return displacy.render(ex, style='ent', options={'colors': COLOR_MAP}, manual=True, minify=True)
class SentenceExtractor(object): #句子切分器,窗口为windows,两端分别延申pad_size def __init__(self, window_size=50, pad_size=10): self.window_size = window_size self.pad_size = pad_size def extract_doc(self, doc): #句子切分函数,切分的时候注意每个切分的句子相对于文档的偏移距离,预测的时候还需要还原 num_sents = math.ceil(len(doc.text) / self.window_size) doc = doc.pad(pad_left=self.pad_size, pad_right=num_sents * self.window_size - len(doc.text) + self.pad_size) sents = [] for cur_idx in range(self.pad_size, len(doc.text) - self.pad_size, self.window_size): sent_text = doc.text[cur_idx - self.pad_size: cur_idx + self.window_size + self.pad_size] ents = [] for ent in doc.ents.find_entities(start_pos=cur_idx - self.pad_size, end_pos=cur_idx + self.window_size + self.pad_size): ents.append(ent.offset(-cur_idx + self.pad_size)) sent = Sentence(doc.doc_id, offset=cur_idx - 2 * self.pad_size, text=sent_text, ents=Entities(ents)) sents.append(sent) return sents def __call__(self, docs): #内部函数:将类当成函数形式的调用 sents = [] for doc in docs: sents += self.extract_doc(doc) return sents
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
模型的构建
def build_lstm_crf_model(num_cates, seq_len, vocab_size, model_opts=dict()): opts = { 'emb_size': 256, 'emb_trainable': True, 'emb_matrix': None, 'lstm_units': 256, 'optimizer': keras.optimizers.Adam() } opts.update(model_opts) input_seq = Input(shape=(seq_len,), dtype='int32') if opts.get('emb_matrix') is not None: embedding = Embedding(vocab_size, opts['emb_size'], weights=[opts['emb_matrix']], trainable=opts['emb_trainable']) else: embedding = Embedding(vocab_size, opts['emb_size']) x = embedding(input_seq) lstm = LSTM(opts['lstm_units'], return_sequences=True) x = Bidirectional(lstm)(x) crf = CRF(num_cates, sparse_target=True) output = crf(x) model = Model(input_seq, output) model.compile(opts['optimizer'], loss=crf.loss_function, metrics=[crf.accuracy]) return model
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
代码和数据集:
我把代码和数据集打包了
链接:https://pan.baidu.com/s/1mvjPuoGRChTpIqCYrLB6VA
提取码:z9tz
复制这段内容后打开百度网盘手机App,操作更方便哦–来自百度网盘超级会员V3的分享
文章来源: blog.csdn.net,作者:快了的程序猿小可哥,版权归原作者所有,如需转载,请联系作者。
原文链接:blog.csdn.net/qq_35914625/article/details/109307361
- 点赞
- 收藏
- 关注作者
评论(0)