ソースを参照

调整

develop
H Vs 1ヶ月前
コミット
b7da4c3855
6個のファイルの変更79行の追加150行の削除
  1. +37
    -32
      app.py
  2. +16
    -31
      common/redis_helper.py
  3. +2
    -2
      docker/entrypoint.sh
  4. +21
    -37
      gunicorn_config.py
  5. +2
    -1
      requirements.txt
  6. +1
    -47
      wechat/biz.py

+ 37
- 32
app.py ファイルの表示

@@ -42,27 +42,6 @@ errors = {
}



def worker():
# start_wxchat_thread()
# start_kafka_consumer_thread()
start_kafka_consumer_thread()
# 尝试获取分布式锁
lock_name = "wx_worker_lock"
#identifier = str(time.time())
identifier = lock_name
while True:
if redis_helper.redis_helper.acquire_lock(lock_name, timeout=60):
try:
logger.info(f"获取{lock_name}工作锁, 开始任务")
start_wxchat_thread()
except Exception as e:
logger.error(f"处理任务时发生错误: {e}")
# 释放分布式锁
redis_helper.redis_helper.release_lock(lock_name, identifier)
logger.info(f"释放{lock_name}锁")
time.sleep(1)

def fetch_and_save_contacts():
"""
获取联系人列表并保存到缓存
@@ -182,16 +161,41 @@ def process_add_contacts_from_chatrooms(wxchat:gewe_chat.GeWeChatCom,status, wxi
else:
logger.info(f'微信ID {wxid} 未登录 {app_id} ,群成员不能定时定时')

def start_wxchat_thread():
scan_wx_login_info()
# 启动同步联系人线程
threading.Thread(target=fetch_and_save_contacts).start()
threading.Thread(target=auto_add_contacts_from_chatrooms).start()
# def background_wxchat_thread():
# scan_wx_login_info()
# # 启动同步联系人线程
# threading.Thread(target=fetch_and_save_contacts).start()
# threading.Thread(target=auto_add_contacts_from_chatrooms).start()


def background_wxchat_thread():
lock_name = "background_wxchat_thread_lock"
lock_identifier = str(time.time()) # 使用时间戳作为唯一标识

# 尝试获取分布式锁
if redis_helper.redis_helper.acquire_lock(lock_name, timeout=60):
try:
logger.info("分布式锁已成功获取")
# 启动任务
scan_wx_login_info()
threading.Thread(target=fetch_and_save_contacts).start()
threading.Thread(target=auto_add_contacts_from_chatrooms).start()

# 保持锁的续期
while True:
time.sleep(30) # 每30秒检查一次锁的状态
if not redis_helper.redis_helper.renew_lock(lock_name, lock_identifier, timeout=60):
break # 如果无法续期锁,退出循环

finally:
# 释放锁
redis_helper.redis_helper.release_lock(lock_name, lock_identifier)
else:
# 如果获取锁失败,等待一段时间后重试
time.sleep(10)
background_wxchat_thread()

def scan_wx_login_info():
wxchat = gewe_chat.wxchat
cursor = 0
while True:
@@ -267,17 +271,18 @@ flask_api.add_resource(SendSNSVideoResource, '/api/sns/sendvideo')


load_config()

kafka_helper.start()
redis_helper.start()
gewe_chat.start()

#worker()
threading.Thread(target=worker).start()
start_kafka_consumer_thread()
# background_wxchat_thread()
threading.Thread(target=background_wxchat_thread).start()

if __name__ == '__main__':
# 获取环境变量
environment = os.environ.get('environment', 'default')
port = 80 if environment == 'default' else 5000

app.run(debug=False, host='0.0.0.0', port=port)
app.run(debug=False, host='0.0.0.0', port=port)


+ 16
- 31
common/redis_helper.py ファイルの表示

@@ -11,6 +11,7 @@ class RedisHelper:
def __init__(self, host='localhost', port=6379, password=None ,db=0):
# 初始化 Redis 连接
self.client = redis.Redis(host=host, port=port,db=db,password=password)
self.lock_renewal_thread = None

def set_hash(self, hash_key, data, timeout=None):
"""添加或更新哈希,并设置有效期"""
@@ -42,37 +43,22 @@ class RedisHelper:
"""更新哈希表中的某个字段"""
self.client.hset(hash_key, field, value)

# def acquire_lock(self, lock_name, timeout=60):
# """
# 尝试获取分布式锁,成功返回 True,失败返回 False
# :param lock_name: 锁的名称
# :param timeout: 锁的超时时间(秒)
# :return: bool
# """
# identifier = str(time.time()) # 使用时间戳作为唯一标识
# end_time = time.time() + timeout
# while time.time() < end_time:
# if self.client.set(lock_name, identifier, nx=True, ex=timeout):
# self.lock_renewal_thread = threading.Thread(target=self.renew_lock, args=(lock_name, identifier, timeout))
# self.lock_renewal_thread.start()
# return True
# time.sleep(0.1)
# return False

def acquire_lock(self, lock_name, timeout=60):
"""
尝试获取分布式锁,成功返回 True,失败返回 False
:param lock_name: 锁的名称
:param timeout: 锁的超时时间(秒)
:return: bool
"""
identifier = str(time.time()) # 使用时间戳作为唯一标识
if self.client.set(lock_name, identifier, nx=True, ex=timeout):
self.lock_renewal_thread = threading.Thread(target=self.renew_lock, args=(lock_name, identifier, timeout))
self.lock_renewal_thread.start()
return True
return False

"""
尝试获取分布式锁,成功返回 True,失败返回 False
:param lock_name: 锁的名称
:param timeout: 锁的超时时间(秒)
:return: bool
"""
#print('获取锁')
identifier = str(time.time()) # 使用时间戳作为唯一标识
#if self.client.set(lock_name, identifier, nx=True, ex=timeout):
if self.client.setnx(lock_name, identifier):
self.client.expire(lock_name, timeout)
self.lock_renewal_thread = threading.Thread(target=self.renew_lock, args=(lock_name, identifier, timeout))
self.lock_renewal_thread.start()
return True
return False

def renew_lock(self, lock_name, identifier, timeout):
"""
@@ -99,7 +85,6 @@ class RedisHelper:
if self.lock_renewal_thread:
self.lock_renewal_thread.join()


def start():
global redis_helper


+ 2
- 2
docker/entrypoint.sh ファイルの表示

@@ -43,6 +43,6 @@ if [ "$environment" == "default" ]; then
else
# 非默认环境,使用 Gunicorn 启动
#gunicorn -w 1 -b 0.0.0.0:5000 --timeout 1200 app:app
gunicorn -w 4 -b 0.0.0.0:5000 --timeout 1200 -k gevent app:app
#gunicorn -c gunicorn_config.py app:app
#gunicorn -w 4 -b 0.0.0.0:5000 --timeout 1200 -k gevent app:app
gunicorn -c gunicorn_config.py app:app
fi

+ 21
- 37
gunicorn_config.py ファイルの表示

@@ -1,41 +1,25 @@
"""gunicorn + gevent 的配置文件"""

# 多进程
import multiprocessing

# 绑定ip + 端口
# 绑定地址与端口
bind = '0.0.0.0:5000'

# 进程数 = cup数量 * 2 + 1
workers = multiprocessing.cpu_count() * 2 + 1

# 等待队列最大长度,超过这个长度的链接将被拒绝连接
backlog = 2048

# 工作模式--协程
worker_class = 'gevent'

# 最大客户客户端并发数量,对使用协程的 worker 的工作有影响
# 服务器配置设置的值 1000:中小型项目 上万并发: 中大型
worker_connections = 1000

# 进程名称
proc_name = 'gunicorn.pid'

# 进程 pid 记录文件
pidfile = 'gunicorn.pid'

# 日志等级
loglevel = 'warning'

# 日志文件名
logfile = 'tmp/gunicorn_log.log'

# 设置访问日志
accesslog = 'tmp/gunicorn_acess.log'

# 设置错误信息日志
errorlog = 'tmp/gunicorn_error.log'

# 代码发生变化是否自动重启
reload = True
# 工作进程与协程配置
#workers = multiprocessing.cpu_count() # 异步模式 Worker 数等于 CPU 核数
workers=4
# worker_class = 'gevent' # 启用协程
worker_connections = 2000 # 适当增加单 Worker 并发量

# 超时与自动重启
timeout = 120
max_requests = 1000
max_requests_jitter = 50

# 日志配置(确保 logs 目录存在)
loglevel = 'info'
accesslog = 'logs/gunicorn_access.log'
errorlog = 'logs/gunicorn_error.log'

# 生产环境安全配置
reload = False
preload_app = True
forwarded_allow_ips = '*' # 根据实际代理 IP 调整

+ 2
- 1
requirements.txt ファイルの表示

@@ -48,4 +48,5 @@ gunicorn
opencv-python
moviepy

celery
celery
gevent

+ 1
- 47
wechat/biz.py ファイルの表示

@@ -34,44 +34,6 @@ def wx_messages_process_callback(agent_tel,message):
except Exception as e:
print(f"处理消息时发生错误: {e}, 消息内容: {message}")


def process_group_sending_v0(wxchat:gewe_chat.GeWeChatCom, content_data, agent_tel:str):
# 获取登录信息
hash_key = f"__AI_OPS_WX__:LOGININFO:{agent_tel}"
logininfo = redis_helper.redis_helper.get_hash(hash_key)
if not logininfo:
logger.warning(f"未找到 {agent_tel} 的登录信息")
return
token_id = logininfo.get('tokenId')
app_id = logininfo.get('appId')
agent_wxid = logininfo.get('wxid')
# 获取联系人列表并计算交集
hash_key = f"__AI_OPS_WX__:CONTACTS_BRIEF:{agent_wxid}"
cache_friend_wxids_str=redis_helper.redis_helper.get_hash_field(hash_key,"data")
cache_friend_wxids_list=json.loads(cache_friend_wxids_str) if cache_friend_wxids_str else []
cache_friend_wxids=[f["userName"] for f in cache_friend_wxids_list]


wxid_contact_list_content_data = [c['wxid'] for c in content_data.get("contact_list", [])]
intersection_wxids = list(set(cache_friend_wxids) & set(wxid_contact_list_content_data))
# 发送消息
wx_content_list = content_data.get("wx_content", [])
for wx_content in wx_content_list:
if wx_content["type"] == "text":
send_text_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content["text"])
elif wx_content["type"] == "image_url":
send_image_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content.get("image_url", {}).get("url"))
elif wx_content["type"] == "tts":
send_tts_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content["text"])



def process_group_sending(wxchat:gewe_chat.GeWeChatCom, content_data, agent_tel:str):
# 获取登录信息
hash_key = f"__AI_OPS_WX__:LOGININFO:{agent_tel}"
@@ -108,13 +70,7 @@ def process_group_sending(wxchat:gewe_chat.GeWeChatCom, content_data, agent_tel:
# 发送消息
wx_content_list = content_data.get("wx_content", [])
# for wx_content in wx_content_list:
# if wx_content["type"] == "text":
# send_text_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content["text"])
# elif wx_content["type"] == "image_url":
# send_image_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content.get("image_url", {}).get("url"))
# elif wx_content["type"] == "tts":
# send_tts_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content["text"])

wxchat.forward_video_aeskey = ''
wxchat.forward_video_cdnvideourl = ''
wxchat.forward_video_length = 0
@@ -130,8 +86,6 @@ def process_group_sending(wxchat:gewe_chat.GeWeChatCom, content_data, agent_tel:
elif wx_content["type"] == "file":
send_file_message(wxchat, token_id, app_id, agent_wxid, [intersection_wxid], wx_content.get("file_url", {}).get("url"))



def send_text_message(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, agent_wxid, intersection_wxids, text):
for t in intersection_wxids:
# 发送文本消息


読み込み中…
キャンセル
保存