Python实现敏感词过滤

举报
Python爱好者 发表于 2020/12/28 23:26:01 2020/12/28
【摘要】 在我们生活中的一些场合经常会有一些不该出现的敏感词,我们通常会使用*去屏蔽它,例如:尼玛 -> **,一些骂人的敏感词和一些政治敏感词都不应该出现在一些公共场合中,这个时候我们就需要一定的手段去屏蔽这些敏感词。下面我来介绍一些简单版本的敏感词屏蔽的方法。 (我已经尽量把脏话做成图片的形式了,要不然文章发不出去) 方法一:replace过滤 replace就是最...

在我们生活中的一些场合经常会有一些不该出现的敏感词,我们通常会使用*去屏蔽它,例如:尼玛 -> **,一些骂人的敏感词和一些政治敏感词都不应该出现在一些公共场合中,这个时候我们就需要一定的手段去屏蔽这些敏感词。下面我来介绍一些简单版本的敏感词屏蔽的方法。

(我已经尽量把脏话做成图片的形式了,要不然文章发不出去)

方法一:replace过滤

replace就是最简单的字符串替换,当一串字符串中有可能会出现的敏感词时,我们直接使用相应的replace方法用*替换出敏感词即可。

缺点:

文本和敏感词少的时候还可以,多的时候效率就比较差了


       import datetime
       now = datetime.datetime.now()
       print(filter_sentence, " | ", now)
   
  

如果是多个敏感词可以用列表进行逐一替换


       for i in dirty:
           speak = speak.replace(i, '*')
       print(speak, " | ", now)
   
  

方法二:正则表达式过滤

正则表达式算是一个不错的匹配方法了,日常的查询中,机会都会用到正则表达式,包括我们的爬虫,也都是经常会使用到正则表达式的,在这里我们主要是使用“|”来进行匹配,“|”的意思是从多个目标字符串中选择一个进行匹配。写个简单的例子:


       import re
       def sentence_filter(keywords, text):
           return re.sub("|".join(keywords), "***", text)
       print(sentence_filter(dirty, speak))
   
  

方法三:DFA过滤算法

DFA的算法,即Deterministic Finite Automaton算法,翻译成中文就是确定有穷自动机算法。它的基本思想是基于状态转移来检索敏感词,只需要扫描一次待检测文本,就能对所有敏感词进行检测。(实现见代码注释)


       #!/usr/bin/env python
       # -*- coding:utf-8 -*-
       # @Time:2020/4/15 11:40
       # @Software:PyCharm
       # article_add: https://www.cnblogs.com/JentZhang/p/12718092.html
       __author__ = "JentZhang"
       import json
       MinMatchType = 1  # 最小匹配规则
       MaxMatchType = 2  # 最大匹配规则
       class DFAUtils(object):
           """
           DFA算法
           """
           def __init__(self, word_warehouse):
               """
               算法初始化
               :param word_warehouse:词库
               """
               # 词库
               self.root = dict()
               # 无意义词库,在检测中需要跳过的(这种无意义的词最后有个专门的地方维护,保存到数据库或者其他存储介质中)
               self.skip_root = [' ', '&', '!', '!', '@', '#', '$', '¥', '*', '^', '%', '?', '?', '<', '>', "《", '》']
               # 初始化词库
               for word in word_warehouse:
                   self.add_word(word)
           def add_word(self, word):
               """
               添加词库
               :param word:
               :return:
               """
               now_node = self.root
               word_count = len(word)
               for i in range(word_count):
                   char_str = word[i]
                   if char_str in now_node.keys():
                       # 如果存在该key,直接赋值,用于下一个循环获取
                       now_node = now_node.get(word[i])
                       now_node['is_end'] = False
                   else:
                       # 不存在则构建一个dict
                       new_node = dict()
                       if i == word_count - 1:  # 最后一个
                           new_node['is_end'] = True
                       else:  # 不是最后一个
                           new_node['is_end'] = False
                       now_node[char_str] = new_node
                       now_node = new_node
           def check_match_word(self, txt, begin_index, match_type=MinMatchType):
               """
               检查文字中是否包含匹配的字符
               :param txt:待检测的文本
               :param begin_index: 调用getSensitiveWord时输入的参数,获取词语的上边界index
               :param match_type:匹配规则 1:最小匹配规则,2:最大匹配规则
               :return:如果存在,则返回匹配字符的长度,不存在返回0
               """
               flag = False
               match_flag_length = 0  # 匹配字符的长度
               now_map = self.root
               tmp_flag = 0  # 包括特殊字符的敏感词的长度
               for i in range(begin_index, len(txt)):
                   word = txt[i]
                   # 检测是否是特殊字符"
                   if word in self.skip_root and len(now_map) < 100:
                       # len(nowMap)<100 保证已经找到这个词的开头之后出现的特殊字符
                       tmp_flag += 1
                       continue
                   # 获取指定key
                   now_map = now_map.get(word)
                   if now_map:  # 存在,则判断是否为最后一个
                       # 找到相应key,匹配标识+1
                       match_flag_length += 1
                       tmp_flag += 1
                       # 如果为最后一个匹配规则,结束循环,返回匹配标识数
                       if now_map.get("is_end"):
                           # 结束标志位为true
                           flag = True
                           # 最小规则,直接返回,最大规则还需继续查找
                           if match_type == MinMatchType:
                               break
                   else:  # 不存在,直接返回
                       break
               if tmp_flag < 2 or not flag:  # 长度必须大于等于1,为词
                   tmp_flag = 0
               return tmp_flag
           def get_match_word(self, txt, match_type=MinMatchType):
               """
               获取匹配到的词语
               :param txt:待检测的文本
               :param match_type:匹配规则 1:最小匹配规则,2:最大匹配规则
               :return:文字中的相匹配词
               """
               matched_word_list = list()
               for i in range(len(txt)):  # 0---11
                   length = self.check_match_word(txt, i, match_type)
                   if length > 0:
                       word = txt[i:i + length]
                       matched_word_list.append(word)
                       # i = i + length - 1
               return matched_word_list
           def is_contain(self, txt, match_type=MinMatchType):
               """
               判断文字是否包含敏感字符
               :param txt:待检测的文本
               :param match_type:匹配规则 1:最小匹配规则,2:最大匹配规则
               :return:若包含返回true,否则返回false
               """
               flag = False
               for i in range(len(txt)):
                   match_flag = self.check_match_word(txt, i, match_type)
                   if match_flag > 0:
                       flag = True
               return flag
           def replace_match_word(self, txt, replace_char='*', match_type=MinMatchType):
               """
               替换匹配字符
               :param txt:待检测的文本
               :param replace_char:用于替换的字符,匹配的敏感词以字符逐个替换,如"你是大王八",敏感词"王八",替换字符*,替换结果"你是大**"
               :param match_type:匹配规则 1:最小匹配规则,2:最大匹配规则
               :return:替换敏感字字符后的文本
               """
               tuple_set = self.get_match_word(txt, match_type)
               word_set = [i for i in tuple_set]
               result_txt = ""
               if len(word_set) > 0:  # 如果检测出了敏感词,则返回替换后的文本
                   for word in word_set:
                       replace_string = len(word) * replace_char
                       txt = txt.replace(word, replace_string)
                       result_txt = txt
               else:  # 没有检测出敏感词,则返回原文本
                   result_txt = txt
               return result_txt
       if __name__ == '__main__':
           dfa = DFAUtils(word_warehouse=word_warehouse)
           print('词库结构:', json.dumps(dfa.root, ensure_ascii=False))
           # 待检测的文本
           msg = msg
           print('是否包含:', dfa.is_contain(msg))
           print('相匹配的词:', dfa.get_match_word(msg))
           print('替换包含的词:', dfa.replace_match_word(msg))
   
  

方法四:AC自动机

AC自动机需要有前置知识:Trie树(简单介绍:又称前缀树,字典树,是用于快速处理字符串的问题,能做到快速查找到一些字符串上的信息。)

详细参考:

https://www.luogu.com.cn/blog/juruohyfhaha/trie-xue-xi-zong-jie

ac自动机,就是在tire树的基础上,增加一个fail指针,如果当前点匹配失败,则将指针转移到fail指针指向的地方,这样就不用回溯,而可以路匹配下去了。

详细匹配机制我在这里不过多赘述,关于AC自动机可以参考一下这篇文章:

https://blog.csdn.net/bestsort/article/details/82947639

python可以利用ahocorasick模块快速实现:


       # python3 -m pip install pyahocorasick
       import ahocorasick
       def build_actree(wordlist):
           actree = ahocorasick.Automaton()
           for index, word in enumerate(wordlist):
               actree.add_word(word, (index, word))
           actree.make_automaton()
           return actree
       if __name__ == '__main__':
           actree = build_actree(wordlist=wordlist)
           sent_cp = sent
           for i in actree.iter(sent):
               sent_cp = sent_cp.replace(i[1][1], "**")
               print("屏蔽词:",i[1][1])
           print("屏蔽结果:",sent_cp)
   
  

当然,我们也可以手写一份AC自动机,具体参考:


       class TrieNode(object):
           __slots__ = ['value', 'next', 'fail', 'emit']
           def __init__(self, value):
               self.value = value
               self.next = dict()
               self.fail = None
               self.emit = None
       class AhoCorasic(object):
           __slots__ = ['_root']
           def __init__(self, words):
               self._root = AhoCorasic._build_trie(words)
           @staticmethod
           def _build_trie(words):
               assert isinstance(words, list) and words
               root = TrieNode('root')
               for word in words:
                   node = root
                   for c in word:
                       if c not in node.next:
                           node.next[c] = TrieNode(c)
                       node = node.next[c]
                   if not node.emit:
                       node.emit = {word}
                   else:
                       node.emit.add(word)
               queue = []
               queue.insert(0, (root, None))
               while len(queue) > 0:
                   node_parent = queue.pop()
                   curr, parent = node_parent[0], node_parent[1]
                   for sub in curr.next.itervalues():
                       queue.insert(0, (sub, curr))
                   if parent is None:
                       continue
                   elif parent is root:
                       curr.fail = root
                   else:
                       fail = parent.fail
                       while fail and curr.value not in fail.next:
                           fail = fail.fail
                       if fail:
                           curr.fail = fail.next[curr.value]
                       else:
                           curr.fail = root
               return root
           def search(self, s):
               seq_list = []
               node = self._root
               for i, c in enumerate(s):
                   matched = True
                   while c not in node.next:
                       if not node.fail:
                           matched = False
                           node = self._root
                           break
                       node = node.fail
                   if not matched:
                       continue
                   node = node.next[c]
                   if node.emit:
                       for _ in node.emit:
                           from_index = i + 1 - len(_)
                           match_info = (from_index, _)
                           seq_list.append(match_info)
                       node = self._root
               return seq_list
       if __name__ == '__main__':
           aho = AhoCorasic(['foo', 'bar'])
           print aho.search('barfoothefoobarman')
   
  

以上便是使用Python实现敏感词过滤的四种方法,前面两种方法比较简单,后面两种偏向算法,需要先了解算法具体实现的原理,之后代码就好懂了。(DFA作为比较常用的过滤手段,建议大家掌握一下~)

最后附上敏感词词库:

https://github.com/qloog/sensitive_words


       以上,便是今天的内容,希望大家喜欢,欢迎「转发」或者点击「在看」支持,谢谢各位。
       “扫一扫,关注我吧”
   
  

文章来源: blog.csdn.net,作者:敲代码的灰太狼,版权归原作者所有,如需转载,请联系作者。

原文链接:blog.csdn.net/tongtongjing1765/article/details/105963611

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。