Browse Source

全局群加好友

1257
H Vs 1 week ago
parent
commit
10efd64d9a
6 changed files with 373 additions and 18 deletions
  1. +21
    -4
      app/endpoints/config_endpoint.py
  2. +1
    -1
      run.py
  3. +57
    -0
      services/gewe_service.py
  4. +5
    -0
      services/kafka_service.py
  5. +47
    -8
      services/redis_service.py
  6. +242
    -5
      tasks.py

+ 21
- 4
app/endpoints/config_endpoint.py View File

@@ -2,6 +2,7 @@ from fastapi import APIRouter,Request
from pydantic import BaseModel from pydantic import BaseModel
from fastapi import APIRouter, Depends from fastapi import APIRouter, Depends
from pydantic import BaseModel, ValidationError from pydantic import BaseModel, ValidationError
from typing import Dict, Any




from services.gewe_service import GeWeService,get_gewe_service from services.gewe_service import GeWeService,get_gewe_service
@@ -9,7 +10,7 @@ from services.redis_service import RedisService
from model.models import AgentConfig,validate_wxid from model.models import AgentConfig,validate_wxid




config_router = APIRouter(prefix="/api/wxchat")
config_router = APIRouter(prefix="/api")


# 定义请求体的 Pydantic 模型 # 定义请求体的 Pydantic 模型
class GetConfigRequest(BaseModel): class GetConfigRequest(BaseModel):
@@ -21,7 +22,9 @@ class SaveConfigRequest(BaseModel):
data: dict data: dict




@config_router.post("/getconfig",response_model=None)


@config_router.post("/wxchat/getconfig",response_model=None)
#@validate_wxid #@validate_wxid
async def get_config(request: Request, body: GetConfigRequest): async def get_config(request: Request, body: GetConfigRequest):
wxid = body.wxid wxid = body.wxid
@@ -44,7 +47,7 @@ async def get_config(request: Request, body: GetConfigRequest):






@config_router.post("/saveconfig",response_model=None)
@config_router.post("/wxchat/saveconfig",response_model=None)
#@validate_wxid #@validate_wxid
async def save_config(request: Request, body: SaveConfigRequest): async def save_config(request: Request, body: SaveConfigRequest):
wxid = body.wxid wxid = body.wxid
@@ -74,4 +77,18 @@ async def save_config(request: Request, body: SaveConfigRequest):
await request.app.state.gewe_service.save_wxchat_config_async(wxid, data) await request.app.state.gewe_service.save_wxchat_config_async(wxid, data)
return {'wxid': wxid, 'config': data} return {'wxid': wxid, 'config': data}


@config_router.post("/global/getconfig",response_model=None)
async def get_global_config(request: Request):
#await request.app.state.gewe_service.get_login_info_by_wxid_async()
config=await request.app.state.gewe_service.get_global_config_from_cache_async()
return config


@config_router.post("/global/saveconfig",response_model=None)
async def save_global_config(request: Request, body: Dict[str, Any]):
#await request.app.state.gewe_service.get_login_info_by_wxid_async()
await request.app.state.gewe_service.save_global_config_async(body)
return {'message': '操作成功'}


+ 1
- 1
run.py View File

@@ -37,7 +37,7 @@ if __name__ == "__main__":
fastapi_process = start_fastapi() fastapi_process = start_fastapi()
celery_worker_process = start_celery_worker() celery_worker_process = start_celery_worker()
celery_beat_process = start_celery_beat() celery_beat_process = start_celery_beat()
# 等待子进程完成 # 等待子进程完成
#fastapi_process.wait() #fastapi_process.wait()
celery_worker_process.wait() celery_worker_process.wait()


+ 57
- 0
services/gewe_service.py View File

@@ -1161,6 +1161,21 @@ class GeWeService:
hash_key = f"__AI_OPS_WX__:WXCHAT_CONFIG" hash_key = f"__AI_OPS_WX__:WXCHAT_CONFIG"
await self.redis_service.update_hash_field(hash_key, wxid, json.dumps(config, ensure_ascii=False)) await self.redis_service.update_hash_field(hash_key, wxid, json.dumps(config, ensure_ascii=False))


async def get_global_config_from_cache_async(self):
"""
获取配置信息
"""
hash_key = f"__AI_OPS_WX__:GLOBAL_CONFIG"
config = await self.redis_service.get_hash_field(hash_key,"data")
return json.loads(config) if config else {}
async def save_global_config_async(self, config:dict):
"""
保存配置信息
"""
hash_key = f"__AI_OPS_WX__:GLOBAL_CONFIG"
await self.redis_service.update_hash_field(hash_key, "data,", json.dumps(config, ensure_ascii=False))

async def get_login_info_from_cache_async(self,tel): async def get_login_info_from_cache_async(self,tel):
hash_key = f"__AI_OPS_WX__:LOGININFO:{tel}" hash_key = f"__AI_OPS_WX__:LOGININFO:{tel}"
cache = await self.redis_service.get_hash(hash_key) cache = await self.redis_service.get_hash(hash_key)
@@ -1319,6 +1334,34 @@ class GeWeService:


return False return False


async def is_group_add_contacts_history_one_day_async(self, wxid,today_total=90) -> bool:

today_list = []
today = datetime.datetime.now().date()
cursor = 0
hash_key = f"__AI_OPS_WX__:GROUPS_ADD_CONTACT_HISTORY:{wxid}:*"
while True:
cursor, history_keys =await self.redis_service.client.scan(cursor, match= hash_key)
#print(f'login_keys:{login_keys}')
# 批量获取所有键的 hash 数据
for k in history_keys:
cache = await self.redis_service.get_hash(k)
for key, value in cache.items():
value_data_list = json.loads(value)
for value_data in value_data_list:
add_time_date = datetime.datetime.fromtimestamp(value_data["addTime"]).date()
if add_time_date == today:
today_list.append(value_data)
if len(today_list) == today_total:
return True

# 如果游标为 0,则表示扫描完成
if cursor == 0:
break

return False


async def enqueue_to_add_contacts_async(self,wxid,scene:int,v3,v4): async def enqueue_to_add_contacts_async(self,wxid,scene:int,v3,v4):
""" """
@@ -1336,7 +1379,21 @@ class GeWeService:
data=await self.redis_service.dequeue(hash_key) data=await self.redis_service.dequeue(hash_key)
return json.loads(data) if data else {} return json.loads(data) if data else {}


# async def acquire_task_run_time_lock_async(self,task_name,run_time,expire_time=None):
# hash_key = f"__AI_OPS_WX__:{task_name}"
# if await self.redis_service.client.setnx(hash_key, run_time):
# await self.redis_service.client.expire(hash_key, expire_time)
# return True
# return False


async def save_task_run_time_async(self,task_name,log:list,expire_time=None):
hash_key = f"__AI_OPS_WX__:TASK_RUN_TIME:{task_name}"
await self.redis_service.set_hash(hash_key, "data,", json.dumps(log, ensure_ascii=False), expire_time)
async def get_task_run_time_async(self,task_name)->list:
hash_key = f"__AI_OPS_WX__:TASK_RUN_TIME:{task_name}"
cache= await self.redis_service.get_hash_field(hash_key,"data")
return json.loads(cache) if cache else []




# 依赖项:获取 GeWeChatCom 单例 # 依赖项:获取 GeWeChatCom 单例


+ 5
- 0
services/kafka_service.py View File

@@ -164,6 +164,11 @@ class KafkaService:
await self.connect_consumer() await self.connect_consumer()
self.consumer_task = asyncio.create_task(self.consume_messages()) self.consumer_task = asyncio.create_task(self.consume_messages())


async def start_producer(self):
"""Start both producer and consumer"""
await self.connect_producer()


async def stop(self): async def stop(self):
"""Graceful shutdown""" """Graceful shutdown"""
if self.producer: if self.producer:


+ 47
- 8
services/redis_service.py View File

@@ -23,20 +23,59 @@ class RedisService:
self.client = await aioredis.Redis(host=host, port=port, password=password, db=db) self.client = await aioredis.Redis(host=host, port=port, password=password, db=db)


async def set_hash(self, hash_key, data, timeout=None):
"""添加或更新哈希,并设置有效期"""
await self.client.hmset_dict(hash_key, data)
if timeout:
# 设置有效期(单位:秒)
await self.client.expire(hash_key, timeout)

# async def set_hash(self, hash_key, data, timeout=None): # async def set_hash(self, hash_key, data, timeout=None):
# """添加或更新哈希,并设置有效期""" # """添加或更新哈希,并设置有效期"""
# await self.client.hmset_dict(hash_key, data)
# # 使用 hmset 方法设置哈希表数据
# await self.client.hmset(hash_key, data)
# if timeout: # if timeout:
# # 设置有效期(单位:秒) # # 设置有效期(单位:秒)
# await self.client.expire(hash_key, timeout) # await self.client.expire(hash_key, timeout)


async def set_hash(self, hash_key, data, timeout=None):
"""添加或更新哈希,并设置有效期"""
# 使用 hmset 方法设置哈希表数据
await self.client.hmset(hash_key, data)
if timeout:
# 设置有效期(单位:秒)
await self.client.expire(hash_key, timeout)

# async def set_hash(self, hash_key, data, timeout=None):
# """添加或更新哈希,并设置有效期"""
# # 使用 hset 方法设置哈希表数据
# await self.client.hset(hash_key, mapping=data)
# if timeout:
# # 设置有效期(单位:秒)
# await self.client.expire(hash_key, timeout)

# def flatten_dict(self,d, parent_key="", sep="."):
# """
# 将嵌套字典扁平化
# :param d: 嵌套字典
# :param parent_key: 父键(用于递归)
# :param sep: 分隔符
# :return: 扁平化字典
# """
# items = []
# for k, v in d.items():
# new_key = f"{parent_key}{sep}{k}" if parent_key else k
# if isinstance(v, dict):
# items.extend(self.flatten_dict(v, new_key, sep=sep).items())
# else:
# items.append((new_key, v))
# return dict(items)

# async def set_hash(self, hash_key, data, timeout=None):
# """添加或更新哈希,并设置有效期"""
# # 扁平化嵌套字典
# flat_data = self.flatten_dict(data)
# # 使用 hset 方法设置哈希表数据
# await self.client.hset(hash_key, mapping=flat_data)
# if timeout:
# # 设置有效期(单位:秒)
# await self.client.expire(hash_key, timeout)


async def get_hash(self, hash_key): async def get_hash(self, hash_key):
"""获取整个哈希表数据""" """获取整个哈希表数据"""


+ 242
- 5
tasks.py View File

@@ -4,7 +4,7 @@ import time,datetime
from celery import Celery from celery import Celery
import celery.schedules import celery.schedules
from redbeat import RedBeatSchedulerEntry from redbeat import RedBeatSchedulerEntry
#from datetime import timedelta
from datetime import timedelta


from services.redis_service import RedisService from services.redis_service import RedisService
from services.kafka_service import KafkaService from services.kafka_service import KafkaService
@@ -350,8 +350,8 @@ def scheduled_task_add_contacts_from_chatrooms_p(self, redis_config, kafka_confi






@celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms', bind=True, acks_late=True)
def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, gewe_config):
#@celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms_p2', bind=True, acks_late=True)
def scheduled_task_add_contacts_from_chatrooms_p2(self, redis_config, kafka_config, gewe_config):


''' '''
关于群加好友的请求规则:一次30人,间隔2小时做1次,一天做3次,即最多90人/天。 关于群加好友的请求规则:一次30人,间隔2小时做1次,一天做3次,即最多90人/天。
@@ -424,9 +424,15 @@ def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config,
contact_wxids_set.add(wxid) contact_wxids_set.add(wxid)


unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id) unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id)
contact_wxids_set.update(set(unavailable_wixds))

if unavailable_wixds:
contact_wxids_set.update(set(unavailable_wixds))
chatroom_member_list = chatroom.get('memberList', []) chatroom_member_list = chatroom.get('memberList', [])
if chatroom_member_list is None:
chatroom_member_list = [] # 如果 memberList 是 None,将其初始化为空列表
elif not isinstance(chatroom_member_list, list):
chatroom_member_list = list(chatroom_member_list) # 如果 memberList 不是列表,将其转换为列表

remaining_chatroot_members = [x for x in chatroom_member_list if x.get('wxid') not in contact_wxids_set] remaining_chatroot_members = [x for x in chatroom_member_list if x.get('wxid') not in contact_wxids_set]
nickname = next((member['nickName'] for member in chatroom_member_list if member['wxid'] == wxid), None) nickname = next((member['nickName'] for member in chatroom_member_list if member['wxid'] == wxid), None)
@@ -511,6 +517,237 @@ def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config,


loop.run_until_complete(task()) # 在现有事件循环中运行任务 loop.run_until_complete(task()) # 在现有事件循环中运行任务


@celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms', bind=True, acks_late=True)
def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, gewe_config):

'''
关于群加好友的请求规则:一次30人,间隔2小时做1次,一天做3次,即最多90人/天。
加好友规则:每天处理次数、间隔时间(分钟)、每次加好友人数这3个参数都可以设置。目前默认只是上面的设置。
'''
async def task():
try:
now = datetime.now()
if now.hour < 8:
logger.info(f"定时群成员定时添好友任务不启动,当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},早于8点")
return


logger.info('定时群成员定时添好友任务开始')
redis_service = RedisService()
await redis_service.init(**redis_config)

gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url'])

KAFKA_BOOTSTRAP_SERVERS=kafka_config['bootstrap_servers']
KAFKA_TOPIC=kafka_config['topic']
KAFKA_GROUP_ID=kafka_config['group_id']
kafka_service= KafkaService(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, KAFKA_TOPIC,KAFKA_GROUP_ID)
await kafka_service.start_producer()

global_config=await gewe_service.get_global_config_from_cache_async()
scheduled_task_add_contacts_from_chatrooms_config=global_config.get('scheduledTaskAddContactsFromChatrooms',{})
oneday_add_contacts_total=90
once_add_contacts_total=30
oneday_times=3

if scheduled_task_add_contacts_from_chatrooms_config:
oneday_add_contacts_total=scheduled_task_add_contacts_from_chatrooms_config.get('oneDayAddContactsTotal',90)
once_add_contacts_total=scheduled_task_add_contacts_from_chatrooms_config.get('onceAddContactsTotal',30)
oneday_times=scheduled_task_add_contacts_from_chatrooms_config.get('oneDayTimes',3)

cache_task_run_time_logs= await gewe_service.get_task_run_time_async('scheduled_task_add_contacts_from_chatrooms')
if len(cache_task_run_time_logs) == oneday_times:
logger.info(f"今日定时群成员定时添好友任务已达上限 {oneday_times} 次!")
return
if cache_task_run_time_logs:
sorted_tasks = sorted(cache_task_run_time_logs, key=lambda x: x.get("runTime"), reverse=True)
last_run_time=sorted_tasks[0].get("runTime")

if last_run_time > 1e12: # 毫秒级时间戳
last_run_time = last_run_time / 1000 # 转换为秒
# 将时间戳转换为 datetime 对象
last_run_time = datetime.fromtimestamp(last_run_time)
# 获取当前时间
current_time = datetime.now()
# 计算时间差
time_difference = current_time - last_run_time
# 判断是否相差2小时
if time_difference < timedelta(hours=2):
logger.info(f"上次定时群成员定时添好友任务在2小时内,不再执行")
return
# 获取当前时间
current_time = datetime.now()

# 计算当天的结束时间(23:59:59)
end_of_day = datetime(current_time.year, current_time.month, current_time.day, 23, 59, 59)

# 计算时间差
time_difference = end_of_day - current_time

# 将时间差转换为秒数
time_difference_seconds = int(time_difference.total_seconds())
cache_task_run_time_logs.append({"runTime":int(time.time())})
await gewe_service.save_task_run_time_async('scheduled_task_add_contacts_from_chatrooms',cache_task_run_time_logs,time_difference_seconds)
login_keys = []
async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
login_keys.append(key)

wixd_add_contacts_from_chatrooms_times = {}

for k in login_keys:
r = await redis_service.get_hash(k)
app_id = r.get("appId")
token_id = r.get("tokenId")
wxid = r.get("wxid")
status = r.get('status')
if status == '0':
logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加")
continue

c = await gewe_service.get_wxchat_config_from_cache_async(wxid)
contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid)
contact_wxids = [c.get('userName') for c in contacts]
chatrooms = c.get('addContactsFromChatroomIdWhiteList', [])
logger.info(f'{wxid} 定时群成员定时添好友任务开始')
wixd_add_contacts_from_chatrooms_times[wxid] = 0
for chatroom_id in chatrooms:
chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id)
chatroom_member=await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id)
chatroom_nickname = chatroom.get('nickName')
chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None)

admin_wxids = chatroom_member.get('adminWxid', [])

admin_wxids = chatroom_member.get('adminWxid')
if admin_wxids is None:
admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表
logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}')
contact_wxids_set = set(contact_wxids)
# for admin_wxid in admin_wxids:
# contact_wxids_set.add(admin_wxid)
if admin_wxids:
contact_wxids_set.update(set(admin_wxids))
if chatroom_owner_wxid is not None:
contact_wxids_set.add(chatroom_owner_wxid)
contact_wxids_set.add(wxid)

# unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id)
# contact_wxids_set.update(set(unavailable_wixds))

# chatroom_member_list = chatroom.get('memberList', [])


unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id)
if unavailable_wixds:
contact_wxids_set.update(set(unavailable_wixds))
chatroom_member_list = chatroom.get('memberList', [])
if chatroom_member_list is None:
chatroom_member_list = [] # 如果 memberList 是 None,将其初始化为空列表
elif not isinstance(chatroom_member_list, list):
chatroom_member_list = list(chatroom_member_list) # 如果 memberList 不是列表,将其转换为列表



remaining_chatroot_members = [x for x in chatroom_member_list if x.get('wxid') not in contact_wxids_set]
nickname = next((member['nickName'] for member in chatroom_member_list if member['wxid'] == wxid), None)

logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}')
for m in remaining_chatroot_members:
# 判断本次任务是否已经邀请了30个好友
if wixd_add_contacts_from_chatrooms_times[wxid] == once_add_contacts_total:
logger.info(f"{wxid} 本次任务已经邀请了{once_add_contacts_total}人,不再邀请")
return
# 判断当天群成员是否已经加了90个好友
is_add_group_times = await gewe_service.is_group_add_contacts_history_one_day_async(wxid,oneday_add_contacts_total)
if is_add_group_times:
logger.info(f"当天 {wxid} 所有群的成员已经加了{oneday_add_contacts_total}个好友,不再添加")
return

contact_wxid= m.get('wxid')
member_nickname=m.get("nickName")
group_add_contacts_history = await gewe_service.get_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid)
sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True)
# 已经邀请过两次,不再邀请
if len(sorted_history)==2:
logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请')
continue

# 当天邀请过,不再邀请
if len(sorted_history) > 0:
last_add_time = sorted_history[0].addTime
def is_add_time_more_than_one_day(addTime: int) -> bool:
"""
判断 addTime 是否与当前时间相隔大于 3600 × 24 秒
:param addTime: Unix 时间戳
:return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False
"""
# 获取当前时间的时间戳
current_time = time.time()
# 计算时间戳差值
time_difference = abs(current_time - addTime)
# 检查是否大于 3600 × 24 秒
return time_difference > 3600 * 24
is_more_than_one_day= is_add_time_more_than_one_day(last_add_time)

if not is_more_than_one_day:
logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的{member_nickname}-{contact_wxid}已经当天邀请,不再邀请')
continue



ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'), f'我是群聊"{chatroom_nickname}"群的{nickname}')
if ret!=200:
logger.warning(f'群好友邀请失败原因:{ret} {data}')

history=AddGroupContactsHistory.model_validate({
"chatroomId":chatroom_id,
"wxid":wxid,
"contactWixd":contact_wxid,
"addTime":int(time.time())
})
await gewe_service.save_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid,history)
wixd_add_contacts_from_chatrooms_times[wxid]+=1
logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg}')
# 推送到kafka

k_message = wx_add_contacts_from_chatroom_message(history.wxid,history.chatroomId,history.contactWixd,history.addTime)
await kafka_service.send_message_async(k_message)

await asyncio.sleep(random.uniform(1.5, 3))
await asyncio.sleep(random.uniform(1.5, 3))


except Exception as e:
logger.error(f"任务执行过程中发生异常: {e}")
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

loop.run_until_complete(task()) # 在现有事件循环中运行任务






REDIS_KEY_PATTERN = "friend_add_limit:{date}" REDIS_KEY_PATTERN = "friend_add_limit:{date}"


Loading…
Cancel
Save