You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

439 lines
20KB

  1. import threading
  2. from common import kafka_helper,redis_helper,utils
  3. from urllib.parse import urlparse, unquote
  4. import json,time,re,random,os
  5. from common.log import logger, log_exception
  6. from datetime import datetime
  7. from wechat import gewe_chat
  8. def wx_messages_process_callback(agent_tel,message):
  9. try:
  10. # print(f'手机号 {agent_tel}')
  11. wxchat = gewe_chat.wxchat
  12. msg_content = message
  13. cleaned_content = clean_json_string(msg_content)
  14. content = json.loads(cleaned_content)
  15. data = content.get("data", {})
  16. msg_type_data = data.get("msg_type", None)
  17. content_data = data.get("content", {})
  18. agent_tel = content_data.get("agent_tel", None)
  19. if msg_type_data == 'group-sending':
  20. process_group_sending(wxchat, content_data, agent_tel)
  21. except json.JSONDecodeError as e:
  22. print(f"JSON解码错误: {e}, 消息内容: {message}")
  23. except Exception as e:
  24. print(f"处理消息时发生错误: {e}, 消息内容: {message}")
  25. def process_group_sending_v0(wxchat:gewe_chat.GeWeChatCom, content_data, agent_tel:str):
  26. # 获取登录信息
  27. hash_key = f"__AI_OPS_WX__:LOGININFO:{agent_tel}"
  28. logininfo = redis_helper.redis_helper.get_hash(hash_key)
  29. if not logininfo:
  30. logger.warning(f"未找到 {agent_tel} 的登录信息")
  31. return
  32. token_id = logininfo.get('tokenId')
  33. app_id = logininfo.get('appId')
  34. agent_wxid = logininfo.get('wxid')
  35. # 获取联系人列表并计算交集
  36. hash_key = f"__AI_OPS_WX__:CONTACTS_BRIEF:{agent_wxid}"
  37. cache_friend_wxids_str=redis_helper.redis_helper.get_hash_field(hash_key,"data")
  38. cache_friend_wxids_list=json.loads(cache_friend_wxids_str) if cache_friend_wxids_str else []
  39. cache_friend_wxids=[f["userName"] for f in cache_friend_wxids_list]
  40. wxid_contact_list_content_data = [c['wxid'] for c in content_data.get("contact_list", [])]
  41. intersection_wxids = list(set(cache_friend_wxids) & set(wxid_contact_list_content_data))
  42. # 发送消息
  43. wx_content_list = content_data.get("wx_content", [])
  44. for wx_content in wx_content_list:
  45. if wx_content["type"] == "text":
  46. send_text_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content["text"])
  47. elif wx_content["type"] == "image_url":
  48. send_image_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content.get("image_url", {}).get("url"))
  49. elif wx_content["type"] == "tts":
  50. send_tts_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content["text"])
  51. def process_group_sending(wxchat:gewe_chat.GeWeChatCom, content_data, agent_tel:str):
  52. # 获取登录信息
  53. hash_key = f"__AI_OPS_WX__:LOGININFO:{agent_tel}"
  54. logininfo = redis_helper.redis_helper.get_hash(hash_key)
  55. if not logininfo:
  56. logger.warning(f"未找到 {agent_tel} 的登录信息")
  57. return
  58. token_id = logininfo.get('tokenId')
  59. app_id = logininfo.get('appId')
  60. agent_wxid = logininfo.get('wxid')
  61. # 获取联系人列表并计算交集
  62. hash_key = f"__AI_OPS_WX__:CONTACTS_BRIEF:{agent_wxid}"
  63. cache_friend_wxids_str=redis_helper.redis_helper.get_hash_field(hash_key,"data")
  64. cache_friend_wxids_list=json.loads(cache_friend_wxids_str) if cache_friend_wxids_str else []
  65. cache_friend_wxids=[f["userName"] for f in cache_friend_wxids_list]
  66. # 获取群交集
  67. hash_key = f"__AI_OPS_WX__:GROUPS_INFO:{agent_wxid}"
  68. cache_chatrooms = redis_helper.redis_helper.get_hash(hash_key)
  69. cache_chatroom_ids=cache_chatrooms.keys()
  70. wxid_contact_list_content_data = [c['wxid'] for c in content_data.get("contact_list", [])]
  71. intersection_friend_wxids = list(set(cache_friend_wxids) & set(wxid_contact_list_content_data))
  72. intersection_chatroom_ids = list(set(cache_chatroom_ids) & set(wxid_contact_list_content_data))
  73. intersection_wxids=intersection_friend_wxids+intersection_chatroom_ids
  74. # 发送消息
  75. wx_content_list = content_data.get("wx_content", [])
  76. # for wx_content in wx_content_list:
  77. # if wx_content["type"] == "text":
  78. # send_text_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content["text"])
  79. # elif wx_content["type"] == "image_url":
  80. # send_image_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content.get("image_url", {}).get("url"))
  81. # elif wx_content["type"] == "tts":
  82. # send_tts_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content["text"])
  83. wxchat.forward_video_aeskey = ''
  84. wxchat.forward_video_cdnvideourl = ''
  85. wxchat.forward_video_length = 0
  86. for intersection_wxid in intersection_wxids:
  87. for wx_content in wx_content_list:
  88. if wx_content["type"] == "text":
  89. send_text_message(wxchat, token_id, app_id, agent_wxid, [intersection_wxid], wx_content["text"])
  90. elif wx_content["type"] == "image_url":
  91. send_image_message(wxchat, token_id, app_id, agent_wxid, [intersection_wxid], wx_content.get("image_url", {}).get("url"))
  92. elif wx_content["type"] == "tts":
  93. send_tts_message(wxchat, token_id, app_id, agent_wxid, [intersection_wxid], wx_content["text"])
  94. elif wx_content["type"] == "file":
  95. send_file_message(wxchat, token_id, app_id, agent_wxid, [intersection_wxid], wx_content.get("file_url", {}).get("url"))
  96. def send_text_message(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, agent_wxid, intersection_wxids, text):
  97. for t in intersection_wxids:
  98. # 发送文本消息
  99. ret,ret_msg,res = wxchat.post_text(token_id, app_id, t, text)
  100. logger.info(f'{agent_wxid} 向 {t} 发送文字【{text}】')
  101. # 构造对话消息并发送到 Kafka
  102. input_wx_content_dialogue_message = [{"type": "text", "text": text}]
  103. input_message = utils.dialogue_message(agent_wxid, t, input_wx_content_dialogue_message)
  104. kafka_helper.kafka_client.produce_message(input_message)
  105. logger.info("发送对话 %s", input_message)
  106. # 等待随机时间
  107. time.sleep(random.uniform(5, 15))
  108. def send_image_message(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, agent_wxid, intersection_wxids, image_url):
  109. aeskey, cdnthumburl, cdnthumblength, cdnthumbheight, cdnthumbwidth, length, md5 = "", "", 0, 0, 0, 0, ""
  110. for t in intersection_wxids:
  111. if t == intersection_wxids[0]:
  112. # 发送图片
  113. ret,ret_msg,res = wxchat.post_image(token_id, app_id, t, image_url)
  114. if ret==200:
  115. aeskey = res["aesKey"]
  116. cdnthumburl = res["fileId"]
  117. cdnthumblength = res["cdnThumbLength"]
  118. cdnthumbheight = res["height"]
  119. cdnthumbwidth = res["width"]
  120. length = res["length"]
  121. md5 = res["md5"]
  122. logger.info(f'{agent_wxid} 向 {t} 发送图片【{image_url}】{ret_msg}')
  123. else:
  124. logger.warning(f'{agent_wxid} 向 {t} 发送图片【{image_url}】{ret_msg}')
  125. else:
  126. if aeskey !="":
  127. # 转发图片
  128. res,ret,ret_msg= wxchat.forward_image(token_id, app_id, t, aeskey, cdnthumburl, cdnthumblength, cdnthumbheight, cdnthumbwidth, length, md5)
  129. logger.info(f'{agent_wxid} 向 {t} 转发图片【{image_url}】{ret_msg}')
  130. else:
  131. # 发送图片
  132. ret,ret_msg,res = wxchat.post_image(token_id, app_id, t, image_url)
  133. if ret==200:
  134. aeskey = res["aesKey"]
  135. cdnthumburl = res["fileId"]
  136. cdnthumblength = res["cdnThumbLength"]
  137. cdnthumbheight = res["height"]
  138. cdnthumbwidth = res["width"]
  139. length = res["length"]
  140. md5 = res["md5"]
  141. logger.info(f'{agent_wxid} 向 {t} 发送图片【{image_url}】{ret_msg}')
  142. else:
  143. logger.warning(f'{agent_wxid} 向 {t} 发送图片【{image_url}】{ret_msg}')
  144. # 构造对话消息并发送到 Kafka
  145. wx_content_dialogue_message = [{"type": "image_url", "image_url": {"url": image_url}}]
  146. input_message = utils.dialogue_message(agent_wxid, t, wx_content_dialogue_message)
  147. kafka_helper.kafka_client.produce_message(input_message)
  148. logger.info("发送对话 %s", input_message)
  149. # 等待随机时间
  150. time.sleep(random.uniform(5, 15))
  151. def send_tts_message(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, agent_wxid, intersection_wxids, text):
  152. voice_during,voice_url=utils.wx_voice(text)
  153. for t in intersection_wxids:
  154. # 发送送语音消息
  155. if voice_url:
  156. ret,ret_msg,res = wxchat.post_voice(token_id, app_id, t, voice_url,voice_during)
  157. if ret==200:
  158. logger.info(f'{agent_wxid} 向 {t} 发送语音文本【{text}】{ret_msg}')
  159. # 构造对话消息并发送到 Kafka
  160. input_wx_content_dialogue_message = [{"type": "text", "text": text}]
  161. input_message = utils.dialogue_message(agent_wxid, t, input_wx_content_dialogue_message)
  162. kafka_helper.kafka_client.produce_message(input_message)
  163. logger.info("发送对话 %s", input_message)
  164. else:
  165. logger.warning((f'{agent_wxid} 向 {t} 发送语音文本【{text}】{ret_msg}'))
  166. else:
  167. logger.warning((f'{agent_wxid} 向 {t} 发送语音文本【{text}】出错'))
  168. # 等待随机时间
  169. time.sleep(random.uniform(5, 15))
  170. def send_file_message(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, agent_wxid, intersection_wxids, file_url):
  171. parsed_url = urlparse(file_url)
  172. path = parsed_url.path
  173. # 从路径中提取文件名
  174. filename = path.split('/')[-1]
  175. # 获取扩展名
  176. _, ext = os.path.splitext(filename)
  177. if ext == '.mp4':
  178. send_video_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, file_url)
  179. else:
  180. send_other_file_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, file_url)
  181. #time.sleep(random.uniform(5, 15))
  182. def send_video_message(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, agent_wxid, intersection_wxids, file_url):
  183. for t in intersection_wxids:
  184. # 发送视频消息
  185. parsed_url = urlparse(file_url)
  186. filename = os.path.basename(parsed_url.path)
  187. tmp_file_path = os.path.join(os.getcwd(),'tmp', filename) # 拼接完整路径
  188. thumbnail_path=tmp_file_path.replace('.mp4','.jpg')
  189. video_thumb_url,video_duration =utils.download_video_and_get_thumbnail(file_url,thumbnail_path)
  190. print(f'视频缩略图 {video_thumb_url} 时长 {video_duration}')
  191. if wxchat.forward_video_aeskey == '':
  192. ret,ret_msg,res = wxchat.post_video(token_id, app_id, t, file_url,video_thumb_url,video_duration)
  193. if ret==200:
  194. wxchat.forward_video_aeskey = res["aesKey"]
  195. wxchat.forward_video_cdnvideourl = res["cdnVideoUrl"]
  196. wxchat.forward_video_length = res["length"]
  197. else:
  198. ret,ret_msg,res = wxchat.forward_video(token_id, app_id, t, wxchat.forward_video_aeskey, wxchat.forward_video_cdnvideourl, wxchat.forward_video_length)
  199. print('转发视频')
  200. if ret==200:
  201. logger.info(f'{agent_wxid} 向 {t} 发送视频【{file_url}】{ret_msg}')
  202. # 构造对话消息并发送到 Kafka
  203. input_wx_content_dialogue_message = [{"type": "file", "file_url": {"url": file_url}}]
  204. input_message = utils.dialogue_message(agent_wxid, t, input_wx_content_dialogue_message)
  205. kafka_helper.kafka_client.produce_message(input_message)
  206. logger.info("发送对话 %s", input_message)
  207. else:
  208. logger.warning((f'{agent_wxid} 向 {t} 发送视频【{file_url}】{ret_msg}'))
  209. # 等待随机时间
  210. time.sleep(random.uniform(5, 15))
  211. def send_other_file_message(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, agent_wxid, intersection_wxids, file_url):
  212. print('send_otherfile_message')
  213. def clean_json_string(json_str):
  214. # 删除所有控制字符(非打印字符),包括换行符、回车符等
  215. return re.sub(r'[\x00-\x1f\x7f]', '', json_str)
  216. # 启动 Kafka 消费者线程
  217. def start_kafka_consumer_thread():
  218. agent_tel=os.environ.get('tel', '18029274615')
  219. # consumer_thread = threading.Thread(target=kafka_helper.kafka_client.consume_messages, args=(agent_tel,wx_messages_process_callback,))
  220. consumer_thread = threading.Thread(target=kafka_helper.kafka_client.consume_messages, args=(ops_messages_process,))
  221. consumer_thread.daemon = True # 设置为守护线程,应用退出时会自动结束
  222. consumer_thread.start()
  223. def login_or_reconnect(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, region_id,hash_key, is_reconnect=False, max_retries=5):
  224. """
  225. 封装微信登录或重连的逻辑
  226. """
  227. agent_tel=hash_key.split(":")[-1]
  228. retry_count = 0
  229. while retry_count < max_retries:
  230. retry_count += 1
  231. if is_reconnect:
  232. logger.info("尝试重连...")
  233. else:
  234. logger.info("获取二维码进行登录...")
  235. qr_code = wxchat.get_login_qr_code(token_id, app_id,region_id)
  236. base64_string = qr_code.get('qrImgBase64')
  237. uuid = qr_code.get('uuid')
  238. if not uuid:
  239. logger.error(f"uuid获取二维码失败: {qr_code}")
  240. break
  241. app_id = app_id or qr_code.get('appId')
  242. start_time = time.time()
  243. qr_code_urls= wxchat.qrCallback(uuid, base64_string)
  244. # 构造 Kafka 消息发送二维码
  245. k_message=utils.login_qrcode_message(token_id,agent_tel,base64_string,qr_code_urls)
  246. kafka_helper.kafka_client.produce_message(k_message)
  247. while True:
  248. now = time.time()
  249. # 如果登录超时,重新获取二维码
  250. if now- start_time > 150: #150 秒 二维码失效
  251. break
  252. logger.info(f"{token_id} 使用 {app_id},等待扫码登录,二维码有效时间 {150 - int(now - start_time)} 秒")
  253. captch_code = wxchat.get_login_wx_captch_code_to_cache(token_id)
  254. captch_code= captch_code if captch_code else ''
  255. ret,msg,res = wxchat.check_login(token_id, app_id, uuid,captch_code)
  256. if ret == 200:
  257. flag = res.get('status')
  258. # 构造 Kafka 消息发送登录状态
  259. # todo
  260. if flag == 2:
  261. logger.info(f"登录成功: {res}")
  262. head_img_url=res.get('headImgUrl','')
  263. login_info = res.get('loginInfo', {})
  264. login_info.update({'appId': app_id, 'uuid': uuid, 'tokenId': token_id,'status': 1,'headImgUrl':head_img_url,'regionId':region_id})
  265. cache_login_info=redis_helper.redis_helper.get_hash(hash_key)
  266. if 'appId' not in cache_login_info:
  267. login_info.update({"create_at":int(time.time()),"modify_at":int(time.time())})
  268. else:
  269. login_info.update({"modify_at":int(time.time())})
  270. # if 'appId' in cache_login_info:
  271. # login_info.update({"reg_time":datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3],"login_time":datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]})
  272. # else:
  273. # login_info.update({"login_time":datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]})
  274. cleaned_login_info = {k: (v if v is not None else '') for k, v in login_info.items()}
  275. redis_helper.redis_helper.set_hash(hash_key, cleaned_login_info)
  276. return login_info
  277. else:
  278. logger.info(f"登录检查中: {ret}-{msg}-{res}")
  279. time.sleep(5)
  280. logger.error(f"登录失败,二维码生成 {max_retries} 次")
  281. def fetch_and_save_contacts(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, hash_key):
  282. """
  283. 获取联系人列表并保存到缓存
  284. """
  285. ret,msg,contacts_list = wxchat.fetch_contacts_list(token_id, app_id)
  286. friend_wxids = contacts_list['friends'][3:] # 可以调整截取范围
  287. wxid = redis_helper.redis_helper.get_hash_field(hash_key, 'wxid')
  288. wxchat.save_contacts_brief_to_cache(token_id, app_id, wxid, friend_wxids)
  289. print(f'微信ID {wxid} 登录APPID {app_id} 成功,联系人已保存')
  290. def wx_login(wxchat:gewe_chat.GeWeChatCom,tel,token_id,region_id):
  291. hash_key = f"__AI_OPS_WX__:LOGININFO:{tel}"
  292. login_info = redis_helper.redis_helper.get_hash(hash_key)
  293. if not login_info:
  294. login_info = login_or_reconnect(wxchat, token_id, '', region_id,hash_key)
  295. else:
  296. app_id = login_info.get('appId')
  297. token_id = login_info.get('tokenId')
  298. wxid= login_info.get('wxid')
  299. # 检查是否已经登录
  300. is_online = wxchat.check_online(token_id, app_id)
  301. if is_online:
  302. logger.info(f'微信ID {wxid} 在APPID {app_id} 已经在线')
  303. else:
  304. # 尝试重连
  305. res = wxchat.reconnection(token_id, app_id)
  306. if res.get('ret') == 200:
  307. logger.info(f'微信ID {wxid} 在APPID {app_id} 重连成功')
  308. else:
  309. print("重连失败,重新登录...")
  310. login_info = login_or_reconnect(wxchat, token_id, app_id, region_id,hash_key, is_reconnect=True)
  311. if login_info:
  312. fetch_and_save_contacts(wxchat, token_id, login_info.get('appId'), hash_key)
  313. def login_wx_captch_code_process(wxchat:gewe_chat.GeWeChatCom,message):
  314. msg_content = message
  315. cleaned_content = clean_json_string(msg_content)
  316. content = json.loads(cleaned_content)
  317. data = content.get("data", {})
  318. msg_type_data = data.get("msg_type", None)
  319. content_data = data.get("content", {})
  320. token_id = content_data.get("token_id", None)
  321. captch_code = content_data.get("captch_code", None)
  322. wxchat.save_login_wx_captch_code_to_cache(token_id,captch_code)
  323. def ops_messages_process(message):
  324. try:
  325. wxchat = gewe_chat.wxchat
  326. #print(message)
  327. # logger.info(f"接收到kafka消息: {json.dumps(message, separators=(',', ':'), ensure_ascii=False)}")
  328. logger.info(f"接收到kafka消息: {json.dumps(json.loads(message), ensure_ascii=False)}")
  329. msg_content = message
  330. cleaned_content = clean_json_string(msg_content)
  331. content = json.loads(cleaned_content)
  332. data = content.get("data", {})
  333. msg_type_data = data.get("msg_type", None)
  334. content_data = data.get("content", {})
  335. if msg_type_data=="login":
  336. tel=content_data.get('tel', '18029274615')
  337. token_id=content_data.get('token_id', 'f828cb3c-1039-489f-b9ae-7494d1778a15')
  338. region_id=content_data.get('region_id', '440000')
  339. #wx_login(wxchat,tel,token_id)
  340. thread = threading.Thread(target=wx_login, args=(wxchat,tel,token_id,region_id))
  341. thread.daemon = True
  342. thread.start()
  343. elif msg_type_data == 'group-sending':
  344. agent_tel=content_data.get('agent_tel', '18029274615')
  345. # 使用线程处理
  346. #wx_messages_process_callback(agent_tel,message)
  347. thread = threading.Thread(target=wx_messages_process_callback, args=(agent_tel,message,))
  348. thread.daemon = True
  349. thread.start()
  350. elif msg_type_data == 'login_wx_captch_code':
  351. thread = threading.Thread(target=login_wx_captch_code_process, args=(wxchat,message,))
  352. thread.daemon = True
  353. thread.start()
  354. else:
  355. print(f'未处理息类型 {msg_type_data}')
  356. except Exception as e:
  357. print(f"处理消息时发生错误: {e}, 消息内容: {message}")