diff --git a/app/endpoints/config_endpoint.py b/app/endpoints/config_endpoint.py index 617ba79..6d6bd93 100644 --- a/app/endpoints/config_endpoint.py +++ b/app/endpoints/config_endpoint.py @@ -2,6 +2,7 @@ from fastapi import APIRouter,Request from pydantic import BaseModel from fastapi import APIRouter, Depends from pydantic import BaseModel, ValidationError +from typing import Dict, Any from services.gewe_service import GeWeService,get_gewe_service @@ -9,7 +10,7 @@ from services.redis_service import RedisService from model.models import AgentConfig,validate_wxid -config_router = APIRouter(prefix="/api/wxchat") +config_router = APIRouter(prefix="/api") # 定义请求体的 Pydantic 模型 class GetConfigRequest(BaseModel): @@ -21,7 +22,9 @@ class SaveConfigRequest(BaseModel): data: dict -@config_router.post("/getconfig",response_model=None) + + +@config_router.post("/wxchat/getconfig",response_model=None) #@validate_wxid async def get_config(request: Request, body: GetConfigRequest): wxid = body.wxid @@ -44,7 +47,7 @@ async def get_config(request: Request, body: GetConfigRequest): -@config_router.post("/saveconfig",response_model=None) +@config_router.post("/wxchat/saveconfig",response_model=None) #@validate_wxid async def save_config(request: Request, body: SaveConfigRequest): wxid = body.wxid @@ -74,4 +77,18 @@ async def save_config(request: Request, body: SaveConfigRequest): await request.app.state.gewe_service.save_wxchat_config_async(wxid, data) return {'wxid': wxid, 'config': data} - \ No newline at end of file + + +@config_router.post("/global/getconfig",response_model=None) +async def get_global_config(request: Request): + #await request.app.state.gewe_service.get_login_info_by_wxid_async() + config=await request.app.state.gewe_service.get_global_config_from_cache_async() + return config + + +@config_router.post("/global/saveconfig",response_model=None) +async def save_global_config(request: Request, body: Dict[str, Any]): + #await request.app.state.gewe_service.get_login_info_by_wxid_async() + await request.app.state.gewe_service.save_global_config_async(body) + return {'message': '操作成功'} + diff --git a/run.py b/run.py index 3cd9772..5a43615 100644 --- a/run.py +++ b/run.py @@ -37,7 +37,7 @@ if __name__ == "__main__": fastapi_process = start_fastapi() celery_worker_process = start_celery_worker() celery_beat_process = start_celery_beat() - + # 等待子进程完成 #fastapi_process.wait() celery_worker_process.wait() diff --git a/services/gewe_service.py b/services/gewe_service.py index 15d675c..89d228a 100644 --- a/services/gewe_service.py +++ b/services/gewe_service.py @@ -1161,6 +1161,21 @@ class GeWeService: hash_key = f"__AI_OPS_WX__:WXCHAT_CONFIG" await self.redis_service.update_hash_field(hash_key, wxid, json.dumps(config, ensure_ascii=False)) + async def get_global_config_from_cache_async(self): + """ + 获取配置信息 + """ + hash_key = f"__AI_OPS_WX__:GLOBAL_CONFIG" + config = await self.redis_service.get_hash_field(hash_key,"data") + return json.loads(config) if config else {} + + async def save_global_config_async(self, config:dict): + """ + 保存配置信息 + """ + hash_key = f"__AI_OPS_WX__:GLOBAL_CONFIG" + await self.redis_service.update_hash_field(hash_key, "data,", json.dumps(config, ensure_ascii=False)) + async def get_login_info_from_cache_async(self,tel): hash_key = f"__AI_OPS_WX__:LOGININFO:{tel}" cache = await self.redis_service.get_hash(hash_key) @@ -1319,6 +1334,34 @@ class GeWeService: return False + async def is_group_add_contacts_history_one_day_async(self, wxid,today_total=90) -> bool: + + today_list = [] + today = datetime.datetime.now().date() + cursor = 0 + hash_key = f"__AI_OPS_WX__:GROUPS_ADD_CONTACT_HISTORY:{wxid}:*" + while True: + cursor, history_keys =await self.redis_service.client.scan(cursor, match= hash_key) + #print(f'login_keys:{login_keys}') + # 批量获取所有键的 hash 数据 + for k in history_keys: + cache = await self.redis_service.get_hash(k) + for key, value in cache.items(): + value_data_list = json.loads(value) + for value_data in value_data_list: + add_time_date = datetime.datetime.fromtimestamp(value_data["addTime"]).date() + if add_time_date == today: + today_list.append(value_data) + if len(today_list) == today_total: + return True + + + # 如果游标为 0,则表示扫描完成 + if cursor == 0: + break + + + return False async def enqueue_to_add_contacts_async(self,wxid,scene:int,v3,v4): """ @@ -1336,7 +1379,21 @@ class GeWeService: data=await self.redis_service.dequeue(hash_key) return json.loads(data) if data else {} + # async def acquire_task_run_time_lock_async(self,task_name,run_time,expire_time=None): + # hash_key = f"__AI_OPS_WX__:{task_name}" + # if await self.redis_service.client.setnx(hash_key, run_time): + # await self.redis_service.client.expire(hash_key, expire_time) + # return True + # return False + async def save_task_run_time_async(self,task_name,log:list,expire_time=None): + hash_key = f"__AI_OPS_WX__:TASK_RUN_TIME:{task_name}" + await self.redis_service.set_hash(hash_key, "data,", json.dumps(log, ensure_ascii=False), expire_time) + + async def get_task_run_time_async(self,task_name)->list: + hash_key = f"__AI_OPS_WX__:TASK_RUN_TIME:{task_name}" + cache= await self.redis_service.get_hash_field(hash_key,"data") + return json.loads(cache) if cache else [] # 依赖项:获取 GeWeChatCom 单例 diff --git a/services/kafka_service.py b/services/kafka_service.py index a3e8322..36035b8 100644 --- a/services/kafka_service.py +++ b/services/kafka_service.py @@ -164,6 +164,11 @@ class KafkaService: await self.connect_consumer() self.consumer_task = asyncio.create_task(self.consume_messages()) + async def start_producer(self): + """Start both producer and consumer""" + await self.connect_producer() + + async def stop(self): """Graceful shutdown""" if self.producer: diff --git a/services/redis_service.py b/services/redis_service.py index 8940a89..1d5c459 100644 --- a/services/redis_service.py +++ b/services/redis_service.py @@ -23,20 +23,59 @@ class RedisService: self.client = await aioredis.Redis(host=host, port=port, password=password, db=db) + async def set_hash(self, hash_key, data, timeout=None): + """添加或更新哈希,并设置有效期""" + await self.client.hmset_dict(hash_key, data) + if timeout: + # 设置有效期(单位:秒) + await self.client.expire(hash_key, timeout) + # async def set_hash(self, hash_key, data, timeout=None): # """添加或更新哈希,并设置有效期""" - # await self.client.hmset_dict(hash_key, data) + # # 使用 hmset 方法设置哈希表数据 + # await self.client.hmset(hash_key, data) # if timeout: # # 设置有效期(单位:秒) # await self.client.expire(hash_key, timeout) - async def set_hash(self, hash_key, data, timeout=None): - """添加或更新哈希,并设置有效期""" - # 使用 hmset 方法设置哈希表数据 - await self.client.hmset(hash_key, data) - if timeout: - # 设置有效期(单位:秒) - await self.client.expire(hash_key, timeout) + + # async def set_hash(self, hash_key, data, timeout=None): + # """添加或更新哈希,并设置有效期""" + # # 使用 hset 方法设置哈希表数据 + # await self.client.hset(hash_key, mapping=data) + + # if timeout: + # # 设置有效期(单位:秒) + # await self.client.expire(hash_key, timeout) + + # def flatten_dict(self,d, parent_key="", sep="."): + # """ + # 将嵌套字典扁平化 + # :param d: 嵌套字典 + # :param parent_key: 父键(用于递归) + # :param sep: 分隔符 + # :return: 扁平化字典 + # """ + # items = [] + # for k, v in d.items(): + # new_key = f"{parent_key}{sep}{k}" if parent_key else k + # if isinstance(v, dict): + # items.extend(self.flatten_dict(v, new_key, sep=sep).items()) + # else: + # items.append((new_key, v)) + # return dict(items) + + # async def set_hash(self, hash_key, data, timeout=None): + # """添加或更新哈希,并设置有效期""" + # # 扁平化嵌套字典 + # flat_data = self.flatten_dict(data) + + # # 使用 hset 方法设置哈希表数据 + # await self.client.hset(hash_key, mapping=flat_data) + + # if timeout: + # # 设置有效期(单位:秒) + # await self.client.expire(hash_key, timeout) async def get_hash(self, hash_key): """获取整个哈希表数据""" diff --git a/tasks.py b/tasks.py index 5282b8b..124bbcd 100644 --- a/tasks.py +++ b/tasks.py @@ -4,7 +4,7 @@ import time,datetime from celery import Celery import celery.schedules from redbeat import RedBeatSchedulerEntry -#from datetime import timedelta +from datetime import timedelta from services.redis_service import RedisService from services.kafka_service import KafkaService @@ -350,8 +350,8 @@ def scheduled_task_add_contacts_from_chatrooms_p(self, redis_config, kafka_confi -@celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms', bind=True, acks_late=True) -def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, gewe_config): +#@celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms_p2', bind=True, acks_late=True) +def scheduled_task_add_contacts_from_chatrooms_p2(self, redis_config, kafka_config, gewe_config): ''' 关于群加好友的请求规则:一次30人,间隔2小时做1次,一天做3次,即最多90人/天。 @@ -424,9 +424,15 @@ def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, contact_wxids_set.add(wxid) unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id) - contact_wxids_set.update(set(unavailable_wixds)) - + if unavailable_wixds: + contact_wxids_set.update(set(unavailable_wixds)) + chatroom_member_list = chatroom.get('memberList', []) + if chatroom_member_list is None: + chatroom_member_list = [] # 如果 memberList 是 None,将其初始化为空列表 + elif not isinstance(chatroom_member_list, list): + chatroom_member_list = list(chatroom_member_list) # 如果 memberList 不是列表,将其转换为列表 + remaining_chatroot_members = [x for x in chatroom_member_list if x.get('wxid') not in contact_wxids_set] nickname = next((member['nickName'] for member in chatroom_member_list if member['wxid'] == wxid), None) @@ -511,6 +517,237 @@ def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, loop.run_until_complete(task()) # 在现有事件循环中运行任务 +@celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms', bind=True, acks_late=True) +def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, gewe_config): + + ''' + 关于群加好友的请求规则:一次30人,间隔2小时做1次,一天做3次,即最多90人/天。 + 加好友规则:每天处理次数、间隔时间(分钟)、每次加好友人数这3个参数都可以设置。目前默认只是上面的设置。 + ''' + async def task(): + try: + now = datetime.now() + if now.hour < 8: + logger.info(f"定时群成员定时添好友任务不启动,当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},早于8点") + return + + + + logger.info('定时群成员定时添好友任务开始') + redis_service = RedisService() + await redis_service.init(**redis_config) + + gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url']) + + KAFKA_BOOTSTRAP_SERVERS=kafka_config['bootstrap_servers'] + KAFKA_TOPIC=kafka_config['topic'] + KAFKA_GROUP_ID=kafka_config['group_id'] + + kafka_service= KafkaService(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, KAFKA_TOPIC,KAFKA_GROUP_ID) + await kafka_service.start_producer() + + global_config=await gewe_service.get_global_config_from_cache_async() + scheduled_task_add_contacts_from_chatrooms_config=global_config.get('scheduledTaskAddContactsFromChatrooms',{}) + + oneday_add_contacts_total=90 + once_add_contacts_total=30 + oneday_times=3 + + if scheduled_task_add_contacts_from_chatrooms_config: + oneday_add_contacts_total=scheduled_task_add_contacts_from_chatrooms_config.get('oneDayAddContactsTotal',90) + once_add_contacts_total=scheduled_task_add_contacts_from_chatrooms_config.get('onceAddContactsTotal',30) + oneday_times=scheduled_task_add_contacts_from_chatrooms_config.get('oneDayTimes',3) + + cache_task_run_time_logs= await gewe_service.get_task_run_time_async('scheduled_task_add_contacts_from_chatrooms') + if len(cache_task_run_time_logs) == oneday_times: + logger.info(f"今日定时群成员定时添好友任务已达上限 {oneday_times} 次!") + return + + if cache_task_run_time_logs: + sorted_tasks = sorted(cache_task_run_time_logs, key=lambda x: x.get("runTime"), reverse=True) + last_run_time=sorted_tasks[0].get("runTime") + + if last_run_time > 1e12: # 毫秒级时间戳 + last_run_time = last_run_time / 1000 # 转换为秒 + + # 将时间戳转换为 datetime 对象 + last_run_time = datetime.fromtimestamp(last_run_time) + + # 获取当前时间 + current_time = datetime.now() + + # 计算时间差 + time_difference = current_time - last_run_time + + # 判断是否相差2小时 + if time_difference < timedelta(hours=2): + logger.info(f"上次定时群成员定时添好友任务在2小时内,不再执行") + return + + # 获取当前时间 + current_time = datetime.now() + + # 计算当天的结束时间(23:59:59) + end_of_day = datetime(current_time.year, current_time.month, current_time.day, 23, 59, 59) + + # 计算时间差 + time_difference = end_of_day - current_time + + # 将时间差转换为秒数 + time_difference_seconds = int(time_difference.total_seconds()) + cache_task_run_time_logs.append({"runTime":int(time.time())}) + await gewe_service.save_task_run_time_async('scheduled_task_add_contacts_from_chatrooms',cache_task_run_time_logs,time_difference_seconds) + + login_keys = [] + async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): + login_keys.append(key) + + wixd_add_contacts_from_chatrooms_times = {} + + for k in login_keys: + r = await redis_service.get_hash(k) + app_id = r.get("appId") + token_id = r.get("tokenId") + wxid = r.get("wxid") + status = r.get('status') + if status == '0': + logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加") + continue + + c = await gewe_service.get_wxchat_config_from_cache_async(wxid) + contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid) + + contact_wxids = [c.get('userName') for c in contacts] + chatrooms = c.get('addContactsFromChatroomIdWhiteList', []) + logger.info(f'{wxid} 定时群成员定时添好友任务开始') + wixd_add_contacts_from_chatrooms_times[wxid] = 0 + for chatroom_id in chatrooms: + chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id) + chatroom_member=await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id) + + chatroom_nickname = chatroom.get('nickName') + chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None) + + admin_wxids = chatroom_member.get('adminWxid', []) + + admin_wxids = chatroom_member.get('adminWxid') + if admin_wxids is None: + admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表 + + logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}') + contact_wxids_set = set(contact_wxids) + # for admin_wxid in admin_wxids: + # contact_wxids_set.add(admin_wxid) + if admin_wxids: + contact_wxids_set.update(set(admin_wxids)) + if chatroom_owner_wxid is not None: + contact_wxids_set.add(chatroom_owner_wxid) + + contact_wxids_set.add(wxid) + + # unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id) + # contact_wxids_set.update(set(unavailable_wixds)) + + # chatroom_member_list = chatroom.get('memberList', []) + + + unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id) + if unavailable_wixds: + contact_wxids_set.update(set(unavailable_wixds)) + + chatroom_member_list = chatroom.get('memberList', []) + if chatroom_member_list is None: + chatroom_member_list = [] # 如果 memberList 是 None,将其初始化为空列表 + elif not isinstance(chatroom_member_list, list): + chatroom_member_list = list(chatroom_member_list) # 如果 memberList 不是列表,将其转换为列表 + + + + remaining_chatroot_members = [x for x in chatroom_member_list if x.get('wxid') not in contact_wxids_set] + + nickname = next((member['nickName'] for member in chatroom_member_list if member['wxid'] == wxid), None) + + logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}') + for m in remaining_chatroot_members: + # 判断本次任务是否已经邀请了30个好友 + if wixd_add_contacts_from_chatrooms_times[wxid] == once_add_contacts_total: + logger.info(f"{wxid} 本次任务已经邀请了{once_add_contacts_total}人,不再邀请") + return + # 判断当天群成员是否已经加了90个好友 + is_add_group_times = await gewe_service.is_group_add_contacts_history_one_day_async(wxid,oneday_add_contacts_total) + if is_add_group_times: + logger.info(f"当天 {wxid} 所有群的成员已经加了{oneday_add_contacts_total}个好友,不再添加") + return + + contact_wxid= m.get('wxid') + member_nickname=m.get("nickName") + group_add_contacts_history = await gewe_service.get_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid) + sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True) + + # 已经邀请过两次,不再邀请 + if len(sorted_history)==2: + logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请') + continue + + # 当天邀请过,不再邀请 + if len(sorted_history) > 0: + last_add_time = sorted_history[0].addTime + def is_add_time_more_than_one_day(addTime: int) -> bool: + """ + 判断 addTime 是否与当前时间相隔大于 3600 × 24 秒 + :param addTime: Unix 时间戳 + :return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False + """ + # 获取当前时间的时间戳 + current_time = time.time() + + # 计算时间戳差值 + time_difference = abs(current_time - addTime) + + # 检查是否大于 3600 × 24 秒 + return time_difference > 3600 * 24 + + is_more_than_one_day= is_add_time_more_than_one_day(last_add_time) + + if not is_more_than_one_day: + logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的{member_nickname}-{contact_wxid}已经当天邀请,不再邀请') + continue + + + + ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'), f'我是群聊"{chatroom_nickname}"群的{nickname}') + if ret!=200: + logger.warning(f'群好友邀请失败原因:{ret} {data}') + + history=AddGroupContactsHistory.model_validate({ + "chatroomId":chatroom_id, + "wxid":wxid, + "contactWixd":contact_wxid, + "addTime":int(time.time()) + }) + await gewe_service.save_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid,history) + wixd_add_contacts_from_chatrooms_times[wxid]+=1 + logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg}') + # 推送到kafka + + k_message = wx_add_contacts_from_chatroom_message(history.wxid,history.chatroomId,history.contactWixd,history.addTime) + await kafka_service.send_message_async(k_message) + + await asyncio.sleep(random.uniform(1.5, 3)) + await asyncio.sleep(random.uniform(1.5, 3)) + + + except Exception as e: + logger.error(f"任务执行过程中发生异常: {e}") + + loop = asyncio.get_event_loop() + if loop.is_closed(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + loop.run_until_complete(task()) # 在现有事件循环中运行任务 + + REDIS_KEY_PATTERN = "friend_add_limit:{date}"