##################################### ## PEinjector/loader ## ##################################### import utils import os import shutil import log import json import traceback import config import action loaded_package = [] # 加载过包 loaderr_pkgs = [] # 加载错误的包 disk = "" # PEinjector 安装盘 lists = [] # 软件包列表 def __version_parse(version: str) -> list: return [int(i) for i in version.split(".")] def __version_compare(ver1: str, ver2: str) -> int: version1 = __version_parse(ver1) version2 = __version_parse(ver2) for (i, j) in zip(version1, version2): if i != j: return 1 if i > j else -1 return 0 def version_check(configuration: dict, pkg_name: str) -> int: # 消息格式 ERROR_MESSAGE = "load moudle [{}] failed: PEinjector version too {}, need [{}]" try: if not "compatibility" in configuration or not "injector" in configuration["compatibility"]: return 0 with open(f"{disk}/PEinjector/VERSION", 'r', encoding="utf-8") as f: version = f.readlines()[0].rstrip("\n\r") if "min" in configuration["compatibility"]["injector"]: plugver = configuration["compatibility"]["injector"]["min"] if __version_compare(version, plugver) == -1: # 版本过低 log.warn(ERROR_MESSAGE.format(pkg_name, "low", plugver)) return 5 if "max" in configuration["compatibility"]["injector"]: plugver = configuration["compatibility"]["injector"]["max"] if __version_compare(version, plugver) == 1: # 版本或高 log.warn(ERROR_MESSAGE.format(pkg_name, "high", plugver)) return 6 except Exception as e: # 未知错误 log.warn(f"load moudle [{pkg_name}] failed: {e}") return 7 return 0 def file_check(file_json: dict, pkg_name: str) -> int: try: for i in file_json.get("compatibility", {}).get("file", {}).get("must", []): if not os.path.exists(i): # 找不到文件 log.warn(f"load moudle [{pkg_name}] failed: " + f"Cannot find file: [{i}]") return 8 for i in file_json.get("compatibility", {}).get("file", {}).get("mustnot", []): if os.path.exists(i): # 找到不兼容文件 log.warn(f"load moudle [{pkg_name}] failed: " + f"Find incompatible file: [{i}]") return 9 for i in file_json.get("compatibility", {}).get("file", {}).get("loaded", []): if os.path.exists(i): # 已经无需加载 log.info(f"jump moudle [{pkg_name}]: " + f"Find compatible file: [{i}]") return -1 except: # 未知错误 log.warn(f"load moudle [{pkg_name}] failed: " + "Unknown error in file check") return 10 return 0 actions = {"onboot": [], "onload": []} def load_package(pkg_name: str) -> int: pkg_path = f"{disk}/PEinjector/package/{pkg_name}" loaded_package.append(pkg_name) if "manifest.json" not in os.listdir(pkg_path): # 连manifest都没有 log.warn(f"load moudle [{pkg_name}] failed: " + "Cannot find manifest.json") return 1 try: with open(pkg_path+"/"+"manifest.json", "r", encoding="utf-8") as file: file_json = json.load(file) except json.decoder.JSONDecodeError: # json格式错误 log.warn(f"load moudle [{pkg_name}] failed: " + "Json syntax error") return 3 except Exception as exp: log.warn(f"load moudle [{pkg_name}] failed: " + "Unknown error in read file ("+repr(exp)+")") return 2 for i in ("version", "name", "author", "introduce"): # 检查必须字段 if i not in file_json: log.warn(f"load moudle [{pkg_name}] failed: " + f"\"{i}\" key not in manifest.json") return 4 retvar = version_check(file_json, pkg_name) # 检查版本号 if retvar != 0: return retvar retvar = file_check(file_json, pkg_name) # 检查所需文件 if retvar == -1: return 0 if retvar != 0: return retvar if "dependence" in file_json: # 依赖加载 try: for i in file_json["dependence"]: if i not in lists: # 找不到依赖 log.warn(f"load moudle [{pkg_name}] failed: " + f"Cannot find dependence [{i}]") return 11 if i in loaderr_pkgs: # 软件包已经加载过但出错 log.warn(f"load moudle [{pkg_name}] failed: " + f"dependence [{i}] loaded failed") return 12 if i not in loaded_package: # 软件包未加载 log.info(f"load moudle [{i}] from [{pkg_name}]") retvar = load_package(i) # (递归)加载软件包 if retvar != 0: # 加载出错 loaderr_pkgs.append(i) log.warn(f"load moudle [{pkg_name}] failed: " + f"dependence [{i}] loaded failed") return 12 except: log.warn(f"load moudle [{pkg_name}] failed: " + f"Cannot find dependence [{i}]") data_list = [] if "data" in file_json: # 对于数据的预处理,实际在load处理链接 j = -1 # 计数 prep_flag = False # 判断 data 是否初始化 if pkg_name not in os.listdir(config.DATAPATH.replace("{DISK}", utils.find_disk())): log.info(f"prep moudle [{pkg_name}] data") prep_flag = True for i in file_json["data"]: j += 1 # 语法检查缺少成分 flag = False for event in ("from", "to"): if event not in i or i[event] == "": log.warn(f"load moudle [{pkg_name}] warning: " + f"Load data syntax error on {event}[{j}] (lost \"{event}\"), igrone") flag = True break if flag: continue # 添加 data_list.append(i) # 初始化 data if prep_flag: data_dir = config.DATAPATH.replace( "{DISK}", utils.find_disk())+"/"+pkg_name os.mkdir(data_dir) if os.path.isfile(pkg_path+"/"+i["from"]): shutil.copyfile(( pkg_path+"/"+i["from"]).replace("/", "\\"), (data_dir+"/"+i["to"])) else: shutil.copytree( pkg_path+"/"+i["from"], data_dir+"/"+i["to"]) # 添加过头部标记 onload_add_action_head = False onboot_add_action_head = False if "load" in file_json: open_symlink = config.USE_SYMLINK # 启用链接加载 if "symlink" in file_json["load"] and file_json["load"] == False: # 软件包明确禁用 open_symlink = False if "mode" in file_json["load"]: for event in ("onboot", "onload"): if event not in file_json["load"]["mode"]: continue # 添加软件头 actions[event].append((0, pkg_name)) if event == "onload": onload_add_action_head = True if event == "onboot": onboot_add_action_head = True # 语法类型检查 if type(file_json["load"]["mode"][event]) != list: log.warn(f"load moudle [{pkg_name}] failed: " + f"Load commands syntax error on {event} (must be a list)") return 13 j = -1 # 软件包action计数,从 0 开始,用于log for i in file_json["load"]["mode"][event]: j += 1 if "type" not in i: # 未知类型 log.warn(f"load moudle [{pkg_name}] warning: " + f"Load commands syntax error on {event}[{j+1}] (lost \"type\"), igrone") continue # 强制复制(复制) if i["type"] == "force_copy" or (i["type"] == "copy" and open_symlink == False): # 语法检查缺少成分 for check in ("from", "to"): if check not in i: flag = True log.warn(f"load moudle [{pkg_name}] warning: " + f"Load commands syntax error on {event}[{j+1}] (lost \"{check}\"), igrone") break if flag: continue # 检查路径有效性 if not os.path.exists(pkg_path+"/"+i["from"]): log.warn(f"load moudle [{pkg_name}] warning: " + f"Load commands file error on {event}[{j+1}] (cannot find \"{i['from']}\"), igrone") continue # 添加 actions[event] += action.force_copy(pkg_path, pkg_name, data_list, i["from"], i["to"]) # 复制(链接) elif i["type"] == "copy": # 语法检查缺少成分 flag = False for check in ("from", "to"): if check not in i: flag = True log.warn(f"load moudle [{pkg_name}] warning: " + f"Load commands syntax error on {event}[{j+1}] (lost \"{check}\"), igrone") break if flag: continue # 检查路径有效性 if not os.path.exists(pkg_path+"/"+i["from"]): log.warn(f"load moudle [{pkg_name}] warning: " + f"Load commands file error on {event}[{j+1}] (cannot find \"{i['from']}\"), igrone") continue # 添加 actions[event] += action.copy(pkg_path, pkg_name, data_list, i["from"], i["to"]) elif i["type"] == "start": # 非阻塞启动程序 # 语法检查缺少成分 flag = False for check in ("command", ): if check not in i: flag = True log.warn(f"load moudle [{pkg_name}] warning: " + f"Load commands syntax error on {event}[{j+1}] (lost \"{check}\"), igrone") break if flag: continue # 添加 actions[event] += action.start( pkg_name, data_list, i["command"]) else: log.warn(f"load moudle [{pkg_name}] warning: " + f"Load commands syntax error on {event}[{j+1}] (unknown type), igrone") if "start" in file_json: if "icon" in file_json["start"]: # 补充包开始加载声明 if not onload_add_action_head: actions["onload"].append((0, pkg_name)) # 语法检查缺少成分 for i in ("command", "name", "icon"): if i not in file_json["start"]["icon"]: log.warn(f"load moudle [{pkg_name}] warning: " + f"Load icon syntax error (lost \"{i}\"), igrone") # 相对路径转绝对路径 if len(file_json["start"]["icon"]["icon"]) < 2 or file_json["start"]["icon"]["icon"][1] != ':': file_json["start"]["icon"]["icon"] = pkg_path + \ "/"+file_json["start"]["icon"]["icon"] if len(file_json["start"]["icon"]["command"]) < 2 or file_json["start"]["icon"]["command"][1] != ':': file_json["start"]["icon"]["command"] = pkg_path + \ "/"+file_json["start"]["icon"]["command"] # 添加 actions["onload"].append( (6, file_json["start"]["icon"]["command"], file_json["start"]["icon"]["icon"], file_json["start"]["icon"]["name"])) if "path" in file_json["start"]: # 补充包开始加载声明 if not onload_add_action_head: actions["onload"].append((0, pkg_name)) for i in file_json["start"]["path"]: # 加载多个path # 相对路径转绝对路径 if len(i) < 2 or i[1] != ':': i = pkg_path + \ "/"+i # 添加 actions["onload"].append((7, i)) if "reg" in file_json: # 添加注册表 for i in file_json["reg"]: # 多个注册表文件 # 相对路径转绝对路径 if len(i) < 2 or i[1] != ':': i = pkg_path + \ "/"+i # 添加 actions["onload"].append((8, i)) def load(): try: global disk, lists log.info("start load") disk = utils.find_disk() lists = os.listdir(f"{disk}/PEinjector/package") # 读取禁用包列表 with open(config.DISABLEPATH.replace("{DISK}", disk), "r") as file: disable_packages = [i.rstrip("\n\r") for i in file.readlines()] for packs in lists: # 加载包 if packs not in loaded_package and packs not in disable_packages: log.info(f"load moudle [{packs}]") retvar = load_package(packs) if retvar == 0: loaderr_pkgs.append(packs) # 保存并执行action action.save_action(actions["onload"]) alog = action.do_action(actions["onboot"])[0] # 写action日志 with open(config.ACTIONLOGPATH.replace("{DISK}", utils.find_disk()), "w") as file: file.write(alog) except Exception as exp: # 未知错误 log.break_err("Exception \n"+str(traceback.format_exc(exp))) raise exp