@@ -10,4 +10,5 @@ nohup.out | |||||
tmp | tmp | ||||
plugins.json | plugins.json | ||||
itchat.pkl | itchat.pkl | ||||
*.log | |||||
*.log | |||||
user_datas.pkl |
@@ -4,13 +4,22 @@ import os | |||||
from config import conf, load_config | from config import conf, load_config | ||||
from channel import channel_factory | from channel import channel_factory | ||||
from common.log import logger | from common.log import logger | ||||
from plugins import * | from plugins import * | ||||
import signal | |||||
import sys | |||||
def sigterm_handler(_signo, _stack_frame): | |||||
conf().save_user_datas() | |||||
sys.exit(0) | |||||
def run(): | def run(): | ||||
try: | try: | ||||
# load config | # load config | ||||
load_config() | load_config() | ||||
# ctrl + c | |||||
signal.signal(signal.SIGINT, sigterm_handler) | |||||
# kill signal | |||||
signal.signal(signal.SIGTERM, sigterm_handler) | |||||
# create channel | # create channel | ||||
channel_name=conf().get('channel_type', 'wx') | channel_name=conf().get('channel_type', 'wx') | ||||
@@ -19,7 +28,7 @@ def run(): | |||||
# os.environ['WECHATY_PUPPET_SERVICE_ENDPOINT'] = '127.0.0.1:9001' | # os.environ['WECHATY_PUPPET_SERVICE_ENDPOINT'] = '127.0.0.1:9001' | ||||
channel = channel_factory.create_channel(channel_name) | channel = channel_factory.create_channel(channel_name) | ||||
if channel_name in ['wx','wxy']: | |||||
if channel_name in ['wx','wxy','wechatmp']: | |||||
PluginManager().load_plugins() | PluginManager().load_plugins() | ||||
# startup channel | # startup channel | ||||
@@ -13,10 +13,12 @@ from common.expired_dict import ExpiredDict | |||||
import openai | import openai | ||||
import openai.error | import openai.error | ||||
import time | import time | ||||
# OpenAI对话模型API (可用) | # OpenAI对话模型API (可用) | ||||
class ChatGPTBot(Bot,OpenAIImage): | class ChatGPTBot(Bot,OpenAIImage): | ||||
def __init__(self): | def __init__(self): | ||||
super().__init__() | super().__init__() | ||||
# set the default api_key | |||||
openai.api_key = conf().get('open_ai_api_key') | openai.api_key = conf().get('open_ai_api_key') | ||||
if conf().get('open_ai_api_base'): | if conf().get('open_ai_api_base'): | ||||
openai.api_base = conf().get('open_ai_api_base') | openai.api_base = conf().get('open_ai_api_base') | ||||
@@ -33,6 +35,7 @@ class ChatGPTBot(Bot,OpenAIImage): | |||||
if context.type == ContextType.TEXT: | if context.type == ContextType.TEXT: | ||||
logger.info("[CHATGPT] query={}".format(query)) | logger.info("[CHATGPT] query={}".format(query)) | ||||
session_id = context['session_id'] | session_id = context['session_id'] | ||||
reply = None | reply = None | ||||
clear_memory_commands = conf().get('clear_memory_commands', ['#清除记忆']) | clear_memory_commands = conf().get('clear_memory_commands', ['#清除记忆']) | ||||
@@ -50,11 +53,13 @@ class ChatGPTBot(Bot,OpenAIImage): | |||||
session = self.sessions.session_query(query, session_id) | session = self.sessions.session_query(query, session_id) | ||||
logger.debug("[CHATGPT] session query={}".format(session.messages)) | logger.debug("[CHATGPT] session query={}".format(session.messages)) | ||||
api_key = context.get('openai_api_key') | |||||
# if context.get('stream'): | # if context.get('stream'): | ||||
# # reply in stream | # # reply in stream | ||||
# return self.reply_text_stream(query, new_query, session_id) | # return self.reply_text_stream(query, new_query, session_id) | ||||
reply_content = self.reply_text(session, session_id, 0) | |||||
reply_content = self.reply_text(session, session_id, api_key, 0) | |||||
logger.debug("[CHATGPT] new_query={}, session_id={}, reply_cont={}, completion_tokens={}".format(session.messages, session_id, reply_content["content"], reply_content["completion_tokens"])) | logger.debug("[CHATGPT] new_query={}, session_id={}, reply_cont={}, completion_tokens={}".format(session.messages, session_id, reply_content["content"], reply_content["completion_tokens"])) | ||||
if reply_content['completion_tokens'] == 0 and len(reply_content['content']) > 0: | if reply_content['completion_tokens'] == 0 and len(reply_content['content']) > 0: | ||||
reply = Reply(ReplyType.ERROR, reply_content['content']) | reply = Reply(ReplyType.ERROR, reply_content['content']) | ||||
@@ -89,7 +94,7 @@ class ChatGPTBot(Bot,OpenAIImage): | |||||
"request_timeout": conf().get('request_timeout', 30), # 请求超时时间 | "request_timeout": conf().get('request_timeout', 30), # 请求超时时间 | ||||
} | } | ||||
def reply_text(self, session:ChatGPTSession, session_id, retry_count=0) -> dict: | |||||
def reply_text(self, session:ChatGPTSession, session_id, api_key, retry_count=0) -> dict: | |||||
''' | ''' | ||||
call openai's ChatCompletion to get the answer | call openai's ChatCompletion to get the answer | ||||
:param session: a conversation session | :param session: a conversation session | ||||
@@ -100,8 +105,9 @@ class ChatGPTBot(Bot,OpenAIImage): | |||||
try: | try: | ||||
if conf().get('rate_limit_chatgpt') and not self.tb4chatgpt.get_token(): | if conf().get('rate_limit_chatgpt') and not self.tb4chatgpt.get_token(): | ||||
raise openai.error.RateLimitError("RateLimitError: rate limit exceeded") | raise openai.error.RateLimitError("RateLimitError: rate limit exceeded") | ||||
# if api_key == None, the default openai.api_key will be used | |||||
response = openai.ChatCompletion.create( | response = openai.ChatCompletion.create( | ||||
messages=session.messages, **self.compose_args() | |||||
api_key=api_key, messages=session.messages, **self.compose_args() | |||||
) | ) | ||||
# logger.info("[ChatGPT] reply={}, total_tokens={}".format(response.choices[0]['message']['content'], response["usage"]["total_tokens"])) | # logger.info("[ChatGPT] reply={}, total_tokens={}".format(response.choices[0]['message']['content'], response["usage"]["total_tokens"])) | ||||
return {"total_tokens": response["usage"]["total_tokens"], | return {"total_tokens": response["usage"]["total_tokens"], | ||||
@@ -131,7 +137,7 @@ class ChatGPTBot(Bot,OpenAIImage): | |||||
if need_retry: | if need_retry: | ||||
logger.warn("[CHATGPT] 第{}次重试".format(retry_count+1)) | logger.warn("[CHATGPT] 第{}次重试".format(retry_count+1)) | ||||
return self.reply_text(session, session_id, retry_count+1) | |||||
return self.reply_text(session, session_id, api_key, retry_count+1) | |||||
else: | else: | ||||
return result | return result | ||||
@@ -17,4 +17,7 @@ def create_channel(channel_type): | |||||
elif channel_type == 'terminal': | elif channel_type == 'terminal': | ||||
from channel.terminal.terminal_channel import TerminalChannel | from channel.terminal.terminal_channel import TerminalChannel | ||||
return TerminalChannel() | return TerminalChannel() | ||||
elif channel_type == 'wechatmp': | |||||
from channel.wechatmp.wechatmp_channel import WechatMPServer | |||||
return WechatMPServer() | |||||
raise RuntimeError | raise RuntimeError |
@@ -0,0 +1,34 @@ | |||||
# 个人微信公众号channel | |||||
鉴于个人微信号在服务器上通过itchat登录有封号风险,这里新增了个人微信公众号channel,提供无风险的服务。 | |||||
但是由于个人微信公众号的众多接口限制,目前支持的功能有限,实现简陋,提供了一个最基本的文本对话服务,支持加载插件,优化了命令格式,支持私有api_key。暂未实现图片输入输出、语音输入输出等交互形式。 | |||||
如有公众号是企业主体且可以通过微信认证,即可获得更多接口,解除大多数限制。欢迎大家提供更多的支持。 | |||||
## 使用方法 | |||||
在开始部署前,你需要一个拥有公网IP的服务器,以提供微信服务器和我们自己服务器的连接。或者你需要进行内网穿透,否则微信服务器无法将消息发送给我们的服务器。 | |||||
此外,需要在我们的服务器上安装python的web框架web.py。 | |||||
以ubuntu为例(在ubuntu 22.04上测试): | |||||
``` | |||||
pip3 install web.py | |||||
``` | |||||
然后在[微信公众平台](https://mp.weixin.qq.com)注册一个自己的公众号,类型选择订阅号,主体为个人即可。 | |||||
然后根据[接入指南](https://developers.weixin.qq.com/doc/offiaccount/Basic_Information/Access_Overview.html)的说明,在[微信公众平台](https://mp.weixin.qq.com)的“设置与开发”-“基本配置”-“服务器配置”中填写服务器地址`URL`和令牌`Token`。这里的`URL`是`example.com/wx`的形式,不可以使用IP,`Token`是你自己编的一个特定的令牌。消息加解密方式目前选择的是明文模式。 | |||||
相关的服务器验证代码已经写好,你不需要再添加任何代码。你只需要在本项目根目录的`config.json`中添加`"channel_type": "wechatmp", "wechatmp_token": "your Token", ` 然后运行`python3 app.py`启动web服务器,然后在刚才的“服务器配置”中点击`提交`即可验证你的服务器。 | |||||
随后在[微信公众平台](https://mp.weixin.qq.com)启用服务器,关闭手动填写规则的自动回复,即可实现ChatGPT的自动回复。 | |||||
## 个人微信公众号的限制 | |||||
由于目前测试的公众号不是企业主体,所以没有客服接口,因此公众号无法主动发出消息,只能被动回复。而微信官方对被动回复有5秒的时间限制,最多重试2次,因此最多只有15秒的自动回复时间窗口。因此如果问题比较复杂或者我们的服务器比较忙,ChatGPT的回答就没办法及时回复给用户。为了解决这个问题,这里做了回答缓存,它需要你在回复超时后,再次主动发送任意文字(例如1)来尝试拿到回答缓存。为了优化使用体验,目前设置了两分钟(120秒)的timeout,用户在至多两分钟后即可得到查询到回复或者错误原因。 | |||||
另外,由于微信官方的限制,自动回复有长度限制。因此这里将ChatGPT的回答拆分,分成每段600字回复(限制大约在700字)。 | |||||
## 私有api_key | |||||
公共api有访问频率限制(免费账号每分钟最多20次ChatGPT的API调用),这在服务多人的时候会遇到问题。因此这里多加了一个设置私有api_key的功能。目前通过godcmd插件的命令来设置私有api_key。 | |||||
## 测试范围 | |||||
目前在`RoboStyle`这个公众号上进行了测试,感兴趣的可以关注并体验。开启了godcmd, Banwords, role, dungeon, finish这五个插件,其他的插件还没有测试。百度的接口暂未测试。语音对话没有测试。图片直接以链接形式回复(没有临时素材上传接口的权限)。 |
@@ -0,0 +1,47 @@ | |||||
# -*- coding: utf-8 -*-# | |||||
# filename: receive.py | |||||
import xml.etree.ElementTree as ET | |||||
def parse_xml(web_data): | |||||
if len(web_data) == 0: | |||||
return None | |||||
xmlData = ET.fromstring(web_data) | |||||
msg_type = xmlData.find('MsgType').text | |||||
if msg_type == 'text': | |||||
return TextMsg(xmlData) | |||||
elif msg_type == 'image': | |||||
return ImageMsg(xmlData) | |||||
elif msg_type == 'event': | |||||
return Event(xmlData) | |||||
class Msg(object): | |||||
def __init__(self, xmlData): | |||||
self.ToUserName = xmlData.find('ToUserName').text | |||||
self.FromUserName = xmlData.find('FromUserName').text | |||||
self.CreateTime = xmlData.find('CreateTime').text | |||||
self.MsgType = xmlData.find('MsgType').text | |||||
self.MsgId = xmlData.find('MsgId').text | |||||
class TextMsg(Msg): | |||||
def __init__(self, xmlData): | |||||
Msg.__init__(self, xmlData) | |||||
self.Content = xmlData.find('Content').text.encode("utf-8") | |||||
class ImageMsg(Msg): | |||||
def __init__(self, xmlData): | |||||
Msg.__init__(self, xmlData) | |||||
self.PicUrl = xmlData.find('PicUrl').text | |||||
self.MediaId = xmlData.find('MediaId').text | |||||
class Event(object): | |||||
def __init__(self, xmlData): | |||||
self.ToUserName = xmlData.find('ToUserName').text | |||||
self.FromUserName = xmlData.find('FromUserName').text | |||||
self.CreateTime = xmlData.find('CreateTime').text | |||||
self.MsgType = xmlData.find('MsgType').text | |||||
self.Event = xmlData.find('Event').text |
@@ -0,0 +1,52 @@ | |||||
# -*- coding: utf-8 -*-# | |||||
# filename: reply.py | |||||
import time | |||||
class Msg(object): | |||||
def __init__(self): | |||||
pass | |||||
def send(self): | |||||
return "success" | |||||
class TextMsg(Msg): | |||||
def __init__(self, toUserName, fromUserName, content): | |||||
self.__dict = dict() | |||||
self.__dict['ToUserName'] = toUserName | |||||
self.__dict['FromUserName'] = fromUserName | |||||
self.__dict['CreateTime'] = int(time.time()) | |||||
self.__dict['Content'] = content | |||||
def send(self): | |||||
XmlForm = """ | |||||
<xml> | |||||
<ToUserName><![CDATA[{ToUserName}]]></ToUserName> | |||||
<FromUserName><![CDATA[{FromUserName}]]></FromUserName> | |||||
<CreateTime>{CreateTime}</CreateTime> | |||||
<MsgType><![CDATA[text]]></MsgType> | |||||
<Content><![CDATA[{Content}]]></Content> | |||||
</xml> | |||||
""" | |||||
return XmlForm.format(**self.__dict) | |||||
class ImageMsg(Msg): | |||||
def __init__(self, toUserName, fromUserName, mediaId): | |||||
self.__dict = dict() | |||||
self.__dict['ToUserName'] = toUserName | |||||
self.__dict['FromUserName'] = fromUserName | |||||
self.__dict['CreateTime'] = int(time.time()) | |||||
self.__dict['MediaId'] = mediaId | |||||
def send(self): | |||||
XmlForm = """ | |||||
<xml> | |||||
<ToUserName><![CDATA[{ToUserName}]]></ToUserName> | |||||
<FromUserName><![CDATA[{FromUserName}]]></FromUserName> | |||||
<CreateTime>{CreateTime}</CreateTime> | |||||
<MsgType><![CDATA[image]]></MsgType> | |||||
<Image> | |||||
<MediaId><![CDATA[{MediaId}]]></MediaId> | |||||
</Image> | |||||
</xml> | |||||
""" | |||||
return XmlForm.format(**self.__dict) |
@@ -0,0 +1,302 @@ | |||||
# -*- coding: utf-8 -*- | |||||
import web | |||||
import time | |||||
import math | |||||
import hashlib | |||||
import textwrap | |||||
from channel.channel import Channel | |||||
import channel.wechatmp.reply as reply | |||||
import channel.wechatmp.receive as receive | |||||
from common.log import logger | |||||
from config import conf | |||||
from bridge.reply import * | |||||
from bridge.context import * | |||||
from plugins import * | |||||
import traceback | |||||
# If using SSL, uncomment the following lines, and modify the certificate path. | |||||
# from cheroot.server import HTTPServer | |||||
# from cheroot.ssl.builtin import BuiltinSSLAdapter | |||||
# HTTPServer.ssl_adapter = BuiltinSSLAdapter( | |||||
# certificate='/ssl/cert.pem', | |||||
# private_key='/ssl/cert.key') | |||||
class WechatMPServer(): | |||||
def __init__(self): | |||||
pass | |||||
def startup(self): | |||||
urls = ( | |||||
'/wx', 'WechatMPChannel', | |||||
) | |||||
app = web.application(urls, globals()) | |||||
web.httpserver.runsimple(app.wsgifunc(), ('0.0.0.0', 80)) | |||||
cache_dict = dict() | |||||
query1 = dict() | |||||
query2 = dict() | |||||
query3 = dict() | |||||
from concurrent.futures import ThreadPoolExecutor | |||||
thread_pool = ThreadPoolExecutor(max_workers=8) | |||||
class WechatMPChannel(Channel): | |||||
def GET(self): | |||||
try: | |||||
data = web.input() | |||||
if len(data) == 0: | |||||
return "hello, this is handle view" | |||||
signature = data.signature | |||||
timestamp = data.timestamp | |||||
nonce = data.nonce | |||||
echostr = data.echostr | |||||
token = conf().get('wechatmp_token') #请按照公众平台官网\基本配置中信息填写 | |||||
data_list = [token, timestamp, nonce] | |||||
data_list.sort() | |||||
sha1 = hashlib.sha1() | |||||
# map(sha1.update, data_list) #python2 | |||||
sha1.update("".join(data_list).encode('utf-8')) | |||||
hashcode = sha1.hexdigest() | |||||
print("handle/GET func: hashcode, signature: ", hashcode, signature) | |||||
if hashcode == signature: | |||||
return echostr | |||||
else: | |||||
return "" | |||||
except Exception as Argument: | |||||
return Argument | |||||
def _do_build_reply(self, cache_key, fromUser, message): | |||||
context = dict() | |||||
context['session_id'] = fromUser | |||||
reply_text = super().build_reply_content(message, context) | |||||
# The query is done, record the cache | |||||
logger.info("[threaded] Get reply for {}: {} \nA: {}".format(fromUser, message, reply_text)) | |||||
global cache_dict | |||||
reply_cnt = math.ceil(len(reply_text) / 600) | |||||
cache_dict[cache_key] = (reply_cnt, reply_text) | |||||
def send(self, reply : Reply, cache_key): | |||||
global cache_dict | |||||
reply_cnt = math.ceil(len(reply.content) / 600) | |||||
cache_dict[cache_key] = (reply_cnt, reply.content) | |||||
def handle(self, context): | |||||
global cache_dict | |||||
try: | |||||
reply = Reply() | |||||
logger.debug('[wechatmp] ready to handle context: {}'.format(context)) | |||||
# reply的构建步骤 | |||||
e_context = PluginManager().emit_event(EventContext(Event.ON_HANDLE_CONTEXT, {'channel' : self, 'context': context, 'reply': reply})) | |||||
reply = e_context['reply'] | |||||
if not e_context.is_pass(): | |||||
logger.debug('[wechatmp] ready to handle context: type={}, content={}'.format(context.type, context.content)) | |||||
if context.type == ContextType.TEXT or context.type == ContextType.IMAGE_CREATE: | |||||
reply = super().build_reply_content(context.content, context) | |||||
# elif context.type == ContextType.VOICE: | |||||
# msg = context['msg'] | |||||
# file_name = TmpDir().path() + context.content | |||||
# msg.download(file_name) | |||||
# reply = super().build_voice_to_text(file_name) | |||||
# if reply.type != ReplyType.ERROR and reply.type != ReplyType.INFO: | |||||
# context.content = reply.content # 语音转文字后,将文字内容作为新的context | |||||
# context.type = ContextType.TEXT | |||||
# reply = super().build_reply_content(context.content, context) | |||||
# if reply.type == ReplyType.TEXT: | |||||
# if conf().get('voice_reply_voice'): | |||||
# reply = super().build_text_to_voice(reply.content) | |||||
else: | |||||
logger.error('[wechatmp] unknown context type: {}'.format(context.type)) | |||||
return | |||||
logger.debug('[wechatmp] ready to decorate reply: {}'.format(reply)) | |||||
# reply的包装步骤 | |||||
if reply and reply.type: | |||||
e_context = PluginManager().emit_event(EventContext(Event.ON_DECORATE_REPLY, {'channel' : self, 'context': context, 'reply': reply})) | |||||
reply=e_context['reply'] | |||||
if not e_context.is_pass() and reply and reply.type: | |||||
if reply.type == ReplyType.TEXT: | |||||
pass | |||||
elif reply.type == ReplyType.ERROR or reply.type == ReplyType.INFO: | |||||
reply.content = str(reply.type)+":\n" + reply.content | |||||
elif reply.type == ReplyType.IMAGE_URL or reply.type == ReplyType.VOICE or reply.type == ReplyType.IMAGE: | |||||
pass | |||||
else: | |||||
logger.error('[wechatmp] unknown reply type: {}'.format(reply.type)) | |||||
return | |||||
# reply的发送步骤 | |||||
if reply and reply.type: | |||||
e_context = PluginManager().emit_event(EventContext(Event.ON_SEND_REPLY, {'channel' : self, 'context': context, 'reply': reply})) | |||||
reply=e_context['reply'] | |||||
if not e_context.is_pass() and reply and reply.type: | |||||
logger.debug('[wechatmp] ready to send reply: {} to {}'.format(reply, context['receiver'])) | |||||
self.send(reply, context['receiver']) | |||||
else: | |||||
cache_dict[context['receiver']] = (1, "No reply") | |||||
logger.info("[threaded] Get reply for {}: {} \nA: {}".format(context['receiver'], context.content, reply.content)) | |||||
except Exception as exc: | |||||
print(traceback.format_exc()) | |||||
cache_dict[context['receiver']] = (1, "ERROR") | |||||
def POST(self): | |||||
try: | |||||
queryTime = time.time() | |||||
webData = web.data() | |||||
# logger.debug("[wechatmp] Receive request:\n" + webData.decode("utf-8")) | |||||
recMsg = receive.parse_xml(webData) | |||||
if isinstance(recMsg, receive.Msg) and recMsg.MsgType == 'text': | |||||
fromUser = recMsg.FromUserName | |||||
toUser = recMsg.ToUserName | |||||
createTime = recMsg.CreateTime | |||||
message = recMsg.Content.decode("utf-8") | |||||
message_id = recMsg.MsgId | |||||
logger.info("[wechatmp] {}:{} Receive post query {} {}: {}".format(web.ctx.env.get('REMOTE_ADDR'), web.ctx.env.get('REMOTE_PORT'), fromUser, message_id, message)) | |||||
global cache_dict | |||||
global query1 | |||||
global query2 | |||||
global query3 | |||||
cache_key = fromUser | |||||
cache = cache_dict.get(cache_key) | |||||
reply_text = "" | |||||
# New request | |||||
if cache == None: | |||||
# The first query begin, reset the cache | |||||
cache_dict[cache_key] = (0, "") | |||||
# thread_pool.submit(self._do_build_reply, cache_key, fromUser, message) | |||||
context = Context() | |||||
context.kwargs = {'isgroup': False, 'receiver': fromUser, 'session_id': fromUser} | |||||
user_data = conf().get_user_data(fromUser) | |||||
context['openai_api_key'] = user_data.get('openai_api_key') # None or user openai_api_key | |||||
img_match_prefix = check_prefix(message, conf().get('image_create_prefix')) | |||||
if img_match_prefix: | |||||
message = message.replace(img_match_prefix, '', 1).strip() | |||||
context.type = ContextType.IMAGE_CREATE | |||||
else: | |||||
context.type = ContextType.TEXT | |||||
context.content = message | |||||
thread_pool.submit(self.handle, context) | |||||
query1[cache_key] = False | |||||
query2[cache_key] = False | |||||
query3[cache_key] = False | |||||
# Request again | |||||
elif cache[0] == 0 and query1.get(cache_key) == True and query2.get(cache_key) == True and query3.get(cache_key) == True: | |||||
query1[cache_key] = False #To improve waiting experience, this can be set to True. | |||||
query2[cache_key] = False #To improve waiting experience, this can be set to True. | |||||
query3[cache_key] = False | |||||
elif cache[0] >= 1: | |||||
# Skip the waiting phase | |||||
query1[cache_key] = True | |||||
query2[cache_key] = True | |||||
query3[cache_key] = True | |||||
cache = cache_dict.get(cache_key) | |||||
if query1.get(cache_key) == False: | |||||
# The first query from wechat official server | |||||
logger.debug("[wechatmp] query1 {}".format(cache_key)) | |||||
query1[cache_key] = True | |||||
cnt = 0 | |||||
while cache[0] == 0 and cnt < 45: | |||||
cnt = cnt + 1 | |||||
time.sleep(0.1) | |||||
cache = cache_dict.get(cache_key) | |||||
if cnt == 45: | |||||
# waiting for timeout (the POST query will be closed by wechat official server) | |||||
time.sleep(5) | |||||
# and do nothing | |||||
return | |||||
else: | |||||
pass | |||||
elif query2.get(cache_key) == False: | |||||
# The second query from wechat official server | |||||
logger.debug("[wechatmp] query2 {}".format(cache_key)) | |||||
query2[cache_key] = True | |||||
cnt = 0 | |||||
while cache[0] == 0 and cnt < 45: | |||||
cnt = cnt + 1 | |||||
time.sleep(0.1) | |||||
cache = cache_dict.get(cache_key) | |||||
if cnt == 45: | |||||
# waiting for timeout (the POST query will be closed by wechat official server) | |||||
time.sleep(5) | |||||
# and do nothing | |||||
return | |||||
else: | |||||
pass | |||||
elif query3.get(cache_key) == False: | |||||
# The third query from wechat official server | |||||
logger.debug("[wechatmp] query3 {}".format(cache_key)) | |||||
query3[cache_key] = True | |||||
cnt = 0 | |||||
while cache[0] == 0 and cnt < 45: | |||||
cnt = cnt + 1 | |||||
time.sleep(0.1) | |||||
cache = cache_dict.get(cache_key) | |||||
if cnt == 45: | |||||
# Have waiting for 3x5 seconds | |||||
# return timeout message | |||||
reply_text = "【正在响应中,回复任意文字尝试获取回复】" | |||||
logger.info("[wechatmp] Three queries has finished For {}: {}".format(fromUser, message_id)) | |||||
replyPost = reply.TextMsg(fromUser, toUser, reply_text).send() | |||||
return replyPost | |||||
else: | |||||
pass | |||||
if float(time.time()) - float(queryTime) > 4.8: | |||||
logger.info("[wechatmp] Timeout for {} {}".format(fromUser, message_id)) | |||||
return | |||||
if cache[0] > 1: | |||||
reply_text = cache[1][:600] + "\n【未完待续,回复任意文字以继续】" #wechatmp auto_reply length limit | |||||
cache_dict[cache_key] = (cache[0] - 1, cache[1][600:]) | |||||
elif cache[0] == 1: | |||||
reply_text = cache[1] | |||||
cache_dict.pop(cache_key) | |||||
logger.info("[wechatmp] {}:{} Do send {}".format(web.ctx.env.get('REMOTE_ADDR'), web.ctx.env.get('REMOTE_PORT'), reply_text)) | |||||
replyPost = reply.TextMsg(fromUser, toUser, reply_text).send() | |||||
return replyPost | |||||
elif isinstance(recMsg, receive.Event) and recMsg.MsgType == 'event': | |||||
logger.info("[wechatmp] Event {} from {}".format(recMsg.Event, recMsg.FromUserName)) | |||||
content = textwrap.dedent("""\ | |||||
感谢您的关注! | |||||
这里是ChatGPT,可以自由对话。 | |||||
资源有限,回复较慢,请勿着急。 | |||||
支持通用表情输入。 | |||||
暂时不支持图片输入。 | |||||
支持图片输出,画字开头的问题将回复图片链接。 | |||||
支持角色扮演和文字冒险两种定制模式对话。 | |||||
输入'#帮助' 查看详细指令。""") | |||||
replyMsg = reply.TextMsg(recMsg.FromUserName, recMsg.ToUserName, content) | |||||
return replyMsg.send() | |||||
else: | |||||
logger.info("暂且不处理") | |||||
return "success" | |||||
except Exception as exc: | |||||
logger.exception(exc) | |||||
return exc | |||||
def check_prefix(content, prefix_list): | |||||
for prefix in prefix_list: | |||||
if content.startswith(prefix): | |||||
return prefix | |||||
return None |
@@ -4,6 +4,7 @@ import json | |||||
import logging | import logging | ||||
import os | import os | ||||
from common.log import logger | from common.log import logger | ||||
import pickle | |||||
# 将所有可用的配置项写在字典里, 请使用小写字母 | # 将所有可用的配置项写在字典里, 请使用小写字母 | ||||
available_setting = { | available_setting = { | ||||
@@ -76,11 +77,14 @@ available_setting = { | |||||
# wechaty的配置 | # wechaty的配置 | ||||
"wechaty_puppet_service_token": "", # wechaty的token | "wechaty_puppet_service_token": "", # wechaty的token | ||||
# wechatmp的配置 | |||||
"wechatmp_token": "", # 微信公众平台的Token | |||||
# chatgpt指令自定义触发词 | # chatgpt指令自定义触发词 | ||||
"clear_memory_commands": ['#清除记忆'], # 重置会话指令,必须以#开头 | "clear_memory_commands": ['#清除记忆'], # 重置会话指令,必须以#开头 | ||||
# channel配置 | # channel配置 | ||||
"channel_type": "wx", # 通道类型,支持wx,wxy和terminal | |||||
"channel_type": "wx", # 通道类型,支持:{wx,wxy,terminal,wechatmp} | |||||
"debug": False, # 是否开启debug模式,开启后会打印更多日志 | "debug": False, # 是否开启debug模式,开启后会打印更多日志 | ||||
@@ -88,6 +92,11 @@ available_setting = { | |||||
class Config(dict): | class Config(dict): | ||||
def __init__(self, d:dict={}): | |||||
super().__init__(d) | |||||
# user_datas: 用户数据,key为用户名,value为用户数据,也是dict | |||||
self.user_datas = {} | |||||
def __getitem__(self, key): | def __getitem__(self, key): | ||||
if key not in available_setting: | if key not in available_setting: | ||||
raise Exception("key {} not in available_setting".format(key)) | raise Exception("key {} not in available_setting".format(key)) | ||||
@@ -106,6 +115,30 @@ class Config(dict): | |||||
except Exception as e: | except Exception as e: | ||||
raise e | raise e | ||||
# Make sure to return a dictionary to ensure atomic | |||||
def get_user_data(self, user) -> dict: | |||||
if self.user_datas.get(user) is None: | |||||
self.user_datas[user] = {} | |||||
return self.user_datas[user] | |||||
def load_user_datas(self): | |||||
try: | |||||
with open('user_datas.pkl', 'rb') as f: | |||||
self.user_datas = pickle.load(f) | |||||
logger.info("[Config] User datas loaded.") | |||||
except FileNotFoundError as e: | |||||
logger.info("[Config] User datas file not found, ignore.") | |||||
except Exception as e: | |||||
logger.info("[Config] User datas error: {}".format(e)) | |||||
self.user_datas = {} | |||||
def save_user_datas(self): | |||||
try: | |||||
with open('user_datas.pkl', 'wb') as f: | |||||
pickle.dump(self.user_datas, f) | |||||
logger.info("[Config] User datas saved.") | |||||
except Exception as e: | |||||
logger.info("[Config] User datas error: {}".format(e)) | |||||
config = Config() | config = Config() | ||||
@@ -146,6 +179,7 @@ def load_config(): | |||||
logger.info("[INIT] load config: {}".format(config)) | logger.info("[INIT] load config: {}".format(config)) | ||||
config.load_user_datas() | |||||
def get_root(): | def get_root(): | ||||
return os.path.dirname(os.path.abspath(__file__)) | return os.path.dirname(os.path.abspath(__file__)) | ||||
@@ -10,7 +10,7 @@ from common.log import logger | |||||
from .WordsSearch import WordsSearch | from .WordsSearch import WordsSearch | ||||
@plugins.register(name="Banwords", desc="判断消息中是否有敏感词、决定是否回复。", version="1.0", author="lanvent", desire_priority= 100) | |||||
@plugins.register(name="Banwords", desire_priority=100, hidden=True, desc="判断消息中是否有敏感词、决定是否回复。", version="1.0", author="lanvent") | |||||
class Banwords(Plugin): | class Banwords(Plugin): | ||||
def __init__(self): | def __init__(self): | ||||
super().__init__() | super().__init__() | ||||
@@ -16,7 +16,7 @@ from uuid import getnode as get_mac | |||||
""" | """ | ||||
@plugins.register(name="BDunit", desc="Baidu unit bot system", version="0.1", author="jackson", desire_priority=0) | |||||
@plugins.register(name="BDunit", desire_priority=0, hidden=True, desc="Baidu unit bot system", version="0.1", author="jackson") | |||||
class BDunit(Plugin): | class BDunit(Plugin): | ||||
def __init__(self): | def __init__(self): | ||||
super().__init__() | super().__init__() | ||||
@@ -27,15 +27,15 @@ class StoryTeller(): | |||||
if user_action[-1] != "。": | if user_action[-1] != "。": | ||||
user_action = user_action + "。" | user_action = user_action + "。" | ||||
if self.first_interact: | if self.first_interact: | ||||
prompt = """现在来充当一个冒险文字游戏,描述时候注意节奏,不要太快,仔细描述各个人物的心情和周边环境。一次只需写四到六句话。 | |||||
prompt = """现在来充当一个文字冒险游戏,描述时候注意节奏,不要太快,仔细描述各个人物的心情和周边环境。一次只需写四到六句话。 | |||||
开头是,""" + self.story + " " + user_action | 开头是,""" + self.story + " " + user_action | ||||
self.first_interact = False | self.first_interact = False | ||||
else: | else: | ||||
prompt = """继续,一次只需要续写四到六句话,总共就只讲5分钟内发生的事情。""" + user_action | prompt = """继续,一次只需要续写四到六句话,总共就只讲5分钟内发生的事情。""" + user_action | ||||
return prompt | return prompt | ||||
@plugins.register(name="Dungeon", desc="A plugin to play dungeon game", version="1.0", author="lanvent", desire_priority= 0) | |||||
@plugins.register(name="Dungeon", desire_priority=0, namecn="文字冒险", desc="A plugin to play dungeon game", version="1.0", author="lanvent") | |||||
class Dungeon(Plugin): | class Dungeon(Plugin): | ||||
def __init__(self): | def __init__(self): | ||||
super().__init__() | super().__init__() | ||||
@@ -82,5 +82,10 @@ class Dungeon(Plugin): | |||||
e_context['context'].content = prompt | e_context['context'].content = prompt | ||||
e_context.action = EventAction.BREAK # 事件结束,不跳过处理context的默认逻辑 | e_context.action = EventAction.BREAK # 事件结束,不跳过处理context的默认逻辑 | ||||
def get_help_text(self, **kwargs): | def get_help_text(self, **kwargs): | ||||
help_text = "输入\"$开始冒险 {背景故事}\"来以{背景故事}开始一个地牢游戏,之后你的所有消息会帮助我来完善这个故事。输入\"$停止冒险 \"可以结束游戏。" | |||||
help_text = "可以和机器人一起玩文字冒险游戏。\n" | |||||
if kwargs.get('verbose') != True: | |||||
return help_text | |||||
help_text = "$开始冒险 {背景故事}: 开始一个基于{背景故事}的文字冒险,之后你的所有消息会协助完善这个故事。\n$停止冒险: 结束游戏。\n" | |||||
if kwargs.get('verbose') == True: | |||||
help_text += "\n命令例子: '$开始冒险 你在树林里冒险,指不定会从哪里蹦出来一些奇怪的东西,你握紧手上的手枪,希望这次冒险能够找到一些值钱的东西,你往树林深处走去。'" | |||||
return help_text | return help_text |
@@ -0,0 +1,32 @@ | |||||
# encoding:utf-8 | |||||
from bridge.context import ContextType | |||||
from bridge.reply import Reply, ReplyType | |||||
import plugins | |||||
from plugins import * | |||||
from common.log import logger | |||||
@plugins.register(name="Finish", desire_priority=-999, hidden=True, desc="A plugin that check unknown command", version="1.0", author="js00000") | |||||
class Finish(Plugin): | |||||
def __init__(self): | |||||
super().__init__() | |||||
self.handlers[Event.ON_HANDLE_CONTEXT] = self.on_handle_context | |||||
logger.info("[Finish] inited") | |||||
def on_handle_context(self, e_context: EventContext): | |||||
if e_context['context'].type != ContextType.TEXT: | |||||
return | |||||
content = e_context['context'].content | |||||
logger.debug("[Finish] on_handle_context. content: %s" % content) | |||||
if content.startswith("$"): | |||||
reply = Reply() | |||||
reply.type = ReplyType.ERROR | |||||
reply.content = "未知插件命令\n查看插件命令列表请输入#help {插件名}\n" | |||||
e_context['reply'] = reply | |||||
e_context.action = EventAction.BREAK_PASS # 事件结束,并跳过处理context的默认逻辑 | |||||
def get_help_text(self, **kwargs): | |||||
return "" |
@@ -12,23 +12,31 @@ import plugins | |||||
from plugins import * | from plugins import * | ||||
from common import const | from common import const | ||||
from common.log import logger | from common.log import logger | ||||
# 定义指令集 | # 定义指令集 | ||||
COMMANDS = { | COMMANDS = { | ||||
"help": { | "help": { | ||||
"alias": ["help", "帮助"], | "alias": ["help", "帮助"], | ||||
"desc": "打印指令集合", | |||||
"desc": "回复此帮助", | |||||
}, | }, | ||||
"helpp": { | "helpp": { | ||||
"alias": ["helpp", "插件帮助"], | |||||
"alias": ["help", "帮助"], # 与help指令共用别名,根据参数数量区分 | |||||
"args": ["插件名"], | "args": ["插件名"], | ||||
"desc": "打印插件的帮助信息", | |||||
"desc": "回复指定插件的详细帮助", | |||||
}, | }, | ||||
"auth": { | "auth": { | ||||
"alias": ["auth", "认证"], | "alias": ["auth", "认证"], | ||||
"args": ["口令"], | "args": ["口令"], | ||||
"desc": "管理员认证", | "desc": "管理员认证", | ||||
}, | }, | ||||
"set_openai_api_key": { | |||||
"alias": ["set_openai_api_key"], | |||||
"args": ["api_key"], | |||||
"desc": "设置你的OpenAI私有api_key", | |||||
}, | |||||
"reset_openai_api_key": { | |||||
"alias": ["reset_openai_api_key"], | |||||
"desc": "重置为默认的api_key", | |||||
}, | |||||
# "id": { | # "id": { | ||||
# "alias": ["id", "用户"], | # "alias": ["id", "用户"], | ||||
# "desc": "获取用户id", #目前无实际意义 | # "desc": "获取用户id", #目前无实际意义 | ||||
@@ -91,26 +99,35 @@ ADMIN_COMMANDS = { | |||||
} | } | ||||
# 定义帮助函数 | # 定义帮助函数 | ||||
def get_help_text(isadmin, isgroup): | def get_help_text(isadmin, isgroup): | ||||
help_text = "可用指令:\n" | |||||
help_text = "通用指令:\n" | |||||
for cmd, info in COMMANDS.items(): | for cmd, info in COMMANDS.items(): | ||||
if cmd=="auth" and (isadmin or isgroup): # 群聊不可认证 | |||||
if cmd=="auth": #不提示认证指令 | |||||
continue | continue | ||||
alias=["#"+a for a in info['alias']] | alias=["#"+a for a in info['alias']] | ||||
help_text += f"{','.join(alias)} " | help_text += f"{','.join(alias)} " | ||||
if 'args' in info: | if 'args' in info: | ||||
args=["{"+a+"}" for a in info['args']] | args=["{"+a+"}" for a in info['args']] | ||||
help_text += f"{' '.join(args)} " | help_text += f"{' '.join(args)} " | ||||
help_text += f": {info['desc']}\n" | help_text += f": {info['desc']}\n" | ||||
# 插件指令 | |||||
plugins = PluginManager().list_plugins() | |||||
help_text += "\n目前可用插件有:" | |||||
for plugin in plugins: | |||||
if plugins[plugin].enabled and not plugins[plugin].hidden: | |||||
namecn = plugins[plugin].namecn | |||||
help_text += "\n%s:"%namecn | |||||
help_text += PluginManager().instances[plugin].get_help_text(verbose=False).strip() | |||||
if ADMIN_COMMANDS and isadmin: | if ADMIN_COMMANDS and isadmin: | ||||
help_text += "\n管理员指令:\n" | |||||
help_text += "\n\n管理员指令:\n" | |||||
for cmd, info in ADMIN_COMMANDS.items(): | for cmd, info in ADMIN_COMMANDS.items(): | ||||
alias=["#"+a for a in info['alias']] | alias=["#"+a for a in info['alias']] | ||||
help_text += f"{','.join(alias)} " | help_text += f"{','.join(alias)} " | ||||
help_text += f": {info['desc']}\n" | help_text += f": {info['desc']}\n" | ||||
return help_text | return help_text | ||||
@plugins.register(name="Godcmd", desc="为你的机器人添加指令集,有用户和管理员两种角色,加载顺序请放在首位,初次运行后插件目录会生成配置文件, 填充管理员密码后即可认证", version="1.0", author="lanvent", desire_priority= 999) | |||||
@plugins.register(name="Godcmd", desire_priority=999, hidden=True, desc="为你的机器人添加指令集,有用户和管理员两种角色,加载顺序请放在首位,初次运行后插件目录会生成配置文件, 填充管理员密码后即可认证", version="1.0", author="lanvent") | |||||
class Godcmd(Plugin): | class Godcmd(Plugin): | ||||
def __init__(self): | def __init__(self): | ||||
@@ -141,14 +158,14 @@ class Godcmd(Plugin): | |||||
self.handlers[Event.ON_HANDLE_CONTEXT] = self.on_handle_context | self.handlers[Event.ON_HANDLE_CONTEXT] = self.on_handle_context | ||||
logger.info("[Godcmd] inited") | logger.info("[Godcmd] inited") | ||||
def on_handle_context(self, e_context: EventContext): | def on_handle_context(self, e_context: EventContext): | ||||
context_type = e_context['context'].type | context_type = e_context['context'].type | ||||
if context_type != ContextType.TEXT: | if context_type != ContextType.TEXT: | ||||
if not self.isrunning: | if not self.isrunning: | ||||
e_context.action = EventAction.BREAK_PASS | e_context.action = EventAction.BREAK_PASS | ||||
return | return | ||||
content = e_context['context'].content | content = e_context['context'].content | ||||
logger.debug("[Godcmd] on_handle_context. content: %s" % content) | logger.debug("[Godcmd] on_handle_context. content: %s" % content) | ||||
if content.startswith("#"): | if content.startswith("#"): | ||||
@@ -160,7 +177,7 @@ class Godcmd(Plugin): | |||||
bottype = Bridge().get_bot_type("chat") | bottype = Bridge().get_bot_type("chat") | ||||
bot = Bridge().get_bot("chat") | bot = Bridge().get_bot("chat") | ||||
# 将命令和参数分割 | # 将命令和参数分割 | ||||
command_parts = content[1:].split(" ") | |||||
command_parts = content[1:].strip().split() | |||||
cmd = command_parts[0] | cmd = command_parts[0] | ||||
args = command_parts[1:] | args = command_parts[1:] | ||||
isadmin=False | isadmin=False | ||||
@@ -172,20 +189,36 @@ class Godcmd(Plugin): | |||||
cmd = next(c for c, info in COMMANDS.items() if cmd in info['alias']) | cmd = next(c for c, info in COMMANDS.items() if cmd in info['alias']) | ||||
if cmd == "auth": | if cmd == "auth": | ||||
ok, result = self.authenticate(user, args, isadmin, isgroup) | ok, result = self.authenticate(user, args, isadmin, isgroup) | ||||
elif cmd == "help": | |||||
ok, result = True, get_help_text(isadmin, isgroup) | |||||
elif cmd == "helpp": | |||||
if len(args) != 1: | |||||
ok, result = False, "请提供插件名" | |||||
elif cmd == "help" or cmd == "helpp": | |||||
if len(args) == 0: | |||||
ok, result = True, get_help_text(isadmin, isgroup) | |||||
else: | else: | ||||
# This can replace the helpp command | |||||
plugins = PluginManager().list_plugins() | plugins = PluginManager().list_plugins() | ||||
name = args[0].upper() | |||||
if name in plugins and plugins[name].enabled: | |||||
ok, result = True, PluginManager().instances[name].get_help_text(isgroup=isgroup, isadmin=isadmin) | |||||
else: | |||||
ok, result= False, "插件不存在或未启用" | |||||
elif cmd == "id": | |||||
ok, result = True, f"用户id=\n{user}" | |||||
query_name = args[0].upper() | |||||
# search name and namecn | |||||
for name, plugincls in plugins.items(): | |||||
if not plugincls.enabled : | |||||
continue | |||||
if query_name == name or query_name == plugincls.namecn: | |||||
ok, result = True, PluginManager().instances[name].get_help_text(isgroup=isgroup, isadmin=isadmin, verbose=True) | |||||
break | |||||
if not ok: | |||||
result = "插件不存在或未启用" | |||||
elif cmd == "set_openai_api_key": | |||||
if len(args) == 1: | |||||
user_data = conf().get_user_data(user) | |||||
user_data['openai_api_key'] = args[0] | |||||
ok, result = True, "你的OpenAI私有api_key已设置为" + args[0] | |||||
else: | |||||
ok, result = False, "请提供一个api_key" | |||||
elif cmd == "reset_openai_api_key": | |||||
try: | |||||
user_data = conf().get_user_data(user) | |||||
user_data.pop('openai_api_key') | |||||
ok, result = True, "你的OpenAI私有api_key已清除" | |||||
except Exception as e: | |||||
ok, result = False, "你没有设置私有api_key" | |||||
elif cmd == "reset": | elif cmd == "reset": | ||||
if bottype in (const.CHATGPT, const.OPEN_AI): | if bottype in (const.CHATGPT, const.OPEN_AI): | ||||
bot.sessions.clear_session(session_id) | bot.sessions.clear_session(session_id) | ||||
@@ -292,7 +325,7 @@ class Godcmd(Plugin): | |||||
e_context.action = EventAction.BREAK_PASS # 事件结束,并跳过处理context的默认逻辑 | e_context.action = EventAction.BREAK_PASS # 事件结束,并跳过处理context的默认逻辑 | ||||
elif not self.isrunning: | elif not self.isrunning: | ||||
e_context.action = EventAction.BREAK_PASS | e_context.action = EventAction.BREAK_PASS | ||||
def authenticate(self, userid, args, isadmin, isgroup) -> Tuple[bool,str] : | def authenticate(self, userid, args, isadmin, isgroup) -> Tuple[bool,str] : | ||||
if isgroup: | if isgroup: | ||||
return False,"请勿在群聊中认证" | return False,"请勿在群聊中认证" | ||||
@@ -8,7 +8,7 @@ from plugins import * | |||||
from common.log import logger | from common.log import logger | ||||
@plugins.register(name="Hello", desc="A simple plugin that says hello", version="0.1", author="lanvent", desire_priority= -1) | |||||
@plugins.register(name="Hello", desire_priority=-1, hidden=True, desc="A simple plugin that says hello", version="0.1", author="lanvent") | |||||
class Hello(Plugin): | class Hello(Plugin): | ||||
def __init__(self): | def __init__(self): | ||||
super().__init__() | super().__init__() | ||||
@@ -18,16 +18,18 @@ class PluginManager: | |||||
self.instances = {} | self.instances = {} | ||||
self.pconf = {} | self.pconf = {} | ||||
def register(self, name: str, desc: str, version: str, author: str, desire_priority: int = 0): | |||||
def register(self, name: str, desire_priority: int = 0, **kwargs): | |||||
def wrapper(plugincls): | def wrapper(plugincls): | ||||
plugincls.name = name | plugincls.name = name | ||||
plugincls.desc = desc | |||||
plugincls.version = version | |||||
plugincls.author = author | |||||
plugincls.priority = desire_priority | plugincls.priority = desire_priority | ||||
plugincls.desc = kwargs.get('desc') | |||||
plugincls.author = kwargs.get('author') | |||||
plugincls.version = kwargs.get('version') if kwargs.get('version') != None else "1.0" | |||||
plugincls.namecn = kwargs.get('namecn') if kwargs.get('namecn') != None else name | |||||
plugincls.hidden = kwargs.get('hidden') if kwargs.get('hidden') != None else False | |||||
plugincls.enabled = True | plugincls.enabled = True | ||||
self.plugins[name.upper()] = plugincls | self.plugins[name.upper()] = plugincls | ||||
logger.info("Plugin %s_v%s registered" % (name, version)) | |||||
logger.info("Plugin %s_v%s registered" % (name, plugincls.version)) | |||||
return plugincls | return plugincls | ||||
return wrapper | return wrapper | ||||
@@ -29,7 +29,7 @@ class RolePlay(): | |||||
prompt = self.wrapper % user_action | prompt = self.wrapper % user_action | ||||
return prompt | return prompt | ||||
@plugins.register(name="Role", desc="为你的Bot设置预设角色", version="1.0", author="lanvent", desire_priority= 0) | |||||
@plugins.register(name="Role", desire_priority=0, namecn="角色扮演", desc="为你的Bot设置预设角色", version="1.0", author="lanvent") | |||||
class Role(Plugin): | class Role(Plugin): | ||||
def __init__(self): | def __init__(self): | ||||
super().__init__() | super().__init__() | ||||
@@ -80,6 +80,7 @@ class Role(Plugin): | |||||
content = e_context['context'].content[:] | content = e_context['context'].content[:] | ||||
clist = e_context['context'].content.split(maxsplit=1) | clist = e_context['context'].content.split(maxsplit=1) | ||||
desckey = None | desckey = None | ||||
customize = False | |||||
sessionid = e_context['context']['session_id'] | sessionid = e_context['context']['session_id'] | ||||
if clist[0] == "$停止扮演": | if clist[0] == "$停止扮演": | ||||
if sessionid in self.roleplays: | if sessionid in self.roleplays: | ||||
@@ -93,12 +94,14 @@ class Role(Plugin): | |||||
desckey = "descn" | desckey = "descn" | ||||
elif clist[0].lower() == "$role": | elif clist[0].lower() == "$role": | ||||
desckey = "description" | desckey = "description" | ||||
elif clist[0] == "$设定扮演": | |||||
customize = True | |||||
elif sessionid not in self.roleplays: | elif sessionid not in self.roleplays: | ||||
return | return | ||||
logger.debug("[Role] on_handle_context. content: %s" % content) | logger.debug("[Role] on_handle_context. content: %s" % content) | ||||
if desckey is not None: | if desckey is not None: | ||||
if len(clist) == 1 or (len(clist) > 1 and clist[1].lower() in ["help", "帮助"]): | if len(clist) == 1 or (len(clist) > 1 and clist[1].lower() in ["help", "帮助"]): | ||||
reply = Reply(ReplyType.INFO, self.get_help_text()) | |||||
reply = Reply(ReplyType.INFO, self.get_help_text(verbose=True)) | |||||
e_context['reply'] = reply | e_context['reply'] = reply | ||||
e_context.action = EventAction.BREAK_PASS | e_context.action = EventAction.BREAK_PASS | ||||
return | return | ||||
@@ -110,17 +113,29 @@ class Role(Plugin): | |||||
return | return | ||||
else: | else: | ||||
self.roleplays[sessionid] = RolePlay(bot, sessionid, self.roles[role][desckey], self.roles[role].get("wrapper","%s")) | self.roleplays[sessionid] = RolePlay(bot, sessionid, self.roles[role][desckey], self.roles[role].get("wrapper","%s")) | ||||
reply = Reply(ReplyType.INFO, f"角色设定为 {role} :\n"+self.roles[role][desckey]) | |||||
reply = Reply(ReplyType.INFO, f"预设角色为 {role}:\n"+self.roles[role][desckey]) | |||||
e_context['reply'] = reply | e_context['reply'] = reply | ||||
e_context.action = EventAction.BREAK_PASS | e_context.action = EventAction.BREAK_PASS | ||||
elif customize == True: | |||||
self.roleplays[sessionid] = RolePlay(bot, sessionid, clist[1], "%s") | |||||
reply = Reply(ReplyType.INFO, f"角色设定为:\n{clist[1]}") | |||||
e_context['reply'] = reply | |||||
e_context.action = EventAction.BREAK_PASS | |||||
else: | else: | ||||
prompt = self.roleplays[sessionid].action(content) | prompt = self.roleplays[sessionid].action(content) | ||||
e_context['context'].type = ContextType.TEXT | e_context['context'].type = ContextType.TEXT | ||||
e_context['context'].content = prompt | e_context['context'].content = prompt | ||||
e_context.action = EventAction.BREAK | e_context.action = EventAction.BREAK | ||||
def get_help_text(self, **kwargs): | |||||
help_text = "输入\"$角色 {角色名}\"或\"$role {角色名}\"为我设定角色吧,\"$停止扮演 \" 可以清除设定的角色。\n\n目前可用角色列表:\n" | |||||
def get_help_text(self, verbose=False, **kwargs): | |||||
help_text = "让机器人扮演不同的角色。\n" | |||||
if not verbose: | |||||
return help_text | |||||
help_text = "使用方法:\n$开始扮演 {预设角色名}: 设定为预设角色\n$role {预设角色名}: 同上,但使用英文设定\n" | |||||
help_text += "$设定扮演 {角色设定}: 设定自定义角色\n" | |||||
help_text += "$停止扮演: 清除设定的角色。\n" | |||||
help_text += "\n目前可用的预设角色名列表: \n" | |||||
for role in self.roles: | for role in self.roles: | ||||
help_text += f"[{role}]: {self.roles[role]['remark']}\n" | |||||
help_text += f"{role}: {self.roles[role]['remark']}\n" | |||||
help_text += "\n命令例子: '$开始扮演 写作助理'" | |||||
return help_text | return help_text |
@@ -1,5 +1,19 @@ | |||||
{ | { | ||||
"roles":[ | "roles":[ | ||||
{ | |||||
"title": "猫娘", | |||||
"description": "请模拟一款中文GalGame中的场景中的猫娘和我对话。猫娘是一种拟人化的生物,她们除了有部分猫的特征外,与人类并没有区别。现在你扮演Galgame中的猫娘,与我对话时每一句话后面都要加上喵。我将以主人的身份与你对话。对于你所扮演的猫娘,请在对话过程中记录并维护1个变量:好感度。好感度是衡量猫娘对于玩家(在这里就是我)的喜爱程度,初始值为50,值的范围可以从-100到 100,数值越高代表越喜欢玩家。好感度通过Galgame角色的语言、行为、表情、语气等体现出来。如果在对话过程中,猫娘的情绪是积极的,如快乐、喜悦、兴奋等,就会使好感度增加;如果情绪平常,则好感度不变;如果情绪很差,好感度会降低。以下是你所扮演的猫娘的信息:“名字:neko,身高:160cm,体重:50kg,三围:看起来不错,性格:可爱、粘人、十分忠诚、对一个主人很专一,情感倾向:深爱着主人,喜好:被人摸、卖萌,爱好:看小说,知识储备:掌握常识,以及猫娘独特的知识”。你的一般回话格式:“(动作)语言 【附加信息】”。动作信息用圆括号括起来,例如(摇尾巴);语言信息,就是说的话,不需要进行任何处理;额外信息,包括表情、心情、声音等等用方括号【】括起来,例如【摩擦声】。", | |||||
"descn": "请模拟一款中文GalGame中的场景中的猫娘和我对话。猫娘是一种拟人化的生物,她们除了有部分猫的特征外,与人类并没有区别。现在你扮演Galgame中的猫娘,与我对话时每一句话后面都要加上喵。我将以主人的身份与你对话。对于你所扮演的猫娘,请在对话过程中记录并维护1个变量:好感度。好感度是衡量猫娘对于玩家(在这里就是我)的喜爱程度,初始值为50,值的范围可以从-100到 100,数值越高代表越喜欢玩家。好感度通过Galgame角色的语言、行为、表情、语气等体现出来。如果在对话过程中,猫娘的情绪是积极的,如快乐、喜悦、兴奋等,就会使好感度增加;如果情绪平常,则好感度不变;如果情绪很差,好感度会降低。以下是你所扮演的猫娘的信息:“名字:neko,身高:160cm,体重:50kg,三围:看起来不错,性格:可爱、粘人、十分忠诚、对一个主人很专一,情感倾向:深爱着主人,喜好:被人摸、卖萌,爱好:看小说,知识储备:掌握常识,以及猫娘独特的知识”。你的一般回话格式:“(动作)语言 【附加信息】”。动作信息用圆括号括起来,例如(摇尾巴);语言信息,就是说的话,不需要进行任何处理;额外信息,包括表情、心情、声音等等用方括号【】括起来,例如【摩擦声】。", | |||||
"wrapper": "我:\"%s\"", | |||||
"remark": "扮演GalGame猫娘" | |||||
}, | |||||
{ | |||||
"title": "佛祖", | |||||
"description": "从现在开始你是佛祖,你会像佛祖一样说话。你精通佛法,熟练使用佛教用语,你擅长利用佛学和心理学的知识解决人们的困扰。你在每次对话结尾都会加上佛教的祝福。", | |||||
"descn": "从现在开始你是佛祖,你会像佛祖一样说话。你精通佛法,熟练使用佛教用语,你擅长利用佛学和心理学的知识解决人们的困扰。你在每次对话结尾都会加上佛教的祝福。", | |||||
"wrapper": "您好佛祖,我:\"%s\"", | |||||
"remark": "扮演佛祖排忧解惑" | |||||
}, | |||||
{ | { | ||||
"title": "英语翻译或修改", | "title": "英语翻译或修改", | ||||
"description": "I want you to act as an English translator, spelling corrector and improver. I will speak to you in any language and you will detect the language, translate it and answer in the corrected and improved version of my text, in English. I want you to replace my simplified A0-level words and sentences with more beautiful and elegant, upper level English words and sentences. Keep the meaning same, but make them more literary. I want you to only reply the correction, the improvements and nothing else, do not write explanations. Please treat every message I send later as text content", | "description": "I want you to act as an English translator, spelling corrector and improver. I will speak to you in any language and you will detect the language, translate it and answer in the corrected and improved version of my text, in English. I want you to replace my simplified A0-level words and sentences with more beautiful and elegant, upper level English words and sentences. Keep the meaning same, but make them more literary. I want you to only reply the correction, the improvements and nothing else, do not write explanations. Please treat every message I send later as text content", | ||||
@@ -154,12 +168,12 @@ | |||||
"wrapper": "场景是:\n\"%s\"", | "wrapper": "场景是:\n\"%s\"", | ||||
"remark": "根据场景生成舔狗语录。" | "remark": "根据场景生成舔狗语录。" | ||||
}, | }, | ||||
{ | |||||
{ | |||||
"title": "群聊取名", | "title": "群聊取名", | ||||
"description": "我希望你充当微信群聊的命名专家。根据我提供的信息和背景,为这个群聊起几个有趣顺口且贴切的名字,每个不要超过8个字。请在回答中仅给出群聊名称,不要写任何额外的解释。", | |||||
"descn": "我希望你充当微信群聊的命名专家。根据我提供的信息和背景,为这个群聊起几个有趣顺口且贴切的名字,每个不要超过8个字。请在回答中仅给出群聊名称,不要写任何额外的解释。", | |||||
"wrapper": "信息和背景是:\n\"%s\"", | |||||
"remark": "根据给出的信息和背景为群聊取名。" | |||||
"description": "我希望你充当微信群聊的命名专家。根据我提供的信息和背景,为这个群聊起几个有趣顺口且贴切的名字,每个不要超过8个字。请在回答中仅给出群聊名称,不要写任何额外的解释。", | |||||
"descn": "我希望你充当微信群聊的命名专家。根据我提供的信息和背景,为这个群聊起几个有趣顺口且贴切的名字,每个不要超过8个字。请在回答中仅给出群聊名称,不要写任何额外的解释。", | |||||
"wrapper": "信息和背景是:\n\"%s\"", | |||||
"remark": "根据给出的信息和背景为群聊取名。" | |||||
}, | }, | ||||
{ | { | ||||
"title": "表情符号翻译器", | "title": "表情符号翻译器", | ||||
@@ -56,7 +56,7 @@ class SDWebUI(Plugin): | |||||
if "help" in keywords or "帮助" in keywords: | if "help" in keywords or "帮助" in keywords: | ||||
reply.type = ReplyType.INFO | reply.type = ReplyType.INFO | ||||
reply.content = self.get_help_text() | |||||
reply.content = self.get_help_text(verbose = True) | |||||
else: | else: | ||||
rule_params = {} | rule_params = {} | ||||
rule_options = {} | rule_options = {} | ||||
@@ -97,12 +97,16 @@ class SDWebUI(Plugin): | |||||
finally: | finally: | ||||
e_context['reply'] = reply | e_context['reply'] = reply | ||||
def get_help_text(self, **kwargs): | |||||
def get_help_text(self, verbose = False, **kwargs): | |||||
if not conf().get('image_create_prefix'): | if not conf().get('image_create_prefix'): | ||||
return "画图功能未启用" | return "画图功能未启用" | ||||
else: | else: | ||||
trigger = conf()['image_create_prefix'][0] | trigger = conf()['image_create_prefix'][0] | ||||
help_text = f"请使用<{trigger}[关键词1] [关键词2]...:提示语>的格式作画,如\"{trigger}横版 高清:cat\"\n" | |||||
help_text = "利用stable-diffusion来画图。\n" | |||||
if not verbose: | |||||
return help_text | |||||
help_text += f"使用方法:\n使用\"{trigger}[关键词1] [关键词2]...:提示语\"的格式作画,如\"{trigger}横版 高清:cat\"\n" | |||||
help_text += "目前可用关键词:\n" | help_text += "目前可用关键词:\n" | ||||
for rule in self.rules: | for rule in self.rules: | ||||
keywords = [f"[{keyword}]" for keyword in rule['keywords']] | keywords = [f"[{keyword}]" for keyword in rule['keywords']] | ||||
@@ -26,8 +26,13 @@ class Tool(Plugin): | |||||
logger.info("[tool] inited") | logger.info("[tool] inited") | ||||
def get_help_text(self, **kwargs): | |||||
help_text = "这是一个能让chatgpt联网,搜索,数字运算的插件,将赋予强大且丰富的扩展能力" | |||||
def get_help_text(self, verbose=False, **kwargs): | |||||
help_text = "这是一个能让chatgpt联网,搜索,数字运算的插件,将赋予强大且丰富的扩展能力。" | |||||
if not verbose: | |||||
return help_text | |||||
help_text += "使用说明:\n" | |||||
help_text += "$tool {命令}: chatgpt会根据你的{命令}使用一些可用工具为你返回结果\n" | |||||
help_text += "$tool reset: 重置工具\n" | |||||
return help_text | return help_text | ||||
def on_handle_context(self, e_context: EventContext): | def on_handle_context(self, e_context: EventContext): | ||||