Refactor app.py to use separate controllers for history and catch functionality

This commit is contained in:
sHa
2024-03-22 00:49:38 +02:00
parent 6b80e1a383
commit e5b50e9368

169
app.py
View File

@@ -1,80 +1,13 @@
import json
import os
from datetime import datetime
from enum import Enum
from fastapi import FastAPI, Request, Response, status from fastapi import FastAPI, Request, Response, status
from pydantic import BaseModel from schema.answer import Answer
from schema.request_data import RequestData
from controller.history_controller import HistoryController
from controller.catch_controller import CatchController
app = FastAPI() app = FastAPI()
HISTORY_LIMIT = 10 history_controller = HistoryController()
HISTORY_STORAGE = "storage" catch_controller = CatchController()
class Status(str, Enum):
ok = "ok"
error = "error"
class Methods(str, Enum):
GET = "GET"
POST = "POST"
PUT = "PUT"
DELETE = "DELETE"
PATCH = "PATCH"
OPTIONS = "OPTIONS"
HEAD = "HEAD"
class Answer(BaseModel):
status: Status
message: str
class RequestData(BaseModel):
data: dict
method: Methods
url: str
headers: dict
time: str
def store_last_request(request_data, namespace="requests"):
# Check if the directory exists, if not, create it
if not os.path.exists(HISTORY_STORAGE):
os.makedirs(HISTORY_STORAGE)
filename = f"{HISTORY_STORAGE}/{namespace}.json"
try:
with open(filename, "r") as f:
data = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
data = []
data.append(request_data)
if len(data) > HISTORY_LIMIT:
data.pop(0)
with open(filename, "w") as f:
json.dump(data, f, indent=4)
def check_namespace(namespace):
if (
namespace == "__history"
or namespace == "__last_request"
or namespace == "__last"
or namespace == "__clear"
or namespace == "__help"
or namespace == "docs"
or namespace == "redoc"
or namespace == "api"
):
return False
return True
@app.get("/", status_code=status.HTTP_200_OK) @app.get("/", status_code=status.HTTP_200_OK)
@@ -96,47 +29,9 @@ async def catch(
response: Response, response: Response,
namespace: str = "requests", namespace: str = "requests",
) -> Answer: ) -> Answer:
if not check_namespace(namespace): return await catch_controller.catch(
response.status_code = status.HTTP_400_BAD_REQUEST request=request, response=response, namespace=namespace
return Answer(status="error", message="Invalid namespace name provided.") )
try:
json = await request.json()
except:
json = {"invalid": "json"}
last_request = {
"data": json,
"method": request.method,
"url": str(request.url),
"headers": dict(request.headers),
"time": datetime.now().isoformat(),
}
store_last_request(request_data=last_request, namespace=namespace)
response.status_code = status.HTTP_200_OK
return Answer(status="ok", message="Request catched.")
@app.get("/api/__help", status_code=status.HTTP_200_OK)
def help():
return {
"message": "This is a simple webhook service. It stores the last 10 requests and returns them on demand.",
"endpoints": {
"/": "Accepts a webhook request and stores it.",
"/{namespace}": "Accepts a webhook request and stores it in the given namespace.",
"/docs": "Swagger UI for the API.",
"/redoc": "ReDoc UI for the API.",
"/api/__last_request": "GET: Returns the last request received.",
"/api/__last_request/{namespace}": "GET: Returns the last request received for the given namespace.",
"/api/__history": "GET: Returns the last 10 requests received.",
"/api/__history/{namespace}": "GET: Returns the last 10 requests received for the given namespace.",
"/api/__history/{id}": "GET: Returns the request with the given ID.",
"/api/__history/{namespace}/{id}": "GET: Returns the request with the given ID for the given namespace.",
"/api/__clear": "GET: Clears the request history.",
"/api/__clear/{namespace}": "GET: Clears the request history for the given namespace.",
},
}
@app.get("/api/__last_request", status_code=status.HTTP_200_OK) @app.get("/api/__last_request", status_code=status.HTTP_200_OK)
@@ -147,20 +42,9 @@ async def last_requests(
response: Response, response: Response,
namespace: str = "requests", namespace: str = "requests",
) -> Answer | RequestData: ) -> Answer | RequestData:
if not check_namespace(namespace): return await history_controller.last_requests(
response.status_code = status.HTTP_400_BAD_REQUEST response=response, namespace=namespace
return Answer(status="error", message="Invalid namespace name provided.") )
filename = f"{HISTORY_STORAGE}/{namespace}.json"
try:
with open(filename, "r") as f:
data = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
data = []
if len(data) == 0:
return Answer(status="error", message="No requests found.")
return RequestData(**data[-1])
@app.get("/api/__history/{id}", status_code=status.HTTP_200_OK) @app.get("/api/__history/{id}", status_code=status.HTTP_200_OK)
@@ -172,21 +56,9 @@ async def history(
id: int | None = None, id: int | None = None,
namespace: str = "requests", namespace: str = "requests",
) -> Answer | RequestData | list[RequestData]: ) -> Answer | RequestData | list[RequestData]:
if not check_namespace(namespace): return await history_controller.history(
response.status_code = status.HTTP_400_BAD_REQUEST response=response, id=id, namespace=namespace
return Answer(status="error", message="Invalid namespace name provided.") )
filename = f"{HISTORY_STORAGE}/{namespace}.json"
try:
with open(filename, "r") as f:
data = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
data = []
if id is not None:
if len(data) == 0:
return Answer(status="error", message="No requests found.")
return RequestData(**data[-id])
return [RequestData(**d) for d in data]
@app.get("/api/__clear", status_code=status.HTTP_200_OK) @app.get("/api/__clear", status_code=status.HTTP_200_OK)
@@ -195,14 +67,9 @@ async def clear_history(
response: Response, response: Response,
namespace: str = "requests", namespace: str = "requests",
) -> Answer: ) -> Answer:
if not check_namespace(namespace): return await history_controller.clear_history(
response.status_code = status.HTTP_400_BAD_REQUEST response=response, namespace=namespace
return Answer(status="error", message="Invalid namespace name provided.") )
filename = f"{HISTORY_STORAGE}/{namespace}.json"
with open(filename, "w") as f:
json.dump([], f)
return Answer(status="ok", message="History cleared.")
if __name__ == "__main__": if __name__ == "__main__":