sacn_accout_system/server/main.py

356 lines
12 KiB
Python

from fastapi.security import OAuth2PasswordBearer
from fastapi import FastAPI, Cookie, Response, Form
from fastapi.templating import Jinja2Templates
from fastapi.responses import RedirectResponse, HTMLResponse
from datetime import timedelta, datetime
from contextlib import asynccontextmanager
from . import db
from . import email as email_sys
from . import cfg
from . import admin
import hashlib
import uuid
from typing import Annotated
import re
import asyncio
import uvicorn
from . import cfg
import pygtrie
import os
import sys
import importlib
def load_cfg():
global ACCESS_TOKEN_EXPIRE_MINUTES
global ACCESS_EMAIL_EXPIRE_MINUTES
global ROOT
global CLEAN_TIMEOUT
global MANAGE_KEY
global REDIRECT_URL_WHITELIST
ACCESS_TOKEN_EXPIRE_MINUTES = cfg.config["common"]["access_token_expire_minutes"]
ACCESS_EMAIL_EXPIRE_MINUTES = cfg.config["common"]["access_email_expire_minutes"]
ROOT = cfg.config["common"]["root"]
CLEAN_TIMEOUT = cfg.config["common"]["clean_timeout"]
MANAGE_KEY = cfg.config["common"]["manage_key"]
REDIRECT_URL_WHITELIST = cfg.config["common"]["redirect_url_whitelist"]
load_cfg()
tokens = pygtrie.StringTrie()
apikeys = pygtrie.StringTrie()
emails = pygtrie.StringTrie()
email_send_lst = []
def prep_uuid(uuid: str):
return '/'.join(list(uuid))
def clean_uuid(uuid: str):
return uuid.replace("/", "")
async def run_clean():
sys.stderr.write("==> clean\n")
for k, v in tokens.items():
if (v[1] < datetime.now()):
del tokens[k]
for k, v in apikeys.items():
if (v[1] < datetime.now()):
del apikeys[k]
for k, v in emails.items():
if (v[2] < datetime.now()):
del emails[k]
async def clean_sys():
while 1:
await asyncio.sleep(CLEAN_TIMEOUT)
await run_clean()
async def send_email():
while 1:
if (len(email_send_lst) > 0):
infos = email_send_lst.pop(0)
asyncio.create_task(email_sys.sendemail(infos[0], infos[1]))
await asyncio.sleep(0.1)
else:
await asyncio.sleep(5)
@asynccontextmanager
async def lifespan(app: FastAPI):
await db.connect_db()
asyncio.create_task(clean_sys())
asyncio.create_task(send_email())
yield
app = FastAPI(lifespan=lifespan)
templates = Jinja2Templates(directory="src")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def check_passwd(passwd: str):
if (len(passwd) < 8):
return 1
pattern = r'^(?![a-zA-Z]+$)(?!\d+$)(?![^\da-zA-Z\s]+$).{8,40}$'
if re.match(pattern, passwd):
return 0
else:
return 1
def check_username(passwd: str):
if (len(passwd) < 3):
return 1
pattern = r'[A-Za-z0-9\_\-]{3,16}'
if re.match(pattern, passwd):
return 0
else:
return 1
async def authenticate_user(username: str, password: str):
hashed_password = await db.get_user(username)
if not hashed_password:
return False
return hashed_password == hashlib.sha256(
password.encode("utf-8")).hexdigest()
async def create_token(username: str):
tkn = prep_uuid(uuid.uuid4().hex)
tokens[tkn] = (username, datetime.now() +
timedelta(minutes=float(ACCESS_TOKEN_EXPIRE_MINUTES)))
return tkn
async def check_token(tkn: str):
res = tokens.get(tkn, None)
if (res is None):
return ""
if (res[1] < datetime.now()):
del tokens[tkn]
return ""
return res[0]
async def check_apikey(tkn: str):
res = apikeys.get(tkn, None)
if (res is None):
return ""
if (res[1] < datetime.now()):
del apikeys[tkn]
return ""
return res[0]
@app.post("/api/login")
async def login_callback(response: Response, username: str = Form(), password: str = Form()):
if (check_username(username)):
return {"msg": cfg.lang["invalid_username"]}
if (check_passwd(password)):
return {"msg": cfg.lang["weak_passwd"]}
if (await authenticate_user(username, password)):
tokennow = await create_token(username)
tkn = prep_uuid(uuid.uuid4().hex)
apikeys[tkn] = tokens[tokennow]
response.set_cookie("session", clean_uuid(tokennow))
return {"msg": "", "key": clean_uuid(tkn)}
else:
return {"msg": cfg.lang["username_or_password_incorrect"], "key": ""}
regex = re.compile(
r'([A-Za-z0-9]+[.-_])*[A-Za-z0-9]+@[A-Za-z0-9-]+(\.[A-Z|a-z]{2,})+')
@app.post("/api/signup")
async def login_callback(username: str = Form(), password: str = Form(), email: str = Form()):
if (check_passwd(password)):
return {"msg": cfg.lang["weak_passwd"], "code": 1}
if (check_username(username)):
return {"msg": cfg.lang["invalid_username"]}
if (not re.fullmatch(regex, email)):
return {"msg": cfg.lang["invalid_email"], "code": 1}
if not (await db.check_user(username)):
tkn = prep_uuid(uuid.uuid4().hex)
emails[tkn] = (username, hashlib.sha256(
password.encode("utf-8")).hexdigest(), datetime.now() +
timedelta(minutes=float(ACCESS_EMAIL_EXPIRE_MINUTES)), email)
email_send_lst.append(
(email, ROOT+"/api/checkemail?uid="+clean_uuid(tkn)))
return {"msg": cfg.lang["verification_email"].format(ACCESS_EMAIL_EXPIRE_MINUTES), "code": 0}
else:
return {"msg": cfg.lang["username_exists"], "code": 1}
@app.get("/api/checkemail")
async def checkemail(uid: str):
uid = prep_uuid(uid)
if (uid not in emails):
return templates.TemplateResponse("checkemail.html", {"request": {}, "msg": cfg.lang["invalid_regid"], "ui": cfg.config["ui"], "lang": cfg.lang})
if (emails[uid][2] < datetime.now()):
del emails[uid]
return templates.TemplateResponse("checkemail.html", {"request": {}, "msg": cfg.lang["invalid_regid"], "ui": cfg.config["ui"], "lang": cfg.lang})
if (emails[uid][1] == ""):
return templates.TemplateResponse("checkemail.html", {"request": {}, "msg": cfg.lang["invalid_regid"], "ui": cfg.config["ui"], "lang": cfg.lang})
if await db.create_user(emails[uid][0], emails[uid][1], emails[uid][3]) == 0:
del emails[uid]
return templates.TemplateResponse("checkemail.html", {"request": {}, "msg": cfg.lang["created_successfully"], "ui": cfg.config["ui"], "lang": cfg.lang})
else:
del emails[uid]
return templates.TemplateResponse("checkemail.html", {"request": {}, "msg": cfg.lang["username_exists"], "ui": cfg.config["ui"], "lang": cfg.lang})
@app.get("/api/resetpasswd", response_class=HTMLResponse)
async def resetpasswd(uid: str, response: Response):
uid = prep_uuid(uid)
if (uid not in emails):
return templates.TemplateResponse("checkemail.html", {"request": {}, "msg": cfg.lang["invalid_checkid"], "ui": cfg.config["ui"], "lang": cfg.lang})
if (emails[uid][2] < datetime.now()):
del emails[uid]
return templates.TemplateResponse("checkemail.html", {"request": {}, "msg": cfg.lang["invalid_checkid"], "ui": cfg.config["ui"], "lang": cfg.lang})
if (emails[uid][1] != ""):
return templates.TemplateResponse("checkemail.html", {"request": {}, "msg": cfg.lang["invalid_checkid"], "ui": cfg.config["ui"], "lang": cfg.lang})
tokennow = await create_token(emails[uid][0])
tkn = prep_uuid(uuid.uuid4().hex)
apikeys[tkn] = tokens[tokennow]
response.set_cookie("session", clean_uuid(tokennow))
del emails[uid]
return f'<html><head><meta http-equiv="refresh" content="0;url=/user"><title>{cfg.lang["redirect"]}</title></head><body>{cfg.lang["redirect"]}</body></html>'
@app.post("/api/send_resetpasswd")
async def resetpasswd(username: str = Form()):
if (check_username(username)):
return {"msg": cfg.lang["invalid_username"]}
if (await db.check_user(username)):
email = await db.get_email(username)
tkn = prep_uuid(uuid.uuid4().hex)
emails[tkn] = (username, "", datetime.now() +
timedelta(minutes=float(ACCESS_EMAIL_EXPIRE_MINUTES)), email)
email_send_lst.append(
(email, ROOT+"/api/resetpasswd?uid="+clean_uuid(tkn)))
return {"msg": cfg.lang["verification_email"].format(ACCESS_EMAIL_EXPIRE_MINUTES), "code": 0}
else:
return {"msg": cfg.lang["username_not_exists"], "code": 1}
@app.get("/api/getinfo")
async def get_user_info(uid: str):
uid = prep_uuid(uid)
username = await check_apikey(uid)
if (username == ""):
return {"code": 1, "msg": cfg.lang["invalid_token"], "data": {}}
return {"code": 0, "msg": "", "data": {"username": username, "email": await db.get_email(username)}}
@app.post("/api/changepasswd")
async def changepasswd(password: str = Form(), session: Annotated[str | None, Cookie()] = None):
if (check_passwd(password)):
return {"msg": cfg.lang["weak_passwd"]}
if (session is not None):
session = prep_uuid(session)
username = await check_token(session)
if (username != ""):
await db.update_passwd(username, hashlib.sha256(
password.encode("utf-8")).hexdigest())
del tokens[session]
return {"msg": ""}
else:
return {"msg": cfg.lang["invalid_token"]}
else:
return {"msg": cfg.lang["invalid_token"]}
@app.get("/login")
async def login(state: str = "", client_id: str = "", redirect_url: str = "/user", session: Annotated[str | None, Cookie()] = None):
now_redirect_url = redirect_url.replace(
"https://", "").replace("http://", "").split("#")[0].split("/")[0]
if (now_redirect_url not in REDIRECT_URL_WHITELIST):
return templates.TemplateResponse("checkemail.html", {"request": {}, "msg": cfg.lang["invalid_redirect_url"], "ui": cfg.config["ui"], "lang": cfg.lang})
if (session is not None):
session = prep_uuid(session)
username = await check_token(session)
if (username != ""):
tkn = prep_uuid(uuid.uuid4().hex)
apikeys[tkn] = tokens[session]
return RedirectResponse(url=redirect_url+f"#access_token={clean_uuid(tkn)}&token_type=Bearer&state={state}")
return templates.TemplateResponse("login.html", {"request": {}, "redirect_url": redirect_url, "state": state, "ui": cfg.config["ui"], "lang": cfg.lang})
@app.get("/signup")
async def login(redirect_url: str):
return templates.TemplateResponse("signup.html", {"request": {}, "redirect_url": redirect_url, "ui": cfg.config["ui"], "lang": cfg.lang})
@app.get("/user")
async def login(session: Annotated[str | None, Cookie()] = None):
if (session is not None):
session = prep_uuid(session)
username = await check_token(session)
if (username == ""):
return RedirectResponse(url="/login?redirect_url=/user")
return templates.TemplateResponse("manage.html", {"request": {}, "ui": cfg.config["ui"], "lang": cfg.lang})
@app.get("/resetpasswd")
async def resetpasswd(response: Response):
return templates.TemplateResponse("resetpasswd.html", {"request": {}, "ui": cfg.config["ui"], "lang": cfg.lang})
@app.get("/manager/init")
async def init(key: str):
if (key != MANAGE_KEY):
return 1
await db.create_db()
return 0
plugins = []
async def reload_cfg():
global plugins
cfg.reload()
load_cfg()
email_sys.load_cfg()
for i in plugins:
await i.reload()
@app.get("/manager/reload")
async def reload(key: str):
sys.stderr.write("==> reload config\n")
if (key != MANAGE_KEY):
return 1
await reload_cfg()
return 0
admin.bind_admin(app, templates, run_clean, tokens, reload_cfg)
for i in os.listdir("plugin"):
if (len(i.split(".")) != 2 or i.split(".")[1] != 'py'):
continue
if (i.split(".")[0] not in cfg.config):
sys.stderr.write("--> disable "+i.split(".")[0]+"\n")
continue
sys.stderr.write("==> load "+i.split(".")[0]+"\n")
plugins.append(importlib.import_module("plugin."+i.split(".")[0]))
plugins[-1].main(app, ROOT, apikeys)
def run():
uvicorn.run(app, host="0.0.0.0", port=8000)