sacn_comments/server/main.py

144 lines
3.6 KiB
Python

from fastapi import FastAPI, Cookie, Response, Form
from fastapi.templating import Jinja2Templates
from datetime import timedelta, datetime
from contextlib import asynccontextmanager
from . import db
from . import cfg
import uuid
import asyncio
import uvicorn
from . import cfg
from typing import Annotated
import pygtrie
import sys
import json
import aiohttp
def load_cfg():
global ACCESS_TOKEN_EXPIRE_MINUTES
global CLEAN_TIMEOUT
global KEY
global AUTH_SERVER
ACCESS_TOKEN_EXPIRE_MINUTES = cfg.config["common"]["access_token_expire_minutes"]
CLEAN_TIMEOUT = cfg.config["common"]["clean_timeout"]
KEY = cfg.config["common"]["key"]
AUTH_SERVER = cfg.config["common"]["auth"]
load_cfg()
tokens = pygtrie.StringTrie()
def prep_uuid(uuid: str):
return '/'.join(list(uuid))
def clean_uuid(uuid: str):
return uuid.replace("/", "")
async def run_clean():
sys.stderr.write("==> clean\n")
for k, v in tokens.items():
if (v[1] < datetime.now()):
del tokens[k]
async def clean_sys():
while 1:
await asyncio.sleep(CLEAN_TIMEOUT)
await run_clean()
@asynccontextmanager
async def lifespan(app: FastAPI):
await db.connect_db()
asyncio.create_task(clean_sys())
yield
app = FastAPI(lifespan=lifespan)
templates = Jinja2Templates(directory="src")
async def create_token(username: str):
tkn = prep_uuid(uuid.uuid4().hex)
tokens[tkn] = (username, datetime.now() +
timedelta(minutes=float(ACCESS_TOKEN_EXPIRE_MINUTES)))
return clean_uuid(tkn)
async def check_token(tkn: str):
res = tokens.get(prep_uuid(tkn), None)
if (res is None):
return ""
if (res[1] < datetime.now()):
del tokens[tkn]
return ""
return res[0]
@app.get("/api/comments")
async def get_comments(route: str = "/test", page: int = 1):
return {
"comments": await db.show_comments(route, page),
"count": await db.get_count(route)
}
@app.post("/api/send_comment")
async def send_comment(text: str = Form(), route: str = "/test", session: Annotated[str | None, Cookie()] = None):
if (len(text) > 10000):
return {"code": 1, "msg": "评论过长"}
if (len(text) < 10):
return {"code": 1, "msg": "评论过短"}
if (session is None):
return {"code": 1, "msg": "登录过期"}
tkn = await check_token(session)
if (tkn == ""):
return {"code": 1, "msg": "登录过期"}
await db.create_comment(tkn, text, route)
return {"code": 0, "msg": ""}
@app.post("/api/login")
async def login(apikey: str, resp: Response):
async with aiohttp.ClientSession() as session:
async with session.get(AUTH_SERVER+"/api/getinfo?uid="+apikey) as response:
obj = json.loads((await response.content.read()).decode("utf-8"))
if (obj["code"] == 1):
return {"msg": obj["msg"]}
username = obj["data"]["username"]
tkn = await create_token(username)
resp.set_cookie("session", tkn)
return {}
@app.get("/manager/init")
async def init(key: str):
if (key != KEY):
return 1
await db.create_db()
return 0
@app.get("/loader.js")
async def loader():
return templates.TemplateResponse("loader.js", {"request": {}})
@app.get("/{route:path}")
async def show(route: str = "", session: Annotated[str | None, Cookie()] = None):
if (session is not None and await check_token(session) != ""):
islogin = 1
else:
islogin = 0
return templates.TemplateResponse("page.html", {"request": {}, "ui": cfg.config["ui"], "route": "/"+route, "islogin": islogin})
def run():
uvicorn.run(app, host="0.0.0.0", port=8020)