当前位置: 首页>后端>正文

django实现问答系统 python问答系统

目录

1、效果预览

2、KBQA介绍

3、KBQA实现

3.1、问答系统设计

3.2、使用python链接Fuseki

3.2、分词实现

3.2.1、实体词处理

3.2.2、分词逻辑的实现

3.3、查询实现

3.3.1、单实体查询

3.3.2、多实体查询

4、业务逻辑的整合实现

5、一些补充

6、参考


        本篇紧随之前的七篇文章,讲述了建立了知识图谱后,不希望仅仅在可视化和抽象推理的方向上得到应用,同时扩充考虑将其应用在问答系统方向;本篇属于一时兴起,写了一个简单的问答系统,通过使用自然语言处理技术,解析匹配用户问题意图,完成后生成查询模板,在知识库里进行检索,从而反馈最佳答案。

        在代码逻辑实现上,考虑到工时资源等信息,因此没有完全按照网上KBQA最常见的REfO的精准匹配去写实现,也没有使用DL技术做模型,而是使用了传统的相似度计算做模糊匹配,因此代码量少很多,算是本篇的特色,应该网上找不到和我代码一样的版本。

代码依赖:

python : 3.6+,(库:SPARQL、jieba、macropodus)

fuseki + jena (同本系列的版本)

1、效果预览

人物问答预览:

django实现问答系统 python问答系统,django实现问答系统 python问答系统_django实现问答系统,第1张

 电影问答预览:

django实现问答系统 python问答系统,django实现问答系统 python问答系统_词性_02,第2张

单实体的条件问答:

django实现问答系统 python问答系统,django实现问答系统 python问答系统_django实现问答系统_03,第3张

多实体问答:

django实现问答系统 python问答系统,django实现问答系统 python问答系统_自然语言处理_04,第4张

2、KBQA介绍

        KBQA:Knowledge Base Question Answer,由于我们将知识提取并生成了结构化数据,建立了知识库,因此在应用方向上,通过自然语言问答,通过对问题的解析和推理,利用知识库进行问答查询和分析,从而得到答案,这就是KBQA。

        详细可以阅读,这位博主在知识普及上写的更好:揭开知识库问答KB-QA的面纱1·简介篇。

3、KBQA实现

        整体代码工程后续回传到Github上,地址后续补充,项目结构简单定义为:

django实现问答系统 python问答系统,django实现问答系统 python问答系统_KBQA_05,第5张

3.1、问答系统设计

        

django实现问答系统 python问答系统,django实现问答系统 python问答系统_KBQA_06,第6张

        上图是这个系统的简单架构示意,其中底层是数据层,主要是使用了TDB三元数据库,存储知识数据;倒数第二层是通信层,主要是依赖Jena+Fuseki的框架,包括规则推理机的配置等;倒数第三层是应用实现层,主要包括Fuseki的客户端实现,NLP自然语言处理,查询模板的生成应用实现等;最顶层是业务层,主要是对用户输入做提取,结果做解析展示的业务交互层,通过这四层的定义,实现我们的整体项目。

        仅仅只有架构,是不足以完善我们的想法的,因此,我们基于某种产品化的考虑,考虑设计完善的业务流程,从用户接入使用,到数据解析,分析以及反馈系统,从而形成一个较为完整的闭环应用体系,因此,设计如下业务框架:

django实现问答系统 python问答系统,django实现问答系统 python问答系统_知识图谱_07,第7张

        如上图所展示,考虑到一个完整的问答系统,必定不能百分百考虑到所有情况,因此,我们需要在业务处理上,考虑当遇到一些疑难杂症的时候,数据应该流转到哪些新模块去做处理,这样才有可能在未来不断地提高KBQA系统的正确率,上图便考虑增加新词发现,新问题发现来帮助系统完善应用广度,增加意图理解来完善深度,而考虑到需要增加KG和模板,那么就需要使用人工加机器二合一的答案判别和数据分析系统模块,这样逐渐形成一个完善的生产系统。我这里仅实现黄色部分,右边的没空实现,不过可以给大家提供一个不错的思路。

3.2、使用python链接Fuseki

利用SPARQLWrapper的接口实现对feseki的调用,测试和使用前记得启动fuseki的服务器。

代码文件fuseki_query.py内容如下:

from SPARQLWrapper import SPARQLWrapper,JSON

class Fuseki:
    def __init__(self,url ='http://localhost:3030/kg_movie/query'):
        self.sparql = SPARQLWrapper(url)
        print('sparql init successful ...')

    def query(self, query_str):
        self.sparql.setQuery(query_str)
        self.sparql.setReturnFormat(JSON)
        result = self.sparql.query().convert()
        return result

    def query_str_build(self,str):
        pass


def test():
    fuseki = Fuseki()
    query_str = r'''
     PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> 
     PREFIX : <http://www.kg_movie.com#>   
     SELECT * WHERE {   
     ?x rdf:type :Comedian.   
     ?x :actor_chName ?n.   
     } LIMIT 10
    '''

    query_str = r'''
    PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> 
     PREFIX : <http://www.kg_movie.com#>   
     SELECT * WHERE {
            ?x :actor_chName '周星驰'.
            ?x :actor_bio ?n.
            } LIMIT 10
            '''
    res = fuseki.query(query_str)
    print(res)

if __name__ == '__main__':
    test()

核心在class Fuseki这里,后续会被调用到。

3.2、分词实现

3.2.1、实体词处理

        首先要导出存储在mysql中的实体词,导出其中的movie和actor两张表的chName列,可以导出成txt:

导出后效果如下:

django实现问答系统 python问答系统,django实现问答系统 python问答系统_自然语言处理_08,第8张

django实现问答系统 python问答系统,django实现问答系统 python问答系统_词性_09,第9张

需要导出这两个文本的原因:

        考虑到对用户话语进行关键词提取和分词,对于我们最关心的热点词,担心分词效果不好,因此将这两个实体数据提取出来,进行特殊词性标注,这样再分词环节就很容易提取出常用热点词。

        我们对这两个文件进行词性标注,打上对应的词性,有关词性的解释,可以看jieba的链接:

jieba库的GitHub地址

        这里分别使用到人名词性nr,专用词性nz。

处理actor.txt和movie.txt文件,代码文件process_word_dict.py:

from pprint import  pprint
def append_words_in_lines(file_name,append_str):
    n_dict = []
    with open(file_name+'.txt', mode='r+' ,encoding='utf-8') as f:
        res = f.readlines()
        # pprint(res)
        for word in res:
            n_dict.append(word[:-1]+append_str)
    with open(file_name+"_dict.txt", mode='w+', encoding='utf-8') as f:
        for word in n_dict:
            f.write(word+'\n')

        # print(n_dict)



if __name__ == '__main__':
    append_words_in_lines('actor', ' nr')
    append_words_in_lines('movie', ' nz')

运行一下即可,完成后生成actor_dict.txt和movie_dict.txt文件,内容如下图:

django实现问答系统 python问答系统,django实现问答系统 python问答系统_KBQA_10,第10张

django实现问答系统 python问答系统,django实现问答系统 python问答系统_KBQA_11,第11张

3.2.2、分词逻辑的实现

        分词我们调用了国内非常有名的开源库,jieba库,这里对齐进行了简单封装,根据我们的需求,做以下步骤:

        A.常用词字典的加载,加载actor_dict.txt和movie_dict.txt;

        B.将一些常见连续词按照需求给强制切分开,例如,“喜剧演员”会有很大几率是一个词,我们电影图谱,“喜剧”和“演员”分别都是对应关键实体的属性和实体类型,因此需要强制切分开,当然,有一些情况可以不切分,这个需要用业务逻辑来判断。

        C.分词和词性标注,不但将词且分开,也要把词性记录下来。

整体代码如下文件nlp.py:

# encoding=utf-8
import jieba
import jieba.posseg as pseg
from pprint import pprint

class TextureProcess:
    def __init__(self):
        self.jieba = jieba
        self.pseg = pseg

    # 加载词典
    def load_dicts(self, *path):
        for p in path:
            self.jieba.load_userdict(p)

    # 强制分词
    def force_split(self,*group_list):
        for group in group_list:
            self.jieba.suggest_freq((group[0], group[1]), True)

    def sentence_split_tag(self,sentence):
        split_list = []
        cut_sen = self.pseg.cut(sentence)
        for word, root in cut_sen:
            split_list.append({"word":word, "root":root})
        return split_list


if __name__ == '__main__':
    text_process = TextureProcess()
    text_process.load_dicts("actor_dict.txt",
                            "movie_dict.txt")
    text_process.force_split(['喜剧', '演员'],
                             ['出生', '日期'])
    text_res = text_process.sentence_split_tag("周星驰的个人简介")

这里扩展提及一下,我们为什么要设计新词发现模块,用户一个问题,会根据习惯和地域文化,产生很多相近词,因此,新词发现也是为了让我们的自然处理以及相似度计算更准确,同时互联网造词能力太强大,如果需要我们的KBQA系统赶上热点,那么一个合格的新词发现系统是更新KBQA的必要条件,在?懂?

3.3、查询实现

        查询的时候,参考了网上很多KBQA的实现,基本上最常见的是两种,一种是基于模板规则,使用正则表达式的精准匹配方式,另外一种是基于ML或DL训练处理的有监督模型,结合我们的图谱大小和数据现状,显然使用ML或DL训练不是很好,而且还需要对数据进行标注处理,这并不符合短时间内实现一个KBQA系统的初衷,因此考虑使用模板匹配,但仔细观看,模板匹配使用了正则表达,每种问题不同问法和相近词都必须考虑到,这并不符合我的想法,因此想要使用模糊匹配解决问题。

        因此核心的考虑方案是,定义问题模板,有且仅对一种问题类型做一个示例,无论用户换几种问法,将其问句和示例进行相似度计算,相似度最高的认定为用户想要问的问题方向,如果相似度极低,那么可以简单认为要么是问题刁钻,要么是没考虑到这个问题需要再补充问题示例。

        这里提及一下,无论使用传统方法,还是基于深度学习的方法,对用户问题的判断必定是基于某种规则的,例如,如果你没有定义过天上的白云有什么形状?这种问题,自然就没有对应的答案,无论分词还是意图提取,断句以及词性做的多准确,都没有用,因此只有你定义过天上的白云的答案,当用户问题是:万米深空的棉花糖都是啥样,机器才有可能回答“心型、圆形等”,至于这种刁钻问题,如何匹配,例如,万米深空对应天上,棉花糖对应白云,这种问题传统算法匹配成功率较低,较好的办法是交给基于大数据训练的深度网络,才有可能提高匹配正确率。

3.3.1、单实体查询

首先对单实体做一个类的定义:

class SingleEntityRule:
    def __init__(self, entity_root, segment, table_name):
        self.entity_root = entity_root
        self.segment = segment
        self.table_name = table_name

        这段代码在temp_match.py中,属于部分片段。 

        其中entity_root是实体的词性,segment是用户的问题,table_name是我们查询对应的数据库表名。

        接下来考虑问题模板规则的定义: 

        单实体匹配主要考虑到简单问答的匹配,如:周星驰的个人简介,刘德华的出生日期等,唐伯虎点秋香的演员有哪些?等等。

        考虑到单实体还会有一些扩展补充问法,如:周星驰演过的电影的类型有哪些?周星驰合作过的导演有哪些?因此也做了实现,区别是需要查询的数据表是连续的。

具体定义如下,将每种问题定义一种问法,然后放到一个list里:

SingleEntityRules = [
    SingleEntityRule(entity_root='nr', segment="的个人介绍?", table_name='bio'),
    SingleEntityRule(entity_root='nr', segment="的中文名叫什么?", table_name='chName'),
    SingleEntityRule(entity_root='nr', segment="的英文名叫什么?", table_name='foreName'),
    SingleEntityRule(entity_root='nr', segment="的国籍是什么?", table_name='nationality'),
    SingleEntityRule(entity_root='nr', segment="的星座是什么?", table_name='constellation'),
    SingleEntityRule(entity_root='nr', segment="的出生地点在哪里?", table_name='birthplace'),
    SingleEntityRule(entity_root='nr', segment="的生日是哪天?", table_name='birthday'),
    SingleEntityRule(entity_root='nr', segment="演过哪些电影?", table_name='repWorks'),
    SingleEntityRule(entity_root='nr', segment="获得了哪些成就?", table_name='achiem'),
    SingleEntityRule(entity_root='nr', segment="的经济公司?", table_name='brokerage'),
    SingleEntityRule(entity_root='nz', segment="的电影简介?", table_name='bio'),
    SingleEntityRule(entity_root='nz', segment="的中文名叫什么?", table_name='chName'),
    SingleEntityRule(entity_root='nz', segment="的英文名叫什么?", table_name='foreName'),
    SingleEntityRule(entity_root='nz', segment="的上映时间是什么时候?", table_name='prodTime'),
    SingleEntityRule(entity_root='nz', segment="的制片公司是哪家?", table_name='prodCompany'),
    SingleEntityRule(entity_root='nz', segment="的导演是谁?", table_name='director'),
    SingleEntityRule(entity_root='nz', segment="的编剧是谁?", table_name='screenwriter'),
    SingleEntityRule(entity_root='nz', segment="属于什么类型的电影?", table_name='genre'),
    SingleEntityRule(entity_root='nz', segment="的参演演员有哪些?", table_name='star'),
    SingleEntityRule(entity_root='nz', segment="的电影时长是多少?", table_name='length'),
    SingleEntityRule(entity_root='nz', segment="的上映时间是哪天?", table_name='releaseTime'),
    SingleEntityRule(entity_root='nz', segment="的语言类型是哪种?", table_name='language'),
    SingleEntityRule(entity_root='nz', segment="获得了哪些成就?", table_name='achiem'),
    # 实体 相关 实体(无定向) 的属性
    SingleEntityRule(entity_root='nr', segment="演过的电影的类型是哪些?", table_name='chName&genre'),
    SingleEntityRule(entity_root='nr', segment="合作过的导演有哪些?", table_name='chName&director')
]

        这段代码在temp_match.py中,属于部分片段。  

        规则定义完后,需要对规则进行查询匹配,大体的思路是,我们对用户的问题进行处理,然后遍历计算用户的问题,判断问题与问题模板里哪一个相似度最高,得分最高的则认为是用户想要了解的问题,这里我们使用了macropodus库做相似度计算,默认是使用jaccard相似度系数,非常方便。

        但获取到用户正确的问题后,再对问题进行查询模板对照生成,由于我们生产的数据实体只有两种,一个是电影,一个是明星,因此,单实体只需要考虑这两种情况nr和nz,这样可被用于fuseki查询的query语句就完成了,具体代码如下:

class BaseQueryTemp:
    def __init__(self):
        self.query_head = r'''
     PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> 
     PREFIX : <http://www.kg_movie.com#>   
     SELECT * WHERE {'''
        self.query_end = r'''} LIMIT 10'''


class QuerySETemp(BaseQueryTemp):
    def __init__(self):
        super().__init__()

    @staticmethod
    def _calc(sentence):
        max_score = 0
        table_name = ""
        fin_entity_root = ""
        for SingleEntityRule in SingleEntityRules:
            score = macropodus.sim(sentence, SingleEntityRule.entity_root + SingleEntityRule.segment)
            if score > max_score:
                max_score = score
                fin_entity_root = SingleEntityRule.entity_root
                table_name = SingleEntityRule.table_name
        return fin_entity_root, table_name

    def get_query_temp(self, entity_word_list, sentence):
        entity_root, table_name = self._calc(sentence)
        core_query = ""
        if entity_root == "nz":
            core_query = r'''
            ?x :movie_chName '{0}'.
            ?x :movie_{1} ?res_o.
            '''.format(entity_word_list[0]['nz'], table_name)
        elif entity_root == "nr":
            if "&" in table_name :
                table_list = table_name.split("&")
                from pprint import pprint
                pprint(table_list)
                core_query = r'''
                ?x :actor_chName '{0}'.
                ?x :hasActedIn ?o.
                ?o :movie_{1} ?res_0.
                ?o :movie_{2} ?res_1.
                '''.format(entity_word_list[0]['nr'], table_list[0], table_list[1])
            else:
                core_query = r'''
                ?x :actor_chName '{0}'.
                ?x :actor_{1} ?res_o.
                '''.format(entity_word_list[0]['nr'], table_name)
        return self.query_head + core_query + self.query_end

        这段代码在temp_match.py中,属于部分片段。  

        其中 BaseQueryTemp是我们定义了一个基类,直接被继承即可。

        QuerySETemp是我们的单实体模板类,其中_calc是计算相似度,get_query_temp是模板生产。

3.3.2、多实体查询

        主要考虑的是联合查询,即实体和实体一起才能满足的条件,例如人和人的条件组合,电影和电影的条件组合,人和电影的条件组合,以及自由搭配实体数量的条件组合。

首先对多实体进行定义:

class MultiEntityRule:
    # type same 表示entity_root 相同,如 [nr nr]
    # type diff 表示entity_root 不相同,如 [nr nz]
    def __init__(self, entity_root_list, segment, link_name, table_name, belong, type='same', ):
        self.type = type
        self.entity_root_list = entity_root_list
        self.segment = segment
        self.link_name = link_name
        self.table_name = table_name
        self.belong = belong

         在之前的单实体的基础上增加了link_name和belong和type,link_name考虑的是中间查询的节点,belong判断属于那哪种类型,例如是电影还是人物方向,type主要考虑是实体和实体之间的类别是否相同,需要触发不同的调用模板逻辑。

 由于多实体我只想到了两种,因此,就定义了两个规则,如下:

# diff类型由于暂时没想到
MultiEntityRules = [
    MultiEntityRule(type='same', entity_root_list=["nr"], segment="一起演过什么电影?", link_name=["hasActedIn"], table_name="chName", belong='movie'),
    MultiEntityRule(type='same', entity_root_list=["nz"], segment="同时参演的演员是谁?", link_name=["hasActor"], table_name="chName", belong='actor'),
]

  这里就看出来为什么我们会设计一个新问题发现模块了,因为我们永远想不全用户会有哪些问题和哪些花式提问方法。

然后计算相似度,生成模板 :

class QueryMETemp(BaseQueryTemp):
    def __init__(self):
        super().__init__()

    @staticmethod
    def _calc(type , sentence):
        max_score = 0
        # list_entity_root = []
        list_link_name = []
        table_name = ""
        belong = ""
        for MultiEntityRule in MultiEntityRules:
            if type == 'same':
                score = macropodus.sim(sentence, MultiEntityRule.entity_root_list[0] + MultiEntityRule.segment)
            else: # diff
                score = macropodus.sim(sentence, MultiEntityRule.entity_root_list[0] + MultiEntityRule.entity_root_list[1] + MultiEntityRule.segment)
            if score > max_score:
                max_score = score
                # list_entity_root = MultiEntityRule.entity_root_list
                list_link_name = MultiEntityRule.link_name
                table_name = MultiEntityRule.table_name
                belong = MultiEntityRule.belong

        return list_link_name, table_name, belong

    def get_query_temp(self, entity_word_list, sentence):
        type = ''
        if all('nr' == root for root_word in entity_word_list for root in root_word) or all('nz' == root for root_word in entity_word_list for root in root_word): # same
            type = 'same'
        else:
            type = 'diff'
        entity_num = len(entity_word_list)
        list_link_name, table_name, belong = self._calc(type, sentence)

        # 生成模板
        entity_query_list = []
        mid_query_list = []
        for index in range(entity_num):
            root_word = entity_word_list[index]
            for root in root_word:
                if root == 'nr':
                    entity_query_list.append(r'''
                    ?a{0} :actor_chName '{1}'.'''.format(index, root_word[root]))
                    mid_query_list.append(r'''
                    ?a{0} :hasActedIn ?last.'''.format(index))
                elif root == 'nz':
                    entity_query_list.append(r'''
                    ?b{0} :movie_chName '{1}'.'''.format(index, root_word[root]))
                    mid_query_list.append(r'''
                    ?b{0} :hasActor ?last.'''.format(index))
                else:
                    print("传参类型错误 ... ")

        end_query = r'''
        ?last :{0}_{1} ?res_0.'''.format(belong,table_name)

        core_query = ''
        for entity_query in entity_query_list:
            core_query = core_query + entity_query
        for mid_query in mid_query_list:
            core_query = core_query + mid_query
        core_query = core_query + end_query
        return self.query_head + core_query + self.query_end



if __name__ == '__main__':
    pass

        生成模板考虑到有的时候用户可以问超过2个实体单位,因此使用了循环结构来生成,这样可以对应2个以上的实体查询模板额生成。

 这里附上单实体和多实体查询模板的完整代码temp_match.py:

import macropodus


class BaseQueryTemp:
    def __init__(self):
        self.query_head = r'''
     PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> 
     PREFIX : <http://www.kg_movie.com#>   
     SELECT * WHERE {'''
        self.query_end = r'''} LIMIT 10'''

# Rule
# [实体]有什么[实体]?
# [实体]的[属性]?
# [实体]和[实体]的[关系]?
# [实体]的[条件属性]?
#

class WithoutEntityRule:
    def __init__(self):
        pass


WithoutEntityRules = [
    # WithoutEntityRule(key_word= ,)
]


class MultiEntityRule:
    # type same 表示entity_root 相同,如 [nr nr]
    # type diff 表示entity_root 不相同,如 [nr nz]
    def __init__(self, entity_root_list, segment, link_name, table_name, belong, type='same', ):
        self.type = type
        self.entity_root_list = entity_root_list
        self.segment = segment
        self.link_name = link_name
        self.table_name = table_name
        self.belong = belong


# diff类型由于暂时没想到
MultiEntityRules = [
    MultiEntityRule(type='same', entity_root_list=["nr"], segment="一起演过什么电影?", link_name=["hasActedIn"], table_name="chName", belong='movie'),
    MultiEntityRule(type='same', entity_root_list=["nz"], segment="同时参演的演员是谁?", link_name=["hasActor"], table_name="chName", belong='actor'),
]


class QueryMETemp(BaseQueryTemp):
    def __init__(self):
        super().__init__()

    @staticmethod
    def _calc(type , sentence):
        max_score = 0
        # list_entity_root = []
        list_link_name = []
        table_name = ""
        belong = ""
        for MultiEntityRule in MultiEntityRules:
            if type == 'same':
                score = macropodus.sim(sentence, MultiEntityRule.entity_root_list[0] + MultiEntityRule.segment)
            else: # diff
                score = macropodus.sim(sentence, MultiEntityRule.entity_root_list[0] + MultiEntityRule.entity_root_list[1] + MultiEntityRule.segment)
            if score > max_score:
                max_score = score
                # list_entity_root = MultiEntityRule.entity_root_list
                list_link_name = MultiEntityRule.link_name
                table_name = MultiEntityRule.table_name
                belong = MultiEntityRule.belong

        return list_link_name, table_name, belong

    def get_query_temp(self, entity_word_list, sentence):
        type = ''
        if all('nr' == root for root_word in entity_word_list for root in root_word) or all('nz' == root for root_word in entity_word_list for root in root_word): # same
            type = 'same'
        else:
            type = 'diff'
        entity_num = len(entity_word_list)
        list_link_name, table_name, belong = self._calc(type, sentence)

        # 生成模板
        entity_query_list = []
        mid_query_list = []
        for index in range(entity_num):
            root_word = entity_word_list[index]
            for root in root_word:
                if root == 'nr':
                    entity_query_list.append(r'''
                    ?a{0} :actor_chName '{1}'.'''.format(index, root_word[root]))
                    mid_query_list.append(r'''
                    ?a{0} :hasActedIn ?last.'''.format(index))
                elif root == 'nz':
                    entity_query_list.append(r'''
                    ?b{0} :movie_chName '{1}'.'''.format(index, root_word[root]))
                    mid_query_list.append(r'''
                    ?b{0} :hasActor ?last.'''.format(index))
                else:
                    print("传参类型错误 ... ")

        end_query = r'''
        ?last :{0}_{1} ?res_0.'''.format(belong,table_name)

        core_query = ''
        for entity_query in entity_query_list:
            core_query = core_query + entity_query
        for mid_query in mid_query_list:
            core_query = core_query + mid_query
        core_query = core_query + end_query
        return self.query_head + core_query + self.query_end



class SingleEntityRule:
    def __init__(self, entity_root, segment, table_name):
        self.entity_root = entity_root
        self.segment = segment
        self.table_name = table_name


class QuerySETemp(BaseQueryTemp):
    def __init__(self):
        super().__init__()

    @staticmethod
    def _calc(sentence):
        max_score = 0
        table_name = ""
        fin_entity_root = ""
        for SingleEntityRule in SingleEntityRules:
            score = macropodus.sim(sentence, SingleEntityRule.entity_root + SingleEntityRule.segment)
            if score > max_score:
                max_score = score
                fin_entity_root = SingleEntityRule.entity_root
                table_name = SingleEntityRule.table_name
        return fin_entity_root, table_name

    def get_query_temp(self, entity_word_list, sentence):
        entity_root, table_name = self._calc(sentence)
        core_query = ""
        if entity_root == "nz":
            core_query = r'''
            ?x :movie_chName '{0}'.
            ?x :movie_{1} ?res_o.
            '''.format(entity_word_list[0]['nz'], table_name)
        elif entity_root == "nr":
            if "&" in table_name :
                table_list = table_name.split("&")
                from pprint import pprint
                pprint(table_list)
                core_query = r'''
                ?x :actor_chName '{0}'.
                ?x :hasActedIn ?o.
                ?o :movie_{1} ?res_0.
                ?o :movie_{2} ?res_1.
                '''.format(entity_word_list[0]['nr'], table_list[0], table_list[1])
            else:
                core_query = r'''
                ?x :actor_chName '{0}'.
                ?x :actor_{1} ?res_o.
                '''.format(entity_word_list[0]['nr'], table_name)
        return self.query_head + core_query + self.query_end


SingleEntityRules = [
    SingleEntityRule(entity_root='nr', segment="的个人介绍?", table_name='bio'),
    SingleEntityRule(entity_root='nr', segment="的中文名叫什么?", table_name='chName'),
    SingleEntityRule(entity_root='nr', segment="的英文名叫什么?", table_name='foreName'),
    SingleEntityRule(entity_root='nr', segment="的国籍是什么?", table_name='nationality'),
    SingleEntityRule(entity_root='nr', segment="的星座是什么?", table_name='constellation'),
    SingleEntityRule(entity_root='nr', segment="的出生地点在哪里?", table_name='birthplace'),
    SingleEntityRule(entity_root='nr', segment="的生日是哪天?", table_name='birthday'),
    SingleEntityRule(entity_root='nr', segment="演过哪些电影?", table_name='repWorks'),
    SingleEntityRule(entity_root='nr', segment="获得了哪些成就?", table_name='achiem'),
    SingleEntityRule(entity_root='nr', segment="的经济公司?", table_name='brokerage'),
    SingleEntityRule(entity_root='nz', segment="的电影简介?", table_name='bio'),
    SingleEntityRule(entity_root='nz', segment="的中文名叫什么?", table_name='chName'),
    SingleEntityRule(entity_root='nz', segment="的英文名叫什么?", table_name='foreName'),
    SingleEntityRule(entity_root='nz', segment="的上映时间是什么时候?", table_name='prodTime'),
    SingleEntityRule(entity_root='nz', segment="的制片公司是哪家?", table_name='prodCompany'),
    SingleEntityRule(entity_root='nz', segment="的导演是谁?", table_name='director'),
    SingleEntityRule(entity_root='nz', segment="的编剧是谁?", table_name='screenwriter'),
    SingleEntityRule(entity_root='nz', segment="属于什么类型的电影?", table_name='genre'),
    SingleEntityRule(entity_root='nz', segment="的参演演员有哪些?", table_name='star'),
    SingleEntityRule(entity_root='nz', segment="的电影时长是多少?", table_name='length'),
    SingleEntityRule(entity_root='nz', segment="的上映时间是哪天?", table_name='releaseTime'),
    SingleEntityRule(entity_root='nz', segment="的语言类型是哪种?", table_name='language'),
    SingleEntityRule(entity_root='nz', segment="获得了哪些成就?", table_name='achiem'),
    # 实体 相关 实体(无定向) 的属性
    SingleEntityRule(entity_root='nr', segment="演过的电影的类型是哪些?", table_name='chName&genre'),
    SingleEntityRule(entity_root='nr', segment="合作过的导演有哪些?", table_name='chName&director')
]



if __name__ == '__main__':
    pass

4、业务逻辑的整合实现

        调用封装的三个类,业务逻辑的简单实现,不需要额外的赘述了,直接放完整代码。

main_logic.py:

# encoding=utf-8
import jieba
import jieba.posseg as pseg
from pprint import pprint
from KBQA.fuseki_query import *
from KBQA.temp_match import *
from KBQA.nlp import *


if __name__ == '__main__':
    # cut_sentences("a")
    # cut_tag('a')
    # mat()
    fuseki = Fuseki()
    query_se_temp = QuerySETemp()
    query_me_temp = QueryMETemp()
    text_process = TextureProcess()
    text_process.load_dicts("actor_dict.txt",
                            "movie_dict.txt")
    text_process.force_split(['喜剧', '演员'],
                             ['出生', '日期'])
    # 周星驰的个人简介
    print("输入要查询的问题:")
    while True:
        in_str = input()
        if in_str == "exit":
            break
        else:
            text_res = text_process.sentence_split_tag(in_str)
            segment = ""
            entity_word_list = []
            query_temp = ""
            for Word in text_res:
                if Word["root"] == "nr" or Word["root"] == "nz":
                    segment = segment + Word["root"]
                    entity_word_list.append({Word['root']: Word['word']})
                else:
                    segment = segment + Word["word"]
            # print(entity_word_list)

            if len(entity_word_list) == 0:
                print("无实体或虚拟实体,》》》转人工域分析 ... ...")
                continue
            elif len(entity_word_list) == 1:
                # print("单实体")
                query_temp = query_se_temp.get_query_temp(entity_word_list, segment)
                # print(query_temp)
            elif len(entity_word_list) >= 2:
                # print("多实体")
                query_temp = query_me_temp.get_query_temp(entity_word_list, segment)
                # print(query_temp)

            # pprint(res)
            query_result = fuseki.query(query_temp)
            # pprint(query_result)
            parse_dict = parse(query_result)

            for result_info in parse_dict:
                if len(parse_dict[result_info]) > 0:
                    print(parse_dict[result_info])

            # print(type(query_result))

    print("进程结束 ................. ")

5、一些补充

        本身现在也没有很大的兴趣沉浸在开发里,因此实现思路有了后也只是简单的实现一些基本框架,也没有大的兴趣全部验证一遍,至少个人项目上没那么大精力,但作为产品和架构思想,我想深挖的细节还是有很多,考虑到这段时间对KBQA做了功课,因此整理一下一些信息,对KBQA做了一下几种分类:

方法

子方法

人力投入

资源投入

其他说明

模板匹配

精准匹配

正则表达实现

模板匹配

模糊匹配

jaccard、余弦等相似度计算

本次使用的方法更接近模糊匹配

模板匹配

关键词匹配

正则、也可以计算相似度

空间计算

向量化问题计算相似,适合Web

语义解析

利用词性进行计算

实体搜索

提取实体及关键词,匹配节点

以上方法除了精准匹配外,都可以通过深度学习做大规模训练,仅此。

6、参考

1、相似度计算库macropodus

2、Jaccard系数

3、结巴分词库

4、KBQA实现

5、农业KBQA

6、中文开发知识图谱-REfO实现KBQA


https://www.xamrdz.com/backend/3xs1926455.html

相关文章: