Bladeren bron

分布式锁

develop
H Vs 1 maand geleden
bovenliggende
commit
e12302480b
5 gewijzigde bestanden met toevoegingen van 124 en 73 verwijderingen
  1. +25
    -9
      app.py
  2. +52
    -62
      common/redis_helper.py
  3. +3
    -1
      docker/entrypoint.sh
  4. +41
    -0
      gunicorn_config.py
  5. +3
    -1
      requirements.txt

+ 25
- 9
app.py Bestand weergeven

@@ -44,13 +44,24 @@ errors = {


def worker():
kafka_helper.start()
redis_helper.start()

start_wxchat_thread()
# start_wxchat_thread()
# start_kafka_consumer_thread()
start_kafka_consumer_thread()


# 尝试获取分布式锁
lock_name = "wx_worker_lock"
#identifier = str(time.time())
identifier = lock_name
while True:
if redis_helper.redis_helper.acquire_lock(lock_name, timeout=60):
try:
logger.info(f"获取{lock_name}工作锁, 开始任务")
start_wxchat_thread()
except Exception as e:
logger.error(f"处理任务时发生错误: {e}")
# 释放分布式锁
redis_helper.redis_helper.release_lock(lock_name, identifier)
logger.info(f"释放{lock_name}锁")
time.sleep(1)

def fetch_and_save_contacts():
"""
@@ -172,7 +183,6 @@ def process_add_contacts_from_chatrooms(wxchat:gewe_chat.GeWeChatCom,status, wxi
logger.info(f'微信ID {wxid} 未登录 {app_id} ,群成员不能定时定时')

def start_wxchat_thread():
# gewe_chat.start()
scan_wx_login_info()
# 启动同步联系人线程
threading.Thread(target=fetch_and_save_contacts).start()
@@ -181,7 +191,7 @@ def start_wxchat_thread():


def scan_wx_login_info():
gewe_chat.start()
wxchat = gewe_chat.wxchat
cursor = 0
while True:
@@ -257,7 +267,13 @@ flask_api.add_resource(SendSNSVideoResource, '/api/sns/sendvideo')


load_config()
worker()

kafka_helper.start()
redis_helper.start()
gewe_chat.start()

#worker()
threading.Thread(target=worker).start()

if __name__ == '__main__':
# 获取环境变量


+ 52
- 62
common/redis_helper.py Bestand weergeven

@@ -3,6 +3,7 @@ import os
from config import conf
import uuid
import time
import threading
# 定义全局 redis_helper
redis_helper = None

@@ -41,74 +42,63 @@ class RedisHelper:
"""更新哈希表中的某个字段"""
self.client.hset(hash_key, field, value)

def acquire_lock(self, lock_key, expire_time=60, timeout=None):
# def acquire_lock(self, lock_name, timeout=60):
# """
# 尝试获取分布式锁,成功返回 True,失败返回 False
# :param lock_name: 锁的名称
# :param timeout: 锁的超时时间(秒)
# :return: bool
# """
# identifier = str(time.time()) # 使用时间戳作为唯一标识
# end_time = time.time() + timeout
# while time.time() < end_time:
# if self.client.set(lock_name, identifier, nx=True, ex=timeout):
# self.lock_renewal_thread = threading.Thread(target=self.renew_lock, args=(lock_name, identifier, timeout))
# self.lock_renewal_thread.start()
# return True
# time.sleep(0.1)
# return False

def acquire_lock(self, lock_name, timeout=60):
"""
获取分布式锁。
Args:
lock_key: 锁的键名。
expire_time: 锁的有效时间(秒)。
timeout: 最大等待时间(秒),默认为None表示无限等待。
Returns:
成功获取锁返回True,否则返回False。
尝试获取分布式锁,成功返回 True,失败返回 False
:param lock_name: 锁的名称
:param timeout: 锁的超时时间(秒)
:return: bool
"""
identifier = str(uuid.uuid4())
while True:
# 尝试获取锁
if self.client.setnx(lock_key, identifier):
self.client.expire(lock_key, expire_time)
return True
# 检查锁是否存在且未过期
current_value = self.client.get(lock_key)
if not current_value:
continue # 锁不存在,继续尝试获取
# 检查锁是否已过期
ttl = self.client.ttl(lock_key)
if ttl == -2: # 锁已过期
continue
# 如果超时时间设置且已经超出,则返回False
if timeout is not None:
start_time = time.time()
while (time.time() - start_time) < timeout:
time.sleep(0.1)
return self.acquire_lock(lock_key, expire_time, timeout)
else:
# 超时,放弃获取锁
return False
# 等待一段时间后重试
time.sleep(0.1)
identifier = str(time.time()) # 使用时间戳作为唯一标识
if self.client.set(lock_name, identifier, nx=True, ex=timeout):
self.lock_renewal_thread = threading.Thread(target=self.renew_lock, args=(lock_name, identifier, timeout))
self.lock_renewal_thread.start()
return True
return False

def release_lock(self, lock_key):

def renew_lock(self, lock_name, identifier, timeout):
"""
释放分布式锁。
Args:
lock_key: 锁的键名。
Returns:
成功释放返回True,否则返回False。
锁的自动续期
:param lock_name: 锁的名称
:param identifier: 锁的唯一标识
:param timeout: 锁的超时时间(秒)
"""
script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
while True:
time.sleep(timeout / 2)
if self.client.get(lock_name) == identifier.encode():
self.client.expire(lock_name, timeout)
else:
break

def release_lock(self, lock_name, identifier):
"""
current_value = self.client.get(lock_key)
if not current_value:
return False
identifier = str(uuid.uuid4()) # 这里应替换为获取锁时使用的标识符
result = self.client.eval(script, [lock_key], [identifier])
return result == 1
释放分布式锁
:param lock_name: 锁的名称
:param identifier: 锁的唯一标识
"""
if self.client.get(lock_name) == identifier.encode():
self.client.delete(lock_name)
if self.lock_renewal_thread:
self.lock_renewal_thread.join()


def start():
global redis_helper


+ 3
- 1
docker/entrypoint.sh Bestand weergeven

@@ -42,5 +42,7 @@ if [ "$environment" == "default" ]; then
$AI_OPS_WECHAT_EXEC
else
# 非默认环境,使用 Gunicorn 启动
gunicorn -w 1 -b 0.0.0.0:5000 --timeout 1200 app:app
#gunicorn -w 1 -b 0.0.0.0:5000 --timeout 1200 app:app
gunicorn -w 4 -b 0.0.0.0:5000 --timeout 1200 -k gevent app:app
#gunicorn -c gunicorn_config.py app:app
fi

+ 41
- 0
gunicorn_config.py Bestand weergeven

@@ -0,0 +1,41 @@
"""gunicorn + gevent 的配置文件"""

# 多进程
import multiprocessing

# 绑定ip + 端口
bind = '0.0.0.0:5000'

# 进程数 = cup数量 * 2 + 1
workers = multiprocessing.cpu_count() * 2 + 1

# 等待队列最大长度,超过这个长度的链接将被拒绝连接
backlog = 2048

# 工作模式--协程
worker_class = 'gevent'

# 最大客户客户端并发数量,对使用协程的 worker 的工作有影响
# 服务器配置设置的值 1000:中小型项目 上万并发: 中大型
worker_connections = 1000

# 进程名称
proc_name = 'gunicorn.pid'

# 进程 pid 记录文件
pidfile = 'gunicorn.pid'

# 日志等级
loglevel = 'warning'

# 日志文件名
logfile = 'tmp/gunicorn_log.log'

# 设置访问日志
accesslog = 'tmp/gunicorn_acess.log'

# 设置错误信息日志
errorlog = 'tmp/gunicorn_error.log'

# 代码发生变化是否自动重启
reload = True

+ 3
- 1
requirements.txt Bestand weergeven

@@ -46,4 +46,6 @@ gunicorn


opencv-python
moviepy
moviepy

celery

Laden…
Annuleren
Opslaan