CustomIWMServer/customiwmserver/main.py

598 lines
21 KiB
Python
Raw Normal View History

2022-12-04 11:54:50 +00:00
from datetime import datetime
2022-11-18 16:51:24 +00:00
from fastapi import FastAPI, Form, File, UploadFile, Header, HTTPException, Body
from fastapi.responses import PlainTextResponse
from fastapi.exceptions import RequestValidationError
from pydantic import BaseModel
2022-11-18 16:51:24 +00:00
from starlette.exceptions import HTTPException as StarletteHTTPException
from typing import Union, Optional, Any
import uvicorn
from . import data_types as types
from . import database as db
2022-11-19 12:51:28 +00:00
from . import hook_system
from . import hooks
2022-12-17 18:10:58 +00:00
from . import object_storage
2022-11-18 16:51:24 +00:00
import pymongo
app = FastAPI()
2022-11-19 12:51:28 +00:00
hook = hook_system.hook
2022-11-18 16:51:24 +00:00
@app.exception_handler(RequestValidationError)
async def http_exception_handler(request, exc):
return PlainTextResponse(f"{str(exc)}", status_code=422)
2022-11-19 12:51:28 +00:00
2022-11-18 16:51:24 +00:00
@app.exception_handler(StarletteHTTPException)
async def http_exception_handler(request, exc):
2022-11-19 12:51:28 +00:00
return PlainTextResponse(
f"[{exc.status_code}] {str(exc.detail)}", status_code=exc.status_code
)
2022-12-17 18:10:58 +00:00
2022-11-18 16:51:24 +00:00
## Users
2022-11-19 12:51:28 +00:00
2022-11-18 16:51:24 +00:00
@app.post("/api/v1/login")
async def login(username: str = Form(), password: str = Form(), version: str = Form()):
"""User login"""
2022-11-19 12:51:28 +00:00
hook.execute_hooks("player_login", username=username)
2022-11-20 18:02:12 +00:00
auth = db.auth_check(username + ":" + password)
if not auth[0]:
if auth[1] == "nouser":
raise HTTPException(404, detail="User not found.")
elif auth[1] == "wrongpass":
raise HTTPException(403, detail="Password is incorrect.")
else:
return {"token": username + ":" + password, "userId": auth[1]["ID"]}
2022-11-18 16:51:24 +00:00
@app.put("/api/v1/user")
2022-11-19 12:51:28 +00:00
async def create_user(
username: str = Form(),
password: str = Form(),
email: str = Form(),
version: str = Form(),
):
2022-11-18 16:51:24 +00:00
"""Create a user with specified credentials"""
token = username + ":" + password
2022-11-19 12:51:28 +00:00
2022-11-18 16:51:24 +00:00
UserID = len(list(db.user_collection.find({})))
2022-11-19 12:51:28 +00:00
if UserID == 0:
UserID = 1
2022-11-18 16:51:24 +00:00
insert_data = {
2022-11-19 12:51:28 +00:00
**types.User(Username=username, Email=email, ID=UserID).dict(),
2022-11-18 16:51:24 +00:00
"Password": password,
}
db.user_collection.insert_one(insert_data)
return {"token": token, "userId": UserID}
2022-11-19 12:51:28 +00:00
2022-11-18 16:51:24 +00:00
@app.get("/api/v1/notifunread")
async def notifunread():
"""Returns the number of unread notifications."""
2022-11-19 12:51:28 +00:00
# FIXME Add notifications
2022-11-18 16:51:24 +00:00
return 0
2022-11-19 12:51:28 +00:00
2022-11-18 16:51:24 +00:00
@app.get("/api/v1/refresh")
async def refresh_login(Authorization: Union[str, None] = Header(default=None)):
"""Intended for refreshing user token."""
return {"token": Authorization}
2022-11-19 12:51:28 +00:00
2022-11-18 16:51:24 +00:00
@app.get("/api/v1/user/{user_id}")
async def get_user(user_id: int):
"""Returns specified user's profile."""
query = db.user_collection.find_one({"ID": user_id})
del query["Password"]
return types.User(**query)
2022-11-19 12:51:28 +00:00
2022-11-18 16:51:24 +00:00
## Maps
2022-11-19 12:51:28 +00:00
2022-11-18 16:51:24 +00:00
@app.get("/api/v1/mapcount")
async def mapcount():
return len(list(db.maps_collection.find({})))
2022-11-19 12:51:28 +00:00
2022-11-18 16:51:24 +00:00
@app.get("/api/v1/useruploadcooldown")
async def useruploadcooldown():
"""Limits the amount of levels that user can upload."""
return {"success": True}
2022-11-19 12:51:28 +00:00
2022-11-18 16:51:24 +00:00
@app.get("/api/v1/map")
async def search_for_maps(
2022-11-19 12:51:28 +00:00
start: int = 0,
limit: int = 5,
min_diff: float = 0.0,
max_diff: float = 5.0,
order: str = '[{ "Dir": "desc", "Name": "created_at" }]',
name: str = "",
author: str = "",
author_id: Optional[int] = None,
last_x_hours: Optional[int] = None,
2022-12-04 11:54:50 +00:00
required_tags: str = "",
disallowed_tags: str = "",
code: Optional[str] = None,
admin_show_unlisted: Optional[int] = 0,
2022-11-19 12:51:28 +00:00
):
2022-11-18 16:51:24 +00:00
"""Search for maps."""
# query = db.maps_collection.find({ "CreatorId": author_id }).limit(limit)
2022-12-04 11:54:50 +00:00
if code:
query = list(db.maps_collection.find({"MapCode": code}).skip(start).limit(limit))[0]
del query["_id"]
del query["MapData"]
return [query]
query = db.maps_collection.find({}).skip(start).limit(limit)
2022-11-18 16:51:24 +00:00
entries = []
2022-12-04 11:54:50 +00:00
# Convert required_tags and disallowed_tags to a list.
required_tag_list = required_tags.split(",") if required_tags != "" else []
disallowed_tag_list = disallowed_tags.split(",") if disallowed_tags != "" else []
2022-11-18 16:51:24 +00:00
for i in query:
2022-11-19 12:51:28 +00:00
del i["_id"]
del i["MapData"]
2022-12-04 11:54:50 +00:00
if not i["Listed"] and admin_show_unlisted != 1:
continue
level_tags = i['TagIDs'].split(",")
CreatedAt = datetime.strptime(i["CreatedAt"], "%Y-%m-%dT%H:%M:%SZ")
# TODO: Improve tag filtering
required_tags_included = False if len(required_tag_list) != 0 else True
disallowed_tags_included = False
for tag in level_tags:
if tag in disallowed_tag_list and len(disallowed_tag_list) != 0: disallowed_tags_included = True
elif tag in required_tag_list and len(required_tag_list) != 0: required_tags_included = True
else:
if required_tags_included and not disallowed_tags_included:
if not last_x_hours or CreatedAt.hour < last_x_hours:
entries.append(i)
2022-11-18 16:51:24 +00:00
return entries
2022-11-19 12:51:28 +00:00
2022-11-18 16:51:24 +00:00
@app.get("/api/v1/map/{mapID}")
async def getMap(mapID: int):
2022-11-19 12:51:28 +00:00
query = db.maps_collection.find_one({"ID": mapID})
del query["_id"]
2022-11-18 16:51:24 +00:00
return query
2022-11-19 12:51:28 +00:00
2022-11-18 16:51:24 +00:00
@app.post("/api/v1/map/{mapID}/start")
async def startMap(
mapID: int,
Authorization: Union[str, None] = Header(default=None)
):
authcheck = db.auth_check(Authorization)
if not authcheck[0] and authcheck[1] == "nouser":
raise HTTPException(404, detail="User not found")
elif not authcheck[0] and authcheck[1] == "wrongpass":
raise HTTPException(403, detail="Wrong password")
elif authcheck[0]:
userData = types.User(**authcheck[1])
query = db.maps_collection.find_one({"ID": mapID})
del query["_id"]
2022-11-18 16:51:24 +00:00
returned_resp = {
"BestDeaths": 0,
"BestPlaytime": 0,
"Clear": False,
"CurMap": query,
"Difficulty": 0,
"Followed": False,
"Played": True,
"Rating": userData.Ratings[str(mapID)] if str(mapID) in userData.Ratings else -1,
"TagIDs": query["TagIDs"],
"TagNames": query["TagNames"],
}
return returned_resp
2022-11-18 16:51:24 +00:00
2022-11-19 12:51:28 +00:00
2022-11-18 16:51:24 +00:00
@app.post("/api/v1/map/{mapID}/stop")
async def stopMapPlay(
2022-11-19 12:51:28 +00:00
mapID: int,
clear: int = Form(),
deaths: int = Form(),
playtime: int = Form(),
totalDeaths: int = Form(),
totalTime: int = Form(),
replayData: str = Form(),
Authorization: Union[str, None] = Header(default=None),
):
2022-11-18 16:51:24 +00:00
"""Saves the map replay, and informs the user if their play is a record"""
authcheck = db.auth_check(Authorization)
if not authcheck[0] and authcheck[1] == "nouser":
raise HTTPException(404, detail="User not found")
elif not authcheck[0] and authcheck[1] == "wrongpass":
raise HTTPException(403, detail="Wrong password")
elif authcheck[0]:
NewMapRecord = False
FirstClear = False
2022-11-19 12:51:28 +00:00
2022-11-18 16:51:24 +00:00
userData = types.User(**authcheck[1])
2022-11-19 12:51:28 +00:00
query = db.maps_collection.find_one(
{"ID": mapID, "Leaderboard.UserID": userData.ID}
)
if query is not None:
del query["_id"]
2022-11-18 16:51:24 +00:00
BestUserTime = None
BestTime = None
2022-11-19 12:51:28 +00:00
if query is not None and "Leaderboard" in query:
for i in query["Leaderboard"]:
if i["UserID"] == userData.ID:
if BestUserTime is None or BestUserTime > i["BestPlaytime"]:
BestUserTime = i["BestPlaytime"]
if BestTime is None or BestTime > i["BestPlaytime"]:
2022-11-18 16:51:24 +00:00
BestTime = i["BestPlaytime"]
2022-11-19 12:51:28 +00:00
if len(query["Leaderboard"]) <= 1 and i["UserID"] != userData.ID:
2022-11-18 16:51:24 +00:00
FirstClear = True
if BestUserTime is None or playtime < BestUserTime:
updateQuery = db.maps_collection.update_one(
{"ID": mapID},
2022-11-20 18:02:12 +00:00
{"$pull": {"Leaderboard": {"UserID": userData.ID}},
}
)
2022-11-19 12:51:28 +00:00
updateQuery = db.maps_collection.update_one(
{"ID": mapID},
2022-11-18 16:51:24 +00:00
{
"$push": {
"Leaderboard": types.MapLeaderboard(
ShoesColor=userData.ShoesColor,
PantsColor=userData.PantsColor,
ShirtColor=userData.ShirtColor,
CapeColor=userData.CapeColor,
SkinColor=userData.SkinColor,
HairColor=userData.HairColor,
HatSpr=userData.HatSpr,
Country=userData.Country,
HairSpr=userData.HairSpr,
HatColor=userData.HatColor,
HatColorInv=userData.HatColorInv,
FacialExpression=userData.FacialExpression,
DeathEffect=userData.DeathEffect,
GunSpr=userData.GunSpr,
BulletSpr=userData.BulletSpr,
SwordSpr=userData.SwordSpr,
Costume=userData.Costume,
FollowerSpr=userData.FollowerSpr,
FollowerColor=userData.FollowerColor,
SaveEffect=userData.SaveEffect,
TextSnd=userData.TextSnd,
BestPlaytime=playtime,
BestReplay=replayData,
CreatorName=userData.Username,
2022-11-19 12:51:28 +00:00
UserID=userData.ID,
).dict()
},
},
)
2022-11-18 16:51:24 +00:00
if BestTime is None or playtime < BestTime:
NewMapRecord = True
2022-11-19 12:51:28 +00:00
hook.execute_hooks(
"map_finished",
user=userData,
mapID=mapID,
clear=clear,
deaths=deaths,
playtime=playtime,
FirstClear=FirstClear,
NewMapRecord=NewMapRecord,
)
2022-11-18 16:51:24 +00:00
return {"FirstClear": FirstClear, "NewMapRecord": NewMapRecord}
2022-11-19 12:51:28 +00:00
2022-11-18 16:51:24 +00:00
@app.put("/api/v1/map")
async def upload_map(
2022-11-19 12:51:28 +00:00
Authorization: Union[str, None] = Header(default=None),
mapName: str = Form(),
mapDescription: str = Form(default=""),
mapVersion: int = Form(),
mapData: str = Form(),
file: Optional[UploadFile] = None,
mapReplay: str = Form(),
deaths: int = Form(),
playtime: int = Form(),
totalDeaths: int = Form(),
totalTime: int = Form(),
listed: int = Form(),
requiresCancels: int = Form(),
hideInChallenges: int = Form(),
tags: str = Form(),
rng: int = Form(),
clientVersion: float = Form(),
):
2022-11-18 16:51:24 +00:00
authcheck = db.auth_check(Authorization)
if not authcheck[0] and authcheck[1] == "nouser":
raise HTTPException(404, detail="User not found")
elif not authcheck[0] and authcheck[1] == "wrongpass":
raise HTTPException(403, detail="Wrong password")
elif authcheck[0]:
print(authcheck)
userData = types.User(**authcheck[1])
2022-11-20 14:51:20 +00:00
ID = len(list(db.maps_collection.find({}))) + 1
2022-11-20 18:02:12 +00:00
MapCode = db.id_to_mapcode(ID)
2022-12-17 18:10:58 +00:00
await object_storage.upload_thumbnail(file, ID)
2022-11-19 12:51:28 +00:00
db.maps_collection.insert_one(
{
**types.Map(
CreatorName=userData.Username,
CreatorId=userData.ID,
2022-11-20 14:51:20 +00:00
ID=ID,
2022-11-19 12:51:28 +00:00
Name=mapName,
Description=mapDescription,
Version=mapVersion,
MapCode=MapCode,
MapData=mapData,
Listed=bool(listed),
HiddenInChallenges=hideInChallenges,
TagIDs=tags,
TagNames=",".join(types.convertTagsToNames(tags)),
ShoesColor=userData.ShoesColor,
PantsColor=userData.PantsColor,
ShirtColor=userData.ShirtColor,
CapeColor=userData.CapeColor,
SkinColor=userData.SkinColor,
HairColor=userData.HairColor,
HatSpr=userData.HatSpr,
Country=userData.Country,
HairSpr=userData.HairSpr,
HatColor=userData.HatColor,
HatColorInv=userData.HatColorInv,
FacialExpression=userData.FacialExpression,
DeathEffect=userData.DeathEffect,
GunSpr=userData.GunSpr,
BulletSpr=userData.BulletSpr,
SwordSpr=userData.SwordSpr,
Costume=userData.Costume,
FollowerSpr=userData.FollowerSpr,
FollowerColor=userData.FollowerColor,
SaveEffect=userData.SaveEffect,
TextSnd=userData.TextSnd,
Leaderboard=[
types.MapLeaderboard(
BestPlaytime=playtime,
BestReplay=mapReplay,
CreatorName=userData.Username,
UserID=userData.ID,
2022-11-18 16:51:24 +00:00
).dict()
2022-11-19 12:51:28 +00:00
],
).dict()
}
2022-12-17 18:10:58 +00:00
2022-11-19 12:51:28 +00:00
)
2022-11-18 16:51:24 +00:00
return {"MapCode": MapCode}
2022-11-19 12:51:28 +00:00
# raise HTTPException(501)
class Rating(BaseModel):
Rating: int
@app.post("/api/v1/map/{mapID}/rating")
async def rateMap(mapID: int, Rating: Rating, Authorization: Union[str, None] = Header(default=None)):
authcheck = db.auth_check(Authorization)
if not authcheck[0] and authcheck[1] == "nouser":
raise HTTPException(404, detail="User not found")
elif not authcheck[0] and authcheck[1] == "wrongpass":
raise HTTPException(403, detail="Wrong password")
elif authcheck[0]:
userData = types.User(**authcheck[1])
userRating = userData.Ratings[str(mapID)] if str(mapID) in userData.Ratings else 0
print(userRating, type(userRating))
if Rating.Rating == 5:
if userRating is not None and userRating == 5:
raise HTTPException(400, detail="Map already rated! Set rating in request to -1 to unrate.")
additional = {}
if userRating == 1:
additional = {"NumThumbsDown": -1}
query = db.maps_collection.update_one({"ID": mapID}, {"$inc": {"NumThumbsUp":1, "NumRatings": 1, **additional}})
userRating = 5
elif Rating.Rating == 1:
if userRating is not None and userRating == 1:
raise HTTPException(400, detail="Map already rated! Set rating in request to -1 to unrate.")
additional = {}
if userRating == 5:
additional = {"NumThumbsUp": -1}
query = db.maps_collection.update_one({"ID": mapID}, {"$inc": {"NumThumbsDown": 1, "NumRatings": 1, **additional}})
userRating = 1
elif Rating.Rating == -1:
if userRating is None or userRating == -1:
raise HTTPException(400, detail="Map is not rated!")
key = ""
if userRating == 5:
key = "NumThumbsUp"
elif userRating == 1:
key = "NumThumbsDown"
else:
raise HTTPException(400, detail=f"Previous rating was not 5 or 1. It was {userRating} [{repr(type(userRating))}]")
query = db.maps_collection.update_one({"ID": mapID}, {"$inc": {"NumRatings": -1, key: -1}})
userRating = -1
query = db.maps_collection.find_one({"ID": mapID})
print(userRating, "+")
userQuery = db.user_collection.update_one({"ID": userData.ID}, {"$set": {f"Ratings.{mapID}": userRating} })
print(userQuery.raw_result)
return {
"Exists": True,
"TagIDs": query["TagIDs"],
"TagNames": query["TagNames"],
"UserMap": {
"BestDeaths": 0,
"BestFullPlaytime": 0,
"BestFullPlaytimeTime": None,
"BestPlaytime": 0,
"BestPlaytimeTime": None,
"BestReplay": "",
"Clear": False,
"Difficulty": 0,
"FirstClearInvalid": False,
"FirstClearPlaytime": 0,
"FirstClearTime": False,
"FirstDeathTimeValid": False,
"FirstDeaths": -1,
"FirstPlayRecorded": True,
"FirstPlaytime": -1,
"FullClear": False,
"MapID": mapID,
"Played": True,
"Rating": userRating,
"UserID": userData.ID
}
}
2022-11-18 16:51:24 +00:00
@app.get("/api/v1/map/{mapID}/besttimes/{maxEntries}")
2022-12-04 11:54:50 +00:00
async def getMapLeaderboard(mapID: int, maxEntries: int = 5):
2022-11-18 16:51:24 +00:00
"""Returns maxEntries records for the specified level"""
2022-11-19 12:51:28 +00:00
query = db.maps_collection.find_one({"ID": int(mapID)})
2022-11-18 16:51:24 +00:00
if not query:
2022-11-19 12:51:28 +00:00
raise HTTPException(404, detail="Map not found")
del query["_id"]
leaderboard = query["Leaderboard"]
2022-12-04 11:54:50 +00:00
return sorted(leaderboard, key=lambda sort: sort["BestPlaytime"])[0:maxEntries]
2022-11-19 12:51:28 +00:00
2022-11-18 16:51:24 +00:00
@app.get("/api/v1/map/{mapID}/userbesttime/{userID}")
async def getPlayerRecord(mapID, userID):
"""Returns specific replay"""
2022-11-19 12:51:28 +00:00
query = db.maps_collection.find_one({"ID": int(mapID)})
2022-11-18 16:51:24 +00:00
if not query:
2022-11-19 12:51:28 +00:00
raise HTTPException(404, detail="Map not found")
del query["_id"]
leaderboard = query["Leaderboard"]
2022-11-18 16:51:24 +00:00
# Find user
replayIndex = None
2022-11-19 14:54:01 +00:00
BestTime = None
2022-11-18 16:51:24 +00:00
for i in range(len(leaderboard)):
print(f"DBG: {i}, {leaderboard[i]}\t{leaderboard[i]['UserID'] == int(userID):}")
2022-11-19 14:54:01 +00:00
if leaderboard[i]["UserID"] == int(userID) and (BestTime is None or leaderboard[i]["BestPlaytime"] < BestTime):
2022-11-18 16:51:24 +00:00
replayIndex = i
print(leaderboard[replayIndex])
2022-11-19 12:51:28 +00:00
if replayIndex is not None:
return {"BestMapTime": leaderboard[replayIndex], "Exists": True}
else:
return {"BestMapTime": None, "Exists": False}
2022-11-18 16:51:24 +00:00
@app.get("/api/v1/map/{mapID}/besttime")
async def getBestRecord(mapID: int):
2022-11-19 12:51:28 +00:00
query = db.maps_collection.find_one({"ID": int(mapID)})
2022-11-18 16:51:24 +00:00
BestTime = None
BestPlay = None
2022-11-19 12:51:28 +00:00
for i in query["Leaderboard"]:
if BestTime is None or BestTime > i["BestPlaytime"]:
2022-11-18 16:51:24 +00:00
BestTime = i["BestPlaytime"]
BestPlay = i
return {"BestMapTime": BestPlay, "Exists": True}
2022-11-19 12:51:28 +00:00
2022-11-18 16:51:24 +00:00
@app.post("/api/v1/map/{mapID}/invalidatealltimes")
async def invalidateAllTimes(
2022-11-19 12:51:28 +00:00
mapID: int,
Reason: int = Body(),
CustomReason: str = Body(default=""),
Authorization: Union[str, None] = Header(default=None),
):
2022-11-18 16:51:24 +00:00
"""Removes ALL records from a specific map"""
authcheck = db.auth_check(Authorization)
if not authcheck[0] and authcheck[1] == "nouser":
raise HTTPException(404, detail="User not found")
elif not authcheck[0] and authcheck[1] == "wrongpass":
raise HTTPException(403, detail="Wrong password")
elif authcheck[0]:
userData = types.User(**authcheck[1])
if userData.Admin:
2022-11-19 12:51:28 +00:00
query = db.maps_collection.update_one(
{"ID": mapID}, {"$push": {"Leaderboard": {"$each": [], "$slice": 0}}}
)
db.LogAdminAction(
action_type="invalidateAllTimes",
action_data={
"Reason": Reason,
"CustomReason": CustomReason,
"mapID": mapID,
},
UserID=userData.ID,
)
2022-11-18 16:51:24 +00:00
else:
2022-11-19 12:51:28 +00:00
db.LogAdminAction(
action_type="invalidateAllTimes",
action_data={
"Reason": Reason,
"CustomReason": CustomReason,
"mapID": mapID,
"unauthorized": True,
},
success=False,
)
raise HTTPException(
403,
detail="Attempted to perform an administrator action without permission. This will be reported.",
)
2022-11-18 16:51:24 +00:00
## General
2022-11-19 12:51:28 +00:00
2022-11-18 16:51:24 +00:00
@app.post("/api/v1/reports")
async def reportContent(
2022-11-19 12:51:28 +00:00
Authorization: Union[str, None] = Header(default=None),
user_id: int = Form(),
map_id: Optional[int] = Form(default=None),
report_type: int = Form(),
content: str = Form(),
):
2022-11-18 16:51:24 +00:00
authcheck = db.auth_check(Authorization)
if not authcheck[0] and authcheck[1] == "nouser":
raise HTTPException(404, detail="User not found")
elif not authcheck[0] and authcheck[1] == "wrongpass":
raise HTTPException(403, detail="Wrong password")
elif authcheck[0]:
2022-11-19 12:51:28 +00:00
db.reports_collection.insert_one(
{
"user_id": user_id,
"map_id": map_id,
"report_type": report_type,
"content": content,
}
)
2022-11-18 16:51:24 +00:00
return True
2022-11-19 12:51:28 +00:00
2022-11-18 16:51:24 +00:00
@app.get("/api/v1/featuredlist")
async def featuredlist():
"""Returns the list id of the weekly levels list."""
2022-11-19 12:51:28 +00:00
# FIXME Add playlists
raise HTTPException(404, detail="Not available due to lack of playlist support.")
2022-11-18 16:51:24 +00:00
2022-11-19 12:51:28 +00:00
2022-11-18 16:51:24 +00:00
@app.get("/api/v1/followcheck")
async def followcheck():
2022-12-04 11:54:50 +00:00
"""Check, if creators that the user follows uploaded new levels."""
# FIXME: Stub
2022-12-17 18:10:58 +00:00
return 0
2022-11-19 12:51:28 +00:00
2022-11-18 16:51:24 +00:00
def start():
"""Launched with `poetry run start` at root level"""
2022-12-17 18:10:58 +00:00
from . import __main__
__main__.server.run()
# uvicorn.run("customiwmserver.main:app", host="0.0.0.0", port=8001, reload=True)