From e12302480bfc132be10574d9852402643d20b0e5 Mon Sep 17 00:00:00 2001 From: H Vs Date: Fri, 7 Mar 2025 17:42:35 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=86=E5=B8=83=E5=BC=8F=E9=94=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app.py | 34 ++++++++---- common/redis_helper.py | 114 +++++++++++++++++++---------------------- docker/entrypoint.sh | 4 +- gunicorn_config.py | 41 +++++++++++++++ requirements.txt | 4 +- 5 files changed, 124 insertions(+), 73 deletions(-) create mode 100644 gunicorn_config.py diff --git a/app.py b/app.py index 29836a8..9bf9a43 100644 --- a/app.py +++ b/app.py @@ -44,13 +44,24 @@ errors = { def worker(): - kafka_helper.start() - redis_helper.start() - - start_wxchat_thread() + # start_wxchat_thread() + # start_kafka_consumer_thread() start_kafka_consumer_thread() - - + # 尝试获取分布式锁 + lock_name = "wx_worker_lock" + #identifier = str(time.time()) + identifier = lock_name + while True: + if redis_helper.redis_helper.acquire_lock(lock_name, timeout=60): + try: + logger.info(f"获取{lock_name}工作锁, 开始任务") + start_wxchat_thread() + except Exception as e: + logger.error(f"处理任务时发生错误: {e}") + # 释放分布式锁 + redis_helper.redis_helper.release_lock(lock_name, identifier) + logger.info(f"释放{lock_name}锁") + time.sleep(1) def fetch_and_save_contacts(): """ @@ -172,7 +183,6 @@ def process_add_contacts_from_chatrooms(wxchat:gewe_chat.GeWeChatCom,status, wxi logger.info(f'微信ID {wxid} 未登录 {app_id} ,群成员不能定时定时') def start_wxchat_thread(): - # gewe_chat.start() scan_wx_login_info() # 启动同步联系人线程 threading.Thread(target=fetch_and_save_contacts).start() @@ -181,7 +191,7 @@ def start_wxchat_thread(): def scan_wx_login_info(): - gewe_chat.start() + wxchat = gewe_chat.wxchat cursor = 0 while True: @@ -257,7 +267,13 @@ flask_api.add_resource(SendSNSVideoResource, '/api/sns/sendvideo') load_config() -worker() + +kafka_helper.start() +redis_helper.start() +gewe_chat.start() + +#worker() +threading.Thread(target=worker).start() if __name__ == '__main__': # 获取环境变量 diff --git a/common/redis_helper.py b/common/redis_helper.py index 27d2512..4d3124c 100644 --- a/common/redis_helper.py +++ b/common/redis_helper.py @@ -3,6 +3,7 @@ import os from config import conf import uuid import time +import threading # 定义全局 redis_helper redis_helper = None @@ -41,74 +42,63 @@ class RedisHelper: """更新哈希表中的某个字段""" self.client.hset(hash_key, field, value) - def acquire_lock(self, lock_key, expire_time=60, timeout=None): + # def acquire_lock(self, lock_name, timeout=60): + # """ + # 尝试获取分布式锁,成功返回 True,失败返回 False + # :param lock_name: 锁的名称 + # :param timeout: 锁的超时时间(秒) + # :return: bool + # """ + # identifier = str(time.time()) # 使用时间戳作为唯一标识 + # end_time = time.time() + timeout + # while time.time() < end_time: + # if self.client.set(lock_name, identifier, nx=True, ex=timeout): + # self.lock_renewal_thread = threading.Thread(target=self.renew_lock, args=(lock_name, identifier, timeout)) + # self.lock_renewal_thread.start() + # return True + # time.sleep(0.1) + # return False + + def acquire_lock(self, lock_name, timeout=60): """ - 获取分布式锁。 - - Args: - lock_key: 锁的键名。 - expire_time: 锁的有效时间(秒)。 - timeout: 最大等待时间(秒),默认为None表示无限等待。 - - Returns: - 成功获取锁返回True,否则返回False。 + 尝试获取分布式锁,成功返回 True,失败返回 False + :param lock_name: 锁的名称 + :param timeout: 锁的超时时间(秒) + :return: bool """ - identifier = str(uuid.uuid4()) - - while True: - # 尝试获取锁 - if self.client.setnx(lock_key, identifier): - self.client.expire(lock_key, expire_time) - return True - - # 检查锁是否存在且未过期 - current_value = self.client.get(lock_key) - if not current_value: - continue # 锁不存在,继续尝试获取 - - # 检查锁是否已过期 - ttl = self.client.ttl(lock_key) - if ttl == -2: # 锁已过期 - continue - - # 如果超时时间设置且已经超出,则返回False - if timeout is not None: - start_time = time.time() - while (time.time() - start_time) < timeout: - time.sleep(0.1) - return self.acquire_lock(lock_key, expire_time, timeout) - else: - # 超时,放弃获取锁 - return False - - # 等待一段时间后重试 - time.sleep(0.1) + identifier = str(time.time()) # 使用时间戳作为唯一标识 + if self.client.set(lock_name, identifier, nx=True, ex=timeout): + self.lock_renewal_thread = threading.Thread(target=self.renew_lock, args=(lock_name, identifier, timeout)) + self.lock_renewal_thread.start() + return True + return False - def release_lock(self, lock_key): + + def renew_lock(self, lock_name, identifier, timeout): """ - 释放分布式锁。 - - Args: - lock_key: 锁的键名。 - - Returns: - 成功释放返回True,否则返回False。 + 锁的自动续期 + :param lock_name: 锁的名称 + :param identifier: 锁的唯一标识 + :param timeout: 锁的超时时间(秒) """ - script = """ - if redis.call("get", KEYS[1]) == ARGV[1] then - return redis.call("del", KEYS[1]) - else - return 0 - end + while True: + time.sleep(timeout / 2) + if self.client.get(lock_name) == identifier.encode(): + self.client.expire(lock_name, timeout) + else: + break + + def release_lock(self, lock_name, identifier): """ - current_value = self.client.get(lock_key) - if not current_value: - return False - - identifier = str(uuid.uuid4()) # 这里应替换为获取锁时使用的标识符 - result = self.client.eval(script, [lock_key], [identifier]) - return result == 1 - + 释放分布式锁 + :param lock_name: 锁的名称 + :param identifier: 锁的唯一标识 + """ + if self.client.get(lock_name) == identifier.encode(): + self.client.delete(lock_name) + if self.lock_renewal_thread: + self.lock_renewal_thread.join() + def start(): global redis_helper diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh index d607c00..a0cfd3d 100644 --- a/docker/entrypoint.sh +++ b/docker/entrypoint.sh @@ -42,5 +42,7 @@ if [ "$environment" == "default" ]; then $AI_OPS_WECHAT_EXEC else # 非默认环境,使用 Gunicorn 启动 - gunicorn -w 1 -b 0.0.0.0:5000 --timeout 1200 app:app + #gunicorn -w 1 -b 0.0.0.0:5000 --timeout 1200 app:app + gunicorn -w 4 -b 0.0.0.0:5000 --timeout 1200 -k gevent app:app + #gunicorn -c gunicorn_config.py app:app fi diff --git a/gunicorn_config.py b/gunicorn_config.py new file mode 100644 index 0000000..f147bc1 --- /dev/null +++ b/gunicorn_config.py @@ -0,0 +1,41 @@ +"""gunicorn + gevent 的配置文件""" + +# 多进程 +import multiprocessing + +# 绑定ip + 端口 +bind = '0.0.0.0:5000' + +# 进程数 = cup数量 * 2 + 1 +workers = multiprocessing.cpu_count() * 2 + 1 + +# 等待队列最大长度,超过这个长度的链接将被拒绝连接 +backlog = 2048 + +# 工作模式--协程 +worker_class = 'gevent' + +# 最大客户客户端并发数量,对使用协程的 worker 的工作有影响 +# 服务器配置设置的值 1000:中小型项目 上万并发: 中大型 +worker_connections = 1000 + +# 进程名称 +proc_name = 'gunicorn.pid' + +# 进程 pid 记录文件 +pidfile = 'gunicorn.pid' + +# 日志等级 +loglevel = 'warning' + +# 日志文件名 +logfile = 'tmp/gunicorn_log.log' + +# 设置访问日志 +accesslog = 'tmp/gunicorn_acess.log' + +# 设置错误信息日志 +errorlog = 'tmp/gunicorn_error.log' + +# 代码发生变化是否自动重启 +reload = True diff --git a/requirements.txt b/requirements.txt index 88524c1..3d8d0d0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -46,4 +46,6 @@ gunicorn opencv-python -moviepy \ No newline at end of file +moviepy + +celery \ No newline at end of file