133 lines
5.1KB

  1. # -*- coding: utf-8 -*-
  2. import sys
  3. import web
  4. import math
  5. import time
  6. import json
  7. import requests
  8. import threading
  9. from common.singleton import singleton
  10. from common.log import logger
  11. from common.expired_dict import ExpiredDict
  12. from config import conf
  13. from bridge.reply import *
  14. from bridge.context import *
  15. from channel.chat_channel import ChatChannel
  16. from channel.wechatmp.common import *
  17. # If using SSL, uncomment the following lines, and modify the certificate path.
  18. # from cheroot.server import HTTPServer
  19. # from cheroot.ssl.builtin import BuiltinSSLAdapter
  20. # HTTPServer.ssl_adapter = BuiltinSSLAdapter(
  21. # certificate='/ssl/cert.pem',
  22. # private_key='/ssl/cert.key')
  23. @singleton
  24. class WechatMPChannel(ChatChannel):
  25. def __init__(self, passive_reply = True):
  26. super().__init__()
  27. self.passive_reply = passive_reply
  28. self.running = set()
  29. self.received_msgs = ExpiredDict(60*60*24)
  30. if self.passive_reply:
  31. self.NOT_SUPPORT_REPLYTYPE = [ReplyType.IMAGE, ReplyType.VOICE]
  32. self.cache_dict = dict()
  33. self.query1 = dict()
  34. self.query2 = dict()
  35. self.query3 = dict()
  36. else:
  37. # TODO support image
  38. self.NOT_SUPPORT_REPLYTYPE = [ReplyType.IMAGE, ReplyType.VOICE]
  39. self.app_id = conf().get('wechatmp_app_id')
  40. self.app_secret = conf().get('wechatmp_app_secret')
  41. self.access_token = None
  42. self.access_token_expires_time = 0
  43. self.access_token_lock = threading.Lock()
  44. self.get_access_token()
  45. def startup(self):
  46. if self.passive_reply:
  47. urls = ('/wx', 'channel.wechatmp.SubscribeAccount.Query')
  48. else:
  49. urls = ('/wx', 'channel.wechatmp.ServiceAccount.Query')
  50. app = web.application(urls, globals())
  51. # app.run()
  52. port = conf().get('wechatmp_port', 8080)
  53. web.httpserver.runsimple(app.wsgifunc(), ('0.0.0.0', port))
  54. def wechatmp_request(self, method, url, **kwargs):
  55. r = requests.request(method=method, url=url, **kwargs)
  56. r.raise_for_status()
  57. r.encoding = "utf-8"
  58. ret = r.json()
  59. if "errcode" in ret and ret["errcode"] != 0:
  60. raise WeChatAPIException("{}".format(ret))
  61. return ret
  62. def get_access_token(self):
  63. # return the access_token
  64. if self.access_token:
  65. if self.access_token_expires_time - time.time() > 60:
  66. return self.access_token
  67. # Get new access_token
  68. # Do not request access_token in parallel! Only the last obtained is valid.
  69. if self.access_token_lock.acquire(blocking=False):
  70. # Wait for other threads that have previously obtained access_token to complete the request
  71. # This happens every 2 hours, so it doesn't affect the experience very much
  72. time.sleep(1)
  73. self.access_token = None
  74. url="https://api.weixin.qq.com/cgi-bin/token"
  75. params={
  76. "grant_type": "client_credential",
  77. "appid": self.app_id,
  78. "secret": self.app_secret
  79. }
  80. data = self.wechatmp_request(method='get', url=url, params=params)
  81. self.access_token = data['access_token']
  82. self.access_token_expires_time = int(time.time()) + data['expires_in']
  83. logger.info("[wechatmp] access_token: {}".format(self.access_token))
  84. self.access_token_lock.release()
  85. else:
  86. # Wait for token update
  87. while self.access_token_lock.locked():
  88. time.sleep(0.1)
  89. return self.access_token
  90. def send(self, reply: Reply, context: Context):
  91. if self.passive_reply:
  92. receiver = context["receiver"]
  93. reply_text = reply.content
  94. reply_cnt = math.ceil(len(reply_text) / 600)
  95. self.cache_dict[receiver] = (reply_cnt, reply_text)
  96. self.running.remove(receiver)
  97. logger.debug("[send] reply to {} saved to cache: {}".format(receiver, reply_text))
  98. else:
  99. receiver = context["receiver"]
  100. reply_text = reply.content
  101. url="https://api.weixin.qq.com/cgi-bin/message/custom/send"
  102. params = {
  103. "access_token": self.get_access_token()
  104. }
  105. json_data = {
  106. "touser": receiver,
  107. "msgtype": "text",
  108. "text": {"content": reply_text}
  109. }
  110. self.wechatmp_request(method='post', url=url, params=params, data=json.dumps(json_data, ensure_ascii=False).encode('utf8'))
  111. logger.info("[send] Do send to {}: {}".format(receiver, reply_text))
  112. return
  113. def _fail_callback(self, session_id, exception, context, **kwargs):
  114. logger.exception("[wechatmp] Fail to generation message to user, msgId={}, exception={}".format(context['msg'].msg_id, exception))
  115. assert session_id not in self.cache_dict
  116. self.running.remove(session_id)
  117. # Last import to avoid circular import
  118. import channel.wechatmp.SubscribeAccount
  119. import channel.wechatmp.ServiceAccount