CustomIWMServer/customiwmserver/main.py
2022-11-20 16:22:35 +01:00

472 lines
15 KiB
Python

from fastapi import FastAPI, Form, File, UploadFile, Header, HTTPException, Body
from fastapi.responses import PlainTextResponse
from fastapi.exceptions import RequestValidationError
from starlette.exceptions import HTTPException as StarletteHTTPException
from typing import Union, Optional, Any
import uvicorn
from . import data_types as types
from . import database as db
from . import hook_system
from . import hooks
import pymongo
app = FastAPI()
hook = hook_system.hook
@app.exception_handler(RequestValidationError)
async def http_exception_handler(request, exc):
return PlainTextResponse(f"{str(exc)}", status_code=422)
@app.exception_handler(StarletteHTTPException)
async def http_exception_handler(request, exc):
return PlainTextResponse(
f"[{exc.status_code}] {str(exc.detail)}", status_code=exc.status_code
)
## Users
@app.post("/api/v1/login")
async def login(username: str = Form(), password: str = Form(), version: str = Form()):
"""User login"""
# FIXME Add login
hook.execute_hooks("player_login", username=username)
return {"token": username + ":" + password, "userId": 1}
@app.put("/api/v1/user")
async def create_user(
username: str = Form(),
password: str = Form(),
email: str = Form(),
version: str = Form(),
):
"""Create a user with specified credentials"""
token = username + ":" + password
UserID = len(list(db.user_collection.find({})))
if UserID == 0:
UserID = 1
insert_data = {
**types.User(Username=username, Email=email, ID=UserID).dict(),
"Password": password,
}
db.user_collection.insert_one(insert_data)
return {"token": token, "userId": UserID}
@app.get("/api/v1/notifunread")
async def notifunread():
"""Returns the number of unread notifications."""
# FIXME Add notifications
return 0
@app.get("/api/v1/refresh")
async def refresh_login(Authorization: Union[str, None] = Header(default=None)):
"""Intended for refreshing user token."""
return {"token": Authorization}
@app.get("/api/v1/user/{user_id}")
async def get_user(user_id: int):
"""Returns specified user's profile."""
# FIXME use database
query = db.user_collection.find_one({"ID": user_id})
del query["Password"]
return types.User(**query)
## Maps
@app.get("/api/v1/mapcount")
async def mapcount():
return len(list(db.maps_collection.find({})))
@app.get("/api/v1/useruploadcooldown")
async def useruploadcooldown():
"""Limits the amount of levels that user can upload."""
return {"success": True}
@app.get("/api/v1/map")
async def search_for_maps(
start: int = 0,
limit: int = 5,
min_diff: float = 0.0,
max_diff: float = 5.0,
order: str = '[{ "Dir": "desc", "Name": "created_at" }]',
name: str = "",
author: str = "",
author_id: Optional[int] = None,
last_x_hours: Optional[int] = None,
):
"""Search for maps."""
# query = db.maps_collection.find({ "CreatorId": author_id }).limit(limit)
query = db.maps_collection.find({}).limit(limit)
entries = []
for i in query:
del i["_id"]
del i["MapData"]
entries.append(i)
return entries
@app.get("/api/v1/map/{mapID}")
async def getMap(mapID: int):
query = db.maps_collection.find_one({"ID": mapID})
del query["_id"]
return query
@app.post("/api/v1/map/{mapID}/start")
async def getMap(mapID: int):
query = db.maps_collection.find_one({"ID": mapID})
del query["_id"]
returned_resp = {
"BestDeaths": 0,
"BestPlaytime": 0,
"Clear": False,
"CurMap": query,
"Difficulty": 0,
"Followed": False,
"Played": True,
"Rating": 5,
"TagIDs": "1,8,9",
"TagNames": "Boss/Avoidance,Music,Art",
}
return returned_resp
@app.post("/api/v1/map/{mapID}/stop")
async def stopMapPlay(
mapID: int,
clear: int = Form(),
deaths: int = Form(),
playtime: int = Form(),
totalDeaths: int = Form(),
totalTime: int = Form(),
replayData: str = Form(),
Authorization: Union[str, None] = Header(default=None),
):
"""Saves the map replay, and informs the user if their play is a record"""
authcheck = db.auth_check(Authorization)
if not authcheck[0] and authcheck[1] == "nouser":
raise HTTPException(404, detail="User not found")
elif not authcheck[0] and authcheck[1] == "wrongpass":
raise HTTPException(403, detail="Wrong password")
elif authcheck[0]:
NewMapRecord = False
FirstClear = False
userData = types.User(**authcheck[1])
query = db.maps_collection.find_one(
{"ID": mapID, "Leaderboard.UserID": userData.ID}
)
if query is not None:
del query["_id"]
print(__import__("json").dumps(query, indent=4))
BestUserTime = None
BestTime = None
if query is not None and "Leaderboard" in query:
print("-" * 3)
for i in query["Leaderboard"]:
if i["UserID"] == userData.ID:
if BestUserTime is None or BestUserTime > i["BestPlaytime"]:
BestUserTime = i["BestPlaytime"]
if BestTime is None or BestTime > i["BestPlaytime"]:
BestTime = i["BestPlaytime"]
if len(query["Leaderboard"]) <= 1 and i["UserID"] != userData.ID:
FirstClear = True
print(BestUserTime, BestTime)
if BestUserTime is None or playtime < BestUserTime:
print(BestUserTime)
updateQuery = db.maps_collection.update_one(
{"ID": mapID},
{
"$push": {
"Leaderboard": types.MapLeaderboard(
ShoesColor=userData.ShoesColor,
PantsColor=userData.PantsColor,
ShirtColor=userData.ShirtColor,
CapeColor=userData.CapeColor,
SkinColor=userData.SkinColor,
HairColor=userData.HairColor,
HatSpr=userData.HatSpr,
Country=userData.Country,
HairSpr=userData.HairSpr,
HatColor=userData.HatColor,
HatColorInv=userData.HatColorInv,
FacialExpression=userData.FacialExpression,
DeathEffect=userData.DeathEffect,
GunSpr=userData.GunSpr,
BulletSpr=userData.BulletSpr,
SwordSpr=userData.SwordSpr,
Costume=userData.Costume,
FollowerSpr=userData.FollowerSpr,
FollowerColor=userData.FollowerColor,
SaveEffect=userData.SaveEffect,
TextSnd=userData.TextSnd,
BestPlaytime=playtime,
BestPlaytimeTime="2020-02-13T15:19:33Z",
BestReplay=replayData,
CreatorName=userData.Username,
UserID=userData.ID,
).dict()
},
},
)
if BestTime is None or playtime < BestTime:
print(BestTime, playtime)
NewMapRecord = True
hook.execute_hooks(
"map_finished",
user=userData,
mapID=mapID,
clear=clear,
deaths=deaths,
playtime=playtime,
FirstClear=FirstClear,
NewMapRecord=NewMapRecord,
)
return {"FirstClear": FirstClear, "NewMapRecord": NewMapRecord}
@app.put("/api/v1/map")
async def upload_map(
Authorization: Union[str, None] = Header(default=None),
mapName: str = Form(),
mapDescription: str = Form(default=""),
mapVersion: int = Form(),
mapData: str = Form(),
file: Optional[UploadFile] = None,
mapReplay: str = Form(),
deaths: int = Form(),
playtime: int = Form(),
totalDeaths: int = Form(),
totalTime: int = Form(),
listed: int = Form(),
requiresCancels: int = Form(),
hideInChallenges: int = Form(),
tags: str = Form(),
rng: int = Form(),
clientVersion: float = Form(),
):
authcheck = db.auth_check(Authorization)
if not authcheck[0] and authcheck[1] == "nouser":
raise HTTPException(404, detail="User not found")
elif not authcheck[0] and authcheck[1] == "wrongpass":
raise HTTPException(403, detail="Wrong password")
elif authcheck[0]:
print(authcheck)
userData = types.User(**authcheck[1])
MapCode = "SUSYBAKA" # FIXME Add mapcodes correctly
db.maps_collection.insert_one(
{
**types.Map(
CreatorName=userData.Username,
CreatorId=userData.ID,
ID=len(list(db.maps_collection.find({}))) + 1,
Name=mapName,
Description=mapDescription,
Version=mapVersion,
MapCode=MapCode,
MapData=mapData,
Listed=bool(listed),
HiddenInChallenges=hideInChallenges,
TagIDs=tags,
TagNames=",".join(types.convertTagsToNames(tags)),
ShoesColor=userData.ShoesColor,
PantsColor=userData.PantsColor,
ShirtColor=userData.ShirtColor,
CapeColor=userData.CapeColor,
SkinColor=userData.SkinColor,
HairColor=userData.HairColor,
HatSpr=userData.HatSpr,
Country=userData.Country,
HairSpr=userData.HairSpr,
HatColor=userData.HatColor,
HatColorInv=userData.HatColorInv,
FacialExpression=userData.FacialExpression,
DeathEffect=userData.DeathEffect,
GunSpr=userData.GunSpr,
BulletSpr=userData.BulletSpr,
SwordSpr=userData.SwordSpr,
Costume=userData.Costume,
FollowerSpr=userData.FollowerSpr,
FollowerColor=userData.FollowerColor,
SaveEffect=userData.SaveEffect,
TextSnd=userData.TextSnd,
Leaderboard=[
types.MapLeaderboard(
BestPlaytime=playtime,
BestPlaytimeTime="2020-02-13T15:19:33Z",
BestReplay=mapReplay,
CreatorName=userData.Username,
UserID=userData.ID,
).dict()
],
).dict()
}
)
return {"MapCode": MapCode}
# raise HTTPException(501)
@app.get("/api/v1/map/{mapID}/besttimes/{maxEntries}")
async def getMapLeaderboard(mapID, maxEntries):
"""Returns maxEntries records for the specified level"""
query = db.maps_collection.find_one({"ID": int(mapID)})
if not query:
raise HTTPException(404, detail="Map not found")
del query["_id"]
leaderboard = query["Leaderboard"]
return leaderboard
@app.get("/api/v1/map/{mapID}/userbesttime/{userID}")
async def getPlayerRecord(mapID, userID):
"""Returns specific replay"""
query = db.maps_collection.find_one({"ID": int(mapID)})
if not query:
raise HTTPException(404, detail="Map not found")
del query["_id"]
leaderboard = query["Leaderboard"]
# Find user
replayIndex = None
for i in range(len(leaderboard)):
print(f"DBG: {i}, {leaderboard[i]}\t{leaderboard[i]['UserID'] == int(userID):}")
if leaderboard[i]["UserID"] == int(userID):
replayIndex = i
break
print(leaderboard[replayIndex])
if replayIndex is not None:
return {"BestMapTime": leaderboard[replayIndex], "Exists": True}
else:
return {"BestMapTime": None, "Exists": False}
@app.get("/api/v1/map/{mapID}/besttime")
async def getBestRecord(mapID: int):
query = db.maps_collection.find_one({"ID": int(mapID)})
BestTime = None
BestPlay = None
for i in query["Leaderboard"]:
if BestTime is None or BestTime > i["BestPlaytime"]:
BestTime = i["BestPlaytime"]
BestPlay = i
return {"BestMapTime": BestPlay, "Exists": True}
@app.post("/api/v1/map/{mapID}/invalidatealltimes")
async def invalidateAllTimes(
mapID: int,
Reason: int = Body(),
CustomReason: str = Body(default=""),
Authorization: Union[str, None] = Header(default=None),
):
"""Removes ALL records from a specific map"""
authcheck = db.auth_check(Authorization)
if not authcheck[0] and authcheck[1] == "nouser":
raise HTTPException(404, detail="User not found")
elif not authcheck[0] and authcheck[1] == "wrongpass":
raise HTTPException(403, detail="Wrong password")
elif authcheck[0]:
userData = types.User(**authcheck[1])
if userData.Admin:
query = db.maps_collection.update_one(
{"ID": mapID}, {"$push": {"Leaderboard": {"$each": [], "$slice": 0}}}
)
db.LogAdminAction(
action_type="invalidateAllTimes",
action_data={
"Reason": Reason,
"CustomReason": CustomReason,
"mapID": mapID,
},
UserID=userData.ID,
)
else:
db.LogAdminAction(
action_type="invalidateAllTimes",
action_data={
"Reason": Reason,
"CustomReason": CustomReason,
"mapID": mapID,
"unauthorized": True,
},
success=False,
)
raise HTTPException(
403,
detail="Attempted to perform an administrator action without permission. This will be reported.",
)
## General
@app.post("/api/v1/reports")
async def reportContent(
Authorization: Union[str, None] = Header(default=None),
user_id: int = Form(),
map_id: Optional[int] = Form(default=None),
report_type: int = Form(),
content: str = Form(),
):
authcheck = db.auth_check(Authorization)
if not authcheck[0] and authcheck[1] == "nouser":
raise HTTPException(404, detail="User not found")
elif not authcheck[0] and authcheck[1] == "wrongpass":
raise HTTPException(403, detail="Wrong password")
elif authcheck[0]:
db.reports_collection.insert_one(
{
"user_id": user_id,
"map_id": map_id,
"report_type": report_type,
"content": content,
}
)
return True
@app.get("/api/v1/featuredlist")
async def featuredlist():
"""Returns the list id of the weekly levels list."""
# FIXME Add playlists
return
@app.get("/api/v1/followcheck")
async def followcheck():
# FIXME Find the purpouse of this endpoint
return 1
def start():
"""Launched with `poetry run start` at root level"""
uvicorn.run("customiwmserver.main:app", host="0.0.0.0", port=8001, reload=True)