|
- from flask import Flask, send_from_directory, request,jsonify
- from flask_restful import Api,got_request_exception
- from resources.messages_resource import MessagesResource
- from resources.contacts_resources import DeleteFriendResource,GetFriendsInfoResource
- from resources.config_reources import GetWxchatConfigResource ,SaveWxchatConfigResource
- from resources.groups_resources import GetGroupsInfoResource
- from resources.login_resources import GetLoginInfoResource
- from resources.sns_resources import SendSNSTextResource,SendSNSImageResource, SendSNSVideoResource
- from common.log import logger, log_exception
- from common.interceptors import before_request, after_request, handle_exception
- import threading
- from common import kafka_helper, redis_helper,utils
-
- import logging
- from config import load_config
-
- from wechat.biz import start_kafka_consumer_thread
-
-
- from wechat import gewe_chat
-
- import os,time,json
-
- from voice.ali.ali_voice import AliVoice
-
-
-
- # 自定义错误消息
- errors = {
- 'UserAlreadyExistsError': {
- 'message': "A user with that username already exists.",
- 'status': 409,
- },
- 'ResourceDoesNotExist': {
- 'message': "A resource with that ID no longer exists.",
- 'status': 410,
- 'extra': "Any extra information you want.",
- },
- }
-
-
-
- def worker():
- kafka_helper.start()
- redis_helper.start()
-
- start_wxchat_thread()
- start_kafka_consumer_thread()
-
-
-
- def fetch_and_save_contacts2():
- """
- 获取联系人列表并保存到缓存
- """
- wxchat=gewe_chat.wxchat
- while True:
-
- login_keys = list(redis_helper.redis_helper.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'))
- logger.info(f"Fetching login keys: {login_keys}")
-
- # 遍历每一个获取到的登录键
- for k in login_keys:
- r= redis_helper.redis_helper.get_hash(k)
- # print(r)
- token_id = r.get('tokenId')
- app_id = r.get('appId')
-
- wxid = r.get('wxid')
- status=r.get('status')
- if status=='1':
- ret,msg,contacts_list = wxchat.fetch_contacts_list(token_id, app_id)
- friend_wxids = contacts_list['friends'][3:] # 可以调整截取范围
- #friend_wxids.remove('weixin')
- wxchat.save_contacts_brief_to_cache(token_id, app_id, wxid, friend_wxids)
- logger.info(f'微信ID {wxid} 登录APPID {app_id} 成功,联系人已定时保存')
- chatrooms=contacts_list['chatrooms']
- wxchat.save_groups_info_to_cache(token_id, app_id, wxid, chatrooms)
- logger.info(f'微信ID {wxid} 登录APPID {app_id} 成功,群信息已定时保存')
-
- else:
- logger.info(f'微信ID {wxid} 未登录 {app_id} ,联系人不能定时保存')
-
- time.sleep(3)
-
- #time.sleep(60*10)
- time.sleep(3600*1)
-
- def auto_contacts_from_chatroom():
- logger.info('自动从群添加好友')
- wxchat=gewe_chat.wxchat
- while True:
-
- login_keys = list(redis_helper.redis_helper.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'))
- #logger.info(f"Fetching login keys: {login_keys}")
-
- # 遍历每一个获取到的登录键
- for k in login_keys:
- r= redis_helper.redis_helper.get_hash(k)
- # print(r)
- token_id = r.get('tokenId')
- app_id = r.get('appId')
- nickname=r.get('nickName')
- wxid = r.get('wxid')
- status=r.get('status')
-
- # 启动线程处理登录信息
- thread = threading.Thread(target=process_add_contacts_from_groups, args=(wxchat,status, nickname, wxid, token_id, app_id))
- thread.start()
- time.sleep(3)
-
- #time.sleep(60*10)
- time.sleep(3600*24)
-
- def process_add_contacts_from_groups(wxchat:gewe_chat.GeWeChatCom,status, nickname, wxid, token_id, app_id):
-
- if status == '1':
- c = wxchat.get_wxchat_config_from_cache(wxid)
- contacts = wxchat.get_contacts_brief_from_cache(wxid)
- contact_wxids = [c.get('userName') for c in contacts]
- chatrooms = c.get('addContactsFromChatroomIdWhiteList', [])
-
- for chatroom_id in chatrooms:
- chatroom = wxchat.get_group_info_from_cache(wxid, chatroom_id)
- chatroom_nickname = chatroom.get('nickName')
- chatroom_owner_wxid = chatroom.get('chatroomOwner', None)
- admin_wxid = chatroom.get('adminWxid', None)
-
- contact_wxids_set = set(contact_wxids)
- if admin_wxid is not None:
- contact_wxids_set.add(admin_wxid)
- if chatroom_owner_wxid is not None:
- contact_wxids_set.add(chatroom_owner_wxid)
-
- contact_wxids_set.add(wxid)
- chatroot_member_list = chatroom.get('memberList', [])
-
- remaining_chatroot_members = [x for x in chatroot_member_list if x.get('wxid') not in contact_wxids_set]
-
- logger.info(f'{nickname} 在 {chatroom_nickname} 群里还没添加的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}')
- for m in remaining_chatroot_members:
- ret, msg, data = wxchat.add_group_member_as_friend(token_id, app_id, chatroom_id, m.get('wxid'), f'我是群聊"{chatroom_nickname}"群的{nickname}')
- logger.info(f'{nickname} 向 {chatroom_nickname} 群的 {m.get("nickName")} 发送好友邀请 {msg}')
- time.sleep(10)
- time.sleep(20)
- else:
- logger.info(f'微信ID {wxid} 未登录 {app_id} ,群成员不能定时定时')
-
- def start_wxchat_thread():
- # gewe_chat.start()
- scan_wx_login_info()
- # 启动同步联系人线程
- threading.Thread(target=fetch_and_save_contacts2).start()
- threading.Thread(target=auto_contacts_from_chatroom).start()
-
-
-
- def scan_wx_login_info():
- gewe_chat.start()
- wxchat = gewe_chat.wxchat
- cursor = 0
- while True:
- cursor, login_keys = redis_helper.redis_helper.client.scan(cursor, match='__AI_OPS_WX__:LOGININFO:*')
-
- for k in login_keys:
- r = redis_helper.redis_helper.get_hash(k)
- app_id=r.get("appId")
- #tel=r.get("mobile")
- token_id=r.get("tokenId")
- wxid=r.get("wxid")
-
- is_online = wxchat.check_online(token_id, app_id)
- if is_online:
- logger.info(f'微信ID {wxid} 在APPID {app_id} 已经在线')
- else:
- # 尝试重连
- res = wxchat.reconnection(token_id, app_id)
- if res.get('ret') == 200:
- logger.info(f'微信ID {wxid} 在APPID {app_id} 重连成功')
- # 同步联系人
- #fetch_and_save_contacts(wxchat, token_id, app_id, k)
- else:
- print("重连失败,重新登录...")
- print("发送离线消息到kafka")
- redis_helper.redis_helper.update_hash_field(k,'status',0)
-
-
- time.sleep(3)
-
-
- # 如果游标为 0,则表示扫描完成
- if cursor == 0:
- break
-
-
-
-
- app = Flask(__name__)
-
- # api = Api(app)
- flask_api = Api(app,errors=errors, catch_all_404s=True)
-
- # 设置日志(logger 已在 log.py 中配置)
- app.logger.handlers.clear() # 清除 Flask 默认的日志处理器
- app.logger.addHandler(logger.handlers[1]) # 使用文件日志处理器
- app.logger.setLevel(logging.DEBUG) # 设置日志级别
-
- # 添加拦截器
- app.before_request(before_request)
- app.after_request(after_request)
- app.register_error_handler(Exception, handle_exception)
-
- # 定义路由
-
- flask_api.add_resource(MessagesResource, '/messages')
-
- flask_api.add_resource(DeleteFriendResource, '/api/contacts/deletefriend')
- flask_api.add_resource(GetFriendsInfoResource, '/api/contacts/getfriends')
-
- flask_api.add_resource(GetWxchatConfigResource, '/api/wxchat/getconfig')
- flask_api.add_resource(SaveWxchatConfigResource, '/api/wxchat/saveconfig')
-
- flask_api.add_resource(GetGroupsInfoResource, '/api/groups/getchatroominfo')
-
- flask_api.add_resource(GetLoginInfoResource, '/api/agent/getlogin')
-
- flask_api.add_resource(SendSNSTextResource, '/api/sns/sendtext')
- flask_api.add_resource(SendSNSImageResource, '/api/sns/sendimages')
- flask_api.add_resource(SendSNSVideoResource, '/api/sns/sendvideo')
-
-
- load_config()
- worker()
-
- if __name__ == '__main__':
- # 获取环境变量
- environment = os.environ.get('environment', 'default')
- port = 80 if environment == 'default' else 5000
-
- app.run(debug=False, host='0.0.0.0', port=port)
|