|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899 |
- # encoding:utf-8
-
- import json
- import os
-
- import plugins
- from bridge.context import ContextType
- from bridge.reply import Reply, ReplyType
- from common.log import logger
- from plugins import *
-
- from .lib.WordsSearch import WordsSearch
-
-
- @plugins.register(
- name="Banwords",
- desire_priority=100,
- hidden=True,
- desc="判断消息中是否有敏感词、决定是否回复。",
- version="1.0",
- author="lanvent",
- )
- class Banwords(Plugin):
- def __init__(self):
- super().__init__()
- try:
- curdir = os.path.dirname(__file__)
- config_path = os.path.join(curdir, "config.json")
- conf = None
- if not os.path.exists(config_path):
- conf = {"action": "ignore"}
- with open(config_path, "w") as f:
- json.dump(conf, f, indent=4)
- else:
- with open(config_path, "r") as f:
- conf = json.load(f)
- self.searchr = WordsSearch()
- self.action = conf["action"]
- banwords_path = os.path.join(curdir, "banwords.txt")
- with open(banwords_path, "r", encoding="utf-8") as f:
- words = []
- for line in f:
- word = line.strip()
- if word:
- words.append(word)
- self.searchr.SetKeywords(words)
- self.handlers[Event.ON_HANDLE_CONTEXT] = self.on_handle_context
- if conf.get("reply_filter", True):
- self.handlers[Event.ON_DECORATE_REPLY] = self.on_decorate_reply
- self.reply_action = conf.get("reply_action", "ignore")
- logger.info("[Banwords] inited")
- except Exception as e:
- logger.warn("[Banwords] init failed, ignore or see https://github.com/zhayujie/chatgpt-on-wechat/tree/master/plugins/banwords .")
- raise e
-
- def on_handle_context(self, e_context: EventContext):
- if e_context["context"].type not in [
- ContextType.TEXT,
- ContextType.IMAGE_CREATE,
- ]:
- return
-
- content = e_context["context"].content
- logger.debug("[Banwords] on_handle_context. content: %s" % content)
- if self.action == "ignore":
- f = self.searchr.FindFirst(content)
- if f:
- logger.info("[Banwords] %s in message" % f["Keyword"])
- e_context.action = EventAction.BREAK_PASS
- return
- elif self.action == "replace":
- if self.searchr.ContainsAny(content):
- reply = Reply(ReplyType.INFO, "发言中包含敏感词,请重试: \n" + self.searchr.Replace(content))
- e_context["reply"] = reply
- e_context.action = EventAction.BREAK_PASS
- return
-
- def on_decorate_reply(self, e_context: EventContext):
- if e_context["reply"].type not in [ReplyType.TEXT]:
- return
-
- reply = e_context["reply"]
- content = reply.content
- if self.reply_action == "ignore":
- f = self.searchr.FindFirst(content)
- if f:
- logger.info("[Banwords] %s in reply" % f["Keyword"])
- e_context["reply"] = None
- e_context.action = EventAction.BREAK_PASS
- return
- elif self.reply_action == "replace":
- if self.searchr.ContainsAny(content):
- reply = Reply(ReplyType.INFO, "已替换回复中的敏感词: \n" + self.searchr.Replace(content))
- e_context["reply"] = reply
- e_context.action = EventAction.CONTINUE
- return
-
- def get_help_text(self, **kwargs):
- return "过滤消息中的敏感词。"
|