from fastapi import FastAPI, Cookie, Response, Form from fastapi.templating import Jinja2Templates from datetime import timedelta, datetime from contextlib import asynccontextmanager from . import db from . import cfg import uuid import asyncio import uvicorn from . import cfg from typing import Annotated import pygtrie import sys import json import aiohttp def load_cfg(): global ACCESS_TOKEN_EXPIRE_MINUTES global CLEAN_TIMEOUT global KEY global AUTH_SERVER ACCESS_TOKEN_EXPIRE_MINUTES = cfg.config["common"]["access_token_expire_minutes"] CLEAN_TIMEOUT = cfg.config["common"]["clean_timeout"] KEY = cfg.config["common"]["key"] AUTH_SERVER = cfg.config["common"]["auth"] load_cfg() tokens = pygtrie.StringTrie() def prep_uuid(uuid: str): return '/'.join(list(uuid)) def clean_uuid(uuid: str): return uuid.replace("/", "") async def run_clean(): sys.stderr.write("==> clean\n") for k, v in tokens.items(): if (v[1] < datetime.now()): del tokens[k] async def clean_sys(): while 1: await asyncio.sleep(CLEAN_TIMEOUT) await run_clean() @asynccontextmanager async def lifespan(app: FastAPI): await db.connect_db() asyncio.create_task(clean_sys()) yield app = FastAPI(lifespan=lifespan) templates = Jinja2Templates(directory="src") async def create_token(username: str): tkn = prep_uuid(uuid.uuid4().hex) tokens[tkn] = (username, datetime.now() + timedelta(minutes=float(ACCESS_TOKEN_EXPIRE_MINUTES))) return clean_uuid(tkn) async def check_token(tkn: str): res = tokens.get(prep_uuid(tkn), None) if (res is None): return "" if (res[1] < datetime.now()): del tokens[tkn] return "" return res[0] @app.get("/api/comments") async def get_comments(route: str = "/test", page: int = 1): return { "comments": await db.show_comments(route, page), "count": await db.get_count(route) } @app.post("/api/send_comment") async def send_comment(text: str = Form(), route: str = "/test", session: Annotated[str | None, Cookie()] = None): if (len(text) > 10000): return {"code": 1, "msg": "评论过长"} if (len(text) < 10): return {"code": 1, "msg": "评论过短"} if (session is None): return {"code": 1, "msg": "登录过期"} tkn = await check_token(session) if (tkn == ""): return {"code": 1, "msg": "登录过期"} await db.create_comment(tkn, text, route) return {"code": 0, "msg": ""} @app.post("/api/login") async def login(apikey: str, resp: Response): async with aiohttp.ClientSession() as session: async with session.get(AUTH_SERVER+"/api/getinfo?uid="+apikey) as response: obj = json.loads((await response.content.read()).decode("utf-8")) if (obj["code"] == 1): return {"msg": obj["msg"]} username = obj["data"]["username"] tkn = await create_token(username) resp.set_cookie("session", tkn) return {} @app.get("/api/callback") async def login_callback(): return templates.TemplateResponse("callback.html", {"request": {}, "ui": cfg.config["ui"]}) @app.get("/manager/init") async def init(key: str): if (key != KEY): return 1 await db.create_db() return 0 @app.get("/loader.js") async def loader(): return templates.TemplateResponse("loader.js", {"request": {}}) @app.get("/{route:path}") async def show(route: str = "", session: Annotated[str | None, Cookie()] = None): if (session is not None and await check_token(session) != ""): islogin = 1 else: islogin = 0 return templates.TemplateResponse("page.html", {"request": {}, "ui": cfg.config["ui"], "route": "/"+route, "islogin": islogin}) def run(): uvicorn.run(app, host="0.0.0.0", port=8020)