CustomIWMServer/customiwmserver/main.py

715 lines
25 KiB
Python

from datetime import datetime
from fastapi import FastAPI, Form, File, UploadFile, Header, HTTPException, Body
from fastapi.responses import PlainTextResponse
from fastapi.exceptions import RequestValidationError
from pydantic import BaseModel
from starlette.exceptions import HTTPException as StarletteHTTPException
from typing import Union, Optional, Any
import uvicorn
from . import data_types as types
from . import database as db
from . import hook_system
from . import hooks
from . import object_storage
import pymongo
from .webui import webui
app = FastAPI()
app.include_router(webui.app)
hook = hook_system.hook
@app.exception_handler(RequestValidationError)
async def http_exception_handler(request, exc):
return PlainTextResponse(f"{str(exc)}", status_code=422)
@app.exception_handler(StarletteHTTPException)
async def http_exception_handler(request, exc):
return PlainTextResponse(
f"[{exc.status_code}] {str(exc.detail)}", status_code=exc.status_code
)
## Users
@app.post("/api/v1/login")
async def login(username: str = Form(), password: str = Form(), version: str = Form()):
"""User login"""
hook.execute_hooks("player_login", username=username)
auth = db.auth_check(username + ":" + password)
if not auth[0]:
raise HTTPException(403, detail="Wrong username or password.")
else:
return {"token": username + ":" + password, "userId": auth[1]["ID"]}
@app.put("/api/v1/user")
async def create_user(
username: str = Form(),
password: str = Form(),
email: str = Form(),
version: str = Form(),
):
"""Create a user with specified credentials"""
token = username + ":" + password
UserID = len(list(db.user_collection.find({})))
if UserID == 0:
UserID = 1
insert_data = {
**types.User(Username=username, Email=email, ID=UserID).dict(),
"Password": password,
}
db.user_collection.insert_one(insert_data)
return {"token": token, "userId": UserID}
@app.get("/api/v1/notifunread")
async def notifunread():
"""Returns the number of unread notifications."""
# FIXME: Add notifications
return 0
@app.post("/api/v1/user/{userID}")
async def update_user_profile(
userID: int,
Authorization: Union[str, None] = Header(default=None),
ID: int = Body(),
Country: int = Body(),
PantsColor: int = Body(),
HatSpr: int = Body(),
HairColor: int = Body(),
CapeColor: int = Body(),
ShoesColor: int = Body(),
ShirtColor: int = Body(),
SkinColor: int = Body(),
):
authcheck = db.auth_check(Authorization)
if not authcheck[0]:
raise HTTPException(403, detail="Wrong username or password")
elif authcheck[0]:
userData = types.User(**authcheck[1])
userData.Country = Country
userData.PantsColor = PantsColor
userData.HatSpr = HatSpr
userData.HairColor = HairColor
userData.CapeColor = CapeColor
userData.ShoesColor = ShoesColor
userData.ShirtColor = ShirtColor
userData.SkinColor = SkinColor
userData.UpdatedAt = datetime.strftime(datetime.now(), "%Y-%m-%dT%H:%M:%SZ")
db.user_collection.update_one({"ID":userID}, {"$set": {**userData.dict()} })
hook.execute_hooks("player_update", userID=userID)
return userData.dict()
return HTTPException(500, detail="Server failed to handle login (somehow)")
@app.get("/api/v1/refresh")
async def refresh_login(Authorization: Union[str, None] = Header(default=None)):
"""Intended for refreshing user token."""
return {"token": Authorization}
@app.get("/api/v1/user/{user_id}")
async def get_user(user_id: int):
"""Returns specified user's profile."""
query = db.user_collection.find_one({"ID": user_id})
del query["Password"]
return types.User(**query)
## Maps
@app.get("/api/v1/mapcount")
async def mapcount():
return len(list(db.maps_collection.find({})))
@app.get("/api/v1/useruploadcooldown")
async def useruploadcooldown():
"""Limits the amount of levels that user can upload."""
return {"success": True}
@app.get("/api/v1/map")
async def search_for_maps(
Authorization: Union[str, None] = Header(default=None),
start: int = 0,
limit: int = 5,
min_diff: float = 0.0,
max_diff: float = 5.0,
order: str = '[{ "Dir": "desc", "Name": "created_at" }]',
name: str = "",
author: str = "",
author_id: Optional[int] = None,
last_x_hours: Optional[int] = None,
required_tags: str = "",
disallowed_tags: str = "",
code: Optional[str] = None,
admin_show_unlisted: Optional[int] = 0,
):
"""Search for maps."""
if Authorization is not None:
authcheck = db.auth_check(Authorization)
if not authcheck[0]:
raise HTTPException(403,
detail="Wrong username or password."
+ "\nIf you're using the API directly, "
+ "then you can just skip the Authorization")
userData = types.User(**authcheck[1])
plays = userData.Plays
else:
userData = None
plays = {}
# query = db.maps_collection.find({ "CreatorId": author_id }).limit(limit)
if code:
query = list(db.maps_collection.find({"MapCode": code}).skip(start).limit(limit))[0]
del query["_id"]
del query["MapData"]
return [query]
query = db.maps_collection.find({}).skip(start).limit(limit)
entries = []
# Convert required_tags and disallowed_tags to a list.
required_tag_list = required_tags.split(",") if required_tags != "" else []
disallowed_tag_list = disallowed_tags.split(",") if disallowed_tags != "" else []
for i in query:
del i["_id"]
del i["MapData"]
del i["Played"]
del i["Clear"]
del i["FullClear"]
if not i["Listed"] and admin_show_unlisted != 1:
continue
level_tags = i['TagIDs'].split(",")
CreatedAt = datetime.strptime(i["CreatedAt"], "%Y-%m-%dT%H:%M:%SZ")
Played = False
Clear = False
FullClear = False
if plays and str(i["ID"]) in plays:
play = plays[str(i["ID"])]
Played = play.Played
Clear = play.Clear
FullClear = play.FullClear
# if userData:
# # Old system that checked if a user has a record, remove
# leaderboard = i["Leaderboard"]
# for record in leaderboard:
# record = types.MapLeaderboard(**record)
# if record.UserID == userData.ID:
# Played = True
# Clear = True
# if record.FullClear:
# FullClear = True
# break
# print(f"DBG: {i}, {leaderboard[i]}\t{leaderboard[i]['UserID'] == int(userID):}")
# if leaderboard[i]["UserID"] == int(userID) and (BestTime is None or leaderboard[i]["BestPlaytime"] < BestTime):
# replayIndex = i
# TODO: Improve tag filtering
required_tags_included = False if len(required_tag_list) != 0 else True
disallowed_tags_included = False
for tag in level_tags:
if tag in disallowed_tag_list and len(disallowed_tag_list) != 0: disallowed_tags_included = True
elif tag in required_tag_list and len(required_tag_list) != 0: required_tags_included = True
else:
if required_tags_included and not disallowed_tags_included:
if not last_x_hours or CreatedAt.hour < last_x_hours:
entries.append(types.Map(Played=Played, Clear=Clear, FullClear=FullClear, **i))
return entries
@app.get("/api/v1/map/{mapID}")
async def getMap(mapID: int):
query = db.maps_collection.find_one({"ID": mapID})
del query["_id"]
return types.Map(**query)
@app.post("/api/v1/map/{mapID}/start")
async def startMap(
mapID: int,
Authorization: Union[str, None] = Header(default=None)
):
authcheck = db.auth_check(Authorization)
if not authcheck[0] and authcheck[1]:
raise HTTPException(403, detail="Wrong username or password.")
elif authcheck[0]:
userData = types.User(**authcheck[1])
query = db.maps_collection.find_one({"ID": mapID})
del query["_id"]
Clear = False
Played = False
if str(mapID) not in userData.Plays :
updateQuery = db.user_collection.update_one(
{"ID": userData.ID},
{
"$set": {
f"Plays.{mapID}": dict(types.MapPlay())
}
}
)
else:
play = userData.Plays[str(mapID)]
Clear = play.Clear
Played = play.Played
FullClear = play.FullClear
returned_resp = {
"BestDeaths": 0,
"BestPlaytime": 0,
"Clear": False,
"CurMap": types.Map(**query),
"Difficulty": 0,
"Followed": False,
"Played": True,
"Rating": userData.Ratings[str(mapID)] if str(mapID) in userData.Ratings else -1,
"TagIDs": query["TagIDs"],
"TagNames": query["TagNames"],
}
return returned_resp
@app.post("/api/v1/map/{mapID}/stop")
async def stopMapPlay(
mapID: int,
clear: int = Form(),
deaths: int = Form(),
playtime: int = Form(),
totalDeaths: int = Form(),
totalTime: int = Form(),
fullClear: int = Form(default=0), # old version compatibility
replayData: str = Form(),
Authorization: Union[str, None] = Header(default=None),
):
"""Saves the map replay, and informs the user if their play is a record"""
authcheck = db.auth_check(Authorization)
if not authcheck[0]:
raise HTTPException(403, detail="Wrong username or password")
elif authcheck[0]:
NewMapRecord = False
FirstClear = False
userData = types.User(**authcheck[1])
userUpdateQuery = db.user_collection.update_one(
{"ID": userData.ID},
{
"$set": {
f"Plays.{mapID}": dict(types.MapPlay(Clear=bool(clear), FullClear=bool(fullClear)))
}
}
)
query = db.maps_collection.find_one(
{"ID": mapID, "Leaderboard.UserID": userData.ID}
)
if query is not None:
del query["_id"]
BestUserTime = None
BestTime = None
BestFullTime = None
if query is not None and "Leaderboard" in query:
for i in query["Leaderboard"]:
if i["UserID"] == userData.ID:
if BestUserTime is None or BestUserTime > i["BestPlaytime"]:
BestUserTime = i["BestPlaytime"]
if BestTime is None or BestTime > i["BestPlaytime"]:
BestTime = i["BestPlaytime"]
elif BestFullTime is None or BestFullTime > i["BestFullTimePlaytime"]:
BestFullTime = i["BestFullTimePlaytime"]
if len(query["Leaderboard"]) <= 1 and i["UserID"] != userData.ID:
FirstClear = True
if (BestTime is None or playtime < BestTime) or (BestFullTime is None or playtime < BestFullTime):
updateQuery = db.maps_collection.update_one(
{"ID": mapID},
{"$pull": {"Leaderboard": {"UserID": userData.ID}},
}
)
updateQuery = db.maps_collection.update_one(
{"ID": mapID},
{
"$push": {
"Leaderboard": types.MapLeaderboard(
ShoesColor=userData.ShoesColor,
PantsColor=userData.PantsColor,
ShirtColor=userData.ShirtColor,
CapeColor=userData.CapeColor,
SkinColor=userData.SkinColor,
HairColor=userData.HairColor,
HatSpr=userData.HatSpr,
Country=userData.Country,
HairSpr=userData.HairSpr,
HatColor=userData.HatColor,
HatColorInv=userData.HatColorInv,
FacialExpression=userData.FacialExpression,
DeathEffect=userData.DeathEffect,
GunSpr=userData.GunSpr,
BulletSpr=userData.BulletSpr,
SwordSpr=userData.SwordSpr,
Costume=userData.Costume,
FollowerSpr=userData.FollowerSpr,
FollowerColor=userData.FollowerColor,
SaveEffect=userData.SaveEffect,
TextSnd=userData.TextSnd,
BestPlaytime=playtime,
BestReplay=replayData,
CreatorName=userData.Username,
UserID=userData.ID,
FullClear=bool(fullClear)
).dict()
},
},
)
if (BestTime is None or playtime < BestTime) or (BestFullTime is None or playtime < BestFullTime):
NewMapRecord = True
hook.execute_hooks(
"map_finished",
user=userData,
mapID=mapID,
clear=clear,
deaths=deaths,
playtime=playtime,
fullClear=fullClear,
FirstClear=FirstClear,
NewMapRecord=NewMapRecord,
)
return {"FirstClear": FirstClear, "NewMapRecord": NewMapRecord}
@app.put("/api/v1/map")
async def upload_map(
Authorization: Union[str, None] = Header(default=None),
mapName: str = Form(),
mapDescription: str = Form(default=""),
mapVersion: int = Form(),
mapData: str = Form(),
file: Optional[UploadFile] = None,
mapReplay: str = Form(),
deaths: int = Form(),
playtime: int = Form(),
totalDeaths: int = Form(),
totalTime: int = Form(),
listed: int = Form(),
requiresCancels: int = Form(),
hideInChallenges: int = Form(),
tags: str = Form(default=""),
rng: int = Form(),
numSubmaps: int = Form(default=0), # Defaults here are set for
dragonCoins: int = Form(default=0), # compatibility with
replayVisibility: int = Form(default=0), # older game versions
clientVersion: float = Form(),
):
authcheck = db.auth_check(Authorization)
if not authcheck[0]:
raise HTTPException(403, detail="Wrong username or password")
elif authcheck[0]:
print(authcheck)
userData = types.User(**authcheck[1])
ID = len(list(db.maps_collection.find({}))) + 1
MapCode = db.id_to_mapcode(ID)
await object_storage.upload_thumbnail(file, ID)
db.maps_collection.insert_one(
{
**types.Map(
CreatorName=userData.Username,
CreatorId=userData.ID,
ID=ID,
Name=mapName,
Description=mapDescription,
Version=mapVersion,
MapCode=MapCode,
MapData=mapData,
Listed=bool(listed),
HiddenInChallenges=hideInChallenges,
TagIDs=tags,
TagNames=",".join(types.convertTagsToNames(tags)),
ShoesColor=userData.ShoesColor,
PantsColor=userData.PantsColor,
ShirtColor=userData.ShirtColor,
CapeColor=userData.CapeColor,
SkinColor=userData.SkinColor,
HairColor=userData.HairColor,
HatSpr=userData.HatSpr,
Country=userData.Country,
HairSpr=userData.HairSpr,
HatColor=userData.HatColor,
HatColorInv=userData.HatColorInv,
FacialExpression=userData.FacialExpression,
DeathEffect=userData.DeathEffect,
GunSpr=userData.GunSpr,
BulletSpr=userData.BulletSpr,
SwordSpr=userData.SwordSpr,
Costume=userData.Costume,
FollowerSpr=userData.FollowerSpr,
FollowerColor=userData.FollowerColor,
SaveEffect=userData.SaveEffect,
TextSnd=userData.TextSnd,
Leaderboard=[
types.MapLeaderboard(
BestPlaytime=playtime,
BestReplay=mapReplay,
CreatorName=userData.Username,
UserID=userData.ID,
).dict()
],
).dict()
}
)
return {"MapCode": MapCode}
# raise HTTPException(501)
class Rating(BaseModel):
Rating: int
@app.post("/api/v1/map/{mapID}/rating")
async def rateMap(mapID: int, Rating: Rating, Authorization: Union[str, None] = Header(default=None)):
authcheck = db.auth_check(Authorization)
if not authcheck[0] and authcheck[1] == "nouser":
raise HTTPException(404, detail="User not found")
elif not authcheck[0] and authcheck[1] == "wrongpass":
raise HTTPException(403, detail="Wrong password")
elif authcheck[0]:
userData = types.User(**authcheck[1])
userRating = userData.Ratings[str(mapID)] if str(mapID) in userData.Ratings else 0
print(userRating, type(userRating))
if Rating.Rating == 5:
if userRating is not None and userRating == 5:
raise HTTPException(400, detail="Map already rated! Set rating in request to -1 to unrate.")
additional = {}
if userRating == 1:
additional = {"NumThumbsDown": -1}
query = db.maps_collection.update_one({"ID": mapID}, {"$inc": {"NumThumbsUp":1, "NumRatings": 1, **additional}})
userRating = 5
elif Rating.Rating == 1:
if userRating is not None and userRating == 1:
raise HTTPException(400, detail="Map already rated! Set rating in request to -1 to unrate.")
additional = {}
if userRating == 5:
additional = {"NumThumbsUp": -1}
query = db.maps_collection.update_one({"ID": mapID}, {"$inc": {"NumThumbsDown": 1, "NumRatings": 1, **additional}})
userRating = 1
elif Rating.Rating == -1:
if userRating is None or userRating == -1:
raise HTTPException(400, detail="Map is not rated!")
key = ""
if userRating == 5:
key = "NumThumbsUp"
elif userRating == 1:
key = "NumThumbsDown"
else:
raise HTTPException(400, detail=f"Previous rating was not 5 or 1. It was {userRating} [{repr(type(userRating))}]")
query = db.maps_collection.update_one({"ID": mapID}, {"$inc": {"NumRatings": -1, key: -1}})
userRating = -1
query = db.maps_collection.find_one({"ID": mapID})
print(userRating, "+")
userQuery = db.user_collection.update_one({"ID": userData.ID}, {"$set": {f"Ratings.{mapID}": userRating} })
print(userQuery.raw_result)
return {
"Exists": True,
"TagIDs": query["TagIDs"],
"TagNames": query["TagNames"],
"UserMap": {
"BestDeaths": 0,
"BestFullPlaytime": 0,
"BestFullPlaytimeTime": None,
"BestPlaytime": 0,
"BestPlaytimeTime": None,
"BestReplay": "",
"Clear": False,
"Difficulty": 0,
"FirstClearInvalid": False,
"FirstClearPlaytime": 0,
"FirstClearTime": False,
"FirstDeathTimeValid": False,
"FirstDeaths": -1,
"FirstPlayRecorded": True,
"FirstPlaytime": -1,
"FullClear": False,
"MapID": mapID,
"Played": True,
"Rating": userRating,
"UserID": userData.ID
}
}
@app.get("/api/v1/map/{mapID}/besttimes/{maxEntries}")
async def getMapLeaderboard(mapID: int, maxEntries: int = 5, full_clear: int = 0):
"""Returns maxEntries records for the specified level"""
query = db.maps_collection.find_one({"ID": int(mapID)})
if not query:
raise HTTPException(404, detail="Map not found")
del query["_id"]
leaderboard = []
for record in query["Leaderboard"]:
record = types.MapLeaderboard(**record)
if record.FullClear and full_clear == 1:
leaderboard.append(dict(record))
elif not record.FullClear and full_clear == 0:
leaderboard.append(dict(record))
else:
leaderboard.append(dict(record))
return sorted(leaderboard, key=lambda sort: sort["BestPlaytime"])[0:maxEntries]
@app.get("/api/v1/map/{mapID}/userbesttime/{userID}")
async def getPlayerRecord(mapID, userID):
"""Returns specific replay"""
query = db.maps_collection.find_one({"ID": int(mapID)})
if not query:
raise HTTPException(404, detail="Map not found")
del query["_id"]
leaderboard = query["Leaderboard"]
# Find user
replayIndex = None
BestTime = None
for i in range(len(leaderboard)):
print(f"DBG: {i}, {leaderboard[i]}\t{leaderboard[i]['UserID'] == int(userID):}")
if leaderboard[i]["UserID"] == int(userID) and (BestTime is None or leaderboard[i]["BestPlaytime"] < BestTime):
replayIndex = i
print(leaderboard[replayIndex])
if replayIndex is not None:
return {"BestMapTime": leaderboard[replayIndex], "Exists": True}
else:
return {"BestMapTime": None, "Exists": False}
@app.get("/api/v1/map/{mapID}/besttime")
async def getBestRecord(mapID: int):
query = db.maps_collection.find_one({"ID": int(mapID)})
BestTime = None
BestPlay = None
for i in query["Leaderboard"]:
if BestTime is None or BestTime > i["BestPlaytime"]:
BestTime = i["BestPlaytime"]
BestPlay = i
return {"BestMapTime": BestPlay, "Exists": True}
@app.post("/api/v1/map/{mapID}/invalidatealltimes")
async def invalidateAllTimes(
mapID: int,
Reason: int = Body(),
CustomReason: str = Body(default=""),
Authorization: Union[str, None] = Header(default=None),
):
"""Removes ALL records from a specific map"""
authcheck = db.auth_check(Authorization)
if not authcheck[0] and authcheck[1] == "nouser":
raise HTTPException(403, detail="Wrong username or password")
elif authcheck[0]:
userData = types.User(**authcheck[1])
if userData.Admin:
query = db.maps_collection.update_one(
{"ID": mapID}, {"$push": {"Leaderboard": {"$each": [], "$slice": 0}}}
)
db.LogAdminAction(
action_type="invalidateAllTimes",
action_data={
"Reason": Reason,
"CustomReason": CustomReason,
"mapID": mapID,
},
UserID=userData.ID,
)
else:
db.LogAdminAction(
action_type="invalidateAllTimes",
action_data={
"Reason": Reason,
"CustomReason": CustomReason,
"mapID": mapID,
"unauthorized": True,
},
success=False,
)
raise HTTPException(
403,
detail="Attempted to perform an administrator action without permission. This will be reported.",
)
## General
@app.post("/api/v1/reports")
async def reportContent(
Authorization: Union[str, None] = Header(default=None),
user_id: int = Form(),
map_id: Optional[int] = Form(default=None),
report_type: int = Form(),
content: str = Form(),
):
authcheck = db.auth_check(Authorization)
if not authcheck[0] and authcheck[1] == "nouser":
raise HTTPException(403, detail="Wrong user or password")
elif authcheck[0]:
db.reports_collection.insert_one(
{
"user_id": user_id,
"map_id": map_id,
"report_type": report_type,
"content": content,
}
)
return True
@app.get("/api/v1/featuredlist")
async def featuredlist():
"""Returns the list id of the weekly levels list."""
# FIXME Add playlists
raise HTTPException(404, detail="Not available due to lack of playlist support.")
@app.get("/api/v1/followcheck")
async def followcheck():
"""Check, if creators that the user follows uploaded new levels."""
# FIXME: Stub
return 0
@app.post("/api/v1/unlockachievement", response_class=PlainTextResponse)
async def unlockachievement(
Authorization: Union[str, None] = Header(default=None),
achievementId: int = Form()
):
"""
Adds an achivement to the user
STUB
"""
return "Success"
def start():
"""Launched with `poetry run start` at root level"""
from . import __main__
__main__.server.run()
# uvicorn.run("customiwmserver.main:app", host="0.0.0.0", port=8001, reload=True)