瀏覽代碼

调整群加好友规则

1257
H Vs 2 週之前
父節點
當前提交
7d4e53b9d3
共有 3 個檔案被更改,包括 80 行新增36 行删除
  1. +9
    -0
      app/endpoints/pipeline_endpoint.py
  2. +22
    -0
      services/gewe_service.py
  3. +49
    -36
      tasks.py

+ 9
- 0
app/endpoints/pipeline_endpoint.py 查看文件

@@ -407,13 +407,22 @@ async def handle_text_async(request: Request,token_id,app_id, wxid,msg_data,from
logger.warning(f"微信号 {wxid} 关闭好友和群自动回复功能,{handle_text_async.__name__} 不回复消息")
return
# 判断是否转人工处理功能
is_human_handle_msg= await request.app.state.gewe_service.is_human_handle_msg_async(wxid)
if is_human_handle_msg:
logger.warning(f'微信号 {wxid} 暂时工人接管30分钟中')
return

msg_content=msg_data["Content"]["string"]
if wxid == from_wxid: #手动发送消息
logger.info("Active message sending detected")
await request.app.state.gewe_service.save_contacts_brief_to_cache_async(token_id,app_id,wxid,[to_wxid])
callback_to_user=msg_data["ToUserName"]["string"]
# 转人工处理功能
await request.app.state.gewe_service.set_human_handle_msg_async(wxid,60*30)

# 推送到kafka
input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
input_message=dialogue_message(from_wxid,to_wxid,input_wx_content_dialogue_message)
await request.app.state.kafka_service.send_message_async(input_message)


+ 22
- 0
services/gewe_service.py 查看文件

@@ -1469,6 +1469,19 @@ class GeWeService:
cache= await self.redis_service.get_hash_field(hash_key,"data")
return json.loads(cache) if cache else []
async def save_task_run_time_by_wxid_async(self,wxid,task_name,log:list,expire_time=None):
'''
任务运行锁
'''
hash_key = f"__AI_OPS_WX__:TASK_RUN_TIME:{wxid}:{task_name}"
await self.redis_service.set_hash(hash_key,{"data": json.dumps(log, ensure_ascii=False)}, expire_time)
async def get_task_run_time_by_wxid_async(self,wxid,task_name)->list:
hash_key = f"__AI_OPS_WX__:TASK_RUN_TIME:{wxid}:{task_name}"
cache= await self.redis_service.get_hash_field(hash_key,"data")
return json.loads(cache) if cache else []
async def save_wx_expection_async(self,wxid,api_name,exception:str,expire_time=None):
hash_key = f"__AI_OPS_WX__:WX_EXCEPTION:{wxid}:{api_name}"
await self.redis_service.set_hash(hash_key,{"data": json.dumps(exception, ensure_ascii=False)}, expire_time)
@@ -1511,6 +1524,15 @@ class GeWeService:
hash_key = f"__AI_OPS_WX__:GROUPS_ADD_CONTACT_HISTORY:{wxid}:{chatroom_id}"
await self.redis_service.delete_hash(hash_key)

async def is_human_handle_msg_async(self,wxid)->bool:
hash_key = f"__AI_OPS_WX__:HUMAN_HANDLE_MSG:{wxid}"
cache= await self.redis_service.get_hash_field(hash_key,"data")
return True if cache else False
async def set_human_handle_msg_async(self,wxid,expire_time=60*30):
hash_key = f"__AI_OPS_WX__:HUMAN_HANDLE_MSG:{wxid}"
await self.redis_service.set_hash(hash_key,{"data": json.dumps(int(time.time()), ensure_ascii=False)}, expire_time)

# 依赖项:获取 GeWeChatCom 单例
async def get_gewe_service(app: FastAPI = Depends()) -> GeWeService:
return app.state.gewe_service

+ 49
- 36
tasks.py 查看文件

@@ -566,47 +566,31 @@ def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config,
once_add_contacts_total=scheduled_task_add_contacts_from_chatrooms_config.get('onceAddContactsTotal',30)
#oneday_times=scheduled_task_add_contacts_from_chatrooms_config.get('oneDayTimes',3)

cache_task_run_time_logs= await gewe_service.get_task_run_time_async('scheduled_task_add_contacts_from_chatrooms')
# if len(cache_task_run_time_logs) == oneday_times:
# logger.info(f"今日定时群成员定时添好友任务已达上限 {oneday_times} 次!")
# return
if cache_task_run_time_logs:
sorted_tasks = sorted(cache_task_run_time_logs, key=lambda x: x.get("runTime"), reverse=True)
last_run_time=sorted_tasks[0].get("runTime")
# cache_task_run_time_logs= await gewe_service.get_task_run_time_async('scheduled_task_add_contacts_from_chatrooms')
# if cache_task_run_time_logs:
# sorted_tasks = sorted(cache_task_run_time_logs, key=lambda x: x.get("runTime"), reverse=True)
# last_run_time=sorted_tasks[0].get("runTime")

if last_run_time > 1e12: # 毫秒级时间戳
last_run_time = last_run_time / 1000 # 转换为秒
# 将时间戳转换为 datetime 对象
last_run_time = datetime.fromtimestamp(last_run_time)
# if last_run_time > 1e12: # 毫秒级时间戳
# last_run_time = last_run_time / 1000 # 转换为秒
# 获取当前时间
current_time = datetime.now()
# # 将时间戳转换为 datetime 对象
# last_run_time = datetime.fromtimestamp(last_run_time)
# 计算时间差
time_difference = current_time - last_run_time
# # 获取当前时间
# current_time = datetime.now()
# 判断是否相差2小时
if time_difference < timedelta(hours=2):
logger.info(f"上次定时群成员定时添好友任务在2小时内,不再执行")
return
# # 计算时间差
# time_difference = current_time - last_run_time
# 获取当前时间
# current_time = datetime.now()

# # 计算当天的结束时间(23:59:59)
# end_of_day = datetime(current_time.year, current_time.month, current_time.day, 23, 59, 59)

# # 计算时间差
# time_difference = end_of_day - current_time

# # 将时间差转换为秒数
# time_difference_seconds = int(time_difference.total_seconds())

time_difference_seconds = today_seconds_remaining()
cache_task_run_time_logs.append({"runTime":int(time.time())})
await gewe_service.save_task_run_time_async('scheduled_task_add_contacts_from_chatrooms',cache_task_run_time_logs,time_difference_seconds)
# # 判断是否相差2小时
# if time_difference < timedelta(hours=2):
# logger.info(f"上次定时群成员定时添好友任务在2小时内,不再执行")
# return

# time_difference_seconds = today_seconds_remaining()
# cache_task_run_time_logs.append({"runTime":int(time.time())})
# await gewe_service.save_task_run_time_async('scheduled_task_add_contacts_from_chatrooms',cache_task_run_time_logs,time_difference_seconds)
login_keys = []
async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
@@ -636,6 +620,35 @@ def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config,
if is_wx_expection:
logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常, {is_wx_expection},本次群好友邀请任务未开始,跳过任务。")
return

cache_task_run_time_wxid_logs= await gewe_service.get_task_run_time_by_wxid_async(wxid,'scheduled_task_add_contacts_from_chatrooms')
if cache_task_run_time_wxid_logs:
sorted_tasks = sorted(cache_task_run_time_wxid_logs, key=lambda x: x.get("runTime"), reverse=True)
last_run_time=sorted_tasks[0].get("runTime")

if last_run_time > 1e12: # 毫秒级时间戳
last_run_time = last_run_time / 1000 # 转换为秒
# 将时间戳转换为 datetime 对象
last_run_time = datetime.fromtimestamp(last_run_time)
# 获取当前时间
current_time = datetime.now()
# 计算时间差
time_difference = current_time - last_run_time
# 判断是否相差2小时
if time_difference < timedelta(hours=2):
logger.info(f"{wxid}上次定时群成员定时添好友任务在2小时内,不再执行")
return

time_difference_seconds = today_seconds_remaining()
cache_task_run_time_wxid_logs.append({"runTime":int(time.time())})
await gewe_service.save_task_run_time_by_wxid_async(wxid,'scheduled_task_add_contacts_from_chatrooms',cache_task_run_time_wxid_logs,time_difference_seconds)


c = await gewe_service.get_wxchat_config_from_cache_async(wxid)
contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid)


Loading…
取消
儲存