""" A simple wrapper for the official ChatGPT API """ import argparse import json import os import sys from datetime import date import openai import tiktoken from bot.bot import Bot from config import conf ENGINE = os.environ.get("GPT_ENGINE") or "text-chat-davinci-002-20221122" ENCODER = tiktoken.get_encoding("gpt2") def get_max_tokens(prompt: str) -> int: """ Get the max tokens for a prompt """ return 4000 - len(ENCODER.encode(prompt)) # ['text-chat-davinci-002-20221122'] class Chatbot: """ Official ChatGPT API """ def __init__(self, api_key: str, buffer: int = None) -> None: """ Initialize Chatbot with API key (from https://platform.openai.com/account/api-keys) """ openai.api_key = api_key or os.environ.get("OPENAI_API_KEY") self.conversations = Conversation() self.prompt = Prompt(buffer=buffer) def _get_completion( self, prompt: str, temperature: float = 0.5, stream: bool = False, ): """ Get the completion function """ return openai.Completion.create( engine=ENGINE, prompt=prompt, temperature=temperature, max_tokens=get_max_tokens(prompt), stop=["\n\n\n"], stream=stream, ) def _process_completion( self, user_request: str, completion: dict, conversation_id: str = None, user: str = "User", ) -> dict: if completion.get("choices") is None: raise Exception("ChatGPT API returned no choices") if len(completion["choices"]) == 0: raise Exception("ChatGPT API returned no choices") if completion["choices"][0].get("text") is None: raise Exception("ChatGPT API returned no text") completion["choices"][0]["text"] = completion["choices"][0]["text"].rstrip( "<|im_end|>", ) # Add to chat history self.prompt.add_to_history( user_request, completion["choices"][0]["text"], user=user, ) if conversation_id is not None: self.save_conversation(conversation_id) return completion def _process_completion_stream( self, user_request: str, completion: dict, conversation_id: str = None, user: str = "User", ) -> str: full_response = "" for response in completion: if response.get("choices") is None: raise Exception("ChatGPT API returned no choices") if len(response["choices"]) == 0: raise Exception("ChatGPT API returned no choices") if response["choices"][0].get("finish_details") is not None: break if response["choices"][0].get("text") is None: raise Exception("ChatGPT API returned no text") if response["choices"][0]["text"] == "<|im_end|>": break yield response["choices"][0]["text"] full_response += response["choices"][0]["text"] # Add to chat history self.prompt.add_to_history(user_request, full_response, user) if conversation_id is not None: self.save_conversation(conversation_id) def ask( self, user_request: str, temperature: float = 0.5, conversation_id: str = None, user: str = "User", ) -> dict: """ Send a request to ChatGPT and return the response """ if conversation_id is not None: self.load_conversation(conversation_id) completion = self._get_completion( self.prompt.construct_prompt(user_request, user=user), temperature, ) return self._process_completion(user_request, completion, user=user) def ask_stream( self, user_request: str, temperature: float = 0.5, conversation_id: str = None, user: str = "User", ) -> str: """ Send a request to ChatGPT and yield the response """ if conversation_id is not None: self.load_conversation(conversation_id) prompt = self.prompt.construct_prompt(user_request, user=user) return self._process_completion_stream( user_request=user_request, completion=self._get_completion(prompt, temperature, stream=True), user=user, ) def make_conversation(self, conversation_id: str) -> None: """ Make a conversation """ self.conversations.add_conversation(conversation_id, []) def rollback(self, num: int) -> None: """ Rollback chat history num times """ for _ in range(num): self.prompt.chat_history.pop() def reset(self) -> None: """ Reset chat history """ self.prompt.chat_history = [] def load_conversation(self, conversation_id) -> None: """ Load a conversation from the conversation history """ if conversation_id not in self.conversations.conversations: # Create a new conversation self.make_conversation(conversation_id) self.prompt.chat_history = self.conversations.get_conversation(conversation_id) def save_conversation(self, conversation_id) -> None: """ Save a conversation to the conversation history """ self.conversations.add_conversation(conversation_id, self.prompt.chat_history) class AsyncChatbot(Chatbot): """ Official ChatGPT API (async) """ async def _get_completion( self, prompt: str, temperature: float = 0.5, stream: bool = False, ): """ Get the completion function """ return openai.Completion.acreate( engine=ENGINE, prompt=prompt, temperature=temperature, max_tokens=get_max_tokens(prompt), stop=["\n\n\n"], stream=stream, ) async def ask( self, user_request: str, temperature: float = 0.5, user: str = "User", ) -> dict: """ Same as Chatbot.ask but async } """ completion = await self._get_completion( self.prompt.construct_prompt(user_request, user=user), temperature, ) return self._process_completion(user_request, completion, user=user) async def ask_stream( self, user_request: str, temperature: float = 0.5, user: str = "User", ) -> str: """ Same as Chatbot.ask_stream but async """ prompt = self.prompt.construct_prompt(user_request, user=user) return self._process_completion_stream( user_request=user_request, completion=await self._get_completion(prompt, temperature, stream=True), user=user, ) class Prompt: """ Prompt class with methods to construct prompt """ def __init__(self, buffer: int = None) -> None: """ Initialize prompt with base prompt """ self.base_prompt = ( os.environ.get("CUSTOM_BASE_PROMPT") or "You are ChatGPT, a large language model trained by OpenAI. Respond conversationally. Do not answer as the user. Current date: " + str(date.today()) + "\n\n" + "User: Hello\n" + "ChatGPT: Hello! How can I help you today? <|im_end|>\n\n\n" ) # Track chat history self.chat_history: list = [] self.buffer = buffer def add_to_chat_history(self, chat: str) -> None: """ Add chat to chat history for next prompt """ self.chat_history.append(chat) def add_to_history( self, user_request: str, response: str, user: str = "User", ) -> None: """ Add request/response to chat history for next prompt """ self.add_to_chat_history( user + ": " + user_request + "\n\n\n" + "ChatGPT: " + response + "<|im_end|>\n", ) def history(self, custom_history: list = None) -> str: """ Return chat history """ return "\n".join(custom_history or self.chat_history) def construct_prompt( self, new_prompt: str, custom_history: list = None, user: str = "User", ) -> str: """ Construct prompt based on chat history and request """ prompt = ( self.base_prompt + self.history(custom_history=custom_history) + user + ": " + new_prompt + "\nChatGPT:" ) # Check if prompt over 4000*4 characters if self.buffer is not None: max_tokens = 4000 - self.buffer else: max_tokens = 3200 if len(ENCODER.encode(prompt)) > max_tokens: # Remove oldest chat if len(self.chat_history) == 0: return prompt self.chat_history.pop(0) # Construct prompt again prompt = self.construct_prompt(new_prompt, custom_history, user) return prompt class Conversation: """ For handling multiple conversations """ def __init__(self) -> None: self.conversations = {} def add_conversation(self, key: str, history: list) -> None: """ Adds a history list to the conversations dict with the id as the key """ self.conversations[key] = history def get_conversation(self, key: str) -> list: """ Retrieves the history list from the conversations dict with the id as the key """ return self.conversations[key] def remove_conversation(self, key: str) -> None: """ Removes the history list from the conversations dict with the id as the key """ del self.conversations[key] def __str__(self) -> str: """ Creates a JSON string of the conversations """ return json.dumps(self.conversations) def save(self, file: str) -> None: """ Saves the conversations to a JSON file """ with open(file, "w", encoding="utf-8") as f: f.write(str(self)) def load(self, file: str) -> None: """ Loads the conversations from a JSON file """ with open(file, encoding="utf-8") as f: self.conversations = json.loads(f.read()) def main(): print( """ ChatGPT - A command-line interface to OpenAI's ChatGPT (https://chat.openai.com/chat) Repo: github.com/acheong08/ChatGPT """, ) print("Type '!help' to show a full list of commands") print("Press enter twice to submit your question.\n") def get_input(prompt): """ Multi-line input function """ # Display the prompt print(prompt, end="") # Initialize an empty list to store the input lines lines = [] # Read lines of input until the user enters an empty line while True: line = input() if line == "": break lines.append(line) # Join the lines, separated by newlines, and store the result user_input = "\n".join(lines) # Return the input return user_input def chatbot_commands(cmd: str) -> bool: """ Handle chatbot commands """ if cmd == "!help": print( """ !help - Display this message !rollback - Rollback chat history !reset - Reset chat history !prompt - Show current prompt !save_c - Save history to a conversation !load_c - Load history from a conversation !save_f - Save all conversations to a file !load_f - Load all conversations from a file !exit - Quit chat """, ) elif cmd == "!exit": exit() elif cmd == "!rollback": chatbot.rollback(1) elif cmd == "!reset": chatbot.reset() elif cmd == "!prompt": print(chatbot.prompt.construct_prompt("")) elif cmd.startswith("!save_c"): chatbot.save_conversation(cmd.split(" ")[1]) elif cmd.startswith("!load_c"): chatbot.load_conversation(cmd.split(" ")[1]) elif cmd.startswith("!save_f"): chatbot.conversations.save(cmd.split(" ")[1]) elif cmd.startswith("!load_f"): chatbot.conversations.load(cmd.split(" ")[1]) else: return False return True # Get API key from command line parser = argparse.ArgumentParser() parser.add_argument( "--api_key", type=str, required=True, help="OpenAI API key", ) parser.add_argument( "--stream", action="store_true", help="Stream response", ) parser.add_argument( "--temperature", type=float, default=0.5, help="Temperature for response", ) args = parser.parse_args() # Initialize chatbot chatbot = Chatbot(api_key=args.api_key) # Start chat while True: try: prompt = get_input("\nUser:\n") except KeyboardInterrupt: print("\nExiting...") sys.exit() if prompt.startswith("!"): if chatbot_commands(prompt): continue if not args.stream: response = chatbot.ask(prompt, temperature=args.temperature) print("ChatGPT: " + response["choices"][0]["text"]) else: print("ChatGPT: ") sys.stdout.flush() for response in chatbot.ask_stream(prompt, temperature=args.temperature): print(response, end="") sys.stdout.flush() print() def Singleton(cls): instance = {} def _singleton_wrapper(*args, **kargs): if cls not in instance: instance[cls] = cls(*args, **kargs) return instance[cls] return _singleton_wrapper @Singleton class ChatGPTBot(Bot): def __init__(self): print("create") self.bot = Chatbot(conf().get('open_ai_api_key')) def reply(self, query, context=None): if not context or not context.get('type') or context.get('type') == 'TEXT': if len(query) < 10 and "reset" in query: self.bot.reset() return "reset OK" return self.bot.ask(query)["choices"][0]["text"]