From b7da4c3855259b4af3662905b045b82701c564c2 Mon Sep 17 00:00:00 2001 From: H Vs Date: Sat, 8 Mar 2025 16:46:16 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app.py | 69 ++++++++++++++++++++++-------------------- common/redis_helper.py | 47 ++++++++++------------------ docker/entrypoint.sh | 4 +-- gunicorn_config.py | 58 +++++++++++++---------------------- requirements.txt | 3 +- wechat/biz.py | 48 +---------------------------- 6 files changed, 79 insertions(+), 150 deletions(-) diff --git a/app.py b/app.py index 9bf9a43..775ea1e 100644 --- a/app.py +++ b/app.py @@ -42,27 +42,6 @@ errors = { } - -def worker(): - # start_wxchat_thread() - # start_kafka_consumer_thread() - start_kafka_consumer_thread() - # 尝试获取分布式锁 - lock_name = "wx_worker_lock" - #identifier = str(time.time()) - identifier = lock_name - while True: - if redis_helper.redis_helper.acquire_lock(lock_name, timeout=60): - try: - logger.info(f"获取{lock_name}工作锁, 开始任务") - start_wxchat_thread() - except Exception as e: - logger.error(f"处理任务时发生错误: {e}") - # 释放分布式锁 - redis_helper.redis_helper.release_lock(lock_name, identifier) - logger.info(f"释放{lock_name}锁") - time.sleep(1) - def fetch_and_save_contacts(): """ 获取联系人列表并保存到缓存 @@ -182,16 +161,41 @@ def process_add_contacts_from_chatrooms(wxchat:gewe_chat.GeWeChatCom,status, wxi else: logger.info(f'微信ID {wxid} 未登录 {app_id} ,群成员不能定时定时') -def start_wxchat_thread(): - scan_wx_login_info() - # 启动同步联系人线程 - threading.Thread(target=fetch_and_save_contacts).start() - threading.Thread(target=auto_add_contacts_from_chatrooms).start() +# def background_wxchat_thread(): +# scan_wx_login_info() +# # 启动同步联系人线程 +# threading.Thread(target=fetch_and_save_contacts).start() +# threading.Thread(target=auto_add_contacts_from_chatrooms).start() +def background_wxchat_thread(): + lock_name = "background_wxchat_thread_lock" + lock_identifier = str(time.time()) # 使用时间戳作为唯一标识 + + # 尝试获取分布式锁 + if redis_helper.redis_helper.acquire_lock(lock_name, timeout=60): + try: + logger.info("分布式锁已成功获取") + # 启动任务 + scan_wx_login_info() + threading.Thread(target=fetch_and_save_contacts).start() + threading.Thread(target=auto_add_contacts_from_chatrooms).start() + + # 保持锁的续期 + while True: + time.sleep(30) # 每30秒检查一次锁的状态 + if not redis_helper.redis_helper.renew_lock(lock_name, lock_identifier, timeout=60): + break # 如果无法续期锁,退出循环 + + finally: + # 释放锁 + redis_helper.redis_helper.release_lock(lock_name, lock_identifier) + else: + # 如果获取锁失败,等待一段时间后重试 + time.sleep(10) + background_wxchat_thread() def scan_wx_login_info(): - wxchat = gewe_chat.wxchat cursor = 0 while True: @@ -267,17 +271,18 @@ flask_api.add_resource(SendSNSVideoResource, '/api/sns/sendvideo') load_config() - kafka_helper.start() redis_helper.start() gewe_chat.start() - -#worker() -threading.Thread(target=worker).start() + +start_kafka_consumer_thread() +# background_wxchat_thread() +threading.Thread(target=background_wxchat_thread).start() if __name__ == '__main__': # 获取环境变量 environment = os.environ.get('environment', 'default') port = 80 if environment == 'default' else 5000 - app.run(debug=False, host='0.0.0.0', port=port) \ No newline at end of file + app.run(debug=False, host='0.0.0.0', port=port) + diff --git a/common/redis_helper.py b/common/redis_helper.py index 4d3124c..3231ea2 100644 --- a/common/redis_helper.py +++ b/common/redis_helper.py @@ -11,6 +11,7 @@ class RedisHelper: def __init__(self, host='localhost', port=6379, password=None ,db=0): # 初始化 Redis 连接 self.client = redis.Redis(host=host, port=port,db=db,password=password) + self.lock_renewal_thread = None def set_hash(self, hash_key, data, timeout=None): """添加或更新哈希,并设置有效期""" @@ -42,37 +43,22 @@ class RedisHelper: """更新哈希表中的某个字段""" self.client.hset(hash_key, field, value) - # def acquire_lock(self, lock_name, timeout=60): - # """ - # 尝试获取分布式锁,成功返回 True,失败返回 False - # :param lock_name: 锁的名称 - # :param timeout: 锁的超时时间(秒) - # :return: bool - # """ - # identifier = str(time.time()) # 使用时间戳作为唯一标识 - # end_time = time.time() + timeout - # while time.time() < end_time: - # if self.client.set(lock_name, identifier, nx=True, ex=timeout): - # self.lock_renewal_thread = threading.Thread(target=self.renew_lock, args=(lock_name, identifier, timeout)) - # self.lock_renewal_thread.start() - # return True - # time.sleep(0.1) - # return False - def acquire_lock(self, lock_name, timeout=60): - """ - 尝试获取分布式锁,成功返回 True,失败返回 False - :param lock_name: 锁的名称 - :param timeout: 锁的超时时间(秒) - :return: bool - """ - identifier = str(time.time()) # 使用时间戳作为唯一标识 - if self.client.set(lock_name, identifier, nx=True, ex=timeout): - self.lock_renewal_thread = threading.Thread(target=self.renew_lock, args=(lock_name, identifier, timeout)) - self.lock_renewal_thread.start() - return True - return False - + """ + 尝试获取分布式锁,成功返回 True,失败返回 False + :param lock_name: 锁的名称 + :param timeout: 锁的超时时间(秒) + :return: bool + """ + #print('获取锁') + identifier = str(time.time()) # 使用时间戳作为唯一标识 + #if self.client.set(lock_name, identifier, nx=True, ex=timeout): + if self.client.setnx(lock_name, identifier): + self.client.expire(lock_name, timeout) + self.lock_renewal_thread = threading.Thread(target=self.renew_lock, args=(lock_name, identifier, timeout)) + self.lock_renewal_thread.start() + return True + return False def renew_lock(self, lock_name, identifier, timeout): """ @@ -99,7 +85,6 @@ class RedisHelper: if self.lock_renewal_thread: self.lock_renewal_thread.join() - def start(): global redis_helper diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh index a0cfd3d..1cf8a85 100644 --- a/docker/entrypoint.sh +++ b/docker/entrypoint.sh @@ -43,6 +43,6 @@ if [ "$environment" == "default" ]; then else # 非默认环境,使用 Gunicorn 启动 #gunicorn -w 1 -b 0.0.0.0:5000 --timeout 1200 app:app - gunicorn -w 4 -b 0.0.0.0:5000 --timeout 1200 -k gevent app:app - #gunicorn -c gunicorn_config.py app:app + #gunicorn -w 4 -b 0.0.0.0:5000 --timeout 1200 -k gevent app:app + gunicorn -c gunicorn_config.py app:app fi diff --git a/gunicorn_config.py b/gunicorn_config.py index f147bc1..c2f3664 100644 --- a/gunicorn_config.py +++ b/gunicorn_config.py @@ -1,41 +1,25 @@ -"""gunicorn + gevent 的配置文件""" - -# 多进程 import multiprocessing -# 绑定ip + 端口 +# 绑定地址与端口 bind = '0.0.0.0:5000' -# 进程数 = cup数量 * 2 + 1 -workers = multiprocessing.cpu_count() * 2 + 1 - -# 等待队列最大长度,超过这个长度的链接将被拒绝连接 -backlog = 2048 - -# 工作模式--协程 -worker_class = 'gevent' - -# 最大客户客户端并发数量,对使用协程的 worker 的工作有影响 -# 服务器配置设置的值 1000:中小型项目 上万并发: 中大型 -worker_connections = 1000 - -# 进程名称 -proc_name = 'gunicorn.pid' - -# 进程 pid 记录文件 -pidfile = 'gunicorn.pid' - -# 日志等级 -loglevel = 'warning' - -# 日志文件名 -logfile = 'tmp/gunicorn_log.log' - -# 设置访问日志 -accesslog = 'tmp/gunicorn_acess.log' - -# 设置错误信息日志 -errorlog = 'tmp/gunicorn_error.log' - -# 代码发生变化是否自动重启 -reload = True +# 工作进程与协程配置 +#workers = multiprocessing.cpu_count() # 异步模式 Worker 数等于 CPU 核数 +workers=4 +# worker_class = 'gevent' # 启用协程 +worker_connections = 2000 # 适当增加单 Worker 并发量 + +# 超时与自动重启 +timeout = 120 +max_requests = 1000 +max_requests_jitter = 50 + +# 日志配置(确保 logs 目录存在) +loglevel = 'info' +accesslog = 'logs/gunicorn_access.log' +errorlog = 'logs/gunicorn_error.log' + +# 生产环境安全配置 +reload = False +preload_app = True +forwarded_allow_ips = '*' # 根据实际代理 IP 调整 \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 3d8d0d0..d6eae4b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -48,4 +48,5 @@ gunicorn opencv-python moviepy -celery \ No newline at end of file +celery +gevent \ No newline at end of file diff --git a/wechat/biz.py b/wechat/biz.py index df5462b..1aad8d1 100644 --- a/wechat/biz.py +++ b/wechat/biz.py @@ -34,44 +34,6 @@ def wx_messages_process_callback(agent_tel,message): except Exception as e: print(f"处理消息时发生错误: {e}, 消息内容: {message}") - - -def process_group_sending_v0(wxchat:gewe_chat.GeWeChatCom, content_data, agent_tel:str): - # 获取登录信息 - hash_key = f"__AI_OPS_WX__:LOGININFO:{agent_tel}" - logininfo = redis_helper.redis_helper.get_hash(hash_key) - - if not logininfo: - logger.warning(f"未找到 {agent_tel} 的登录信息") - return - - token_id = logininfo.get('tokenId') - app_id = logininfo.get('appId') - agent_wxid = logininfo.get('wxid') - - - # 获取联系人列表并计算交集 - hash_key = f"__AI_OPS_WX__:CONTACTS_BRIEF:{agent_wxid}" - cache_friend_wxids_str=redis_helper.redis_helper.get_hash_field(hash_key,"data") - cache_friend_wxids_list=json.loads(cache_friend_wxids_str) if cache_friend_wxids_str else [] - cache_friend_wxids=[f["userName"] for f in cache_friend_wxids_list] - - - wxid_contact_list_content_data = [c['wxid'] for c in content_data.get("contact_list", [])] - intersection_wxids = list(set(cache_friend_wxids) & set(wxid_contact_list_content_data)) - - # 发送消息 - wx_content_list = content_data.get("wx_content", []) - for wx_content in wx_content_list: - if wx_content["type"] == "text": - send_text_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content["text"]) - elif wx_content["type"] == "image_url": - send_image_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content.get("image_url", {}).get("url")) - elif wx_content["type"] == "tts": - send_tts_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content["text"]) - - - def process_group_sending(wxchat:gewe_chat.GeWeChatCom, content_data, agent_tel:str): # 获取登录信息 hash_key = f"__AI_OPS_WX__:LOGININFO:{agent_tel}" @@ -108,13 +70,7 @@ def process_group_sending(wxchat:gewe_chat.GeWeChatCom, content_data, agent_tel: # 发送消息 wx_content_list = content_data.get("wx_content", []) - # for wx_content in wx_content_list: - # if wx_content["type"] == "text": - # send_text_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content["text"]) - # elif wx_content["type"] == "image_url": - # send_image_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content.get("image_url", {}).get("url")) - # elif wx_content["type"] == "tts": - # send_tts_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content["text"]) + wxchat.forward_video_aeskey = '' wxchat.forward_video_cdnvideourl = '' wxchat.forward_video_length = 0 @@ -130,8 +86,6 @@ def process_group_sending(wxchat:gewe_chat.GeWeChatCom, content_data, agent_tel: elif wx_content["type"] == "file": send_file_message(wxchat, token_id, app_id, agent_wxid, [intersection_wxid], wx_content.get("file_url", {}).get("url")) - - def send_text_message(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, agent_wxid, intersection_wxids, text): for t in intersection_wxids: # 发送文本消息