peinjector/src/loader.py

322 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#####################################
## PEinjector/loader ##
#####################################
import utils
import os
import shutil
import log
import json
import traceback
import config
import action
loaded_package = [] # 加载过包
loaderr_pkgs = [] # 加载错误的包
disk = "" # PEinjector 安装盘
lists = [] # 软件包列表
def __version_parse(version: str) -> list:
return [int(i) for i in version.split(".")]
def __version_compare(ver1: str, ver2: str) -> int:
version1 = __version_parse(ver1)
version2 = __version_parse(ver2)
for (i, j) in zip(version1, version2):
if i != j:
return 1 if i > j else -1
return 0
def version_check(configuration: dict, pkg_name: str) -> int:
# 消息格式
ERROR_MESSAGE = "load moudle [{}] failed: PEinjector version too {}, need [{}]"
try:
if not "compatibility" in configuration or not "injector" in configuration["compatibility"]:
return 0
with open(f"{disk}/PEinjector/VERSION", 'r', encoding="utf-8") as f:
version = f.readlines()[0].rstrip("\n\r")
if "min" in configuration["compatibility"]["injector"]:
plugver = configuration["compatibility"]["injector"]["min"]
if __version_compare(version, plugver) == -1: # 版本过低
log.warn(ERROR_MESSAGE.format(pkg_name, "low", plugver))
return 5
if "max" in configuration["compatibility"]["injector"]:
plugver = configuration["compatibility"]["injector"]["max"]
if __version_compare(version, plugver) == 1: # 版本或高
log.warn(ERROR_MESSAGE.format(pkg_name, "high", plugver))
return 6
except Exception as e: # 未知错误
log.warn(f"load moudle [{pkg_name}] failed: {e}")
return 7
return 0
def file_check(file_json: dict, pkg_name: str) -> int:
try:
for i in file_json.get("compatibility", {}).get("file", {}).get("must", []):
if not os.path.exists(i): # 找不到文件
log.warn(f"load moudle [{pkg_name}] failed: " +
f"Cannot find file: [{i}]")
return 8
for i in file_json.get("compatibility", {}).get("file", {}).get("mustnot", []):
if os.path.exists(i): # 找到不兼容文件
log.warn(f"load moudle [{pkg_name}] failed: " +
f"Find incompatible file: [{i}]")
return 9
for i in file_json.get("compatibility", {}).get("file", {}).get("loaded", []):
if os.path.exists(i): # 已经无需加载
log.info(f"jump moudle [{pkg_name}]: " +
f"Find compatible file: [{i}]")
return -1
except: # 未知错误
log.warn(f"load moudle [{pkg_name}] failed: " +
"Unknown error in file check")
return 10
return 0
actions = {"onboot": [], "onload": []}
def load_package(pkg_name: str) -> int:
pkg_path = f"{disk}/PEinjector/package/{pkg_name}"
loaded_package.append(pkg_name)
if "manifest.json" not in os.listdir(pkg_path): # 连manifest都没有
log.warn(f"load moudle [{pkg_name}] failed: " +
"Cannot find manifest.json")
return 1
try:
with open(pkg_path+"/"+"manifest.json", "r", encoding="utf-8") as file:
file_json = json.load(file)
except json.decoder.JSONDecodeError: # json格式错误
log.warn(f"load moudle [{pkg_name}] failed: " +
"Json syntax error")
return 3
except Exception as exp:
log.warn(f"load moudle [{pkg_name}] failed: " +
"Unknown error in read file ("+repr(exp)+")")
return 2
for i in ("version", "name", "author", "introduce"): # 检查必须字段
if i not in file_json:
log.warn(f"load moudle [{pkg_name}] failed: " +
f"\"{i}\" key not in manifest.json")
return 4
retvar = version_check(file_json, pkg_name) # 检查版本号
if retvar != 0:
return retvar
retvar = file_check(file_json, pkg_name) # 检查所需文件
if retvar == -1:
return 0
if retvar != 0:
return retvar
if "dependence" in file_json: # 依赖加载
try:
for i in file_json["dependence"]:
if i not in lists: # 找不到依赖
log.warn(f"load moudle [{pkg_name}] failed: " +
f"Cannot find dependence [{i}]")
return 11
if i in loaderr_pkgs: # 软件包已经加载过但出错
log.warn(f"load moudle [{pkg_name}] failed: " +
f"dependence [{i}] loaded failed")
return 12
if i not in loaded_package: # 软件包未加载
log.info(f"load moudle [{i}] from [{pkg_name}]")
retvar = load_package(i) # (递归)加载软件包
if retvar != 0: # 加载出错
loaderr_pkgs.append(i)
log.warn(f"load moudle [{pkg_name}] failed: " +
f"dependence [{i}] loaded failed")
return 12
except:
log.warn(f"load moudle [{pkg_name}] failed: " +
f"Cannot find dependence [{i}]")
data_list = []
if "data" in file_json: # 对于数据的预处理实际在load处理链接
j = -1 # 计数
prep_flag = False # 判断 data 是否初始化
if pkg_name not in os.listdir(config.DATAPATH.format(DISK=utils.find_disk())):
log.info(f"prep moudle [{pkg_name}] data")
prep_flag = True
for i in file_json["data"]:
j += 1
# 语法检查缺少成分
flag = False
for event in ("from", "to"):
if event not in i or i[event] == "":
log.warn(f"load moudle [{pkg_name}] warning: " +
f"Load data syntax error on {event}[{j}] (lost \"{event}\"), igrone")
flag = True
break
if flag:
continue
# 添加
data_list.append(i)
# 初始化 data
if prep_flag:
data_dir = config.DATAPATH.format(
DISK=utils.find_disk())+"/"+pkg_name
os.mkdir(data_dir)
if os.path.isfile(pkg_path+"/"+i["from"]):
shutil.copyfile((
pkg_path+"/"+i["from"]).replace("/", "\\"), (data_dir+"/"+i["to"]))
else:
shutil.copytree(
pkg_path+"/"+i["from"], data_dir+"/"+i["to"])
# 添加过头部标记
onload_add_action_head = False
onboot_add_action_head = False
if "load" in file_json:
open_symlink = config.USE_SYMLINK # 启用链接加载
if "symlink" in file_json["load"] and file_json["load"] == False: # 软件包明确禁用
open_symlink = False
if "mode" in file_json["load"]:
for event in ("onboot", "onload"):
if event not in file_json["load"]["mode"]:
continue
# 添加软件头
actions[event].append((0, pkg_name))
if event == "onload":
onload_add_action_head = True
if event == "onboot":
onboot_add_action_head = True
# 语法类型检查
if type(file_json["load"]["mode"][event]) != list:
log.warn(f"load moudle [{pkg_name}] failed: " +
f"Load commands syntax error on {event} (must be a list)")
return 13
j = -1 # 软件包action计数从 0 开始用于log
for i in file_json["load"]["mode"][event]:
j += 1
if "type" not in i: # 未知类型
log.warn(f"load moudle [{pkg_name}] warning: " +
f"Load commands syntax error on {event}[{j+1}] (lost \"type\"), igrone")
continue
# 强制复制(复制)
if i["type"] == "force_copy" or (i["type"] == "copy" and open_symlink == False):
# 语法检查缺少成分
for check in ("from", "to"):
if check not in i:
flag = True
log.warn(f"load moudle [{pkg_name}] warning: " +
f"Load commands syntax error on {event}[{j+1}] (lost \"{check}\"), igrone")
break
if flag:
continue
# 检查路径有效性
if not os.path.exists(pkg_path+"/"+i["from"]):
log.warn(f"load moudle [{pkg_name}] warning: " +
f"Load commands file error on {event}[{j+1}] (cannot find \"{i['from']}\"), igrone")
continue
# 添加
actions[event] += action.force_copy(pkg_path,
pkg_name, data_list, i["from"], i["to"])
# 复制(链接)
elif i["type"] == "copy":
# 语法检查缺少成分
flag = False
for check in ("from", "to"):
if check not in i:
flag = True
log.warn(f"load moudle [{pkg_name}] warning: " +
f"Load commands syntax error on {event}[{j+1}] (lost \"{check}\"), igrone")
break
if flag:
continue
# 检查路径有效性
if not os.path.exists(pkg_path+"/"+i["from"]):
log.warn(f"load moudle [{pkg_name}] warning: " +
f"Load commands file error on {event}[{j+1}] (cannot find \"{i['from']}\"), igrone")
continue
# 添加
actions[event] += action.copy(pkg_path, pkg_name, data_list,
i["from"], i["to"])
elif i["type"] == "start": # 非阻塞启动程序
# 语法检查缺少成分
flag = False
for check in ("command", ):
if check not in i:
flag = True
log.warn(f"load moudle [{pkg_name}] warning: " +
f"Load commands syntax error on {event}[{j+1}] (lost \"{check}\"), igrone")
break
if flag:
continue
# 添加
actions[event] += action.start(
pkg_name, data_list, i["command"])
else:
log.warn(f"load moudle [{pkg_name}] warning: " +
f"Load commands syntax error on {event}[{j+1}] (unknown type), igrone")
if "start" in file_json:
if "icon" in file_json["start"]:
# 补充包开始加载声明
if not onload_add_action_head:
actions["onload"].append((0, pkg_name))
# 语法检查缺少成分
for i in ("command", "name", "icon"):
if i not in file_json["start"]["icon"]:
log.warn(f"load moudle [{pkg_name}] warning: " +
f"Load icon syntax error (lost \"{i}\"), igrone")
# 相对路径转绝对路径
if len(file_json["start"]["icon"]["icon"]) < 2 or file_json["start"]["icon"]["icon"][1] != ':':
file_json["start"]["icon"]["icon"] = pkg_path + \
"/"+file_json["start"]["icon"]["icon"]
if len(file_json["start"]["icon"]["command"]) < 2 or file_json["start"]["icon"]["command"][1] != ':':
file_json["start"]["icon"]["command"] = pkg_path + \
"/"+file_json["start"]["icon"]["command"]
# 添加
actions["onload"].append(
(6, file_json["start"]["icon"]["command"], file_json["start"]["icon"]["icon"], file_json["start"]["icon"]["name"]))
if "path" in file_json["start"]:
# 补充包开始加载声明
if not onload_add_action_head:
actions["onload"].append((0, pkg_name))
for i in file_json["start"]["path"]: # 加载多个path
# 相对路径转绝对路径
if len(i) < 2 or i[1] != ':':
i = pkg_path + \
"/"+i
# 添加
actions["onload"].append((7, i))
if "reg" in file_json: # 添加注册表
for i in file_json["reg"]: # 多个注册表文件
# 相对路径转绝对路径
if len(i) < 2 or i[1] != ':':
i = pkg_path + \
"/"+i
# 添加
actions["onload"].append((8, i))
def load():
try:
global disk, lists
log.info("start load")
disk = utils.find_disk()
lists = os.listdir(f"{disk}/PEinjector/package")
# 读取禁用包列表
with open(config.DISABLEPATH.format(DISK=disk), "r") as file:
disable_packages = [i.rstrip("\n\r") for i in file.readlines()]
for packs in lists: # 加载包
if packs not in loaded_package and packs not in disable_packages:
log.info(f"load moudle [{packs}]")
retvar = load_package(packs)
if retvar == 0:
loaderr_pkgs.append(packs)
# 保存并执行action
action.save_action(actions["onload"])
alog = action.do_action(actions["onboot"])[0]
# 写action日志
with open(config.ACTIONLOGPATH.format(DISK=utils.find_disk()), "w") as file:
file.write(alog)
except Exception as exp: # 未知错误
log.break_err("Exception \n"+str(traceback.format_exc(exp)))
raise exp