選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。

biz.py 15KB

4ヶ月前
4ヶ月前
4ヶ月前
3ヶ月前
4ヶ月前
4ヶ月前
4ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
4ヶ月前
4ヶ月前
4ヶ月前
4ヶ月前
4ヶ月前
4ヶ月前
4ヶ月前
4ヶ月前
4ヶ月前
4ヶ月前
4ヶ月前
4ヶ月前
4ヶ月前
4ヶ月前
4ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
3ヶ月前
3ヶ月前
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. import threading
  2. from common import kafka_helper,redis_helper,utils
  3. import json,time,re,random,os
  4. from common.log import logger, log_exception
  5. from datetime import datetime
  6. from wechat import gewe_chat
  7. def wx_messages_process_callback(agent_tel,message):
  8. try:
  9. # print(f'手机号 {agent_tel}')
  10. wxchat = gewe_chat.wxchat
  11. msg_content = message
  12. cleaned_content = clean_json_string(msg_content)
  13. content = json.loads(cleaned_content)
  14. data = content.get("data", {})
  15. msg_type_data = data.get("msg_type", None)
  16. content_data = data.get("content", {})
  17. agent_tel = content_data.get("agent_tel", None)
  18. if msg_type_data == 'group-sending':
  19. process_group_sending(wxchat, content_data, agent_tel)
  20. except json.JSONDecodeError as e:
  21. print(f"JSON解码错误: {e}, 消息内容: {message}")
  22. except Exception as e:
  23. print(f"处理消息时发生错误: {e}, 消息内容: {message}")
  24. def process_group_sending_v0(wxchat:gewe_chat.GeWeChatCom, content_data, agent_tel:str):
  25. # 获取登录信息
  26. hash_key = f"__AI_OPS_WX__:LOGININFO:{agent_tel}"
  27. logininfo = redis_helper.redis_helper.get_hash(hash_key)
  28. if not logininfo:
  29. logger.warning(f"未找到 {agent_tel} 的登录信息")
  30. return
  31. token_id = logininfo.get('tokenId')
  32. app_id = logininfo.get('appId')
  33. agent_wxid = logininfo.get('wxid')
  34. # 获取联系人列表并计算交集
  35. hash_key = f"__AI_OPS_WX__:CONTACTS_BRIEF:{agent_wxid}"
  36. cache_friend_wxids_str=redis_helper.redis_helper.get_hash_field(hash_key,"data")
  37. cache_friend_wxids_list=json.loads(cache_friend_wxids_str) if cache_friend_wxids_str else []
  38. cache_friend_wxids=[f["userName"] for f in cache_friend_wxids_list]
  39. wxid_contact_list_content_data = [c['wxid'] for c in content_data.get("contact_list", [])]
  40. intersection_wxids = list(set(cache_friend_wxids) & set(wxid_contact_list_content_data))
  41. # 发送消息
  42. wx_content_list = content_data.get("wx_content", [])
  43. for wx_content in wx_content_list:
  44. if wx_content["type"] == "text":
  45. send_text_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content["text"])
  46. elif wx_content["type"] == "image_url":
  47. send_image_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content.get("image_url", {}).get("url"))
  48. elif wx_content["type"] == "tts":
  49. send_tts_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content["text"])
  50. def process_group_sending(wxchat:gewe_chat.GeWeChatCom, content_data, agent_tel:str):
  51. # 获取登录信息
  52. hash_key = f"__AI_OPS_WX__:LOGININFO:{agent_tel}"
  53. logininfo = redis_helper.redis_helper.get_hash(hash_key)
  54. if not logininfo:
  55. logger.warning(f"未找到 {agent_tel} 的登录信息")
  56. return
  57. token_id = logininfo.get('tokenId')
  58. app_id = logininfo.get('appId')
  59. agent_wxid = logininfo.get('wxid')
  60. # 获取联系人列表并计算交集
  61. hash_key = f"__AI_OPS_WX__:CONTACTS_BRIEF:{agent_wxid}"
  62. cache_friend_wxids_str=redis_helper.redis_helper.get_hash_field(hash_key,"data")
  63. cache_friend_wxids_list=json.loads(cache_friend_wxids_str) if cache_friend_wxids_str else []
  64. cache_friend_wxids=[f["userName"] for f in cache_friend_wxids_list]
  65. # 获取群交集
  66. hash_key = f"__AI_OPS_WX__:GROUPS_INFO:{agent_wxid}"
  67. cache_chatrooms = redis_helper.redis_helper.get_hash(hash_key)
  68. cache_chatroom_ids=cache_chatrooms.keys()
  69. wxid_contact_list_content_data = [c['wxid'] for c in content_data.get("contact_list", [])]
  70. intersection_friend_wxids = list(set(cache_friend_wxids) & set(wxid_contact_list_content_data))
  71. intersection_chatroom_ids = list(set(cache_chatroom_ids) & set(wxid_contact_list_content_data))
  72. intersection_wxids=intersection_friend_wxids+intersection_chatroom_ids
  73. # 发送消息
  74. wx_content_list = content_data.get("wx_content", [])
  75. for wx_content in wx_content_list:
  76. if wx_content["type"] == "text":
  77. send_text_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content["text"])
  78. elif wx_content["type"] == "image_url":
  79. send_image_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content.get("image_url", {}).get("url"))
  80. elif wx_content["type"] == "tts":
  81. send_tts_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content["text"])
  82. def send_text_message(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, agent_wxid, intersection_wxids, text):
  83. for t in intersection_wxids:
  84. # 发送文本消息
  85. ret,ret_msg,res = wxchat.post_text(token_id, app_id, t, text)
  86. logger.info(f'{agent_wxid} 向 {t} 发送文字【{text}】')
  87. # 构造对话消息并发送到 Kafka
  88. input_wx_content_dialogue_message = [{"type": "text", "text": text}]
  89. input_message = utils.dialogue_message(agent_wxid, t, input_wx_content_dialogue_message)
  90. kafka_helper.kafka_client.produce_message(input_message)
  91. logger.info("发送对话 %s", input_message)
  92. # 等待随机时间
  93. time.sleep(random.uniform(5, 15))
  94. def send_image_message(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, agent_wxid, intersection_wxids, image_url):
  95. aeskey, cdnthumburl, cdnthumblength, cdnthumbheight, cdnthumbwidth, length, md5 = "", "", 0, 0, 0, 0, ""
  96. for t in intersection_wxids:
  97. if t == intersection_wxids[0]:
  98. # 发送图片
  99. ret,ret_msg,res = wxchat.post_image(token_id, app_id, t, image_url)
  100. if ret==200:
  101. aeskey = res["aesKey"]
  102. cdnthumburl = res["fileId"]
  103. cdnthumblength = res["cdnThumbLength"]
  104. cdnthumbheight = res["height"]
  105. cdnthumbwidth = res["width"]
  106. length = res["length"]
  107. md5 = res["md5"]
  108. logger.info(f'{agent_wxid} 向 {t} 发送图片【{image_url}】{ret_msg}')
  109. else:
  110. logger.warning(f'{agent_wxid} 向 {t} 发送图片【{image_url}】{ret_msg}')
  111. else:
  112. if aeskey !="":
  113. # 转发图片
  114. res,ret,ret_msg= wxchat.forward_image(token_id, app_id, t, aeskey, cdnthumburl, cdnthumblength, cdnthumbheight, cdnthumbwidth, length, md5)
  115. logger.info(f'{agent_wxid} 向 {t} 转发图片【{image_url}】{ret_msg}')
  116. else:
  117. # 发送图片
  118. ret,ret_msg,res = wxchat.post_image(token_id, app_id, t, image_url)
  119. if ret==200:
  120. aeskey = res["aesKey"]
  121. cdnthumburl = res["fileId"]
  122. cdnthumblength = res["cdnThumbLength"]
  123. cdnthumbheight = res["height"]
  124. cdnthumbwidth = res["width"]
  125. length = res["length"]
  126. md5 = res["md5"]
  127. logger.info(f'{agent_wxid} 向 {t} 发送图片【{image_url}】{ret_msg}')
  128. else:
  129. logger.warning(f'{agent_wxid} 向 {t} 发送图片【{image_url}】{ret_msg}')
  130. # 构造对话消息并发送到 Kafka
  131. wx_content_dialogue_message = [{"type": "image_url", "image_url": {"url": image_url}}]
  132. input_message = utils.dialogue_message(agent_wxid, t, wx_content_dialogue_message)
  133. kafka_helper.kafka_client.produce_message(input_message)
  134. logger.info("发送对话 %s", input_message)
  135. # 等待随机时间
  136. time.sleep(random.uniform(5, 15))
  137. def send_tts_message(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, agent_wxid, intersection_wxids, text):
  138. voice_during,voice_url=utils.wx_voice(text)
  139. for t in intersection_wxids:
  140. # 发送送语音消息
  141. if voice_url:
  142. ret,ret_msg,res = wxchat.post_voice(token_id, app_id, t, voice_url,voice_during)
  143. if ret==200:
  144. logger.info(f'{agent_wxid} 向 {t} 发送语音文本【{text}】{ret_msg}')
  145. # 构造对话消息并发送到 Kafka
  146. input_wx_content_dialogue_message = [{"type": "text", "text": text}]
  147. input_message = utils.dialogue_message(agent_wxid, t, input_wx_content_dialogue_message)
  148. kafka_helper.kafka_client.produce_message(input_message)
  149. logger.info("发送对话 %s", input_message)
  150. else:
  151. logger.warning((f'{agent_wxid} 向 {t} 发送语音文本【{text}】{ret_msg}'))
  152. else:
  153. logger.warning((f'{agent_wxid} 向 {t} 发送语音文本【{text}】出错'))
  154. # 等待随机时间
  155. time.sleep(random.uniform(5, 15))
  156. def clean_json_string(json_str):
  157. # 删除所有控制字符(非打印字符),包括换行符、回车符等
  158. return re.sub(r'[\x00-\x1f\x7f]', '', json_str)
  159. # 启动 Kafka 消费者线程
  160. def start_kafka_consumer_thread():
  161. agent_tel=os.environ.get('tel', '18029274615')
  162. # consumer_thread = threading.Thread(target=kafka_helper.kafka_client.consume_messages, args=(agent_tel,wx_messages_process_callback,))
  163. consumer_thread = threading.Thread(target=kafka_helper.kafka_client.consume_messages, args=(ops_messages_process,))
  164. consumer_thread.daemon = True # 设置为守护线程,应用退出时会自动结束
  165. consumer_thread.start()
  166. def login_or_reconnect(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, hash_key, is_reconnect=False, max_retries=5):
  167. """
  168. 封装微信登录或重连的逻辑
  169. """
  170. agent_tel=hash_key.split(":")[-1]
  171. retry_count = 0
  172. while retry_count < max_retries:
  173. retry_count += 1
  174. if is_reconnect:
  175. logger.info("尝试重连...")
  176. else:
  177. logger.info("获取二维码进行登录...")
  178. qr_code = wxchat.get_login_qr_code(token_id, app_id)
  179. base64_string = qr_code.get('qrImgBase64')
  180. uuid = qr_code.get('uuid')
  181. app_id = app_id or qr_code.get('appId')
  182. start_time = time.time()
  183. qr_code_urls= wxchat.qrCallback(uuid, base64_string)
  184. # 构造 Kafka 消息发送二维码
  185. k_message=utils.login_qrcode_message(token_id,agent_tel,base64_string,qr_code_urls)
  186. kafka_helper.kafka_client.produce_message(k_message)
  187. while True:
  188. # 如果登录超时,重新获取二维码
  189. if time.time() - start_time > 150: #150 秒 二维码失效
  190. break
  191. res = wxchat.check_login(token_id, app_id, uuid)
  192. flag = res.get('status')
  193. # 构造 Kafka 消息发送登录状态
  194. # todo
  195. if flag == 2:
  196. logger.info(f"登录成功: {res}")
  197. login_info = res.get('loginInfo', {})
  198. login_info.update({'appId': app_id, 'uuid': uuid, 'tokenId': token_id,'status': 1})
  199. # cache_login_info=redis_helper.redis_helper.get_hash(hash_key)
  200. # if 'appId' in cache_login_info:
  201. # login_info.update({"reg_time":datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3],"login_time":datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]})
  202. # else:
  203. # login_info.update({"login_time":datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]})
  204. cleaned_login_info = {k: (v if v is not None else '') for k, v in login_info.items()}
  205. redis_helper.redis_helper.set_hash(hash_key, cleaned_login_info)
  206. return login_info
  207. time.sleep(5)
  208. logger.error(f"登录失败,二维码生成 {max_retries} 次")
  209. def fetch_and_save_contacts(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, hash_key):
  210. """
  211. 获取联系人列表并保存到缓存
  212. """
  213. ret,msg,contacts_list = wxchat.fetch_contacts_list(token_id, app_id)
  214. friend_wxids = contacts_list['friends'][3:] # 可以调整截取范围
  215. wxid = redis_helper.redis_helper.get_hash_field(hash_key, 'wxid')
  216. wxchat.save_contacts_brief_to_cache(token_id, app_id, wxid, friend_wxids)
  217. print(f'微信ID {wxid} 登录APPID {app_id} 成功,联系人已保存')
  218. def wx_login(wxchat:gewe_chat.GeWeChatCom,tel,token_id):
  219. hash_key = f"__AI_OPS_WX__:LOGININFO:{tel}"
  220. login_info = redis_helper.redis_helper.get_hash(hash_key)
  221. if not login_info:
  222. login_info = login_or_reconnect(wxchat, token_id, '', hash_key)
  223. else:
  224. app_id = login_info.get('appId')
  225. token_id = login_info.get('tokenId')
  226. wxid= login_info.get('wxid')
  227. # 检查是否已经登录
  228. is_online = wxchat.check_online(token_id, app_id)
  229. if is_online:
  230. logger.info(f'微信ID {wxid} 在APPID {app_id} 已经在线')
  231. else:
  232. # 尝试重连
  233. res = wxchat.reconnection(token_id, app_id)
  234. if res.get('ret') == 200:
  235. logger.info(f'微信ID {wxid} 在APPID {app_id} 重连成功')
  236. else:
  237. print("重连失败,重新登录...")
  238. login_info = login_or_reconnect(wxchat, token_id, app_id, hash_key, is_reconnect=True)
  239. fetch_and_save_contacts(wxchat, token_id, login_info.get('appId'), hash_key)
  240. def ops_messages_process(message):
  241. try:
  242. wxchat = gewe_chat.wxchat
  243. #print(message)
  244. # logger.info(f"接收到kafka消息: {json.dumps(message, separators=(',', ':'), ensure_ascii=False)}")
  245. logger.info(f"接收到kafka消息: {json.dumps(json.loads(message), ensure_ascii=False)}")
  246. msg_content = message
  247. cleaned_content = clean_json_string(msg_content)
  248. content = json.loads(cleaned_content)
  249. data = content.get("data", {})
  250. msg_type_data = data.get("msg_type", None)
  251. content_data = data.get("content", {})
  252. if msg_type_data=="login":
  253. tel=content_data.get('tel', '18029274615')
  254. token_id=content_data.get('token_id', 'f828cb3c-1039-489f-b9ae-7494d1778a15')
  255. #wx_login(wxchat,tel,token_id)
  256. thread = threading.Thread(target=wx_login, args=(wxchat,tel,token_id,))
  257. thread.daemon = True
  258. thread.start()
  259. elif msg_type_data == 'group-sending':
  260. agent_tel=content_data.get('agent_tel', '18029274615')
  261. # 使用线程处理
  262. #wx_messages_process_callback(agent_tel,message)
  263. thread = threading.Thread(target=wx_messages_process_callback, args=(agent_tel,message,))
  264. thread.daemon = True
  265. thread.start()
  266. else:
  267. print(f'未处理息类型 {msg_type_data}')
  268. except Exception as e:
  269. print(f"处理消息时发生错误: {e}, 消息内容: {message}")
  270. # 启动
  271. # def consumer_start_up_thread():
  272. # print('启动')
  273. # try:
  274. # print('启动')
  275. # except Exception as e:
  276. # print(f"处理消息时发生错误: {e}, 消息内容: {message}")