@@ -0,0 +1,2 @@ | |||
.idea/ | |||
venv |
@@ -0,0 +1,10 @@ | |||
from channel import channel_factory | |||
if __name__ == '__main__': | |||
# create channel | |||
channel = channel_factory.create_channel("wx") | |||
# startup channel | |||
channel.startup() | |||
print("Hello bot") |
@@ -0,0 +1,26 @@ | |||
# encoding:utf-8 | |||
import json | |||
import requests | |||
from bot.bot import Bot | |||
class BaiduUnitBot(Bot): | |||
def reply(self, query): | |||
token = self.get_token() | |||
url = 'https://aip.baidubce.com/rpc/2.0/unit/service/v3/chat?access_token=' + token | |||
post_data = "{\"version\":\"3.0\",\"service_id\":\"S73177\",\"session_id\":\"\",\"log_id\":\"7758521\",\"skill_ids\":[\"1221886\"],\"request\":{\"terminal_id\":\"88888\",\"query\":\"" + query + "\", \"hyper_params\": {\"chat_custom_bot_profile\": 1}}}" | |||
print(post_data) | |||
headers = {'content-type': 'application/x-www-form-urlencoded'} | |||
response = requests.post(url, data=post_data.encode(), headers=headers) | |||
if response: | |||
return response.json()['result']['context']['SYS_PRESUMED_HIST'][1] | |||
def get_token(self): | |||
access_key = '${YOUR_ACCESS_KEY}' | |||
secret_key = '${YOUR_SECRET_KEY}' | |||
host = 'https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id=' + access_key + '&client_secret=' + secret_key | |||
response = requests.get(host) | |||
if response: | |||
print(response.json()) | |||
return response.json()['access_token'] |
@@ -0,0 +1,13 @@ | |||
""" | |||
Auto-replay chat robot abstract class | |||
""" | |||
class Bot(object): | |||
def reply(self, query): | |||
""" | |||
bot auto-reply content | |||
:param req: received message | |||
:return: reply content | |||
""" | |||
raise NotImplementedError |
@@ -0,0 +1,16 @@ | |||
""" | |||
channel factory | |||
""" | |||
from bot.baidu.baidu_unit_bot import BaiduUnitBot | |||
def create_bot(bot_type): | |||
""" | |||
create a channel instance | |||
:param channel_type: channel type code | |||
:return: channel instance | |||
""" | |||
if bot_type == 'baidu': | |||
return BaiduUnitBot() | |||
raise RuntimeError |
@@ -0,0 +1,9 @@ | |||
from bot import bot_factory | |||
class Bridge(object): | |||
def __init__(self): | |||
pass | |||
def fetch_reply_content(self, query): | |||
return bot_factory.BaiduUnitBot().reply(query) |
@@ -0,0 +1,31 @@ | |||
""" | |||
Message sending channel abstract class | |||
""" | |||
from bridge.bridge import Bridge | |||
class Channel(object): | |||
def startup(self): | |||
""" | |||
init channel | |||
""" | |||
raise NotImplementedError | |||
def handle(self, msg): | |||
""" | |||
process received msg | |||
:param msg: message object | |||
""" | |||
raise NotImplementedError | |||
def send(self, msg, receiver): | |||
""" | |||
send message to user | |||
:param msg: message content | |||
:param receiver: receiver channel account | |||
:return: | |||
""" | |||
raise NotImplementedError | |||
def build_reply_content(self, query): | |||
return Bridge().fetch_reply_content(query) |
@@ -0,0 +1,15 @@ | |||
""" | |||
channel factory | |||
""" | |||
from channel.wechat.wechat_channel import WechatChannel | |||
def create_channel(channel_type): | |||
""" | |||
create a channel instance | |||
:param channel_type: channel type code | |||
:return: channel instance | |||
""" | |||
if channel_type == 'wx': | |||
return WechatChannel() | |||
raise RuntimeError |
@@ -0,0 +1,39 @@ | |||
""" | |||
wechat channel | |||
""" | |||
import itchat | |||
import time | |||
import random | |||
import json | |||
from itchat.content import * | |||
from channel.channel import Channel | |||
@itchat.msg_register([TEXT]) | |||
def handler_receive_msg(msg): | |||
WechatChannel().handle(msg) | |||
class WechatChannel(Channel): | |||
def __init__(self): | |||
pass | |||
def startup(self): | |||
# login by scan QRCode | |||
itchat.auto_login() | |||
# start message listener | |||
itchat.run() | |||
def handle(self, msg): | |||
print("handle: ", msg) | |||
print(json.dumps(msg, ensure_ascii=False)) | |||
from_user_id = msg['FromUserName'] | |||
other_user_id = msg['User']['UserName'] | |||
if from_user_id == other_user_id: | |||
self.send(super().build_reply_content(msg['Text']), from_user_id) | |||
def send(self, msg, receiver): | |||
time.sleep(random.randint(1, 3)) | |||
print(msg, receiver) | |||
itchat.send(msg + " [bot]", toUserName=receiver) |