You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

role.py 8.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. # encoding:utf-8
  2. import json
  3. import os
  4. import plugins
  5. from bridge.bridge import Bridge
  6. from bridge.context import ContextType
  7. from bridge.reply import Reply, ReplyType
  8. from common import const
  9. from common.log import logger
  10. from config import conf
  11. from plugins import *
  12. class RolePlay:
  13. def __init__(self, bot, sessionid, desc, wrapper=None):
  14. self.bot = bot
  15. self.sessionid = sessionid
  16. self.wrapper = wrapper or "%s" # 用于包装用户输入
  17. self.desc = desc
  18. self.bot.sessions.build_session(self.sessionid, system_prompt=self.desc)
  19. def reset(self):
  20. self.bot.sessions.clear_session(self.sessionid)
  21. def action(self, user_action):
  22. session = self.bot.sessions.build_session(self.sessionid)
  23. if session.system_prompt != self.desc: # 目前没有触发session过期事件,这里先简单判断,然后重置
  24. session.set_system_prompt(self.desc)
  25. prompt = self.wrapper % user_action
  26. return prompt
  27. @plugins.register(
  28. name="Role",
  29. desire_priority=0,
  30. namecn="角色扮演",
  31. desc="为你的Bot设置预设角色",
  32. version="1.0",
  33. author="lanvent",
  34. )
  35. class Role(Plugin):
  36. def __init__(self):
  37. super().__init__()
  38. curdir = os.path.dirname(__file__)
  39. config_path = os.path.join(curdir, "roles.json")
  40. try:
  41. with open(config_path, "r", encoding="utf-8") as f:
  42. config = json.load(f)
  43. self.tags = {tag: (desc, []) for tag, desc in config["tags"].items()}
  44. self.roles = {}
  45. for role in config["roles"]:
  46. self.roles[role["title"].lower()] = role
  47. for tag in role["tags"]:
  48. if tag not in self.tags:
  49. logger.warning(f"[Role] unknown tag {tag} ")
  50. self.tags[tag] = (tag, [])
  51. self.tags[tag][1].append(role)
  52. for tag in list(self.tags.keys()):
  53. if len(self.tags[tag][1]) == 0:
  54. logger.debug(f"[Role] no role found for tag {tag} ")
  55. del self.tags[tag]
  56. if len(self.roles) == 0:
  57. raise Exception("no role found")
  58. self.handlers[Event.ON_HANDLE_CONTEXT] = self.on_handle_context
  59. self.roleplays = {}
  60. logger.info("[Role] inited")
  61. except Exception as e:
  62. if isinstance(e, FileNotFoundError):
  63. logger.warn(f"[Role] init failed, {config_path} not found, ignore or see https://github.com/zhayujie/chatgpt-on-wechat/tree/master/plugins/role .")
  64. else:
  65. logger.warn("[Role] init failed, ignore or see https://github.com/zhayujie/chatgpt-on-wechat/tree/master/plugins/role .")
  66. raise e
  67. def get_role(self, name, find_closest=True, min_sim=0.35):
  68. name = name.lower()
  69. found_role = None
  70. if name in self.roles:
  71. found_role = name
  72. elif find_closest:
  73. import difflib
  74. def str_simularity(a, b):
  75. return difflib.SequenceMatcher(None, a, b).ratio()
  76. max_sim = min_sim
  77. max_role = None
  78. for role in self.roles:
  79. sim = str_simularity(name, role)
  80. if sim >= max_sim:
  81. max_sim = sim
  82. max_role = role
  83. found_role = max_role
  84. return found_role
  85. def on_handle_context(self, e_context: EventContext):
  86. if e_context["context"].type != ContextType.TEXT:
  87. return
  88. btype = Bridge().get_bot_type("chat")
  89. if btype not in [const.OPEN_AI, const.CHATGPT, const.CHATGPTONAZURE, const.LINKAI]:
  90. return
  91. bot = Bridge().get_bot("chat")
  92. content = e_context["context"].content[:]
  93. clist = e_context["context"].content.split(maxsplit=1)
  94. desckey = None
  95. customize = False
  96. sessionid = e_context["context"]["session_id"]
  97. trigger_prefix = conf().get("plugin_trigger_prefix", "$")
  98. if clist[0] == f"{trigger_prefix}停止扮演":
  99. if sessionid in self.roleplays:
  100. self.roleplays[sessionid].reset()
  101. del self.roleplays[sessionid]
  102. reply = Reply(ReplyType.INFO, "角色扮演结束!")
  103. e_context["reply"] = reply
  104. e_context.action = EventAction.BREAK_PASS
  105. return
  106. elif clist[0] == f"{trigger_prefix}角色":
  107. desckey = "descn"
  108. elif clist[0].lower() == f"{trigger_prefix}role":
  109. desckey = "description"
  110. elif clist[0] == f"{trigger_prefix}设定扮演":
  111. customize = True
  112. elif clist[0] == f"{trigger_prefix}角色类型":
  113. if len(clist) > 1:
  114. tag = clist[1].strip()
  115. help_text = "角色列表:\n"
  116. for key, value in self.tags.items():
  117. if value[0] == tag:
  118. tag = key
  119. break
  120. if tag == "所有":
  121. for role in self.roles.values():
  122. help_text += f"{role['title']}: {role['remark']}\n"
  123. elif tag in self.tags:
  124. for role in self.tags[tag][1]:
  125. help_text += f"{role['title']}: {role['remark']}\n"
  126. else:
  127. help_text = f"未知角色类型。\n"
  128. help_text += "目前的角色类型有: \n"
  129. help_text += ",".join([self.tags[tag][0] for tag in self.tags]) + "\n"
  130. else:
  131. help_text = f"请输入角色类型。\n"
  132. help_text += "目前的角色类型有: \n"
  133. help_text += ",".join([self.tags[tag][0] for tag in self.tags]) + "\n"
  134. reply = Reply(ReplyType.INFO, help_text)
  135. e_context["reply"] = reply
  136. e_context.action = EventAction.BREAK_PASS
  137. return
  138. elif sessionid not in self.roleplays:
  139. return
  140. logger.debug("[Role] on_handle_context. content: %s" % content)
  141. if desckey is not None:
  142. if len(clist) == 1 or (len(clist) > 1 and clist[1].lower() in ["help", "帮助"]):
  143. reply = Reply(ReplyType.INFO, self.get_help_text(verbose=True))
  144. e_context["reply"] = reply
  145. e_context.action = EventAction.BREAK_PASS
  146. return
  147. role = self.get_role(clist[1])
  148. if role is None:
  149. reply = Reply(ReplyType.ERROR, "角色不存在")
  150. e_context["reply"] = reply
  151. e_context.action = EventAction.BREAK_PASS
  152. return
  153. else:
  154. self.roleplays[sessionid] = RolePlay(
  155. bot,
  156. sessionid,
  157. self.roles[role][desckey],
  158. self.roles[role].get("wrapper", "%s"),
  159. )
  160. reply = Reply(ReplyType.INFO, f"预设角色为 {role}:\n" + self.roles[role][desckey])
  161. e_context["reply"] = reply
  162. e_context.action = EventAction.BREAK_PASS
  163. elif customize == True:
  164. self.roleplays[sessionid] = RolePlay(bot, sessionid, clist[1], "%s")
  165. reply = Reply(ReplyType.INFO, f"角色设定为:\n{clist[1]}")
  166. e_context["reply"] = reply
  167. e_context.action = EventAction.BREAK_PASS
  168. else:
  169. prompt = self.roleplays[sessionid].action(content)
  170. e_context["context"].type = ContextType.TEXT
  171. e_context["context"].content = prompt
  172. e_context.action = EventAction.BREAK
  173. def get_help_text(self, verbose=False, **kwargs):
  174. help_text = "让机器人扮演不同的角色。\n"
  175. if not verbose:
  176. return help_text
  177. trigger_prefix = conf().get("plugin_trigger_prefix", "$")
  178. help_text = f"使用方法:\n{trigger_prefix}角色" + " 预设角色名: 设定角色为{预设角色名}。\n" + f"{trigger_prefix}role" + " 预设角色名: 同上,但使用英文设定。\n"
  179. help_text += f"{trigger_prefix}设定扮演" + " 角色设定: 设定自定义角色人设为{角色设定}。\n"
  180. help_text += f"{trigger_prefix}停止扮演: 清除设定的角色。\n"
  181. help_text += f"{trigger_prefix}角色类型" + " 角色类型: 查看某类{角色类型}的所有预设角色,为所有时输出所有预设角色。\n"
  182. help_text += "\n目前的角色类型有: \n"
  183. help_text += ",".join([self.tags[tag][0] for tag in self.tags]) + "。\n"
  184. help_text += f"\n命令例子: \n{trigger_prefix}角色 写作助理\n"
  185. help_text += f"{trigger_prefix}角色类型 所有\n"
  186. help_text += f"{trigger_prefix}停止扮演\n"
  187. return help_text