101 lines
3.7KB

  1. # encoding:utf-8
  2. import json
  3. import os
  4. import plugins
  5. from bridge.context import ContextType
  6. from bridge.reply import Reply, ReplyType
  7. from common.log import logger
  8. from plugins import *
  9. from .lib.WordsSearch import WordsSearch
  10. @plugins.register(
  11. name="Banwords",
  12. desire_priority=100,
  13. hidden=True,
  14. desc="判断消息中是否有敏感词、决定是否回复。",
  15. version="1.0",
  16. author="lanvent",
  17. )
  18. class Banwords(Plugin):
  19. def __init__(self):
  20. super().__init__()
  21. try:
  22. # load config
  23. conf = super().load_config()
  24. curdir = os.path.dirname(__file__)
  25. if not conf:
  26. # 配置不存在则写入默认配置
  27. config_path = os.path.join(curdir, "config.json")
  28. if not os.path.exists(config_path):
  29. conf = {"action": "ignore"}
  30. with open(config_path, "w") as f:
  31. json.dump(conf, f, indent=4)
  32. self.searchr = WordsSearch()
  33. self.action = conf["action"]
  34. banwords_path = os.path.join(curdir, "banwords.txt")
  35. with open(banwords_path, "r", encoding="utf-8") as f:
  36. words = []
  37. for line in f:
  38. word = line.strip()
  39. if word:
  40. words.append(word)
  41. self.searchr.SetKeywords(words)
  42. self.handlers[Event.ON_HANDLE_CONTEXT] = self.on_handle_context
  43. if conf.get("reply_filter", True):
  44. self.handlers[Event.ON_DECORATE_REPLY] = self.on_decorate_reply
  45. self.reply_action = conf.get("reply_action", "ignore")
  46. logger.info("[Banwords] inited")
  47. except Exception as e:
  48. logger.warn("[Banwords] init failed, ignore or see https://github.com/zhayujie/chatgpt-on-wechat/tree/master/plugins/banwords .")
  49. raise e
  50. def on_handle_context(self, e_context: EventContext):
  51. if e_context["context"].type not in [
  52. ContextType.TEXT,
  53. ContextType.IMAGE_CREATE,
  54. ]:
  55. return
  56. content = e_context["context"].content
  57. logger.debug("[Banwords] on_handle_context. content: %s" % content)
  58. if self.action == "ignore":
  59. f = self.searchr.FindFirst(content)
  60. if f:
  61. logger.info("[Banwords] %s in message" % f["Keyword"])
  62. e_context.action = EventAction.BREAK_PASS
  63. return
  64. elif self.action == "replace":
  65. if self.searchr.ContainsAny(content):
  66. reply = Reply(ReplyType.INFO, "发言中包含敏感词,请重试: \n" + self.searchr.Replace(content))
  67. e_context["reply"] = reply
  68. e_context.action = EventAction.BREAK_PASS
  69. return
  70. def on_decorate_reply(self, e_context: EventContext):
  71. if e_context["reply"].type not in [ReplyType.TEXT]:
  72. return
  73. reply = e_context["reply"]
  74. content = reply.content
  75. if self.reply_action == "ignore":
  76. f = self.searchr.FindFirst(content)
  77. if f:
  78. logger.info("[Banwords] %s in reply" % f["Keyword"])
  79. e_context["reply"] = None
  80. e_context.action = EventAction.BREAK_PASS
  81. return
  82. elif self.reply_action == "replace":
  83. if self.searchr.ContainsAny(content):
  84. reply = Reply(ReplyType.INFO, "已替换回复中的敏感词: \n" + self.searchr.Replace(content))
  85. e_context["reply"] = reply
  86. e_context.action = EventAction.CONTINUE
  87. return
  88. def get_help_text(self, **kwargs):
  89. return "过滤消息中的敏感词。"