瀏覽代碼

更改群加好友规则

1257
H Vs 2 週之前
父節點
當前提交
018bac4976
共有 4 個檔案被更改,包括 209 行新增3 行删除
  1. +1
    -1
      celery_app.py
  2. +7
    -0
      common/utils.py
  3. +30
    -0
      services/gewe_service.py
  4. +171
    -2
      tasks.py

+ 1
- 1
celery_app.py 查看文件

@@ -30,7 +30,7 @@ environment = os.environ.get('environment', 'default')


if environment == 'production': if environment == 'production':
scheduled_task_sync_wx_info_interval = 60*11 scheduled_task_sync_wx_info_interval = 60*11
scheduled_task_add_contacts_from_chatrooms_interval = 60*11
scheduled_task_add_contacts_from_chatrooms_interval = 3600*2
elif environment == 'test': elif environment == 'test':
scheduled_task_sync_wx_info_interval = 60*11 scheduled_task_sync_wx_info_interval = 60*11
scheduled_task_add_contacts_from_chatrooms_interval = 60*11 scheduled_task_add_contacts_from_chatrooms_interval = 60*11


+ 7
- 0
common/utils.py 查看文件

@@ -132,6 +132,13 @@ def wx_mod_group_info_members_message(wxid:str,data:dict|list)->str:
data=kafka_base_message("mod-group",content) data=kafka_base_message("mod-group",content)
return json.dumps(data, separators=(',', ':'), ensure_ascii=False) return json.dumps(data, separators=(',', ':'), ensure_ascii=False)



def wx_add_contacts_from_chatroom_message(wxid:str,chatroom_id:str,contact_wixd:str,add_time:int)->str:
content = {"wxid": wxid,"chatroomId":chatroom_id,"contactWixd":contact_wixd,"addTime":add_time}
data=kafka_base_message("add-contacts-from-chatroom",content)
return json.dumps(data, separators=(',', ':'), ensure_ascii=False)


def wx_del_group_message(wxid:str,chatroom_id:str)->str: def wx_del_group_message(wxid:str,chatroom_id:str)->str:
content = {"wxid": wxid,"chatroom_id":chatroom_id} content = {"wxid": wxid,"chatroom_id":chatroom_id}
data=kafka_base_message("del-group",content) data=kafka_base_message("del-group",content)


+ 30
- 0
services/gewe_service.py 查看文件

@@ -1290,6 +1290,36 @@ class GeWeService:
return True return True
return False return False
async def is_group_add_contacts_history_one_day_90_async(self, wxid) -> bool:

today_list = []
today = datetime.datetime.now().date()
cursor = 0
hash_key = f"__AI_OPS_WX__:GROUPS_ADD_CONTACT_HISTORY:{wxid}:*"
while True:
cursor, history_keys =await self.redis_service.client.scan(cursor, match= hash_key)
#print(f'login_keys:{login_keys}')
# 批量获取所有键的 hash 数据
for k in history_keys:
cache = await self.redis_service.get_hash(k)
for key, value in cache.items():
value_data_list = json.loads(value)
for value_data in value_data_list:
add_time_date = datetime.datetime.fromtimestamp(value_data["addTime"]).date()
if add_time_date == today:
today_list.append(value_data)
if len(today_list) == 90:
return True

# 如果游标为 0,则表示扫描完成
if cursor == 0:
break

return False


async def enqueue_to_add_contacts_async(self,wxid,scene:int,v3,v4): async def enqueue_to_add_contacts_async(self,wxid,scene:int,v3,v4):
""" """
入列待添加好友 入列待添加好友


+ 171
- 2
tasks.py 查看文件

@@ -10,6 +10,7 @@ from services.redis_service import RedisService
from services.kafka_service import KafkaService from services.kafka_service import KafkaService
from services.gewe_service import GeWeService from services.gewe_service import GeWeService
from common.log import logger from common.log import logger
from common.utils import *
import asyncio,random import asyncio,random
from model.models import AddGroupContactsHistory from model.models import AddGroupContactsHistory


@@ -157,6 +158,7 @@ def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config):
redis_service = RedisService() redis_service = RedisService()
await redis_service.init(**redis_config) await redis_service.init(**redis_config)
gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url']) gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url'])

login_keys = [] login_keys = []
async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
login_keys.append(key) login_keys.append(key)
@@ -198,8 +200,8 @@ def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config):






@celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms', bind=True, acks_late=True)
def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, gewe_config):
#@celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms_p', bind=True, acks_late=True)
def scheduled_task_add_contacts_from_chatrooms_p(self, redis_config, kafka_config, gewe_config):
async def task(): async def task():
try: try:
now = datetime.datetime.now() now = datetime.datetime.now()
@@ -254,6 +256,9 @@ def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config,
contact_wxids_set = set(contact_wxids) contact_wxids_set = set(contact_wxids)
# for admin_wxid in admin_wxids: # for admin_wxid in admin_wxids:
# contact_wxids_set.add(admin_wxid) # contact_wxids_set.add(admin_wxid)
if admin_wxids:
contact_wxids_set.update(set(admin_wxids))

contact_wxids_set.update(set(admin_wxids)) contact_wxids_set.update(set(admin_wxids))
if chatroom_owner_wxid is not None: if chatroom_owner_wxid is not None:
contact_wxids_set.add(chatroom_owner_wxid) contact_wxids_set.add(chatroom_owner_wxid)
@@ -344,6 +349,170 @@ def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config,
loop.run_until_complete(task()) # 在现有事件循环中运行任务 loop.run_until_complete(task()) # 在现有事件循环中运行任务





@celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms', bind=True, acks_late=True)
def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, gewe_config):

'''
关于群加好友的请求规则:一次30人,间隔2小时做1次,一天做3次,即最多90人/天。
'''
async def task():
try:
now = datetime.datetime.now()
if now.hour < 8:
logger.info(f"定时群成员定时添好友任务不启动,当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},早于8点")
return
logger.info('定时群成员定时添好友任务开始')
redis_service = RedisService()
await redis_service.init(**redis_config)

gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url'])

KAFKA_BOOTSTRAP_SERVERS=kafka_config['bootstrap_servers']
KAFKA_TOPIC=kafka_config['topic']
KAFKA_GROUP_ID=kafka_config['group_id']
kafka_service= KafkaService(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, KAFKA_TOPIC,KAFKA_GROUP_ID)
await kafka_service.start()

login_keys = []
async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
login_keys.append(key)

wixd_add_contacts_from_chatrooms_times = {}
#print(login_keys)
for k in login_keys:
r = await redis_service.get_hash(k)
app_id = r.get("appId")
token_id = r.get("tokenId")
wxid = r.get("wxid")
status = r.get('status')
if status == '0':
logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加")
continue

c = await gewe_service.get_wxchat_config_from_cache_async(wxid)
contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid)
contact_wxids = [c.get('userName') for c in contacts]
chatrooms = c.get('addContactsFromChatroomIdWhiteList', [])
logger.info(f'{wxid} 定时群成员定时添好友任务开始')
wixd_add_contacts_from_chatrooms_times[wxid] = 0
for chatroom_id in chatrooms:
chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id)
chatroom_member=await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id)
chatroom_nickname = chatroom.get('nickName')
chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None)

admin_wxids = chatroom_member.get('adminWxid', [])

admin_wxids = chatroom_member.get('adminWxid')
if admin_wxids is None:
admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表
logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}')
contact_wxids_set = set(contact_wxids)
# for admin_wxid in admin_wxids:
# contact_wxids_set.add(admin_wxid)
if admin_wxids:
contact_wxids_set.update(set(admin_wxids))
if chatroom_owner_wxid is not None:
contact_wxids_set.add(chatroom_owner_wxid)
contact_wxids_set.add(wxid)

unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id)
contact_wxids_set.update(set(unavailable_wixds))

chatroom_member_list = chatroom.get('memberList', [])
remaining_chatroot_members = [x for x in chatroom_member_list if x.get('wxid') not in contact_wxids_set]
nickname = next((member['nickName'] for member in chatroom_member_list if member['wxid'] == wxid), None)

logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}')
for m in remaining_chatroot_members:
# 判断本次任务是否已经邀请了30个好友
if wixd_add_contacts_from_chatrooms_times[wxid] == 30:
logger.info(f"{wxid} 本次任务已经邀请了30人,不再邀请")
return
# 判断当天群成员是否已经加了90个好友
is_add_group_90_times = await gewe_service.is_group_add_contacts_history_one_day_90_async(wxid)
if is_add_group_90_times:
logger.info(f"当天 {wxid} 所有群的成员已经加了90个好友,不再添加")
return

contact_wxid= m.get('wxid')
member_nickname=m.get("nickName")
group_add_contacts_history = await gewe_service.get_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid)
sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True)
# 已经邀请过两次,不再邀请
if len(sorted_history)==2:
logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请')
continue

# 当天邀请过,不再邀请
if len(sorted_history) > 0:
last_add_time = sorted_history[0].addTime
def is_add_time_more_than_one_day(addTime: int) -> bool:
"""
判断 addTime 是否与当前时间相隔大于 3600 × 24 秒
:param addTime: Unix 时间戳
:return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False
"""
# 获取当前时间的时间戳
current_time = time.time()
# 计算时间戳差值
time_difference = abs(current_time - addTime)
# 检查是否大于 3600 × 24 秒
return time_difference > 3600 * 24
is_more_than_one_day= is_add_time_more_than_one_day(last_add_time)

if not is_more_than_one_day:
logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的{member_nickname}-{contact_wxid}已经当天邀请,不再邀请')
continue



ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'), f'我是群聊"{chatroom_nickname}"群的{nickname}')
if ret!=200:
logger.warning(f'群好友邀请失败原因:{ret} {data}')

history=AddGroupContactsHistory.model_validate({
"chatroomId":chatroom_id,
"wxid":wxid,
"contactWixd":contact_wxid,
"addTime":int(time.time())
})
await gewe_service.save_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid,history)
wixd_add_contacts_from_chatrooms_times[wxid]+=1
logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg}')
# 推送到kafka

k_message = wx_add_contacts_from_chatroom_message(history.wxid,history.chatroomId,history.contactWixd,history.addTime)
await kafka_service.send_message_async(k_message)

await asyncio.sleep(random.uniform(1.5, 3))
await asyncio.sleep(random.uniform(1.5, 3))


except Exception as e:
logger.error(f"任务执行过程中发生异常: {e}")
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

loop.run_until_complete(task()) # 在现有事件循环中运行任务



REDIS_KEY_PATTERN = "friend_add_limit:{date}" REDIS_KEY_PATTERN = "friend_add_limit:{date}"
REDIS_LAST_RUN_KEY = "last_run_time:add_friends_task" REDIS_LAST_RUN_KEY = "last_run_time:add_friends_task"




Loading…
取消
儲存