From 6c7e4aaf37f88025d56d5cd59350784969a20dec Mon Sep 17 00:00:00 2001 From: lanvent Date: Tue, 4 Apr 2023 14:29:03 +0800 Subject: [PATCH] feat: prioritize handling commands --- channel/chat_channel.py | 13 +++++++------ common/dequeue.py | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 6 deletions(-) create mode 100644 common/dequeue.py diff --git a/channel/chat_channel.py b/channel/chat_channel.py index 2582b63..af2c299 100644 --- a/channel/chat_channel.py +++ b/channel/chat_channel.py @@ -1,14 +1,12 @@ from asyncio import CancelledError -import queue from concurrent.futures import Future, ThreadPoolExecutor import os import re import threading import time -from channel.chat_message import ChatMessage -from common.expired_dict import ExpiredDict +from common.dequeue import Dequeue from channel.channel import Channel from bridge.reply import * from bridge.context import * @@ -245,8 +243,11 @@ class ChatChannel(Channel): session_id = context['session_id'] with self.lock: if session_id not in self.sessions: - self.sessions[session_id] = (queue.Queue(), threading.BoundedSemaphore(conf().get("concurrency_in_session", 1))) - self.sessions[session_id][0].put(context) + self.sessions[session_id] = (Dequeue(), threading.BoundedSemaphore(conf().get("concurrency_in_session", 1))) + if context.type == ContextType.TEXT and context.content.startswith("#"): + self.sessions[session_id][0].putleft(context) # 优先处理命令 + else: + self.sessions[session_id][0].put(context) # 消费者函数,单独线程,用于从消息队列中取出消息并处理 def consume(self): @@ -277,7 +278,7 @@ class ChatChannel(Channel): if session_id in self.sessions: for future in self.futures[session_id]: future.cancel() - self.sessions[session_id][0]=queue.Queue() + self.sessions[session_id][0]=Dequeue() def check_prefix(content, prefix_list): diff --git a/common/dequeue.py b/common/dequeue.py new file mode 100644 index 0000000..5881106 --- /dev/null +++ b/common/dequeue.py @@ -0,0 +1,33 @@ + +from queue import Full, Queue +from time import monotonic as time + +# add implementation of putleft to Queue +class Dequeue(Queue): + def putleft(self, item, block=True, timeout=None): + with self.not_full: + if self.maxsize > 0: + if not block: + if self._qsize() >= self.maxsize: + raise Full + elif timeout is None: + while self._qsize() >= self.maxsize: + self.not_full.wait() + elif timeout < 0: + raise ValueError("'timeout' must be a non-negative number") + else: + endtime = time() + timeout + while self._qsize() >= self.maxsize: + remaining = endtime - time() + if remaining <= 0.0: + raise Full + self.not_full.wait(remaining) + self._putleft(item) + self.unfinished_tasks += 1 + self.not_empty.notify() + + def put_nowait(self, item): + return self.put(item, block=False) + + def _putleft(self, item): + self.queue.appendleft(item) \ No newline at end of file