from flask import Flask, send_from_directory, request,jsonify from flask_restful import Api,got_request_exception from resources.user_resource import UserResource from resources.messages_resource import MessagesResource from resources.contacts_resources import DeleteFriendResource,GetFriendsInfoResource from resources.config_reources import GetWxchatConfigResource ,SaveWxchatConfigResource from resources.groups_resources import GetGroupsInfoResource from resources.login_resources import GetLoginInfoResource from common.log import logger, log_exception from common.interceptors import before_request, after_request, handle_exception import threading from common import kafka_helper, redis_helper,utils import logging from config import load_config from wechat.biz import start_kafka_consumer_thread from wechat import gewe_chat import os,time,json from voice.ali.ali_voice import AliVoice # 自定义错误消息 errors = { 'UserAlreadyExistsError': { 'message': "A user with that username already exists.", 'status': 409, }, 'ResourceDoesNotExist': { 'message': "A resource with that ID no longer exists.", 'status': 410, 'extra': "Any extra information you want.", }, } def worker(): kafka_helper.start() redis_helper.start() start_wxchat_thread() start_kafka_consumer_thread() def fetch_and_save_contacts2(): """ 获取联系人列表并保存到缓存 """ wxchat=gewe_chat.wxchat while True: login_keys = list(redis_helper.redis_helper.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*')) logger.info(f"Fetching login keys: {login_keys}") # 遍历每一个获取到的登录键 for k in login_keys: r= redis_helper.redis_helper.get_hash(k) # print(r) token_id = r.get('tokenId') app_id = r.get('appId') wxid = r.get('wxid') status=r.get('status') if status=='1': ret,msg,contacts_list = wxchat.fetch_contacts_list(token_id, app_id) friend_wxids = contacts_list['friends'][3:] # 可以调整截取范围 #friend_wxids.remove('weixin') wxchat.save_contacts_brief_to_cache(token_id, app_id, wxid, friend_wxids) logger.info(f'微信ID {wxid} 登录APPID {app_id} 成功,联系人已定时保存') chatrooms=contacts_list['chatrooms'] wxchat.save_groups_info_to_cache(token_id, app_id, wxid, chatrooms) logger.info(f'微信ID {wxid} 登录APPID {app_id} 成功,群信息已定时保存') else: logger.info(f'微信ID {wxid} 未登录 {app_id} ,联系人不能定时保存') time.sleep(3) time.sleep(60*10) def start_wxchat_thread(): # gewe_chat.start() scan_wx_login_info() # 启动同步联系人线程 threading.Thread(target=fetch_and_save_contacts2).start() def scan_wx_login_info(): gewe_chat.start() wxchat = gewe_chat.wxchat cursor = 0 while True: cursor, login_keys = redis_helper.redis_helper.client.scan(cursor, match='__AI_OPS_WX__:LOGININFO:*') for k in login_keys: r = redis_helper.redis_helper.get_hash(k) app_id=r.get("appId") #tel=r.get("mobile") token_id=r.get("tokenId") wxid=r.get("wxid") is_online = wxchat.check_online(token_id, app_id) if is_online: logger.info(f'微信ID {wxid} 在APPID {app_id} 已经在线') else: # 尝试重连 res = wxchat.reconnection(token_id, app_id) if res.get('ret') == 200: logger.info(f'微信ID {wxid} 在APPID {app_id} 重连成功') # 同步联系人 #fetch_and_save_contacts(wxchat, token_id, app_id, k) else: print("重连失败,重新登录...") print("发送离线消息到kafka") redis_helper.redis_helper.update_hash_field(k,'status',0) time.sleep(3) # 如果游标为 0,则表示扫描完成 if cursor == 0: break # def app_run(): # app = Flask(__name__) # # api = Api(app) # flask_api = Api(app,errors=errors, catch_all_404s=True) # # 设置日志(logger 已在 log.py 中配置) # app.logger.handlers.clear() # 清除 Flask 默认的日志处理器 # app.logger.addHandler(logger.handlers[1]) # 使用文件日志处理器 # app.logger.setLevel(logging.DEBUG) # 设置日志级别 # # 添加拦截器 # app.before_request(before_request) # app.after_request(after_request) # app.register_error_handler(Exception, handle_exception) # # 定义路由 # flask_api.add_resource(UserResource, '/api/user', '/api/user/') # flask_api.add_resource(MessagesResource, '/messages') # flask_api.add_resource(DeleteFriendResource, '/api/contacts/deletefriend') # flask_api.add_resource(GetFriendsInfoResource, '/api/contacts/getfriends') # flask_api.add_resource(GetWxchatConfigResource, '/api/wxchat/getconfig') # flask_api.add_resource(SaveWxchatConfigResource, '/api/wxchat/saveconfig') # flask_api.add_resource(GetGroupsInfoResource, '/api/groups/getchatroominfo') # load_config() # worker() # # 获取环境变量 # environment = os.environ.get('environment', 'default') # port = 5000 if environment == 'default' else 80 # default 使用 5000,其他环境使用 80 # if environment == 'default': # app.run(debug=False, host='0.0.0.0', port=port) # 默认直接启动 Flask 内置服务器 # else: # # 在非 default 环境中,使用 Gunicorn 启动应用 # #os.system(f"gunicorn -w 4 -b 0.0.0.0:{port} app:app") # 启动 Gunicorn,4 个工作进程 # app.run() app = Flask(__name__) # api = Api(app) flask_api = Api(app,errors=errors, catch_all_404s=True) # 设置日志(logger 已在 log.py 中配置) app.logger.handlers.clear() # 清除 Flask 默认的日志处理器 app.logger.addHandler(logger.handlers[1]) # 使用文件日志处理器 app.logger.setLevel(logging.DEBUG) # 设置日志级别 # 添加拦截器 app.before_request(before_request) app.after_request(after_request) app.register_error_handler(Exception, handle_exception) # 定义路由 flask_api.add_resource(UserResource, '/api/user', '/api/user/') flask_api.add_resource(MessagesResource, '/messages') flask_api.add_resource(DeleteFriendResource, '/api/contacts/deletefriend') flask_api.add_resource(GetFriendsInfoResource, '/api/contacts/getfriends') flask_api.add_resource(GetWxchatConfigResource, '/api/wxchat/getconfig') flask_api.add_resource(SaveWxchatConfigResource, '/api/wxchat/saveconfig') flask_api.add_resource(GetGroupsInfoResource, '/api/groups/getchatroominfo') flask_api.add_resource(GetLoginInfoResource, '/api/agent/getlogin') load_config() worker() if __name__ == '__main__': # 获取环境变量 environment = os.environ.get('environment', 'default') port = 80 if environment == 'default' else 5000 app.run(debug=False, host='0.0.0.0', port=port)