diff --git a/common/redis_helper.py b/common/redis_helper.py index b174487..27d2512 100644 --- a/common/redis_helper.py +++ b/common/redis_helper.py @@ -1,6 +1,8 @@ import redis import os from config import conf +import uuid +import time # 定义全局 redis_helper redis_helper = None @@ -39,16 +41,73 @@ class RedisHelper: """更新哈希表中的某个字段""" self.client.hset(hash_key, field, value) - def get_creation_time(self, hash_key): - """获取哈希表的创建时间""" - creation_time = self.client.hget(hash_key, '__created_at') - return int(creation_time) if creation_time else None - - def get_modification_time(self, hash_key): - """获取哈希表的最后修改时间""" - modified_time = self.client.hget(hash_key, '__modified_at') - return int(modified_time) if modified_time else None + def acquire_lock(self, lock_key, expire_time=60, timeout=None): + """ + 获取分布式锁。 + + Args: + lock_key: 锁的键名。 + expire_time: 锁的有效时间(秒)。 + timeout: 最大等待时间(秒),默认为None表示无限等待。 + + Returns: + 成功获取锁返回True,否则返回False。 + """ + identifier = str(uuid.uuid4()) + + while True: + # 尝试获取锁 + if self.client.setnx(lock_key, identifier): + self.client.expire(lock_key, expire_time) + return True + + # 检查锁是否存在且未过期 + current_value = self.client.get(lock_key) + if not current_value: + continue # 锁不存在,继续尝试获取 + + # 检查锁是否已过期 + ttl = self.client.ttl(lock_key) + if ttl == -2: # 锁已过期 + continue + + # 如果超时时间设置且已经超出,则返回False + if timeout is not None: + start_time = time.time() + while (time.time() - start_time) < timeout: + time.sleep(0.1) + return self.acquire_lock(lock_key, expire_time, timeout) + else: + # 超时,放弃获取锁 + return False + + # 等待一段时间后重试 + time.sleep(0.1) + def release_lock(self, lock_key): + """ + 释放分布式锁。 + + Args: + lock_key: 锁的键名。 + + Returns: + 成功释放返回True,否则返回False。 + """ + script = """ + if redis.call("get", KEYS[1]) == ARGV[1] then + return redis.call("del", KEYS[1]) + else + return 0 + end + """ + current_value = self.client.get(lock_key) + if not current_value: + return False + + identifier = str(uuid.uuid4()) # 这里应替换为获取锁时使用的标识符 + result = self.client.eval(script, [lock_key], [identifier]) + return result == 1 def start(): diff --git a/wechat/biz.py b/wechat/biz.py index c8f9b69..f2b23c3 100644 --- a/wechat/biz.py +++ b/wechat/biz.py @@ -300,6 +300,7 @@ def login_or_reconnect(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, region_id uuid = qr_code.get('uuid') if not uuid: logger.error(f"uuid获取二维码失败: {qr_code}") + redis_helper.redis_helper.release_lock(f"__AI_OPS_WX__:LOGINLOCK:{token_id}") break @@ -340,6 +341,7 @@ def login_or_reconnect(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, region_id # login_info.update({"login_time":datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]}) cleaned_login_info = {k: (v if v is not None else '') for k, v in login_info.items()} redis_helper.redis_helper.set_hash(hash_key, cleaned_login_info) + redis_helper.redis_helper.release_lock(f"__AI_OPS_WX__:LOGINLOCK:{token_id}") return login_info else: logger.info(f"登录检查中: {ret}-{msg}-{res}") @@ -347,6 +349,7 @@ def login_or_reconnect(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, region_id time.sleep(5) logger.error(f"登录失败,二维码生成 {max_retries} 次") + redis_helper.redis_helper.release_lock(f"__AI_OPS_WX__:LOGINLOCK:{token_id}") def fetch_and_save_contacts(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, hash_key): """ @@ -374,11 +377,13 @@ def wx_login(wxchat:gewe_chat.GeWeChatCom,tel,token_id,region_id): is_online = wxchat.check_online(token_id, app_id) if is_online: logger.info(f'微信ID {wxid} 在APPID {app_id} 已经在线') + else: # 尝试重连 res = wxchat.reconnection(token_id, app_id) if res.get('ret') == 200: logger.info(f'微信ID {wxid} 在APPID {app_id} 重连成功') + else: print("重连失败,重新登录...") login_info = login_or_reconnect(wxchat, token_id, app_id, region_id,hash_key, is_reconnect=True) @@ -416,9 +421,13 @@ def ops_messages_process(message): token_id=content_data.get('token_id', 'f828cb3c-1039-489f-b9ae-7494d1778a15') region_id=content_data.get('region_id', '440000') #wx_login(wxchat,tel,token_id) - thread = threading.Thread(target=wx_login, args=(wxchat,tel,token_id,region_id)) - thread.daemon = True - thread.start() + flag=redis_helper.redis_helper.acquire_lock(f"__AI_OPS_WX__:LOGINLOCK:{token_id}", expire_time=800, timeout=830) + if flag: + thread = threading.Thread(target=wx_login, args=(wxchat,tel,token_id,region_id)) + thread.daemon = True + thread.start() + else: + logger.info(f'登录进行中,获取锁失败 {token_id}') elif msg_type_data == 'group-sending': agent_tel=content_data.get('agent_tel', '18029274615') # 使用线程处理 diff --git a/wechat/gewe_chat.py b/wechat/gewe_chat.py index 833a416..a87b512 100644 --- a/wechat/gewe_chat.py +++ b/wechat/gewe_chat.py @@ -805,50 +805,6 @@ class GeWeChatCom: # cache[:] = filtered_cache redis_helper.redis_helper.update_hash_field(hash_key, "data", json.dumps(filtered_cache, ensure_ascii=False)) - - def save_contacts_brief_to_cache_prev(self, token_id, app_id, wxid, contacts_wxids: list): - """ - 将联系人信息保存到 Redis 缓存。 - """ - # Redis 缓存的 key - hash_key = f"__AI_OPS_WX__:CONTACTS_BRIEF:{wxid}" - - # 获取缓存中的数据 - cache_str = redis_helper.redis_helper.get_hash_field(hash_key, "data") - cache = json.loads(cache_str) if cache_str else [] - - if not cache: - # 缓存为空,分批处理 contacts_wxids - batch_size = 100 - for i in range(0, len(contacts_wxids), batch_size): - batch = contacts_wxids[i:i + batch_size] - friends_brief = self.get_brief_info(token_id, app_id, batch) - - friends_no_brief_wxid = [f['userName'] for f in friends_brief if not f["nickName"]] - cache.extend(f for f in friends_brief if f["nickName"]) - - if friends_no_brief_wxid: - detailed_info = self.get_detail_info(token_id, app_id, friends_no_brief_wxid) - cache.extend(detailed_info) - - # 更新缓存 - redis_helper.redis_helper.update_hash_field(hash_key, "data", json.dumps(cache, ensure_ascii=False)) - return - - # 缓存已存在,检查新联系人 - existing_usernames = {contact['userName'] for contact in cache} - new_contacts_wxids = [wxid for wxid in contacts_wxids if wxid not in existing_usernames] - - # 如果有新联系人,分批获取详细信息并更新缓存 - if new_contacts_wxids: - batch_size = 20 - for i in range(0, len(new_contacts_wxids), batch_size): - batch = new_contacts_wxids[i:i + batch_size] - detailed_info = self.get_detail_info(token_id, app_id, batch) - cache.extend(detailed_info) - - redis_helper.redis_helper.update_hash_field(hash_key, "data", json.dumps(cache, ensure_ascii=False)) - def save_groups_info_to_cache(self, token_id, app_id, wxid, chatroom_ids: list): """ 将群信息保存到 Redis 缓存。