from fastapi import FastAPI, Cookie, Response, Form, HTTPException from fastapi.templating import Jinja2Templates import hashlib from . import cfg from . import db from typing import Annotated def bind_admin(app: FastAPI, templates: Jinja2Templates, run_clean, tokens: list, reload_cfg): @app.get("/admin/auth/{key}") async def admin_auth(response: Response, key: str): if (cfg.config["common"]["manage_key"] == key): response.set_cookie("adminsession", key) return {} raise HTTPException(status_code=404) @app.get("/admin") async def admin(adminsession: Annotated[str | None, Cookie()] = None): if (cfg.config["common"]["manage_key"] != adminsession): raise HTTPException(status_code=404) return templates.TemplateResponse("admin.html", {"request": {}, "ui": cfg.config["ui"], "lang": cfg.lang}) @app.post("/admin/clean") async def admin_clean(adminsession: Annotated[str | None, Cookie()] = None): if (cfg.config["common"]["manage_key"] != adminsession): raise HTTPException(status_code=404) await run_clean() return {"msg": "Cleaned!"} @app.post("/admin/init") async def admin_init(adminsession: Annotated[str | None, Cookie()] = None): if (cfg.config["common"]["manage_key"] != adminsession): raise HTTPException(status_code=404) await db.create_db() return {"msg": "Init!"} @app.post("/admin/reload") async def admin_reload(adminsession: Annotated[str | None, Cookie()] = None): if (cfg.config["common"]["manage_key"] != adminsession): raise HTTPException(status_code=404) await reload_cfg() return {"msg": "Reloaded!"} @app.get("/admin/getinfo") async def admin_getinfo(adminsession: Annotated[str | None, Cookie()] = None): if (cfg.config["common"]["manage_key"] != adminsession): raise HTTPException(status_code=404) return {"users": await db.get_userscount(), "tokens": len(tokens), "msg": ""} @app.get("/admin/cfg") async def admin_getcfg(adminsession: Annotated[str | None, Cookie()] = None): if (cfg.config["common"]["manage_key"] != adminsession): raise HTTPException(status_code=404) with open("config/config.toml", 'r', encoding='utf-8') as file: cfgs = file.read() return {"cfg": cfgs, "msg": ""} @app.post("/admin/cfg") async def admin_setcfg(wcfg: str = Form(), adminsession: Annotated[str | None, Cookie()] = None): if (cfg.config["common"]["manage_key"] != adminsession): raise HTTPException(status_code=404) with open("config/config.toml", 'w', encoding='utf-8') as file: file.write(wcfg) return {"msg": "Saved!"} @app.post("/admin/create") async def admin_createuser(username: str = Form(), passwd: str = Form(), email: str = Form(), adminsession: Annotated[str | None, Cookie()] = None): if (cfg.config["common"]["manage_key"] != adminsession): raise HTTPException(status_code=404) if await db.create_user(username, hashlib.sha256(passwd.encode("utf-8")).hexdigest(), email) == 0: return {"msg": "Created!"} return {"msg": "Fail!"} @app.get("/admin/users") async def admin_getusers(page: int = 1, username: str = "", adminsession: Annotated[str | None, Cookie()] = None): if (cfg.config["common"]["manage_key"] != adminsession): raise HTTPException(status_code=404) PAGE_CNT = 20 users = await db.search_user(page, username, PAGE_CNT) if (users is None): users = [] return {"msg": "", "users": users, "pages": (((await db.search_user_len(page, username, PAGE_CNT))-1)//PAGE_CNT)+1} @app.delete("/admin/users") async def admin_deleteusers(username: str, adminsession: Annotated[str | None, Cookie()] = None): if (cfg.config["common"]["manage_key"] != adminsession): raise HTTPException(status_code=404) await db.delete_user(username) return {"msg": ""} @app.post("/admin/users") async def admin_ch_passwd(username: str = Form(), passwd: str = Form(), adminsession: Annotated[str | None, Cookie()] = None): if (cfg.config["common"]["manage_key"] != adminsession): raise HTTPException(status_code=404) await db.update_passwd(username, hashlib.sha256(passwd.encode("utf-8")).hexdigest()) return {"msg": ""}