diff --git a/app.py b/app.py index fb34887..1943338 100644 --- a/app.py +++ b/app.py @@ -458,7 +458,16 @@ class WechatThreadManager: if ret==200: friend_wxids = [c for c in contacts_list['friends'] if c not in ['fmessage', 'medianote','weixin','weixingongzhong']] # 可以调整截取范围 print(f'{wxid}的好友数量 {len(friend_wxids)}') - self.wxchat.save_contacts_brief_to_cache(token_id, app_id, wxid, friend_wxids) + cache=self.wxchat.get_contacts_brief_from_cache(wxid) + cache_wxids = [c.get('userName') for c in cache] + new_friend_wxids = [wxid for wxid in friend_wxids if wxid not in cache_wxids] + data=self.wxchat.save_contacts_brief_to_cache(token_id, app_id, wxid, friend_wxids) + # 推送到kafka + + input_message=utils.wx_all_contacts(wxid,data) + kafka_helper.kafka_client.produce_message(input_message) + + logger.info(f'微信ID {wxid} 登录APPID {app_id} 成功,联系人已定时保存') chatrooms=contacts_list['chatrooms'] self.wxchat.save_groups_info_to_cache(token_id, app_id, wxid, chatrooms) diff --git a/common/kafka_helper.py b/common/kafka_helper.py index 5543d0e..94cfd50 100644 --- a/common/kafka_helper.py +++ b/common/kafka_helper.py @@ -102,54 +102,6 @@ class KafkaClient: finally: self.consumer.close() - def consume_messages1(self,agent_tel,process_callback): - """ - 消费消息并调用回调处理业务逻辑,只有当回调返回 True 时才提交偏移量 - :param process_callback: 业务逻辑回调函数,返回布尔值 - :param agent_tel: 代理商手机号 - """ - consumer=Consumer({ - 'bootstrap.servers': self.bootstrap_servers, - 'group.id': f'aiops-wx_{agent_tel}', - 'auto.offset.reset': 'earliest', - # 'enable.auto.commit': False # 禁用自动提交,使用手动提交 - 'enable.auto.commit': True - }) - consumer.subscribe([self.topic]) - - try: - while True: - msg = consumer.poll(0.3) - if msg is None: - continue - if msg.error(): - if msg.error().code() == KafkaError._PARTITION_EOF: - print(f"End of partition {msg.partition}, offset {msg.offset()}") - else: - raise KafkaException(msg.error()) - else: - # 调用业务处理逻辑 - # process_callback(msg.value().decode('utf-8')) - # 调用业务处理逻辑,传递 user_nickname 和消息 - process_callback(agent_tel,msg.value().decode('utf-8')) - # if process_callback(user_nickname, msg.value().decode('utf-8')): - # # 如果返回 True,表示处理成功,可以提交偏移量 - # try: - # # self.consumer.commit(msg) - # self.consumer.commit(message=msg, asynchronous=True) - # print(f"Manually committed offset: {msg.offset()}") - # except KafkaException as e: - # print(f"Error committing offset: {e}") - except KafkaException as e: - print(f"Kafka exception occurred: {e}") - if 'KafkaError._ALL_BROKERS_DOWN' in str(e): - print(f"Kafka brokers for agent {agent_tel} are down, retrying in 5 seconds...") - time.sleep(5) - self._reconnect_consumer_with_agent_tel(consumer, agent_tel) - except Exception as e: - print(f"An unexpected error occurred: {e}") - time.sleep(5) - def _reconnect_consumer_with_agent_tel(self, consumer, agent_tel): """ diff --git a/common/utils.py b/common/utils.py index aed6213..33730c0 100644 --- a/common/utils.py +++ b/common/utils.py @@ -250,7 +250,7 @@ def dialogue_message(wxid_from:str,wxid_to:str,wx_content:list,is_ai:bool=False) return json.dumps(data, separators=(',', ':'), ensure_ascii=False) -def kafka_base_message(msg_type,content: dict)->dict: +def kafka_base_message(msg_type:str,content: dict)->dict: # 获取当前时间戳,精确到毫秒 current_timestamp = int(time.time() * 1000) @@ -270,6 +270,25 @@ def kafka_base_message(msg_type,content: dict)->dict: } return data +def wx_offline_message(appid:str,wxid:str)->str: + content = {"appid": appid,"wxid":wxid} + data=kafka_base_message("wx-offline",content) + return json.dumps(data, separators=(',', ':'), ensure_ascii=False) + +def wx_del_contact_message(wxid:str,contact_wixd:str)->str: + content = {"wxid": wxid,"contact_wixd":contact_wixd} + data=kafka_base_message("del-contact",content) + return json.dumps(data, separators=(',', ':'), ensure_ascii=False) + +def wx_mod_contact_message(wxid:str,contact_data:dict)->str: + content = {"wxid": wxid,"contact_data":contact_data} + data=kafka_base_message("mod-contact",content) + return json.dumps(data, separators=(',', ':'), ensure_ascii=False) + +def wx_all_contacts(wxid:str,data)->str: + content = {"wxid": wxid,"contacts_data":data} + data=kafka_base_message("all-contacts",content) + return json.dumps(data, separators=(',', ':'), ensure_ascii=False) def login_qrcode_message(token_id: str,agent_tel:str,qr_code_img_base64:str,qr_code_url:list)->str: """ diff --git a/gunicorn_config.py b/gunicorn_config.py index 4a36f92..a908bdb 100644 --- a/gunicorn_config.py +++ b/gunicorn_config.py @@ -19,6 +19,9 @@ loglevel = 'info' accesslog = 'logs/gunicorn_access.log' errorlog = 'logs/gunicorn_error.log' + +preload_app = True + # 生产环境安全配置 # reload = False # preload_app = True diff --git a/resources/messages_resource.py b/resources/messages_resource.py index 6fefe5a..02cd59d 100644 --- a/resources/messages_resource.py +++ b/resources/messages_resource.py @@ -17,6 +17,8 @@ from voice import audio_convert timeout_duration = 8.0 + +WX_BACKLIST=['fmessage', 'medianote','weixin','weixingongzhong'] class MessagesResource(Resource): def __init__(self): self.parser = reqparse.RequestParser() @@ -36,7 +38,6 @@ class MessagesResource(Resource): wxid = msg.get("Wxid",'') - # 自发命令 # if type_name=='AddMsg': # from_wxid = msg_data["FromUserName"]["string"] @@ -44,14 +45,18 @@ class MessagesResource(Resource): # pass if type_name == 'AddMsg': handle_self_cmd(wxid,msg) + msg_data = msg.get("Data") + from_wxid = msg_data["FromUserName"]["string"] + config=redis_helper.redis_helper.get_hash(f"__AI_OPS_WX__:WXCHAT_CONFIG") + wxids=config.keys() + WX_BACKLIST.extend(wxids) + if from_wxid in WX_BACKLIST: + logger.warning(f'微信ID {wxid} 在黑名单,不处理') + return jsonify({"message": "收到微信回调消息"}) + wx_config = gewe_chat.wxchat.get_wxchat_config_from_cache(wxid) - - # if not bool(wx_config.get("agentEnabled",False)): - # logger.info(f'微信ID {wxid} 未托管,不处理') - # return jsonify({"message": "收到微信回调消息"}) - if type_name=='AddMsg': if not bool(wx_config.get("agentEnabled",False)): @@ -106,6 +111,51 @@ class MessagesResource(Resource): # threading.Thread(target=handle_mod_contacts, args=(token_id,app_id,wxid,msg_data)).start() + contact_wxid = msg_data["UserName"]["string"] + nickname=msg_data["NickName"]["string"] + city=msg_data["City"] + signature=msg_data["Signature"] + province=msg_data["Province"] + bigHeadImgUrl=msg_data["SnsUserInfo"]["SnsBgimgId"] + country=msg_data["Country"] + sex=msg_data["Sex"] + pyInitial=msg_data["PyInitial"]["string"] + quanPin=msg_data["QuanPin"]["string"] + remark=msg_data.get("Remark").get("string","") + remarkPyInitial=msg_data.get("RemarkPyInitial").get("string","") + remarkQuanPin=msg_data.get("RemarkQuanPin").get("string","") + smallHeadImgUrl=msg_data["smallHeadImgUrl"] + + # data=gewe_chat.wxchat.get_brief_info(token_id,app_id,[contact_wxid]) + # contact=data[0] + # alias=contact.get("alias") + #推动到kafka + contact_data = { + "alias": None, + "bigHeadImgUrl": bigHeadImgUrl, + "cardImgUrl": None, + "city": city, + "country": country, + "description": None, + "labelList": None, + "nickName": nickname, + "phoneNumList": None, + "province": province, + "pyInitial": pyInitial, + "quanPin": quanPin, + "remark": remark, + "remarkPyInitial": remarkPyInitial, + "remarkQuanPin": remarkQuanPin, + "sex": sex, + "signature": signature, + "smallHeadImgUrl": smallHeadImgUrl, + "snsBgImg": None, + "userName": contact_wxid + } + input_message=utils.wx_mod_contact_message(wxid,contact_data) + kafka_helper.kafka_client.produce_message(input_message) + + elif type_name=="DelContacts": ''' 删除好友通知/退出群聊 @@ -120,6 +170,11 @@ class MessagesResource(Resource): logger.info(f'清除 chatroom_id{chatroom_id} 数据') else: logger.info('删除好友通知') + + # 推送到kafka + input_message=utils.wx_del_contact_message(wxid,username) + kafka_helper.kafka_client.produce_message(input_message) + elif type_name=="Offline": ''' 已经离线 @@ -130,6 +185,11 @@ class MessagesResource(Resource): print(k) redis_helper.redis_helper.update_hash_field(k,'status',0) redis_helper.redis_helper.update_hash_field(k,'modify_at',int(time.time())) + + # 推送到kafka + input_message=utils.wx_offline_message(app_id,wxid) + kafka_helper.kafka_client.produce_message(input_message) + else: logger.warning(f"未知消息类型") diff --git a/wechat/gewe_chat.py b/wechat/gewe_chat.py index 85c9436..8c60634 100644 --- a/wechat/gewe_chat.py +++ b/wechat/gewe_chat.py @@ -243,7 +243,7 @@ class GeWeChatCom: print(response_data) return response_data.get('data') - def get_brief_info(self,token_id, app_id,wxids): + def get_brief_info(self,token_id, app_id,wxids:list): ''' 获取群/好友简要信息 1<= wxids <=100