iwm_browser/iwm_browser/main.py
2023-06-09 14:51:23 +02:00

285 lines
9.9 KiB
Python

from typing import Union
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.gzip import GZipMiddleware
from fastapi.responses import HTMLResponse, PlainTextResponse, RedirectResponse
from fastapi import FastAPI, Form, Response
from typing import Optional
import urllib.parse
import starlette
import jinja2
import uvicorn
import httpx
import json
import os
import re
import importlib.resources
from . import utils
from . import __version__
basedir = os.path.dirname(__file__)
app = FastAPI(openapi_url=None)
app.add_middleware(GZipMiddleware)
app.mount("/static", StaticFiles(directory=basedir + "/static"), name="static")
template_env = jinja2.Environment(
loader=jinja2.PackageLoader("iwm_browser", "templates"), auto_reload=True
)
error_template = template_env.get_template("error.html")
template_env.globals["builtins"] = __builtins__
template_env.globals["convert_times"] = utils.convert_times
template_env.globals["json_dumps"] = json.dumps
template_env.globals["urlencode"] = urllib.parse.urlencode
template_env.globals["combineCSS"] = utils.combineCSS
template_env.globals["isSelected"] = utils.isSelected
template_env.globals["generate_thumbnail_url"] = utils.generate_thumbnail_url
template_env.globals["__version__"] = __version__
BASE_URL = utils.config_value("IWM_SERVER", "http://make.fangam.es")
THUMB_URL = utils.config_value("IWM_THUMBNAILS", "https://images.make.fangam.es")
utils.global_imgproxy_url = utils.config_value("IMGPROXY_URL", None)
utils.global_imgproxy_params = utils.config_value("IMGPROXY_PARAMS", '{"extension": "webp", "advanced": ["q:50"]}') # Set it to None to disable the proxy params
PROXIES = utils.config_value("HTTP_PROXIES", '{}')
# Matches level code.
# \S[A-Z0-9]{4}\-[A-Z0-9]{4} = With dash, no whitespace
# \S[A-Z0-9]{8} = without dash, no whitespace
level_code_regex = re.compile("[A-Z0-9]{4}\-[A-Z0-9]{4}|[A-Z0-9]{8}")
class CSSResponse(Response):
media_type = "text/css"
def render(self, content: any) -> bytes:
return content.encode()
@app.get("/assets/combine_css", response_class=CSSResponse)
async def combineCSSResources(resources: str, response: Response):
"""Combines and returns the css stylesheets. The resources have to be accessible via /static."""
combined_css = ""
for resource in resources.split(","):
path = os.path.join(os.path.abspath(basedir), "static", os.path.relpath(resource))
with open(path) as f:
contents = f.read()
combined_css += f"\n/* SRC: {resource} */\n" + re.sub(" {4}| {2}"," ", re.sub("\n.*\/\*.*\*\/", "", contents))
return combined_css
@app.get("/", response_class=HTMLResponse)
async def root():
template = template_env.get_template("home.html")
return template.render()
@app.get("/about", response_class=HTMLResponse)
async def root():
template = template_env.get_template("about.html")
return template.render()
@app.get("/search", response_class=HTMLResponse)
async def search(
request: starlette.requests.Request,
response: Response,
q: Union[str, None] = None,
p: int = 0,
sort: str = "average_rating",
dir: str = "desc",
date: int = -1,
):
template = template_env.get_template("search.html")
limit = 10
if p is None:
p = 0
# Passed to the template
QueryValues = {"q": q, "p": p, "sort": sort, "dir": dir, "date": date, **request.query_params}
searchResults = None
if q is not None:
if level_code_regex.match(q):
levelCode = q.upper().replace("-", "")
searchValue = q
entryNumber = -1
try:
rq = httpx.get(
BASE_URL + "/api/v1/map",
params={
"code": levelCode,
},
timeout=10,
)
searchResults = rq.json()
except httpx.ReadTimeout:
return "Server timed out"
else:
author = {}
min_diff = 0.00
max_diff = 5.00
params = re.findall("[a-z\-\_]*:[a-zA-Z0-9._\-]*", q)
search = re.sub("\S[a-z\-\_]*:[a-zA-Z0-9._\-]*", "", q).lstrip().rstrip()
for i in params:
split = i.split(":")
if split[0] == "author":
author = {"author": split[1]}
elif split[0] == "min_diff":
min_diff = float(split[1])
elif split[0] == "max_diff":
max_diff = float(split[1])
order = {"Dir": dir, "Name": sort}
if date == -1:
last_x_hours = {}
else:
last_x_hours = {"last_x_hours": date}
searchValue = q
try:
headers = {}
if sort == "random":
headers["Cache-Control"] = "no-cache"
rq = httpx.get(
BASE_URL + "/api/v1/map",
params={
"start": p * limit,
"limit": limit,
"min_diff": min_diff,
"max_diff": max_diff,
"order": json.dumps([order]),
"name": search,
**last_x_hours,
**author,
},
timeout=10,
proxies=PROXIES,
headers=headers
)
if rq.status_code == 503:
return error_template.render(
reason="Server is unavailable right now."
)
searchResults = rq.json()
except httpx.ReadTimeout:
return error_template.render(reason="Server timed out")
except json.decoder.JSONDecodeError:
return error_template.render(
reason="Failed to parse server response.", details=rq.text
)
except Exception as exc:
return error_template.render(reason="Uncaught exception", details=exc)
entryNumber = len(searchResults)
else:
searchValue = ""
entryNumber = None
return template.render(
searchResults=searchResults,
THUMB_URL=THUMB_URL,
entryLimit=limit,
entryNumber=entryNumber,
QueryValues=QueryValues,
)
@app.get("/playlist/featured")
async def get_featured_list():
try:
rq = httpx.get(BASE_URL + "/api/v1/featuredlist", timeout=10, proxies=PROXIES)
if rq.status_code == 503:
return error_template.render(reason="Server is unavailable right now")
response = rq.json()
except httpx.ReadTimeout:
return error_template.render(reason="Server timed out")
except json.decoder.JSONDecodeError:
return error_template.render(
reason="Failed to parse server response.", details=rq.text
)
except Exception as exc:
return error_template.render(reason="Uncaught exception", details=exc)
return RedirectResponse("/playlist/" + str(response["ListID"]))
@app.get("/playlist/{playlist_id}", response_class=HTMLResponse)
async def showList(request: starlette.requests.Request, playlist_id: int):
template = template_env.get_template("playlist.html")
try:
rq = httpx.get(BASE_URL + "/api/v1/list/" + str(playlist_id), timeout=10, proxies=PROXIES)
if rq.status_code == 503:
return error_template.render(reason="Server is unavailable right now")
response = rq.json()
# searchResults = response["Maps"]
print(rq.url)
except httpx.ReadTimeout:
return error_template.render(reason="Server timed out")
except json.decoder.JSONDecodeError:
return error_template.render(
reason="Failed to parse server response.", details=rq.text
)
except Exception as exc:
return error_template.render(reason="Uncaught exception", details=exc)
return template.render(
response=response,
QueryValues=request.query_params,
THUMB_URL=THUMB_URL,
)
@app.get("/level/{level_id}", response_class=HTMLResponse)
async def showLevel(level_id: int):
template = template_env.get_template("levelInfo.html")
try:
rq = httpx.get(BASE_URL + "/api/v1/map/" + str(level_id), timeout=10, proxies=PROXIES)
if rq.status_code == 503:
return error_template.render(reason="Server is unavailable right now")
searchResults = rq.json()
print(rq.url)
except httpx.ReadTimeout:
return error_template.render(reason="Server timed out")
except json.decoder.JSONDecodeError:
return error_template.render(
reason="Failed to parse server response.", details=rq.text
)
except Exception as exc:
return error_template.render(reason="Uncaught exception", details=exc)
return template.render(
map=searchResults,
THUMB_URL=THUMB_URL,
)
@app.get("/user/{user_id}", response_class=HTMLResponse)
async def showUser(user_id: int):
template = template_env.get_template("userInfo.html")
try:
rq = httpx.get(BASE_URL + "/api/v1/user/" + str(user_id), timeout=10, proxies=PROXIES)
if rq.status_code == 503:
return error_template.render(reason="Server is unavailable right now")
searchResults = rq.json()
print(rq.url)
except httpx.ReadTimeout:
return error_template.render(reason="Server timed out")
except json.decoder.JSONDecodeError:
return error_template.render(
reason="Failed to parse server response.", details=rq.text
)
except Exception as exc:
print(exc)
return error_template.render(reason="Uncaught exception", details=exc)
return template.render(
user=searchResults,
THUMB_URL=THUMB_URL,
)
def start():
"""Launched with `poetry run start` at root level"""
uvicorn.run("iwm_browser.main:app", host="0.0.0.0", port=7775, reload=True)