添加熔断和数据库检查缓存

This commit is contained in:
cxykevin 2024-07-18 15:49:12 +08:00
parent 57392cec3d
commit 0874573935
6 changed files with 138 additions and 18 deletions

View File

@ -3,17 +3,25 @@ runmode = "webapi"
[webapi]
host = "127.0.0.1"
port = 3032
[[limit]]
enable = true
speed = 5
wait100ms = 6000
wait1s = 8000
wait10s = 30000
[console]
emacsmode = true
softstop = true
[db]
host = "127.0.0.1"
port = 3306
user = "root"
passwd = "passwd"
dbname = "myjsonDB"
user = "myjsondb"
passwd = "<your passwd>"
dbname = "myjsondb"
minsize = 100
maxsize = 100

View File

@ -22,5 +22,16 @@ DB_MAXSIZE: str = configs.get("db", {}).get("maxsize", 50)
USE_EMACE_MOD: bool = configs.get("console", {}).get("emacsmode", True)
SIGTERM_CMD: bool = configs.get("console", {}).get("softstop", True)
API_HOST: bool = configs.get("webapi", {}).get("host", "127.0.0.1")
API_PORT: bool = configs.get("webapi", {}).get("port", 3032)
API_HOST: str = configs.get("webapi", {}).get("host", "127.0.0.1")
API_PORT: int = configs.get("webapi", {}).get("port", 3032)
LIM_ENABLE: bool = configs.get("webapi", {}).get(
"limit", {}).get("enable", True)
LIM_SPEED: int = configs.get("webapi", {}).get(
"limit", {}).get("speed", 5)
LIM_WAIT100MS: int = configs.get("webapi", {}).get(
"limit", {}).get("wait1ms", 100)
LIM_WAIT1S: int = configs.get("webapi", {}).get(
"limit", {}).get("wait1s", 200)
LIM_WAIT10S: int = configs.get("webapi", {}).get(
"limit", {}).get("wait10s", 300)

View File

@ -6,6 +6,10 @@ import sys
import pickle
from typing import Any, Tuple
db_check_cache = {}
db_check_cache_lock = {}
db_list_cache = None
async def connect_realdb() -> None:
"""
@ -45,12 +49,20 @@ async def check_jsondb(name: str) -> bool | None:
Check a jsondb if it has been created.
@return: True(created) | False(not create) | None(error)
"""
global db_check_cache, db_check_cache_lock
await log.info("Check jsondb: "+str(name))
res = re.search(r'[0-9A-Za-z_]{2,20}', name)
if res is None or res.group() != name:
await log.warn("Check database name error: "+str(name))
return None
# get cache
cache_r = db_check_cache.get(name, None)
if (cache_r is not None):
return cache_r
async def callback(result):
if (len(result) == 0):
return False
@ -60,7 +72,13 @@ async def check_jsondb(name: str) -> bool | None:
async def err_callback(err_info):
await log.warn("Other error on create database: "+str(err_info))
return None
return await sqllink.execute_cmd(connects, command="SELECT * FROM sys_db_list WHERE name='"+name+"';", callback=callback, err_callback=err_callback)
do_res = await sqllink.execute_cmd(connects, command="SELECT * FROM sys_db_list WHERE name='"+name+"';", callback=callback, err_callback=err_callback)
# write cache
if (do_res is not None and name not in db_check_cache_lock):
db_check_cache[name] = do_res
return do_res
async def list_jsondb() -> Tuple[str] | None:
@ -68,15 +86,26 @@ async def list_jsondb() -> Tuple[str] | None:
## List all jsondb(s).
@return: `tuple | None(error)`
"""
global db_list_cache
await log.info("list jsondb")
if (db_list_cache is not None):
return db_list_cache
async def callback(result):
return [i[0] for i in result]
async def err_callback(err_info):
await log.warn("Other error on list databases: "+str(err_info))
return None
return await sqllink.execute_cmd(connects, command="SELECT * FROM sys_db_list;", callback=callback, err_callback=err_callback)
res = await sqllink.execute_cmd(connects, command="SELECT * FROM sys_db_list;", callback=callback, err_callback=err_callback)
if (res is not None):
db_list_cache = res
return res
async def create_jsondb(name: str) -> tuple[int, str | None]:
@ -84,12 +113,19 @@ async def create_jsondb(name: str) -> tuple[int, str | None]:
## Create a jsondb.
@return: `tuple[errorlevel:int, errormsg:str|None]`
"""
global db_check_cache, db_check_cache_lock, db_list_cache
await log.info("Create jsondb: "+str(name))
res = re.search(r'[0-9A-Za-z_]{2,20}', name)
if res is None or res.group() != name:
await log.warn("Create database name error: "+str(name))
return (1, "Name error: "+str(name))
db_check_cache_lock[name] = True
if (name in db_check_cache):
del db_check_cache[name]
async def callback():
return (0, None)
@ -100,7 +136,13 @@ async def create_jsondb(name: str) -> tuple[int, str | None]:
else:
await log.warn("Other error on create database: "+str(err_info))
return (2, "Other error: "+str(err_info))
return await sqllink.execute_cmd(connects, command="INSERT INTO sys_db_list (name) VALUES ('"+name+"');", commit=True, callback=callback, err_callback=err_callback)
res = await sqllink.execute_cmd(connects, command="INSERT INTO sys_db_list (name) VALUES ('"+name+"');", commit=True, callback=callback, err_callback=err_callback)
if (name in db_check_cache_lock):
del db_check_cache_lock[name]
db_list_cache = None
return res
async def create_table(dbname: str, name: str) -> tuple[int, str | None]:
@ -108,6 +150,7 @@ async def create_table(dbname: str, name: str) -> tuple[int, str | None]:
## Create a table in a jsondb.
@return: `tuple[errorlevel:int, errormsg:str|None]`
"""
await log.info("Create table '"+str(name)+"' in '"+str(dbname)+"'")
res = re.search(r'[0-9A-Za-z_]{2,20}', dbname)
@ -177,6 +220,9 @@ async def rm_jsondb(dbname: str) -> tuple[int, str | None]:
## Remove a jsondb and all tables in it.
@return: `tuple[errorlevel:int, errormsg:str|None]`
"""
global db_check_cache, db_check_cache_lock, db_list_cache
await log.info("Remove database in: "+str(dbname))
res = re.search(r'[0-9A-Za-z_]{2,20}', dbname)
@ -184,11 +230,15 @@ async def rm_jsondb(dbname: str) -> tuple[int, str | None]:
await log.warn("Database name error: "+str(dbname))
return (1, "DB name error"+str(dbname))
db_check_cache_lock[dbname] = True
if (dbname in db_check_cache):
del db_check_cache[dbname]
checks = await check_jsondb(dbname)
if checks == None:
return (2, "Other check error")
elif checks == False:
await log.warn("Create table cannot find database: "+str(dbname))
await log.warn("Remove cannot find database: "+str(dbname))
return (2, "Cannot find database: "+str(dbname))
async def callback() -> tuple[int, str | None]:
@ -214,10 +264,16 @@ async def rm_jsondb(dbname: str) -> tuple[int, str | None]:
except Exception as exp:
return await err_callback(repr(exp))
return await sqllink.execute_cmd(connects,
command=f"DELETE FROM sys_db_list WHERE `name`='{
dbname}';",
commit=True, callback=callback, err_callback=err_callback)
res = await sqllink.execute_cmd(connects,
command=f"DELETE FROM sys_db_list WHERE `name`='{
dbname}';",
commit=True, callback=callback, err_callback=err_callback)
if (dbname in db_check_cache_lock):
del db_check_cache_lock[dbname]
db_list_cache = None
return res
async def rm_table(dbname: str, name: str) -> tuple[int, str | None]:
@ -241,7 +297,7 @@ async def rm_table(dbname: str, name: str) -> tuple[int, str | None]:
if checks == None:
return (3, "Other check error")
elif checks == False:
await log.warn("Create table cannot find database: "+str(dbname))
await log.warn("Remove table cannot find database: "+str(dbname))
return (3, "Cannot find database: "+str(dbname))
async def callback():
@ -285,7 +341,7 @@ async def check_table(dbname: str, name: str) -> bool | None:
if checks == None:
return None
elif checks == False:
await log.warn("Create table cannot find database: "+str(dbname))
await log.warn("Check table cannot find database: "+str(dbname))
return None
async def callback(result):

26
src/reqlim.py Normal file
View File

@ -0,0 +1,26 @@
from src import config
import asyncio
req_num = 0
async def get_req():
global req_num
req_num += 1
if (req_num >= config.LIM_WAIT10S):
await asyncio.sleep(10)
elif (req_num >= config.LIM_WAIT1S):
await asyncio.sleep(1)
elif (req_num >= config.LIM_WAIT100MS):
await asyncio.sleep(0.1)
async def less_reqnum():
global req_num
while 1:
await asyncio.sleep(0.01)
req_num = max(req_num-config.LIM_SPEED, 0)
async def run_task():
asyncio.create_task(less_reqnum())

View File

@ -1,5 +1,6 @@
from src.webcore import app
from src import core
from src import reqlim
from pydantic import BaseModel
version = "v1"
@ -11,6 +12,7 @@ class KeyItem(BaseModel):
@app.get("/api/"+version+"/database", status_code=200)
async def list_database():
await reqlim.get_req()
ret = await core.list_jsondb()
if (ret is not None):
return {"status": 200, "return": ret}
@ -19,14 +21,16 @@ async def list_database():
@app.get("/api/"+version+"/database/{database_name}", status_code=200)
async def check_database(database_name: str):
await reqlim.get_req()
ret = await core.check_jsondb(database_name)
if (ret is not None):
return {"status": 200, "return": ret}
return {"status": 400, "errormsg": "Unknown Error"}
@app.post("/api/"+version+"/database/{database_name}", status_code=201)
@app.post("/api/"+version+"/database/{database_name}", status_code=200)
async def create_database(database_name: str):
await reqlim.get_req()
ret = await core.create_jsondb(database_name)
if (ret[0] == 0):
return {"status": 200}
@ -35,6 +39,7 @@ async def create_database(database_name: str):
@app.delete("/api/"+version+"/database/{database_name}", status_code=200)
async def delete_database(database_name: str):
await reqlim.get_req()
ret = await core.rm_jsondb(database_name)
if (ret[0] == 0):
return {"status": 200}
@ -43,6 +48,7 @@ async def delete_database(database_name: str):
@app.get("/api/"+version+"/database/{database_name}/table", status_code=200)
async def list_table(database_name: str):
await reqlim.get_req()
ret = await core.list_table(database_name)
if (ret is not None):
return {"status": 200, "return": ret}
@ -51,14 +57,16 @@ async def list_table(database_name: str):
@app.get("/api/"+version+"/database/{database_name}/table/{table_name}", status_code=200)
async def check_database(database_name: str, table_name: str):
await reqlim.get_req()
ret = await core.check_table(database_name, table_name)
if (ret is not None):
return {"status": 200, "return": ret}
return {"status": 400, "errormsg": "Unknown Error"}
@app.post("/api/"+version+"/database/{database_name}/table/{table_name}", status_code=201)
@app.post("/api/"+version+"/database/{database_name}/table/{table_name}", status_code=200)
async def create_table(database_name: str, table_name: str):
await reqlim.get_req()
ret = await core.create_table(database_name, table_name)
if (ret[0] == 0):
return {"status": 200}
@ -67,6 +75,7 @@ async def create_table(database_name: str, table_name: str):
@app.delete("/api/"+version+"/database/{database_name}/table/{table_name}", status_code=200)
async def delete_table(database_name: str, table_name: str):
await reqlim.get_req()
ret = await core.rm_table(database_name, table_name)
if (ret[0] == 0):
return {"status": 200}
@ -75,6 +84,7 @@ async def delete_table(database_name: str, table_name: str):
@app.get("/api/"+version+"/database/{database_name}/table/{table_name}/key", status_code=200)
async def list_key(database_name: str, table_name: str):
await reqlim.get_req()
ret = await core.list_key(database_name, table_name)
if (ret is not None):
return {"status": 200, "return": ret}
@ -83,14 +93,16 @@ async def list_key(database_name: str, table_name: str):
@app.get("/api/"+version+"/database/{database_name}/table/{table_name}/key/{key_name}/check", status_code=200)
async def create_key(database_name: str, table_name: str, key_name: int | str):
await reqlim.get_req()
ret = await core.check_key(database_name, table_name, key_name)
if (ret is not None):
return {"status": 200, "return": ret}
return {"status": 400, "errormsg": "Unknown Error"}
@app.post("/api/"+version+"/database/{database_name}/table/{table_name}/key/{key_name}", status_code=201)
@app.post("/api/"+version+"/database/{database_name}/table/{table_name}/key/{key_name}", status_code=200)
async def create_key(database_name: str, table_name: str, key_name: int | str, data: KeyItem):
await reqlim.get_req()
if (data.data is None):
return {"status": 400, "errormsg": "No data"}
ret = await core.create_key(database_name, table_name, key_name, data.data)
@ -101,6 +113,7 @@ async def create_key(database_name: str, table_name: str, key_name: int | str, d
@app.delete("/api/"+version+"/database/{database_name}/table/{table_name}/key/{key_name}", status_code=200)
async def delete_key(database_name: str, table_name: str, key_name: int | str):
await reqlim.get_req()
ret = await core.remove_key(database_name, table_name, key_name)
if (ret[0] == 0):
return {"status": 200}
@ -109,6 +122,7 @@ async def delete_key(database_name: str, table_name: str, key_name: int | str):
@app.patch("/api/"+version+"/database/{database_name}/table/{table_name}/key/{key_name}", status_code=200)
async def create_key(database_name: str, table_name: str, key_name: int | str, data: KeyItem):
await reqlim.get_req()
if (data.data is None):
return {"status": 400, "errormsg": "No data"}
ret = await core.change_key(database_name, table_name, key_name, data.data)
@ -119,6 +133,7 @@ async def create_key(database_name: str, table_name: str, key_name: int | str, d
@app.get("/api/"+version+"/database/{database_name}/table/{table_name}/key/{key_name}", status_code=200)
async def create_key(database_name: str, table_name: str, key_name: int | str):
await reqlim.get_req()
ret = await core.get_key(database_name, table_name, key_name)
if (ret is not None):
return {"status": 200, "return": ret}

View File

@ -2,6 +2,7 @@ from fastapi import FastAPI
import uvicorn
from src import config
from src import log
from src import reqlim
import os
app = FastAPI()
@ -13,6 +14,9 @@ async def root_hello():
async def start():
await log.info("Start request counter")
await reqlim.run_task()
await log.info("Search web plugins")
plugs = []