You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

355 lines
16KB

  1. import threading
  2. from common import kafka_helper,redis_helper,utils
  3. import json,time,re,random,os
  4. from common.log import logger, log_exception
  5. from datetime import datetime
  6. from wechat import gewe_chat
  7. def wx_messages_process_callback(agent_tel,message):
  8. try:
  9. # print(f'手机号 {agent_tel}')
  10. wxchat = gewe_chat.wxchat
  11. msg_content = message
  12. cleaned_content = clean_json_string(msg_content)
  13. content = json.loads(cleaned_content)
  14. data = content.get("data", {})
  15. msg_type_data = data.get("msg_type", None)
  16. content_data = data.get("content", {})
  17. agent_tel = content_data.get("agent_tel", None)
  18. if msg_type_data == 'group-sending':
  19. process_group_sending(wxchat, content_data, agent_tel)
  20. except json.JSONDecodeError as e:
  21. print(f"JSON解码错误: {e}, 消息内容: {message}")
  22. except Exception as e:
  23. print(f"处理消息时发生错误: {e}, 消息内容: {message}")
  24. def process_group_sending_v0(wxchat:gewe_chat.GeWeChatCom, content_data, agent_tel:str):
  25. # 获取登录信息
  26. hash_key = f"__AI_OPS_WX__:LOGININFO:{agent_tel}"
  27. logininfo = redis_helper.redis_helper.get_hash(hash_key)
  28. if not logininfo:
  29. logger.warning(f"未找到 {agent_tel} 的登录信息")
  30. return
  31. token_id = logininfo.get('tokenId')
  32. app_id = logininfo.get('appId')
  33. agent_wxid = logininfo.get('wxid')
  34. # 获取联系人列表并计算交集
  35. hash_key = f"__AI_OPS_WX__:CONTACTS_BRIEF:{agent_wxid}"
  36. cache_friend_wxids_str=redis_helper.redis_helper.get_hash_field(hash_key,"data")
  37. cache_friend_wxids_list=json.loads(cache_friend_wxids_str) if cache_friend_wxids_str else []
  38. cache_friend_wxids=[f["userName"] for f in cache_friend_wxids_list]
  39. wxid_contact_list_content_data = [c['wxid'] for c in content_data.get("contact_list", [])]
  40. intersection_wxids = list(set(cache_friend_wxids) & set(wxid_contact_list_content_data))
  41. # 发送消息
  42. wx_content_list = content_data.get("wx_content", [])
  43. for wx_content in wx_content_list:
  44. if wx_content["type"] == "text":
  45. send_text_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content["text"])
  46. elif wx_content["type"] == "image_url":
  47. send_image_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content.get("image_url", {}).get("url"))
  48. elif wx_content["type"] == "tts":
  49. send_tts_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content["text"])
  50. def process_group_sending(wxchat:gewe_chat.GeWeChatCom, content_data, agent_tel:str):
  51. # 获取登录信息
  52. hash_key = f"__AI_OPS_WX__:LOGININFO:{agent_tel}"
  53. logininfo = redis_helper.redis_helper.get_hash(hash_key)
  54. if not logininfo:
  55. logger.warning(f"未找到 {agent_tel} 的登录信息")
  56. return
  57. token_id = logininfo.get('tokenId')
  58. app_id = logininfo.get('appId')
  59. agent_wxid = logininfo.get('wxid')
  60. # 获取联系人列表并计算交集
  61. hash_key = f"__AI_OPS_WX__:CONTACTS_BRIEF:{agent_wxid}"
  62. cache_friend_wxids_str=redis_helper.redis_helper.get_hash_field(hash_key,"data")
  63. cache_friend_wxids_list=json.loads(cache_friend_wxids_str) if cache_friend_wxids_str else []
  64. cache_friend_wxids=[f["userName"] for f in cache_friend_wxids_list]
  65. # 获取群交集
  66. hash_key = f"__AI_OPS_WX__:GROUPS_INFO:{agent_wxid}"
  67. cache_chatrooms = redis_helper.redis_helper.get_hash(hash_key)
  68. cache_chatroom_ids=cache_chatrooms.keys()
  69. wxid_contact_list_content_data = [c['wxid'] for c in content_data.get("contact_list", [])]
  70. intersection_friend_wxids = list(set(cache_friend_wxids) & set(wxid_contact_list_content_data))
  71. intersection_chatroom_ids = list(set(cache_chatroom_ids) & set(wxid_contact_list_content_data))
  72. intersection_wxids=intersection_friend_wxids+intersection_chatroom_ids
  73. # 发送消息
  74. wx_content_list = content_data.get("wx_content", [])
  75. # for wx_content in wx_content_list:
  76. # if wx_content["type"] == "text":
  77. # send_text_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content["text"])
  78. # elif wx_content["type"] == "image_url":
  79. # send_image_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content.get("image_url", {}).get("url"))
  80. # elif wx_content["type"] == "tts":
  81. # send_tts_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content["text"])
  82. for intersection_wxid in intersection_wxids:
  83. for wx_content in wx_content_list:
  84. if wx_content["type"] == "text":
  85. send_text_message(wxchat, token_id, app_id, agent_wxid, [intersection_wxid], wx_content["text"])
  86. elif wx_content["type"] == "image_url":
  87. send_image_message(wxchat, token_id, app_id, agent_wxid, [intersection_wxid], wx_content.get("image_url", {}).get("url"))
  88. elif wx_content["type"] == "tts":
  89. send_tts_message(wxchat, token_id, app_id, agent_wxid, [intersection_wxid], wx_content["text"])
  90. def send_text_message(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, agent_wxid, intersection_wxids, text):
  91. for t in intersection_wxids:
  92. # 发送文本消息
  93. ret,ret_msg,res = wxchat.post_text(token_id, app_id, t, text)
  94. logger.info(f'{agent_wxid} 向 {t} 发送文字【{text}】')
  95. # 构造对话消息并发送到 Kafka
  96. input_wx_content_dialogue_message = [{"type": "text", "text": text}]
  97. input_message = utils.dialogue_message(agent_wxid, t, input_wx_content_dialogue_message)
  98. kafka_helper.kafka_client.produce_message(input_message)
  99. logger.info("发送对话 %s", input_message)
  100. # 等待随机时间
  101. time.sleep(random.uniform(5, 15))
  102. def send_image_message(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, agent_wxid, intersection_wxids, image_url):
  103. aeskey, cdnthumburl, cdnthumblength, cdnthumbheight, cdnthumbwidth, length, md5 = "", "", 0, 0, 0, 0, ""
  104. for t in intersection_wxids:
  105. if t == intersection_wxids[0]:
  106. # 发送图片
  107. ret,ret_msg,res = wxchat.post_image(token_id, app_id, t, image_url)
  108. if ret==200:
  109. aeskey = res["aesKey"]
  110. cdnthumburl = res["fileId"]
  111. cdnthumblength = res["cdnThumbLength"]
  112. cdnthumbheight = res["height"]
  113. cdnthumbwidth = res["width"]
  114. length = res["length"]
  115. md5 = res["md5"]
  116. logger.info(f'{agent_wxid} 向 {t} 发送图片【{image_url}】{ret_msg}')
  117. else:
  118. logger.warning(f'{agent_wxid} 向 {t} 发送图片【{image_url}】{ret_msg}')
  119. else:
  120. if aeskey !="":
  121. # 转发图片
  122. res,ret,ret_msg= wxchat.forward_image(token_id, app_id, t, aeskey, cdnthumburl, cdnthumblength, cdnthumbheight, cdnthumbwidth, length, md5)
  123. logger.info(f'{agent_wxid} 向 {t} 转发图片【{image_url}】{ret_msg}')
  124. else:
  125. # 发送图片
  126. ret,ret_msg,res = wxchat.post_image(token_id, app_id, t, image_url)
  127. if ret==200:
  128. aeskey = res["aesKey"]
  129. cdnthumburl = res["fileId"]
  130. cdnthumblength = res["cdnThumbLength"]
  131. cdnthumbheight = res["height"]
  132. cdnthumbwidth = res["width"]
  133. length = res["length"]
  134. md5 = res["md5"]
  135. logger.info(f'{agent_wxid} 向 {t} 发送图片【{image_url}】{ret_msg}')
  136. else:
  137. logger.warning(f'{agent_wxid} 向 {t} 发送图片【{image_url}】{ret_msg}')
  138. # 构造对话消息并发送到 Kafka
  139. wx_content_dialogue_message = [{"type": "image_url", "image_url": {"url": image_url}}]
  140. input_message = utils.dialogue_message(agent_wxid, t, wx_content_dialogue_message)
  141. kafka_helper.kafka_client.produce_message(input_message)
  142. logger.info("发送对话 %s", input_message)
  143. # 等待随机时间
  144. time.sleep(random.uniform(5, 15))
  145. def send_tts_message(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, agent_wxid, intersection_wxids, text):
  146. voice_during,voice_url=utils.wx_voice(text)
  147. for t in intersection_wxids:
  148. # 发送送语音消息
  149. if voice_url:
  150. ret,ret_msg,res = wxchat.post_voice(token_id, app_id, t, voice_url,voice_during)
  151. if ret==200:
  152. logger.info(f'{agent_wxid} 向 {t} 发送语音文本【{text}】{ret_msg}')
  153. # 构造对话消息并发送到 Kafka
  154. input_wx_content_dialogue_message = [{"type": "text", "text": text}]
  155. input_message = utils.dialogue_message(agent_wxid, t, input_wx_content_dialogue_message)
  156. kafka_helper.kafka_client.produce_message(input_message)
  157. logger.info("发送对话 %s", input_message)
  158. else:
  159. logger.warning((f'{agent_wxid} 向 {t} 发送语音文本【{text}】{ret_msg}'))
  160. else:
  161. logger.warning((f'{agent_wxid} 向 {t} 发送语音文本【{text}】出错'))
  162. # 等待随机时间
  163. time.sleep(random.uniform(5, 15))
  164. def clean_json_string(json_str):
  165. # 删除所有控制字符(非打印字符),包括换行符、回车符等
  166. return re.sub(r'[\x00-\x1f\x7f]', '', json_str)
  167. # 启动 Kafka 消费者线程
  168. def start_kafka_consumer_thread():
  169. agent_tel=os.environ.get('tel', '18029274615')
  170. # consumer_thread = threading.Thread(target=kafka_helper.kafka_client.consume_messages, args=(agent_tel,wx_messages_process_callback,))
  171. consumer_thread = threading.Thread(target=kafka_helper.kafka_client.consume_messages, args=(ops_messages_process,))
  172. consumer_thread.daemon = True # 设置为守护线程,应用退出时会自动结束
  173. consumer_thread.start()
  174. def login_or_reconnect(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, hash_key, is_reconnect=False, max_retries=5):
  175. """
  176. 封装微信登录或重连的逻辑
  177. """
  178. agent_tel=hash_key.split(":")[-1]
  179. retry_count = 0
  180. while retry_count < max_retries:
  181. retry_count += 1
  182. if is_reconnect:
  183. logger.info("尝试重连...")
  184. else:
  185. logger.info("获取二维码进行登录...")
  186. qr_code = wxchat.get_login_qr_code(token_id, app_id)
  187. base64_string = qr_code.get('qrImgBase64')
  188. uuid = qr_code.get('uuid')
  189. app_id = app_id or qr_code.get('appId')
  190. start_time = time.time()
  191. qr_code_urls= wxchat.qrCallback(uuid, base64_string)
  192. # 构造 Kafka 消息发送二维码
  193. k_message=utils.login_qrcode_message(token_id,agent_tel,base64_string,qr_code_urls)
  194. kafka_helper.kafka_client.produce_message(k_message)
  195. while True:
  196. # 如果登录超时,重新获取二维码
  197. if time.time() - start_time > 150: #150 秒 二维码失效
  198. break
  199. res = wxchat.check_login(token_id, app_id, uuid)
  200. flag = res.get('status')
  201. # 构造 Kafka 消息发送登录状态
  202. # todo
  203. if flag == 2:
  204. logger.info(f"登录成功: {res}")
  205. login_info = res.get('loginInfo', {})
  206. login_info.update({'appId': app_id, 'uuid': uuid, 'tokenId': token_id,'status': 1})
  207. # cache_login_info=redis_helper.redis_helper.get_hash(hash_key)
  208. # if 'appId' in cache_login_info:
  209. # login_info.update({"reg_time":datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3],"login_time":datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]})
  210. # else:
  211. # login_info.update({"login_time":datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]})
  212. cleaned_login_info = {k: (v if v is not None else '') for k, v in login_info.items()}
  213. redis_helper.redis_helper.set_hash(hash_key, cleaned_login_info)
  214. return login_info
  215. time.sleep(5)
  216. logger.error(f"登录失败,二维码生成 {max_retries} 次")
  217. def fetch_and_save_contacts(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, hash_key):
  218. """
  219. 获取联系人列表并保存到缓存
  220. """
  221. ret,msg,contacts_list = wxchat.fetch_contacts_list(token_id, app_id)
  222. friend_wxids = contacts_list['friends'][3:] # 可以调整截取范围
  223. wxid = redis_helper.redis_helper.get_hash_field(hash_key, 'wxid')
  224. wxchat.save_contacts_brief_to_cache(token_id, app_id, wxid, friend_wxids)
  225. print(f'微信ID {wxid} 登录APPID {app_id} 成功,联系人已保存')
  226. def wx_login(wxchat:gewe_chat.GeWeChatCom,tel,token_id):
  227. hash_key = f"__AI_OPS_WX__:LOGININFO:{tel}"
  228. login_info = redis_helper.redis_helper.get_hash(hash_key)
  229. if not login_info:
  230. login_info = login_or_reconnect(wxchat, token_id, '', hash_key)
  231. else:
  232. app_id = login_info.get('appId')
  233. token_id = login_info.get('tokenId')
  234. wxid= login_info.get('wxid')
  235. # 检查是否已经登录
  236. is_online = wxchat.check_online(token_id, app_id)
  237. if is_online:
  238. logger.info(f'微信ID {wxid} 在APPID {app_id} 已经在线')
  239. else:
  240. # 尝试重连
  241. res = wxchat.reconnection(token_id, app_id)
  242. if res.get('ret') == 200:
  243. logger.info(f'微信ID {wxid} 在APPID {app_id} 重连成功')
  244. else:
  245. print("重连失败,重新登录...")
  246. login_info = login_or_reconnect(wxchat, token_id, app_id, hash_key, is_reconnect=True)
  247. fetch_and_save_contacts(wxchat, token_id, login_info.get('appId'), hash_key)
  248. def ops_messages_process(message):
  249. try:
  250. wxchat = gewe_chat.wxchat
  251. #print(message)
  252. # logger.info(f"接收到kafka消息: {json.dumps(message, separators=(',', ':'), ensure_ascii=False)}")
  253. logger.info(f"接收到kafka消息: {json.dumps(json.loads(message), ensure_ascii=False)}")
  254. msg_content = message
  255. cleaned_content = clean_json_string(msg_content)
  256. content = json.loads(cleaned_content)
  257. data = content.get("data", {})
  258. msg_type_data = data.get("msg_type", None)
  259. content_data = data.get("content", {})
  260. if msg_type_data=="login":
  261. tel=content_data.get('tel', '18029274615')
  262. token_id=content_data.get('token_id', 'f828cb3c-1039-489f-b9ae-7494d1778a15')
  263. #wx_login(wxchat,tel,token_id)
  264. thread = threading.Thread(target=wx_login, args=(wxchat,tel,token_id,))
  265. thread.daemon = True
  266. thread.start()
  267. elif msg_type_data == 'group-sending':
  268. agent_tel=content_data.get('agent_tel', '18029274615')
  269. # 使用线程处理
  270. #wx_messages_process_callback(agent_tel,message)
  271. thread = threading.Thread(target=wx_messages_process_callback, args=(agent_tel,message,))
  272. thread.daemon = True
  273. thread.start()
  274. else:
  275. print(f'未处理息类型 {msg_type_data}')
  276. except Exception as e:
  277. print(f"处理消息时发生错误: {e}, 消息内容: {message}")
  278. # 启动
  279. # def consumer_start_up_thread():
  280. # print('启动')
  281. # try:
  282. # print('启动')
  283. # except Exception as e:
  284. # print(f"处理消息时发生错误: {e}, 消息内容: {message}")