from bot.session_manager import Session from common.log import logger """ e.g. [ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "Who won the world series in 2020?"}, {"role": "assistant", "content": "The Los Angeles Dodgers won the World Series in 2020."}, {"role": "user", "content": "Where was it played?"} ] """ class ChatGPTSession(Session): def __init__(self, session_id, system_prompt=None, model="gpt-3.5-turbo"): super().__init__(session_id, system_prompt) self.model = model self.reset() def discard_exceeding(self, max_tokens, cur_tokens=None): precise = True try: cur_tokens = self.calc_tokens() except Exception as e: precise = False if cur_tokens is None: raise e logger.debug( "Exception when counting tokens precisely for query: {}".format(e) ) while cur_tokens > max_tokens: if len(self.messages) > 2: self.messages.pop(1) elif len(self.messages) == 2 and self.messages[1]["role"] == "assistant": self.messages.pop(1) if precise: cur_tokens = self.calc_tokens() else: cur_tokens = cur_tokens - max_tokens break elif len(self.messages) == 2 and self.messages[1]["role"] == "user": logger.warn( "user message exceed max_tokens. total_tokens={}".format(cur_tokens) ) break else: logger.debug( "max_tokens={}, total_tokens={}, len(messages)={}".format( max_tokens, cur_tokens, len(self.messages) ) ) break if precise: cur_tokens = self.calc_tokens() else: cur_tokens = cur_tokens - max_tokens return cur_tokens def calc_tokens(self): return num_tokens_from_messages(self.messages, self.model) # refer to https://github.com/openai/openai-cookbook/blob/main/examples/How_to_count_tokens_with_tiktoken.ipynb def num_tokens_from_messages(messages, model): """Returns the number of tokens used by a list of messages.""" import tiktoken try: encoding = tiktoken.encoding_for_model(model) except KeyError: logger.debug("Warning: model not found. Using cl100k_base encoding.") encoding = tiktoken.get_encoding("cl100k_base") if model == "gpt-3.5-turbo" or model == "gpt-35-turbo": return num_tokens_from_messages(messages, model="gpt-3.5-turbo-0301") elif model == "gpt-4": return num_tokens_from_messages(messages, model="gpt-4-0314") elif model == "gpt-3.5-turbo-0301": tokens_per_message = ( 4 # every message follows <|start|>{role/name}\n{content}<|end|>\n ) tokens_per_name = -1 # if there's a name, the role is omitted elif model == "gpt-4-0314": tokens_per_message = 3 tokens_per_name = 1 else: logger.warn( f"num_tokens_from_messages() is not implemented for model {model}. Returning num tokens assuming gpt-3.5-turbo-0301." ) return num_tokens_from_messages(messages, model="gpt-3.5-turbo-0301") num_tokens = 0 for message in messages: num_tokens += tokens_per_message for key, value in message.items(): num_tokens += len(encoding.encode(value)) if key == "name": num_tokens += tokens_per_name num_tokens += 3 # every reply is primed with <|start|>assistant<|message|> return num_tokens