# encoding:utf-8 import importlib import json import os from common.singleton import singleton from common.sorted_dict import SortedDict from .event import * from .plugin import * from common.log import logger @singleton class PluginManager: def __init__(self): self.plugins = SortedDict(lambda k,v: v.priority,reverse=True) self.listening_plugins = {} self.instances = {} self.pconf = {} def register(self, name: str, desc: str, version: str, author: str, desire_priority: int = 0): def wrapper(plugincls): plugincls.name = name plugincls.desc = desc plugincls.version = version plugincls.author = author plugincls.priority = desire_priority plugincls.enabled = True self.plugins[name] = plugincls logger.info("Plugin %s_v%s registered" % (name, version)) return plugincls return wrapper def save_config(self): with open("plugins/plugins.json", "w", encoding="utf-8") as f: json.dump(self.pconf, f, indent=4, ensure_ascii=False) def load_config(self): logger.info("Loading plugins config...") modified = False if os.path.exists("plugins/plugins.json"): with open("plugins/plugins.json", "r", encoding="utf-8") as f: pconf = json.load(f) pconf['plugins'] = SortedDict(lambda k,v: v["priority"],pconf['plugins'],reverse=True) else: modified = True pconf = {"plugins": SortedDict(lambda k,v: v["priority"],reverse=True)} self.pconf = pconf if modified: self.save_config() return pconf def scan_plugins(self): logger.info("Scaning plugins ...") plugins_dir = "plugins" for plugin_name in os.listdir(plugins_dir): plugin_path = os.path.join(plugins_dir, plugin_name) if os.path.isdir(plugin_path): # 判断插件是否包含同名.py文件 main_module_path = os.path.join(plugin_path, plugin_name+".py") if os.path.isfile(main_module_path): # 导入插件 import_path = "{}.{}.{}".format(plugins_dir, plugin_name, plugin_name) main_module = importlib.import_module(import_path) pconf = self.pconf new_plugins = [] modified = False for name, plugincls in self.plugins.items(): if name not in pconf["plugins"]: new_plugins.append(plugincls) modified = True logger.info("Plugin %s not found in pconfig, adding to pconfig..." % name) pconf["plugins"][name] = {"enabled": plugincls.enabled, "priority": plugincls.priority} else: self.plugins[name].enabled = pconf["plugins"][name]["enabled"] self.plugins[name].priority = pconf["plugins"][name]["priority"] self.plugins._update_heap(name) # 更新下plugins中的顺序 if modified: self.save_config() return new_plugins def refresh_order(self): for event in self.listening_plugins.keys(): self.listening_plugins[event].sort(key=lambda name: self.plugins[name].priority, reverse=True) def activate_plugins(self): # 生成新开启的插件实例 for name, plugincls in self.plugins.items(): if plugincls.enabled: if name not in self.instances: instance = plugincls() self.instances[name] = instance for event in instance.handlers: if event not in self.listening_plugins: self.listening_plugins[event] = [] self.listening_plugins[event].append(name) self.refresh_order() def load_plugins(self): self.load_config() self.scan_plugins() pconf = self.pconf logger.debug("plugins.json config={}".format(pconf)) for name,plugin in pconf["plugins"].items(): if name not in self.plugins: logger.error("Plugin %s not found, but found in plugins.json" % name) self.activate_plugins() def emit_event(self, e_context: EventContext, *args, **kwargs): if e_context.event in self.listening_plugins: for name in self.listening_plugins[e_context.event]: if self.plugins[name].enabled and e_context.action == EventAction.CONTINUE: logger.debug("Plugin %s triggered by event %s" % (name,e_context.event)) instance = self.instances[name] instance.handlers[e_context.event](e_context, *args, **kwargs) return e_context def set_plugin_priority(self,name,priority): if name not in self.plugins: return False if self.plugins[name].priority == priority: return True self.plugins[name].priority = priority self.plugins._update_heap(name) self.pconf["plugins"][name]["priority"] = priority self.pconf["plugins"]._update_heap(name) self.save_config() self.refresh_order() return True def enable_plugin(self,name): if name not in self.plugins: return False if not self.plugins[name].enabled : self.plugins[name].enabled = True self.pconf["plugins"][name]["enabled"] = True self.save_config() self.activate_plugins() return True return True def disable_plugin(self,name): if name not in self.plugins: return False if self.plugins[name].enabled : self.plugins[name].enabled = False self.pconf["plugins"][name]["enabled"] = False self.save_config() return True return True def list_plugins(self): return self.plugins