Преглед изворни кода

plugin: add banwords plugin

master
lanvent пре 1 година
родитељ
комит
8915149d36
9 измењених фајлова са 338 додато и 1 уклоњено
  1. +1
    -0
      plugins/banwords/.gitignore
  2. +9
    -0
      plugins/banwords/README.md
  3. +250
    -0
      plugins/banwords/WordsSearch.py
  4. +0
    -0
      plugins/banwords/__init__.py
  5. +63
    -0
      plugins/banwords/banwords.py
  6. +3
    -0
      plugins/banwords/banwords.txt.template
  7. +3
    -0
      plugins/banwords/config.json.template
  8. +4
    -0
      plugins/godcmd/config.json.template
  9. +5
    -1
      plugins/godcmd/godcmd.py

+ 1
- 0
plugins/banwords/.gitignore Прегледај датотеку

@@ -0,0 +1 @@
banwords.txt

+ 9
- 0
plugins/banwords/README.md Прегледај датотеку

@@ -0,0 +1,9 @@
### 说明
简易的敏感词插件,暂不支持分词,请自行导入词库到插件文件夹中的`banwords.txt`,每行一个词,一个参考词库是[1](https://github.com/cjh0613/tencent-sensitive-words/blob/main/sensitive_words_lines.txt)。

`config.json`中能够填写默认的处理行为,目前行为有:
- `ignore` : 无视这条消息。
- `replace` : 将消息中的敏感词替换成"*",并回复违规。

### 致谢
搜索功能实现来自https://github.com/toolgood/ToolGood.Words

+ 250
- 0
plugins/banwords/WordsSearch.py Прегледај датотеку

@@ -0,0 +1,250 @@
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# ToolGood.Words.WordsSearch.py
# 2020, Lin Zhijun, https://github.com/toolgood/ToolGood.Words
# Licensed under the Apache License 2.0
# 更新日志
# 2020.04.06 第一次提交
# 2020.05.16 修改,支持大于0xffff的字符

__all__ = ['WordsSearch']
__author__ = 'Lin Zhijun'
__date__ = '2020.05.16'

class TrieNode():
def __init__(self):
self.Index = 0
self.Index = 0
self.Layer = 0
self.End = False
self.Char = ''
self.Results = []
self.m_values = {}
self.Failure = None
self.Parent = None

def Add(self,c):
if c in self.m_values :
return self.m_values[c]
node = TrieNode()
node.Parent = self
node.Char = c
self.m_values[c] = node
return node

def SetResults(self,index):
if (self.End == False):
self.End = True
self.Results.append(index)

class TrieNode2():
def __init__(self):
self.End = False
self.Results = []
self.m_values = {}
self.minflag = 0xffff
self.maxflag = 0

def Add(self,c,node3):
if (self.minflag > c):
self.minflag = c
if (self.maxflag < c):
self.maxflag = c
self.m_values[c] = node3

def SetResults(self,index):
if (self.End == False) :
self.End = True
if (index in self.Results )==False :
self.Results.append(index)

def HasKey(self,c):
return c in self.m_values
def TryGetValue(self,c):
if (self.minflag <= c and self.maxflag >= c):
if c in self.m_values:
return self.m_values[c]
return None


class WordsSearch():
def __init__(self):
self._first = {}
self._keywords = []
self._indexs=[]
def SetKeywords(self,keywords):
self._keywords = keywords
self._indexs=[]
for i in range(len(keywords)):
self._indexs.append(i)

root = TrieNode()
allNodeLayer={}

for i in range(len(self._keywords)): # for (i = 0; i < _keywords.length; i++)
p = self._keywords[i]
nd = root
for j in range(len(p)): # for (j = 0; j < p.length; j++)
nd = nd.Add(ord(p[j]))
if (nd.Layer == 0):
nd.Layer = j + 1
if nd.Layer in allNodeLayer:
allNodeLayer[nd.Layer].append(nd)
else:
allNodeLayer[nd.Layer]=[]
allNodeLayer[nd.Layer].append(nd)
nd.SetResults(i)


allNode = []
allNode.append(root)
for key in allNodeLayer.keys():
for nd in allNodeLayer[key]:
allNode.append(nd)
allNodeLayer=None

for i in range(len(allNode)): # for (i = 0; i < allNode.length; i++)
if i==0 :
continue
nd=allNode[i]
nd.Index = i
r = nd.Parent.Failure
c = nd.Char
while (r != None and (c in r.m_values)==False):
r = r.Failure
if (r == None):
nd.Failure = root
else:
nd.Failure = r.m_values[c]
for key2 in nd.Failure.Results :
nd.SetResults(key2)
root.Failure = root

allNode2 = []
for i in range(len(allNode)): # for (i = 0; i < allNode.length; i++)
allNode2.append( TrieNode2())
for i in range(len(allNode2)): # for (i = 0; i < allNode2.length; i++)
oldNode = allNode[i]
newNode = allNode2[i]

for key in oldNode.m_values :
index = oldNode.m_values[key].Index
newNode.Add(key, allNode2[index])
for index in range(len(oldNode.Results)): # for (index = 0; index < oldNode.Results.length; index++)
item = oldNode.Results[index]
newNode.SetResults(item)
oldNode=oldNode.Failure
while oldNode != root:
for key in oldNode.m_values :
if (newNode.HasKey(key) == False):
index = oldNode.m_values[key].Index
newNode.Add(key, allNode2[index])
for index in range(len(oldNode.Results)):
item = oldNode.Results[index]
newNode.SetResults(item)
oldNode=oldNode.Failure
allNode = None
root = None

# first = []
# for index in range(65535):# for (index = 0; index < 0xffff; index++)
# first.append(None)
# for key in allNode2[0].m_values :
# first[key] = allNode2[0].m_values[key]
self._first = allNode2[0]

def FindFirst(self,text):
ptr = None
for index in range(len(text)): # for (index = 0; index < text.length; index++)
t =ord(text[index]) # text.charCodeAt(index)
tn = None
if (ptr == None):
tn = self._first.TryGetValue(t)
else:
tn = ptr.TryGetValue(t)
if (tn==None):
tn = self._first.TryGetValue(t)
if (tn != None):
if (tn.End):
item = tn.Results[0]
keyword = self._keywords[item]
return { "Keyword": keyword, "Success": True, "End": index, "Start": index + 1 - len(keyword), "Index": self._indexs[item] }
ptr = tn
return None

def FindAll(self,text):
ptr = None
list = []

for index in range(len(text)): # for (index = 0; index < text.length; index++)
t =ord(text[index]) # text.charCodeAt(index)
tn = None
if (ptr == None):
tn = self._first.TryGetValue(t)
else:
tn = ptr.TryGetValue(t)
if (tn==None):
tn = self._first.TryGetValue(t)
if (tn != None):
if (tn.End):
for j in range(len(tn.Results)): # for (j = 0; j < tn.Results.length; j++)
item = tn.Results[j]
keyword = self._keywords[item]
list.append({ "Keyword": keyword, "Success": True, "End": index, "Start": index + 1 - len(keyword), "Index": self._indexs[item] })
ptr = tn
return list


def ContainsAny(self,text):
ptr = None
for index in range(len(text)): # for (index = 0; index < text.length; index++)
t =ord(text[index]) # text.charCodeAt(index)
tn = None
if (ptr == None):
tn = self._first.TryGetValue(t)
else:
tn = ptr.TryGetValue(t)
if (tn==None):
tn = self._first.TryGetValue(t)
if (tn != None):
if (tn.End):
return True
ptr = tn
return False
def Replace(self,text, replaceChar = '*'):
result = list(text)

ptr = None
for i in range(len(text)): # for (i = 0; i < text.length; i++)
t =ord(text[i]) # text.charCodeAt(index)
tn = None
if (ptr == None):
tn = self._first.TryGetValue(t)
else:
tn = ptr.TryGetValue(t)
if (tn==None):
tn = self._first.TryGetValue(t)
if (tn != None):
if (tn.End):
maxLength = len( self._keywords[tn.Results[0]])
start = i + 1 - maxLength
for j in range(start,i+1): # for (j = start; j <= i; j++)
result[j] = replaceChar
ptr = tn
return ''.join(result)

+ 0
- 0
plugins/banwords/__init__.py Прегледај датотеку


+ 63
- 0
plugins/banwords/banwords.py Прегледај датотеку

@@ -0,0 +1,63 @@
# encoding:utf-8

import json
import os
from bridge.context import ContextType
from bridge.reply import Reply, ReplyType
import plugins
from plugins import *
from common.log import logger
from .WordsSearch import WordsSearch


@plugins.register(name="Banwords", desc="判断消息中是否有敏感词、决定是否回复。", version="1.0", author="lanvent", desire_priority= 100)
class Banwords(Plugin):
def __init__(self):
super().__init__()
try:
curdir=os.path.dirname(__file__)
config_path=os.path.join(curdir,"config.json")
conf=None
if not os.path.exists(config_path):
conf={"action":"ignore"}
with open(config_path,"w") as f:
json.dump(conf,f,indent=4)
else:
with open(config_path,"r") as f:
conf=json.load(f)
self.searchr = WordsSearch()
self.action = conf["action"]
banwords_path = os.path.join(curdir,"banwords.txt")
with open(banwords_path, 'r', encoding='utf-8') as f:
words=[]
for line in f:
word = line.strip()
if word:
words.append(word)
self.searchr.SetKeywords(words)
self.handlers[Event.ON_HANDLE_CONTEXT] = self.on_handle_context
logger.info("[Banwords] inited")
except Exception as e:
logger.error("Banwords init failed: %s" % e)


def on_handle_context(self, e_context: EventContext):

if e_context['context'].type not in [ContextType.TEXT,ContextType.IMAGE_CREATE]:
return
content = e_context['context'].content
logger.debug("[Banwords] on_handle_context. content: %s" % content)
if self.action == "ignore":
f = self.searchr.FindFirst(content)
if f:
logger.info("Banwords: %s" % f["Keyword"])
e_context.action = EventAction.BREAK_PASS
return
elif self.action == "replace":
if self.searchr.ContainsAny(content):
reply = Reply(ReplyType.INFO, "发言中包含敏感词,请重试: \n"+self.searchr.Replace(content))
e_context['reply'] = reply
e_context.action = EventAction.BREAK_PASS
return

+ 3
- 0
plugins/banwords/banwords.txt.template Прегледај датотеку

@@ -0,0 +1,3 @@
nipples
pennis
法轮功

+ 3
- 0
plugins/banwords/config.json.template Прегледај датотеку

@@ -0,0 +1,3 @@
{
"action": "ignore"
}

+ 4
- 0
plugins/godcmd/config.json.template Прегледај датотеку

@@ -0,0 +1,4 @@
{
"password": "",
"admin_users": []
}

+ 5
- 1
plugins/godcmd/godcmd.py Прегледај датотеку

@@ -273,9 +273,13 @@ class Godcmd(Plugin):
if isadmin:
return False,"管理员账号无需认证"

if len(self.password) == 0:
return False,"未设置口令,无法认证"
if len(args) != 1:
return False,"请提供口令"
password = args[0]
if password == self.password:
self.admin_users.append(userid)


Loading…
Откажи
Сачувај