【知识图谱学习】知识图谱搭建医疗问答系统
发布时间
阅读量:
阅读量
通过知识图谱我们可以构建一个简单的医疗问答系统。在问答过程中可以方便的检索问题。
GitHub:https://github.com/cshmzin/zstp-project/tree/main/医疗机器人
实验环境
- neo4j数据库
- py_aho_corasick模块
简介
- 数据提取模块(从互联网获取数据)
- 知识图谱数据库构建模块(将数据清洗构建知识图谱)
- 节点匹配模块(匹配节点获取关系)
- 问题匹配模块(匹配问题构建查询)
- 回答构建模块(输出)

实验代码
数据集的构建
数据集不做展示,可在github提取。
我们构建如下7个实体和11个关系:
drugs = [] # 药品
foods = [] # 食物
checks = [] # 检查
departments = [] #科室
producers = [] #药品大类
diseases = [] #疾病
symptoms = []#症状
disease_infos = []#疾病信息
rels_department = [] # 科室-科室关系
rels_noteat = [] # 疾病-忌吃食物关系
rels_doeat = [] # 疾病-宜吃食物关系
rels_recommandeat = [] # 疾病-推荐吃食物关系
rels_commonddrug = [] # 疾病-通用药品关系
rels_recommanddrug = [] # 疾病-热门药品关系
rels_check = [] # 疾病-检查关系
rels_drug_producer = [] # 厂商-药物关系
rels_symptom = [] #疾病症状关系
rels_acompany = [] # 疾病并发关系
rels_category = [] # 疾病与科室之间的关系
然后将数据从json文件中提取出来:
- 疾病属性的获取:
def diseases_property(self,disease,data_json):
disease_dict = {}
disease_dict['name'] = disease
disease_dict['desc'] = '' # 描述
disease_dict['prevent'] = '' # 解决方法
disease_dict['cause'] = '' # 造成原因
disease_dict['get_prob'] = '' # 疾病发生率
disease_dict['easy_get'] = '' # 病病易发人群
disease_dict['cure_way'] = '' # 治疗方法
disease_dict['cure_lasttime'] = '' # 治疗时间
disease_dict['cured_prob'] = '' # 治疗成功率
if 'desc' in data_json:
disease_dict['desc'] = data_json['desc']
if 'prevent' in data_json:
disease_dict['prevent'] = data_json['prevent']
if 'cause' in data_json:
disease_dict['cause'] = data_json['cause']
if 'get_prob' in data_json:
disease_dict['get_prob'] = data_json['get_prob']
if 'easy_get' in data_json:
disease_dict['easy_get'] = data_json['easy_get']
if 'cure_way' in data_json:
disease_dict['cure_way'] = data_json['cure_way']
if 'cure_lasttime' in data_json:
disease_dict['cure_lasttime'] = data_json['cure_lasttime']
if 'cured_prob' in data_json:
disease_dict['cured_prob'] = data_json['cured_prob']
return disease_dict
- 获取实体及其关系(例子):
#构建科室及相关关系
if 'cure_department' in data_json:
cure_department = data_json['cure_department']
if len(cure_department) == 1: #只有一个表示无上下级
rels_category.append([disease, cure_department[0]])
if len(cure_department) == 2: #2个表示有上下级
rels_department.append([cure_department[1], cure_department[0]])
rels_category.append([disease, cure_department[1]])
departments += cure_department
- 创建node
def create_node(self,label,nodes):
for node_name in nodes:
node = Node(label, name=node_name)
self.link.create(node)
print(f'创建节点:{label},共{len(nodes)}个')
- 循环将全部数据创建实体节点
Drugs, Foods, Checks, Departments, Producers, Symptoms, Diseases, disease_infos = self.read_data()[0]
self.create_diseases_nodes(disease_infos)
self.create_node('Drug', Drugs)
self.create_node('Food', Foods)
self.create_node('Check', Checks)
self.create_node('Department', Departments)
self.create_node('Producer', Producers)
self.create_node('Symptom', Symptoms)
- 创建关系
def create_relationship(self, start_node, end_node, edges, rel_type, rel_name):
# 去重处理
edges = list(set([tuple(edge) for edge in edges]))
edges = [list(edge) for edge in edges]
for edge in edges:
p,q = edge
query = "match(p:%s),(q:%s) where p.name='%s'and q.name='%s' create (p)-[rel:%s{name:'%s'}]->(q)" % (start_node, end_node, p, q, rel_type, rel_name)
self.link.run(query)
- 循环将全部数据创建关系
rels_check, rels_recommandeat, rels_noteat, rels_doeat, rels_department, rels_commonddrug, rels_drug_producer, rels_recommanddrug,rels_symptom, rels_acompany, rels_category = self.read_data()[1]
self.create_relationship('Disease', 'Food', rels_recommandeat, 'recommand_eat', '推荐食谱')
self.create_relationship('Disease', 'Food', rels_noteat, 'no_eat', '忌吃')
self.create_relationship('Disease', 'Food', rels_doeat, 'do_eat', '宜吃')
self.create_relationship('Department', 'Department', rels_department, 'belongs_to', '属于')
self.create_relationship('Disease', 'Drug', rels_commonddrug, 'common_drug', '常用药品')
self.create_relationship('Producer', 'Drug', rels_drug_producer, 'drugs_of', '生产药品')
self.create_relationship('Disease', 'Drug', rels_recommanddrug, 'recommand_drug', '好评药品')
self.create_relationship('Disease', 'Check', rels_check, 'need_check', '诊断检查')
self.create_relationship('Disease', 'Symptom', rels_symptom, 'has_symptom', '症状')
self.create_relationship('Disease', 'Disease', rels_acompany, 'acompany_with', '并发症')
self.create_relationship('Disease', 'Department', rels_category, 'belongs_to', '所属科室')
数据集构建完成后可以查看如下图:

构建对话系统
在主类中我们构建了三个模块:
- QuestionClassifier() 【对问题进行分类,确定问题种类】
- QuestionPaser() 【对问题进行解析,获取相对应的查询语句】
- AnswerSearcher() 【通过查询语句搜索数据库,构造回答】
class ChatBotGraph:
def __init__(self):
self.classifier = QuestionClassifier()
self.parser = QuestionPaser()
self.searcher = AnswerSearcher()
def chat_main(self, sent):
answer = '。。。。。。。。。。'
res_classify = self.classifier.classify(sent)
if not res_classify:return answer
res_sql = self.parser.parser_main(res_classify)
final_answers = self.searcher.search_main(res_sql)
if not final_answers:return answer
else:return '\n'.join(final_answers)
问题分类
我们将问题分为如下几类,通过字段匹配和关键词匹配完成。
def classify(self, question):
data = {}
medical_dict = self.check_medical(question)
if medical_dict == {}:return {}
data['args'] = medical_dict
#收集问句当中所涉及到的实体类型
types = []
for type_ in medical_dict.values():
types += type_
question_types = []
# 症状
if self.check_words(self.symptom_qwds, question) and ('disease' in types):
question_type = 'disease_symptom'
question_types.append(question_type)
if self.check_words(self.symptom_qwds, question) and ('symptom' in types):
question_type = 'symptom_disease'
question_types.append(question_type)
# 原因
if self.check_words(self.cause_qwds, question) and ('disease' in types):
question_type = 'disease_cause'
question_types.append(question_type)
# 并发症
if self.check_words(self.acompany_qwds, question) and ('disease' in types):
question_type = 'disease_acompany'
question_types.append(question_type)
# 推荐食品
if self.check_words(self.food_qwds, question) and 'disease' in types:
deny_status = self.check_words(self.deny_words, question)
if deny_status:
question_type = 'disease_not_food'
else:
question_type = 'disease_do_food'
question_types.append(question_type)
。。。。。。。。。。。。。。。。。。。。。
问题解析
我们将分类后来问题进行解析,为每一类问题构建相应的查询代码:
for question_type in question_types:
sql_ = {}
sql_['question_type'] = question_type
sql = []
if question_type == 'disease_symptom':
sql = self.sql_transfer(question_type, entity_dict.get('disease'))
elif question_type == 'symptom_disease':
sql = self.sql_transfer(question_type, entity_dict.get('symptom'))
elif question_type == 'disease_cause':
sql = self.sql_transfer(question_type, entity_dict.get('disease'))
elif question_type == 'disease_acompany':
sql = self.sql_transfer(question_type, entity_dict.get('disease'))
。。。。。。。。。。。。
回答构造
根据对应的qustion_type,调用相应的回复模板
def answer_prettify(self, question_type, answers):
final_answer = []
if not answers:
return ''
if question_type == 'disease_symptom':
desc = [i['n.name'] for i in answers]
subject = answers[0]['m.name']
final_answer = '{0}的症状包括:{1}'.format(subject, ';'.join(list(set(desc))[:self.num_limit]))
elif question_type == 'symptom_disease':
desc = [i['m.name'] for i in answers]
subject = answers[0]['n.name']
final_answer = '症状{0}可能染上的疾病有:{1}'.format(subject, ';'.join(list(set(desc))[:self.num_limit]))
elif question_type == 'disease_cause':
desc = [i['m.cause'] for i in answers]
subject = answers[0]['m.name']
final_answer = '{0}可能的成因有:{1}'.format(subject, ';'.join(list(set(desc))[:self.num_limit]))
elif question_type == 'disease_prevent':
desc = [i['m.prevent'] for i in answers]
subject = answers[0]['m.name']
final_answer = '{0}的预防措施包括:{1}'.format(subject, ';'.join(list(set(desc))[:self.num_limit]))
。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
测试

全部评论 (0)
还没有任何评论哟~
