CustomIWMServer/customiwmserver/main.py

513 lines
17 KiB
Python
Raw Normal View History

2022-12-04 11:54:50 +00:00
from datetime import datetime
2022-11-18 16:51:24 +00:00
from fastapi import FastAPI, Form, File, UploadFile, Header, HTTPException, Body
from fastapi.responses import PlainTextResponse
from fastapi.exceptions import RequestValidationError
from starlette.exceptions import HTTPException as StarletteHTTPException
from typing import Union, Optional, Any
import uvicorn
from . import data_types as types
from . import database as db
2022-11-19 12:51:28 +00:00
from . import hook_system
from . import hooks
2022-11-18 16:51:24 +00:00
import pymongo
app = FastAPI()
2022-11-19 12:51:28 +00:00
hook = hook_system.hook
2022-11-18 16:51:24 +00:00
@app.exception_handler(RequestValidationError)
async def http_exception_handler(request, exc):
return PlainTextResponse(f"{str(exc)}", status_code=422)
2022-11-19 12:51:28 +00:00
2022-11-18 16:51:24 +00:00
@app.exception_handler(StarletteHTTPException)
async def http_exception_handler(request, exc):
2022-11-19 12:51:28 +00:00
return PlainTextResponse(
f"[{exc.status_code}] {str(exc.detail)}", status_code=exc.status_code
)
2022-11-18 16:51:24 +00:00
## Users
2022-11-19 12:51:28 +00:00
2022-11-18 16:51:24 +00:00
@app.post("/api/v1/login")
async def login(username: str = Form(), password: str = Form(), version: str = Form()):
"""User login"""
2022-11-19 12:51:28 +00:00
hook.execute_hooks("player_login", username=username)
2022-11-20 18:02:12 +00:00
auth = db.auth_check(username + ":" + password)
if not auth[0]:
if auth[1] == "nouser":
raise HTTPException(404, detail="User not found.")
elif auth[1] == "wrongpass":
raise HTTPException(403, detail="Password is incorrect.")
else:
return {"token": username + ":" + password, "userId": auth[1]["ID"]}
2022-11-18 16:51:24 +00:00
@app.put("/api/v1/user")
2022-11-19 12:51:28 +00:00
async def create_user(
username: str = Form(),
password: str = Form(),
email: str = Form(),
version: str = Form(),
):
2022-11-18 16:51:24 +00:00
"""Create a user with specified credentials"""
token = username + ":" + password
2022-11-19 12:51:28 +00:00
2022-11-18 16:51:24 +00:00
UserID = len(list(db.user_collection.find({})))
2022-11-19 12:51:28 +00:00
if UserID == 0:
UserID = 1
2022-11-18 16:51:24 +00:00
insert_data = {
2022-11-19 12:51:28 +00:00
**types.User(Username=username, Email=email, ID=UserID).dict(),
2022-11-18 16:51:24 +00:00
"Password": password,
}
db.user_collection.insert_one(insert_data)
return {"token": token, "userId": UserID}
2022-11-19 12:51:28 +00:00
2022-11-18 16:51:24 +00:00
@app.get("/api/v1/notifunread")
async def notifunread():
"""Returns the number of unread notifications."""
2022-11-19 12:51:28 +00:00
# FIXME Add notifications
2022-11-18 16:51:24 +00:00
return 0
2022-11-19 12:51:28 +00:00
2022-11-18 16:51:24 +00:00
@app.get("/api/v1/refresh")
async def refresh_login(Authorization: Union[str, None] = Header(default=None)):
"""Intended for refreshing user token."""
return {"token": Authorization}
2022-11-19 12:51:28 +00:00
2022-11-18 16:51:24 +00:00
@app.get("/api/v1/user/{user_id}")
async def get_user(user_id: int):
"""Returns specified user's profile."""
query = db.user_collection.find_one({"ID": user_id})
del query["Password"]
return types.User(**query)
2022-11-19 12:51:28 +00:00
2022-11-18 16:51:24 +00:00
## Maps
2022-11-19 12:51:28 +00:00
2022-11-18 16:51:24 +00:00
@app.get("/api/v1/mapcount")
async def mapcount():
return len(list(db.maps_collection.find({})))
2022-11-19 12:51:28 +00:00
2022-11-18 16:51:24 +00:00
@app.get("/api/v1/useruploadcooldown")
async def useruploadcooldown():
"""Limits the amount of levels that user can upload."""
return {"success": True}
2022-11-19 12:51:28 +00:00
2022-11-18 16:51:24 +00:00
@app.get("/api/v1/map")
async def search_for_maps(
2022-11-19 12:51:28 +00:00
start: int = 0,
limit: int = 5,
min_diff: float = 0.0,
max_diff: float = 5.0,
order: str = '[{ "Dir": "desc", "Name": "created_at" }]',
name: str = "",
author: str = "",
author_id: Optional[int] = None,
last_x_hours: Optional[int] = None,
2022-12-04 11:54:50 +00:00
required_tags: str = "",
disallowed_tags: str = "",
code: Optional[str] = None,
admin_show_unlisted: Optional[int] = 0,
2022-11-19 12:51:28 +00:00
):
2022-11-18 16:51:24 +00:00
"""Search for maps."""
# query = db.maps_collection.find({ "CreatorId": author_id }).limit(limit)
2022-12-04 11:54:50 +00:00
if code:
query = list(db.maps_collection.find({"MapCode": code}).skip(start).limit(limit))[0]
del query["_id"]
del query["MapData"]
return [query]
query = db.maps_collection.find({}).skip(start).limit(limit)
2022-11-18 16:51:24 +00:00
entries = []
2022-12-04 11:54:50 +00:00
# Convert required_tags and disallowed_tags to a list.
required_tag_list = required_tags.split(",") if required_tags != "" else []
disallowed_tag_list = disallowed_tags.split(",") if disallowed_tags != "" else []
2022-11-18 16:51:24 +00:00
for i in query:
2022-11-19 12:51:28 +00:00
del i["_id"]
del i["MapData"]
2022-12-04 11:54:50 +00:00
if not i["Listed"] and admin_show_unlisted != 1:
continue
level_tags = i['TagIDs'].split(",")
CreatedAt = datetime.strptime(i["CreatedAt"], "%Y-%m-%dT%H:%M:%SZ")
# TODO: Improve tag filtering
required_tags_included = False if len(required_tag_list) != 0 else True
disallowed_tags_included = False
for tag in level_tags:
if tag in disallowed_tag_list and len(disallowed_tag_list) != 0: disallowed_tags_included = True
elif tag in required_tag_list and len(required_tag_list) != 0: required_tags_included = True
else:
if required_tags_included and not disallowed_tags_included:
if not last_x_hours or CreatedAt.hour < last_x_hours:
entries.append(i)
2022-11-18 16:51:24 +00:00
return entries
2022-11-19 12:51:28 +00:00
2022-11-18 16:51:24 +00:00
@app.get("/api/v1/map/{mapID}")
async def getMap(mapID: int):
2022-11-19 12:51:28 +00:00
query = db.maps_collection.find_one({"ID": mapID})
del query["_id"]
2022-11-18 16:51:24 +00:00
return query
2022-11-19 12:51:28 +00:00
2022-11-18 16:51:24 +00:00
@app.post("/api/v1/map/{mapID}/start")
2022-12-04 11:54:50 +00:00
async def startMap(mapID: int):
2022-11-19 12:51:28 +00:00
query = db.maps_collection.find_one({"ID": mapID})
del query["_id"]
2022-11-18 16:51:24 +00:00
returned_resp = {
2022-11-19 12:51:28 +00:00
"BestDeaths": 0,
"BestPlaytime": 0,
"Clear": False,
"CurMap": query,
"Difficulty": 0,
"Followed": False,
"Played": True,
"Rating": 5,
"TagIDs": "1,8,9",
"TagNames": "Boss/Avoidance,Music,Art",
}
2022-11-18 16:51:24 +00:00
return returned_resp
2022-11-19 12:51:28 +00:00
2022-11-18 16:51:24 +00:00
@app.post("/api/v1/map/{mapID}/stop")
async def stopMapPlay(
2022-11-19 12:51:28 +00:00
mapID: int,
clear: int = Form(),
deaths: int = Form(),
playtime: int = Form(),
totalDeaths: int = Form(),
totalTime: int = Form(),
replayData: str = Form(),
Authorization: Union[str, None] = Header(default=None),
):
2022-11-18 16:51:24 +00:00
"""Saves the map replay, and informs the user if their play is a record"""
authcheck = db.auth_check(Authorization)
if not authcheck[0] and authcheck[1] == "nouser":
raise HTTPException(404, detail="User not found")
elif not authcheck[0] and authcheck[1] == "wrongpass":
raise HTTPException(403, detail="Wrong password")
elif authcheck[0]:
NewMapRecord = False
FirstClear = False
2022-11-19 12:51:28 +00:00
2022-11-18 16:51:24 +00:00
userData = types.User(**authcheck[1])
2022-11-19 12:51:28 +00:00
query = db.maps_collection.find_one(
{"ID": mapID, "Leaderboard.UserID": userData.ID}
)
if query is not None:
del query["_id"]
print(__import__("json").dumps(query, indent=4))
2022-11-18 16:51:24 +00:00
BestUserTime = None
BestTime = None
2022-11-19 12:51:28 +00:00
if query is not None and "Leaderboard" in query:
print("-" * 3)
for i in query["Leaderboard"]:
if i["UserID"] == userData.ID:
if BestUserTime is None or BestUserTime > i["BestPlaytime"]:
BestUserTime = i["BestPlaytime"]
if BestTime is None or BestTime > i["BestPlaytime"]:
2022-11-18 16:51:24 +00:00
BestTime = i["BestPlaytime"]
2022-11-19 12:51:28 +00:00
if len(query["Leaderboard"]) <= 1 and i["UserID"] != userData.ID:
2022-11-18 16:51:24 +00:00
FirstClear = True
print(BestUserTime, BestTime)
if BestUserTime is None or playtime < BestUserTime:
print(BestUserTime)
updateQuery = db.maps_collection.update_one(
{"ID": mapID},
2022-11-20 18:02:12 +00:00
{"$pull": {"Leaderboard": {"UserID": userData.ID}},
}
)
2022-11-19 12:51:28 +00:00
updateQuery = db.maps_collection.update_one(
{"ID": mapID},
2022-11-18 16:51:24 +00:00
{
"$push": {
"Leaderboard": types.MapLeaderboard(
ShoesColor=userData.ShoesColor,
PantsColor=userData.PantsColor,
ShirtColor=userData.ShirtColor,
CapeColor=userData.CapeColor,
SkinColor=userData.SkinColor,
HairColor=userData.HairColor,
HatSpr=userData.HatSpr,
Country=userData.Country,
HairSpr=userData.HairSpr,
HatColor=userData.HatColor,
HatColorInv=userData.HatColorInv,
FacialExpression=userData.FacialExpression,
DeathEffect=userData.DeathEffect,
GunSpr=userData.GunSpr,
BulletSpr=userData.BulletSpr,
SwordSpr=userData.SwordSpr,
Costume=userData.Costume,
FollowerSpr=userData.FollowerSpr,
FollowerColor=userData.FollowerColor,
SaveEffect=userData.SaveEffect,
TextSnd=userData.TextSnd,
BestPlaytime=playtime,
BestReplay=replayData,
CreatorName=userData.Username,
2022-11-19 12:51:28 +00:00
UserID=userData.ID,
).dict()
},
},
)
2022-11-18 16:51:24 +00:00
if BestTime is None or playtime < BestTime:
print(BestTime, playtime)
NewMapRecord = True
2022-11-19 12:51:28 +00:00
hook.execute_hooks(
"map_finished",
user=userData,
mapID=mapID,
clear=clear,
deaths=deaths,
playtime=playtime,
FirstClear=FirstClear,
NewMapRecord=NewMapRecord,
)
2022-11-18 16:51:24 +00:00
return {"FirstClear": FirstClear, "NewMapRecord": NewMapRecord}
2022-11-19 12:51:28 +00:00
2022-11-18 16:51:24 +00:00
@app.put("/api/v1/map")
async def upload_map(
2022-11-19 12:51:28 +00:00
Authorization: Union[str, None] = Header(default=None),
mapName: str = Form(),
mapDescription: str = Form(default=""),
mapVersion: int = Form(),
mapData: str = Form(),
file: Optional[UploadFile] = None,
mapReplay: str = Form(),
deaths: int = Form(),
playtime: int = Form(),
totalDeaths: int = Form(),
totalTime: int = Form(),
listed: int = Form(),
requiresCancels: int = Form(),
hideInChallenges: int = Form(),
tags: str = Form(),
rng: int = Form(),
clientVersion: float = Form(),
):
2022-11-18 16:51:24 +00:00
authcheck = db.auth_check(Authorization)
if not authcheck[0] and authcheck[1] == "nouser":
raise HTTPException(404, detail="User not found")
elif not authcheck[0] and authcheck[1] == "wrongpass":
raise HTTPException(403, detail="Wrong password")
elif authcheck[0]:
print(authcheck)
userData = types.User(**authcheck[1])
2022-11-20 14:51:20 +00:00
ID = len(list(db.maps_collection.find({}))) + 1
2022-11-20 18:02:12 +00:00
MapCode = db.id_to_mapcode(ID)
2022-11-19 12:51:28 +00:00
db.maps_collection.insert_one(
{
**types.Map(
CreatorName=userData.Username,
CreatorId=userData.ID,
2022-11-20 14:51:20 +00:00
ID=ID,
2022-11-19 12:51:28 +00:00
Name=mapName,
Description=mapDescription,
Version=mapVersion,
MapCode=MapCode,
MapData=mapData,
Listed=bool(listed),
HiddenInChallenges=hideInChallenges,
TagIDs=tags,
TagNames=",".join(types.convertTagsToNames(tags)),
ShoesColor=userData.ShoesColor,
PantsColor=userData.PantsColor,
ShirtColor=userData.ShirtColor,
CapeColor=userData.CapeColor,
SkinColor=userData.SkinColor,
HairColor=userData.HairColor,
HatSpr=userData.HatSpr,
Country=userData.Country,
HairSpr=userData.HairSpr,
HatColor=userData.HatColor,
HatColorInv=userData.HatColorInv,
FacialExpression=userData.FacialExpression,
DeathEffect=userData.DeathEffect,
GunSpr=userData.GunSpr,
BulletSpr=userData.BulletSpr,
SwordSpr=userData.SwordSpr,
Costume=userData.Costume,
FollowerSpr=userData.FollowerSpr,
FollowerColor=userData.FollowerColor,
SaveEffect=userData.SaveEffect,
TextSnd=userData.TextSnd,
Leaderboard=[
types.MapLeaderboard(
BestPlaytime=playtime,
BestReplay=mapReplay,
CreatorName=userData.Username,
UserID=userData.ID,
2022-11-18 16:51:24 +00:00
).dict()
2022-11-19 12:51:28 +00:00
],
).dict()
}
)
2022-11-18 16:51:24 +00:00
return {"MapCode": MapCode}
2022-11-19 12:51:28 +00:00
# raise HTTPException(501)
2022-11-18 16:51:24 +00:00
@app.get("/api/v1/map/{mapID}/besttimes/{maxEntries}")
2022-12-04 11:54:50 +00:00
async def getMapLeaderboard(mapID: int, maxEntries: int = 5):
2022-11-18 16:51:24 +00:00
"""Returns maxEntries records for the specified level"""
2022-11-19 12:51:28 +00:00
query = db.maps_collection.find_one({"ID": int(mapID)})
2022-11-18 16:51:24 +00:00
if not query:
2022-11-19 12:51:28 +00:00
raise HTTPException(404, detail="Map not found")
del query["_id"]
leaderboard = query["Leaderboard"]
2022-12-04 11:54:50 +00:00
return sorted(leaderboard, key=lambda sort: sort["BestPlaytime"])[0:maxEntries]
2022-11-19 12:51:28 +00:00
2022-11-18 16:51:24 +00:00
@app.get("/api/v1/map/{mapID}/userbesttime/{userID}")
async def getPlayerRecord(mapID, userID):
"""Returns specific replay"""
2022-11-19 12:51:28 +00:00
query = db.maps_collection.find_one({"ID": int(mapID)})
2022-11-18 16:51:24 +00:00
if not query:
2022-11-19 12:51:28 +00:00
raise HTTPException(404, detail="Map not found")
del query["_id"]
leaderboard = query["Leaderboard"]
2022-11-18 16:51:24 +00:00
# Find user
replayIndex = None
2022-11-19 14:54:01 +00:00
BestTime = None
2022-11-18 16:51:24 +00:00
for i in range(len(leaderboard)):
print(f"DBG: {i}, {leaderboard[i]}\t{leaderboard[i]['UserID'] == int(userID):}")
2022-11-19 14:54:01 +00:00
if leaderboard[i]["UserID"] == int(userID) and (BestTime is None or leaderboard[i]["BestPlaytime"] < BestTime):
2022-11-18 16:51:24 +00:00
replayIndex = i
print(leaderboard[replayIndex])
2022-11-19 12:51:28 +00:00
if replayIndex is not None:
return {"BestMapTime": leaderboard[replayIndex], "Exists": True}
else:
return {"BestMapTime": None, "Exists": False}
2022-11-18 16:51:24 +00:00
@app.get("/api/v1/map/{mapID}/besttime")
async def getBestRecord(mapID: int):
2022-11-19 12:51:28 +00:00
query = db.maps_collection.find_one({"ID": int(mapID)})
2022-11-18 16:51:24 +00:00
BestTime = None
BestPlay = None
2022-11-19 12:51:28 +00:00
for i in query["Leaderboard"]:
if BestTime is None or BestTime > i["BestPlaytime"]:
2022-11-18 16:51:24 +00:00
BestTime = i["BestPlaytime"]
BestPlay = i
return {"BestMapTime": BestPlay, "Exists": True}
2022-11-19 12:51:28 +00:00
2022-11-18 16:51:24 +00:00
@app.post("/api/v1/map/{mapID}/invalidatealltimes")
async def invalidateAllTimes(
2022-11-19 12:51:28 +00:00
mapID: int,
Reason: int = Body(),
CustomReason: str = Body(default=""),
Authorization: Union[str, None] = Header(default=None),
):
2022-11-18 16:51:24 +00:00
"""Removes ALL records from a specific map"""
authcheck = db.auth_check(Authorization)
if not authcheck[0] and authcheck[1] == "nouser":
raise HTTPException(404, detail="User not found")
elif not authcheck[0] and authcheck[1] == "wrongpass":
raise HTTPException(403, detail="Wrong password")
elif authcheck[0]:
userData = types.User(**authcheck[1])
if userData.Admin:
2022-11-19 12:51:28 +00:00
query = db.maps_collection.update_one(
{"ID": mapID}, {"$push": {"Leaderboard": {"$each": [], "$slice": 0}}}
)
db.LogAdminAction(
action_type="invalidateAllTimes",
action_data={
"Reason": Reason,
"CustomReason": CustomReason,
"mapID": mapID,
},
UserID=userData.ID,
)
2022-11-18 16:51:24 +00:00
else:
2022-11-19 12:51:28 +00:00
db.LogAdminAction(
action_type="invalidateAllTimes",
action_data={
"Reason": Reason,
"CustomReason": CustomReason,
"mapID": mapID,
"unauthorized": True,
},
success=False,
)
raise HTTPException(
403,
detail="Attempted to perform an administrator action without permission. This will be reported.",
)
2022-11-18 16:51:24 +00:00
## General
2022-11-19 12:51:28 +00:00
2022-11-18 16:51:24 +00:00
@app.post("/api/v1/reports")
async def reportContent(
2022-11-19 12:51:28 +00:00
Authorization: Union[str, None] = Header(default=None),
user_id: int = Form(),
map_id: Optional[int] = Form(default=None),
report_type: int = Form(),
content: str = Form(),
):
2022-11-18 16:51:24 +00:00
authcheck = db.auth_check(Authorization)
if not authcheck[0] and authcheck[1] == "nouser":
raise HTTPException(404, detail="User not found")
elif not authcheck[0] and authcheck[1] == "wrongpass":
raise HTTPException(403, detail="Wrong password")
elif authcheck[0]:
2022-11-19 12:51:28 +00:00
db.reports_collection.insert_one(
{
"user_id": user_id,
"map_id": map_id,
"report_type": report_type,
"content": content,
}
)
2022-11-18 16:51:24 +00:00
return True
2022-11-19 12:51:28 +00:00
2022-11-18 16:51:24 +00:00
@app.get("/api/v1/featuredlist")
async def featuredlist():
"""Returns the list id of the weekly levels list."""
2022-11-19 12:51:28 +00:00
# FIXME Add playlists
2022-11-18 16:51:24 +00:00
return
2022-11-19 12:51:28 +00:00
2022-11-18 16:51:24 +00:00
@app.get("/api/v1/followcheck")
async def followcheck():
2022-12-04 11:54:50 +00:00
"""Check, if creators that the user follows uploaded new levels."""
# FIXME: Stub
2022-11-18 16:51:24 +00:00
return 1
2022-11-19 12:51:28 +00:00
2022-11-18 16:51:24 +00:00
def start():
"""Launched with `poetry run start` at root level"""
uvicorn.run("customiwmserver.main:app", host="0.0.0.0", port=8001, reload=True)