712 lines
25 KiB
Python
712 lines
25 KiB
Python
from datetime import datetime
|
|
from fastapi import FastAPI, Form, File, UploadFile, Header, HTTPException, Body
|
|
from fastapi.responses import PlainTextResponse
|
|
from fastapi.exceptions import RequestValidationError
|
|
from pydantic import BaseModel
|
|
from starlette.exceptions import HTTPException as StarletteHTTPException
|
|
|
|
from typing import Union, Optional, Any
|
|
import uvicorn
|
|
from . import data_types as types
|
|
from . import database as db
|
|
from . import hook_system
|
|
from . import hooks
|
|
from . import object_storage
|
|
import pymongo
|
|
|
|
app = FastAPI()
|
|
hook = hook_system.hook
|
|
|
|
|
|
@app.exception_handler(RequestValidationError)
|
|
async def http_exception_handler(request, exc):
|
|
return PlainTextResponse(f"{str(exc)}", status_code=422)
|
|
|
|
|
|
@app.exception_handler(StarletteHTTPException)
|
|
async def http_exception_handler(request, exc):
|
|
return PlainTextResponse(
|
|
f"[{exc.status_code}] {str(exc.detail)}", status_code=exc.status_code
|
|
)
|
|
|
|
## Users
|
|
|
|
|
|
@app.post("/api/v1/login")
|
|
async def login(username: str = Form(), password: str = Form(), version: str = Form()):
|
|
"""User login"""
|
|
hook.execute_hooks("player_login", username=username)
|
|
auth = db.auth_check(username + ":" + password)
|
|
if not auth[0]:
|
|
raise HTTPException(403, detail="Wrong username or password.")
|
|
else:
|
|
return {"token": username + ":" + password, "userId": auth[1]["ID"]}
|
|
|
|
|
|
@app.put("/api/v1/user")
|
|
async def create_user(
|
|
username: str = Form(),
|
|
password: str = Form(),
|
|
email: str = Form(),
|
|
version: str = Form(),
|
|
):
|
|
"""Create a user with specified credentials"""
|
|
token = username + ":" + password
|
|
|
|
UserID = len(list(db.user_collection.find({})))
|
|
if UserID == 0:
|
|
UserID = 1
|
|
|
|
insert_data = {
|
|
**types.User(Username=username, Email=email, ID=UserID).dict(),
|
|
"Password": password,
|
|
}
|
|
|
|
db.user_collection.insert_one(insert_data)
|
|
return {"token": token, "userId": UserID}
|
|
|
|
|
|
@app.get("/api/v1/notifunread")
|
|
async def notifunread():
|
|
"""Returns the number of unread notifications."""
|
|
# FIXME: Add notifications
|
|
return 0
|
|
|
|
@app.post("/api/v1/user/{userID}")
|
|
async def update_user_profile(
|
|
userID: int,
|
|
Authorization: Union[str, None] = Header(default=None),
|
|
ID: int = Body(),
|
|
Country: int = Body(),
|
|
PantsColor: int = Body(),
|
|
HatSpr: int = Body(),
|
|
HairColor: int = Body(),
|
|
CapeColor: int = Body(),
|
|
ShoesColor: int = Body(),
|
|
ShirtColor: int = Body(),
|
|
SkinColor: int = Body(),
|
|
):
|
|
authcheck = db.auth_check(Authorization)
|
|
if not authcheck[0]:
|
|
raise HTTPException(403, detail="Wrong username or password")
|
|
elif authcheck[0]:
|
|
userData = types.User(**authcheck[1])
|
|
userData.Country = Country
|
|
userData.PantsColor = PantsColor
|
|
userData.HatSpr = HatSpr
|
|
userData.HairColor = HairColor
|
|
userData.CapeColor = CapeColor
|
|
userData.ShoesColor = ShoesColor
|
|
userData.ShirtColor = ShirtColor
|
|
userData.SkinColor = SkinColor
|
|
userData.UpdatedAt = datetime.strftime(datetime.now(), "%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
db.user_collection.update_one({"ID":userID}, {"$set": {**userData.dict()} })
|
|
hook.execute_hooks("player_update", userID=userID)
|
|
return userData.dict()
|
|
return HTTPException(500, detail="Server failed to handle login (somehow)")
|
|
|
|
|
|
@app.get("/api/v1/refresh")
|
|
async def refresh_login(Authorization: Union[str, None] = Header(default=None)):
|
|
"""Intended for refreshing user token."""
|
|
return {"token": Authorization}
|
|
|
|
|
|
@app.get("/api/v1/user/{user_id}")
|
|
async def get_user(user_id: int):
|
|
"""Returns specified user's profile."""
|
|
query = db.user_collection.find_one({"ID": user_id})
|
|
del query["Password"]
|
|
return types.User(**query)
|
|
|
|
|
|
## Maps
|
|
|
|
|
|
@app.get("/api/v1/mapcount")
|
|
async def mapcount():
|
|
return len(list(db.maps_collection.find({})))
|
|
|
|
|
|
@app.get("/api/v1/useruploadcooldown")
|
|
async def useruploadcooldown():
|
|
"""Limits the amount of levels that user can upload."""
|
|
return {"success": True}
|
|
|
|
|
|
@app.get("/api/v1/map")
|
|
async def search_for_maps(
|
|
Authorization: Union[str, None] = Header(default=None),
|
|
start: int = 0,
|
|
limit: int = 5,
|
|
min_diff: float = 0.0,
|
|
max_diff: float = 5.0,
|
|
order: str = '[{ "Dir": "desc", "Name": "created_at" }]',
|
|
name: str = "",
|
|
author: str = "",
|
|
author_id: Optional[int] = None,
|
|
last_x_hours: Optional[int] = None,
|
|
required_tags: str = "",
|
|
disallowed_tags: str = "",
|
|
code: Optional[str] = None,
|
|
admin_show_unlisted: Optional[int] = 0,
|
|
):
|
|
"""Search for maps."""
|
|
if Authorization is not None:
|
|
authcheck = db.auth_check(Authorization)
|
|
if not authcheck[0]:
|
|
raise HTTPException(403,
|
|
detail="Wrong username or password."
|
|
+ "\nIf you're using the API directly, "
|
|
+ "then you can just skip the Authorization")
|
|
userData = types.User(**authcheck[1])
|
|
plays = userData.Plays
|
|
else:
|
|
userData = None
|
|
plays = {}
|
|
|
|
# query = db.maps_collection.find({ "CreatorId": author_id }).limit(limit)
|
|
if code:
|
|
query = list(db.maps_collection.find({"MapCode": code}).skip(start).limit(limit))[0]
|
|
del query["_id"]
|
|
del query["MapData"]
|
|
return [query]
|
|
|
|
query = db.maps_collection.find({}).skip(start).limit(limit)
|
|
entries = []
|
|
|
|
# Convert required_tags and disallowed_tags to a list.
|
|
required_tag_list = required_tags.split(",") if required_tags != "" else []
|
|
disallowed_tag_list = disallowed_tags.split(",") if disallowed_tags != "" else []
|
|
for i in query:
|
|
del i["_id"]
|
|
del i["MapData"]
|
|
del i["Played"]
|
|
del i["Clear"]
|
|
del i["FullClear"]
|
|
|
|
if not i["Listed"] and admin_show_unlisted != 1:
|
|
continue
|
|
|
|
level_tags = i['TagIDs'].split(",")
|
|
CreatedAt = datetime.strptime(i["CreatedAt"], "%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
Played = False
|
|
Clear = False
|
|
FullClear = False
|
|
if plays and str(i["ID"]) in plays:
|
|
play = plays[str(i["ID"])]
|
|
Played = play.Played
|
|
Clear = play.Clear
|
|
FullClear = play.FullClear
|
|
# if userData:
|
|
# # Old system that checked if a user has a record, remove
|
|
# leaderboard = i["Leaderboard"]
|
|
# for record in leaderboard:
|
|
# record = types.MapLeaderboard(**record)
|
|
# if record.UserID == userData.ID:
|
|
# Played = True
|
|
# Clear = True
|
|
# if record.FullClear:
|
|
# FullClear = True
|
|
# break
|
|
# print(f"DBG: {i}, {leaderboard[i]}\t{leaderboard[i]['UserID'] == int(userID):}")
|
|
# if leaderboard[i]["UserID"] == int(userID) and (BestTime is None or leaderboard[i]["BestPlaytime"] < BestTime):
|
|
# replayIndex = i
|
|
|
|
# TODO: Improve tag filtering
|
|
required_tags_included = False if len(required_tag_list) != 0 else True
|
|
disallowed_tags_included = False
|
|
for tag in level_tags:
|
|
if tag in disallowed_tag_list and len(disallowed_tag_list) != 0: disallowed_tags_included = True
|
|
elif tag in required_tag_list and len(required_tag_list) != 0: required_tags_included = True
|
|
else:
|
|
if required_tags_included and not disallowed_tags_included:
|
|
if not last_x_hours or CreatedAt.hour < last_x_hours:
|
|
entries.append(types.Map(Played=Played, Clear=Clear, FullClear=FullClear, **i))
|
|
return entries
|
|
|
|
|
|
@app.get("/api/v1/map/{mapID}")
|
|
async def getMap(mapID: int):
|
|
query = db.maps_collection.find_one({"ID": mapID})
|
|
del query["_id"]
|
|
return types.Map(**query)
|
|
|
|
|
|
@app.post("/api/v1/map/{mapID}/start")
|
|
async def startMap(
|
|
mapID: int,
|
|
Authorization: Union[str, None] = Header(default=None)
|
|
):
|
|
authcheck = db.auth_check(Authorization)
|
|
if not authcheck[0] and authcheck[1]:
|
|
raise HTTPException(403, detail="Wrong username or password.")
|
|
elif authcheck[0]:
|
|
userData = types.User(**authcheck[1])
|
|
query = db.maps_collection.find_one({"ID": mapID})
|
|
del query["_id"]
|
|
Clear = False
|
|
Played = False
|
|
if str(mapID) not in userData.Plays :
|
|
updateQuery = db.user_collection.update_one(
|
|
{"ID": userData.ID},
|
|
{
|
|
"$set": {
|
|
f"Plays.{mapID}": dict(types.MapPlay())
|
|
}
|
|
}
|
|
)
|
|
else:
|
|
play = userData.Plays[str(mapID)]
|
|
Clear = play.Clear
|
|
Played = play.Played
|
|
FullClear = play.FullClear
|
|
|
|
|
|
returned_resp = {
|
|
"BestDeaths": 0,
|
|
"BestPlaytime": 0,
|
|
"Clear": False,
|
|
"CurMap": types.Map(**query),
|
|
"Difficulty": 0,
|
|
"Followed": False,
|
|
"Played": True,
|
|
"Rating": userData.Ratings[str(mapID)] if str(mapID) in userData.Ratings else -1,
|
|
"TagIDs": query["TagIDs"],
|
|
"TagNames": query["TagNames"],
|
|
}
|
|
return returned_resp
|
|
|
|
|
|
@app.post("/api/v1/map/{mapID}/stop")
|
|
async def stopMapPlay(
|
|
mapID: int,
|
|
clear: int = Form(),
|
|
deaths: int = Form(),
|
|
playtime: int = Form(),
|
|
totalDeaths: int = Form(),
|
|
totalTime: int = Form(),
|
|
fullClear: int = Form(),
|
|
replayData: str = Form(),
|
|
Authorization: Union[str, None] = Header(default=None),
|
|
):
|
|
"""Saves the map replay, and informs the user if their play is a record"""
|
|
authcheck = db.auth_check(Authorization)
|
|
if not authcheck[0]:
|
|
raise HTTPException(403, detail="Wrong username or password")
|
|
elif authcheck[0]:
|
|
NewMapRecord = False
|
|
FirstClear = False
|
|
|
|
userData = types.User(**authcheck[1])
|
|
|
|
userUpdateQuery = db.user_collection.update_one(
|
|
{"ID": userData.ID},
|
|
{
|
|
"$set": {
|
|
f"Plays.{mapID}": dict(types.MapPlay(Clear=bool(clear), FullClear=bool(fullClear)))
|
|
}
|
|
}
|
|
)
|
|
|
|
query = db.maps_collection.find_one(
|
|
{"ID": mapID, "Leaderboard.UserID": userData.ID}
|
|
)
|
|
if query is not None:
|
|
del query["_id"]
|
|
|
|
BestUserTime = None
|
|
BestTime = None
|
|
BestFullTime = None
|
|
if query is not None and "Leaderboard" in query:
|
|
for i in query["Leaderboard"]:
|
|
if i["UserID"] == userData.ID:
|
|
if BestUserTime is None or BestUserTime > i["BestPlaytime"]:
|
|
BestUserTime = i["BestPlaytime"]
|
|
if BestTime is None or BestTime > i["BestPlaytime"]:
|
|
BestTime = i["BestPlaytime"]
|
|
elif BestFullTime is None or BestFullTime > i["BestFullTimePlaytime"]:
|
|
BestFullTime = i["BestFullTimePlaytime"]
|
|
|
|
if len(query["Leaderboard"]) <= 1 and i["UserID"] != userData.ID:
|
|
FirstClear = True
|
|
if (BestTime is None or playtime < BestTime) or (BestFullTime is None or playtime < BestFullTime):
|
|
updateQuery = db.maps_collection.update_one(
|
|
{"ID": mapID},
|
|
{"$pull": {"Leaderboard": {"UserID": userData.ID}},
|
|
}
|
|
)
|
|
updateQuery = db.maps_collection.update_one(
|
|
{"ID": mapID},
|
|
{
|
|
"$push": {
|
|
"Leaderboard": types.MapLeaderboard(
|
|
ShoesColor=userData.ShoesColor,
|
|
PantsColor=userData.PantsColor,
|
|
ShirtColor=userData.ShirtColor,
|
|
CapeColor=userData.CapeColor,
|
|
SkinColor=userData.SkinColor,
|
|
HairColor=userData.HairColor,
|
|
HatSpr=userData.HatSpr,
|
|
Country=userData.Country,
|
|
HairSpr=userData.HairSpr,
|
|
HatColor=userData.HatColor,
|
|
HatColorInv=userData.HatColorInv,
|
|
FacialExpression=userData.FacialExpression,
|
|
DeathEffect=userData.DeathEffect,
|
|
GunSpr=userData.GunSpr,
|
|
BulletSpr=userData.BulletSpr,
|
|
SwordSpr=userData.SwordSpr,
|
|
Costume=userData.Costume,
|
|
FollowerSpr=userData.FollowerSpr,
|
|
FollowerColor=userData.FollowerColor,
|
|
SaveEffect=userData.SaveEffect,
|
|
TextSnd=userData.TextSnd,
|
|
BestPlaytime=playtime,
|
|
BestReplay=replayData,
|
|
CreatorName=userData.Username,
|
|
UserID=userData.ID,
|
|
FullClear=bool(fullClear)
|
|
).dict()
|
|
},
|
|
},
|
|
)
|
|
if (BestTime is None or playtime < BestTime) or (BestFullTime is None or playtime < BestFullTime):
|
|
NewMapRecord = True
|
|
|
|
hook.execute_hooks(
|
|
"map_finished",
|
|
user=userData,
|
|
mapID=mapID,
|
|
clear=clear,
|
|
deaths=deaths,
|
|
playtime=playtime,
|
|
fullClear=fullClear,
|
|
FirstClear=FirstClear,
|
|
NewMapRecord=NewMapRecord,
|
|
)
|
|
|
|
return {"FirstClear": FirstClear, "NewMapRecord": NewMapRecord}
|
|
|
|
|
|
@app.put("/api/v1/map")
|
|
async def upload_map(
|
|
Authorization: Union[str, None] = Header(default=None),
|
|
mapName: str = Form(),
|
|
mapDescription: str = Form(default=""),
|
|
mapVersion: int = Form(),
|
|
mapData: str = Form(),
|
|
file: Optional[UploadFile] = None,
|
|
mapReplay: str = Form(),
|
|
deaths: int = Form(),
|
|
playtime: int = Form(),
|
|
totalDeaths: int = Form(),
|
|
totalTime: int = Form(),
|
|
listed: int = Form(),
|
|
requiresCancels: int = Form(),
|
|
hideInChallenges: int = Form(),
|
|
tags: str = Form(default=""),
|
|
rng: int = Form(),
|
|
numSubmaps: int = Form(),
|
|
dragonCoins: int = Form(),
|
|
replayVisibility: int = Form(),
|
|
clientVersion: float = Form(),
|
|
):
|
|
authcheck = db.auth_check(Authorization)
|
|
if not authcheck[0]:
|
|
raise HTTPException(403, detail="Wrong username or password")
|
|
elif authcheck[0]:
|
|
print(authcheck)
|
|
userData = types.User(**authcheck[1])
|
|
ID = len(list(db.maps_collection.find({}))) + 1
|
|
MapCode = db.id_to_mapcode(ID)
|
|
await object_storage.upload_thumbnail(file, ID)
|
|
db.maps_collection.insert_one(
|
|
{
|
|
**types.Map(
|
|
CreatorName=userData.Username,
|
|
CreatorId=userData.ID,
|
|
ID=ID,
|
|
Name=mapName,
|
|
Description=mapDescription,
|
|
Version=mapVersion,
|
|
MapCode=MapCode,
|
|
MapData=mapData,
|
|
Listed=bool(listed),
|
|
HiddenInChallenges=hideInChallenges,
|
|
TagIDs=tags,
|
|
TagNames=",".join(types.convertTagsToNames(tags)),
|
|
ShoesColor=userData.ShoesColor,
|
|
PantsColor=userData.PantsColor,
|
|
ShirtColor=userData.ShirtColor,
|
|
CapeColor=userData.CapeColor,
|
|
SkinColor=userData.SkinColor,
|
|
HairColor=userData.HairColor,
|
|
HatSpr=userData.HatSpr,
|
|
Country=userData.Country,
|
|
HairSpr=userData.HairSpr,
|
|
HatColor=userData.HatColor,
|
|
HatColorInv=userData.HatColorInv,
|
|
FacialExpression=userData.FacialExpression,
|
|
DeathEffect=userData.DeathEffect,
|
|
GunSpr=userData.GunSpr,
|
|
BulletSpr=userData.BulletSpr,
|
|
SwordSpr=userData.SwordSpr,
|
|
Costume=userData.Costume,
|
|
FollowerSpr=userData.FollowerSpr,
|
|
FollowerColor=userData.FollowerColor,
|
|
SaveEffect=userData.SaveEffect,
|
|
TextSnd=userData.TextSnd,
|
|
Leaderboard=[
|
|
types.MapLeaderboard(
|
|
BestPlaytime=playtime,
|
|
BestReplay=mapReplay,
|
|
CreatorName=userData.Username,
|
|
UserID=userData.ID,
|
|
).dict()
|
|
],
|
|
).dict()
|
|
}
|
|
|
|
)
|
|
return {"MapCode": MapCode}
|
|
# raise HTTPException(501)
|
|
|
|
class Rating(BaseModel):
|
|
Rating: int
|
|
|
|
@app.post("/api/v1/map/{mapID}/rating")
|
|
async def rateMap(mapID: int, Rating: Rating, Authorization: Union[str, None] = Header(default=None)):
|
|
authcheck = db.auth_check(Authorization)
|
|
if not authcheck[0] and authcheck[1] == "nouser":
|
|
raise HTTPException(404, detail="User not found")
|
|
elif not authcheck[0] and authcheck[1] == "wrongpass":
|
|
raise HTTPException(403, detail="Wrong password")
|
|
elif authcheck[0]:
|
|
userData = types.User(**authcheck[1])
|
|
|
|
userRating = userData.Ratings[str(mapID)] if str(mapID) in userData.Ratings else 0
|
|
print(userRating, type(userRating))
|
|
|
|
|
|
if Rating.Rating == 5:
|
|
if userRating is not None and userRating == 5:
|
|
raise HTTPException(400, detail="Map already rated! Set rating in request to -1 to unrate.")
|
|
additional = {}
|
|
if userRating == 1:
|
|
additional = {"NumThumbsDown": -1}
|
|
query = db.maps_collection.update_one({"ID": mapID}, {"$inc": {"NumThumbsUp":1, "NumRatings": 1, **additional}})
|
|
userRating = 5
|
|
elif Rating.Rating == 1:
|
|
if userRating is not None and userRating == 1:
|
|
raise HTTPException(400, detail="Map already rated! Set rating in request to -1 to unrate.")
|
|
additional = {}
|
|
if userRating == 5:
|
|
additional = {"NumThumbsUp": -1}
|
|
query = db.maps_collection.update_one({"ID": mapID}, {"$inc": {"NumThumbsDown": 1, "NumRatings": 1, **additional}})
|
|
userRating = 1
|
|
elif Rating.Rating == -1:
|
|
if userRating is None or userRating == -1:
|
|
raise HTTPException(400, detail="Map is not rated!")
|
|
key = ""
|
|
if userRating == 5:
|
|
key = "NumThumbsUp"
|
|
elif userRating == 1:
|
|
key = "NumThumbsDown"
|
|
else:
|
|
raise HTTPException(400, detail=f"Previous rating was not 5 or 1. It was {userRating} [{repr(type(userRating))}]")
|
|
query = db.maps_collection.update_one({"ID": mapID}, {"$inc": {"NumRatings": -1, key: -1}})
|
|
userRating = -1
|
|
query = db.maps_collection.find_one({"ID": mapID})
|
|
|
|
print(userRating, "+")
|
|
userQuery = db.user_collection.update_one({"ID": userData.ID}, {"$set": {f"Ratings.{mapID}": userRating} })
|
|
print(userQuery.raw_result)
|
|
|
|
return {
|
|
"Exists": True,
|
|
"TagIDs": query["TagIDs"],
|
|
"TagNames": query["TagNames"],
|
|
"UserMap": {
|
|
"BestDeaths": 0,
|
|
"BestFullPlaytime": 0,
|
|
"BestFullPlaytimeTime": None,
|
|
"BestPlaytime": 0,
|
|
"BestPlaytimeTime": None,
|
|
"BestReplay": "",
|
|
"Clear": False,
|
|
"Difficulty": 0,
|
|
"FirstClearInvalid": False,
|
|
"FirstClearPlaytime": 0,
|
|
"FirstClearTime": False,
|
|
"FirstDeathTimeValid": False,
|
|
"FirstDeaths": -1,
|
|
"FirstPlayRecorded": True,
|
|
"FirstPlaytime": -1,
|
|
"FullClear": False,
|
|
"MapID": mapID,
|
|
"Played": True,
|
|
"Rating": userRating,
|
|
"UserID": userData.ID
|
|
}
|
|
}
|
|
|
|
@app.get("/api/v1/map/{mapID}/besttimes/{maxEntries}")
|
|
async def getMapLeaderboard(mapID: int, maxEntries: int = 5, full_clear: int = 0):
|
|
"""Returns maxEntries records for the specified level"""
|
|
query = db.maps_collection.find_one({"ID": int(mapID)})
|
|
if not query:
|
|
raise HTTPException(404, detail="Map not found")
|
|
del query["_id"]
|
|
leaderboard = []
|
|
for record in query["Leaderboard"]:
|
|
record = types.MapLeaderboard(**record)
|
|
if record.FullClear and full_clear == 1:
|
|
leaderboard.append(dict(record))
|
|
elif not record.FullClear and full_clear == 0:
|
|
leaderboard.append(dict(record))
|
|
else:
|
|
leaderboard.append(dict(record))
|
|
|
|
return sorted(leaderboard, key=lambda sort: sort["BestPlaytime"])[0:maxEntries]
|
|
|
|
|
|
@app.get("/api/v1/map/{mapID}/userbesttime/{userID}")
|
|
async def getPlayerRecord(mapID, userID):
|
|
"""Returns specific replay"""
|
|
query = db.maps_collection.find_one({"ID": int(mapID)})
|
|
if not query:
|
|
raise HTTPException(404, detail="Map not found")
|
|
del query["_id"]
|
|
leaderboard = query["Leaderboard"]
|
|
|
|
# Find user
|
|
replayIndex = None
|
|
BestTime = None
|
|
for i in range(len(leaderboard)):
|
|
print(f"DBG: {i}, {leaderboard[i]}\t{leaderboard[i]['UserID'] == int(userID):}")
|
|
if leaderboard[i]["UserID"] == int(userID) and (BestTime is None or leaderboard[i]["BestPlaytime"] < BestTime):
|
|
replayIndex = i
|
|
print(leaderboard[replayIndex])
|
|
|
|
if replayIndex is not None:
|
|
return {"BestMapTime": leaderboard[replayIndex], "Exists": True}
|
|
else:
|
|
return {"BestMapTime": None, "Exists": False}
|
|
|
|
|
|
@app.get("/api/v1/map/{mapID}/besttime")
|
|
async def getBestRecord(mapID: int):
|
|
query = db.maps_collection.find_one({"ID": int(mapID)})
|
|
BestTime = None
|
|
BestPlay = None
|
|
for i in query["Leaderboard"]:
|
|
if BestTime is None or BestTime > i["BestPlaytime"]:
|
|
BestTime = i["BestPlaytime"]
|
|
BestPlay = i
|
|
|
|
return {"BestMapTime": BestPlay, "Exists": True}
|
|
|
|
|
|
@app.post("/api/v1/map/{mapID}/invalidatealltimes")
|
|
async def invalidateAllTimes(
|
|
mapID: int,
|
|
Reason: int = Body(),
|
|
CustomReason: str = Body(default=""),
|
|
Authorization: Union[str, None] = Header(default=None),
|
|
):
|
|
"""Removes ALL records from a specific map"""
|
|
|
|
authcheck = db.auth_check(Authorization)
|
|
if not authcheck[0] and authcheck[1] == "nouser":
|
|
raise HTTPException(403, detail="Wrong username or password")
|
|
elif authcheck[0]:
|
|
userData = types.User(**authcheck[1])
|
|
if userData.Admin:
|
|
query = db.maps_collection.update_one(
|
|
{"ID": mapID}, {"$push": {"Leaderboard": {"$each": [], "$slice": 0}}}
|
|
)
|
|
db.LogAdminAction(
|
|
action_type="invalidateAllTimes",
|
|
action_data={
|
|
"Reason": Reason,
|
|
"CustomReason": CustomReason,
|
|
"mapID": mapID,
|
|
},
|
|
UserID=userData.ID,
|
|
)
|
|
else:
|
|
db.LogAdminAction(
|
|
action_type="invalidateAllTimes",
|
|
action_data={
|
|
"Reason": Reason,
|
|
"CustomReason": CustomReason,
|
|
"mapID": mapID,
|
|
"unauthorized": True,
|
|
},
|
|
success=False,
|
|
)
|
|
raise HTTPException(
|
|
403,
|
|
detail="Attempted to perform an administrator action without permission. This will be reported.",
|
|
)
|
|
|
|
|
|
## General
|
|
|
|
|
|
@app.post("/api/v1/reports")
|
|
async def reportContent(
|
|
Authorization: Union[str, None] = Header(default=None),
|
|
user_id: int = Form(),
|
|
map_id: Optional[int] = Form(default=None),
|
|
report_type: int = Form(),
|
|
content: str = Form(),
|
|
):
|
|
|
|
authcheck = db.auth_check(Authorization)
|
|
if not authcheck[0] and authcheck[1] == "nouser":
|
|
raise HTTPException(403, detail="Wrong user or password")
|
|
elif authcheck[0]:
|
|
db.reports_collection.insert_one(
|
|
{
|
|
"user_id": user_id,
|
|
"map_id": map_id,
|
|
"report_type": report_type,
|
|
"content": content,
|
|
}
|
|
)
|
|
return True
|
|
|
|
|
|
@app.get("/api/v1/featuredlist")
|
|
async def featuredlist():
|
|
"""Returns the list id of the weekly levels list."""
|
|
# FIXME Add playlists
|
|
raise HTTPException(404, detail="Not available due to lack of playlist support.")
|
|
|
|
|
|
@app.get("/api/v1/followcheck")
|
|
async def followcheck():
|
|
"""Check, if creators that the user follows uploaded new levels."""
|
|
# FIXME: Stub
|
|
return 0
|
|
|
|
@app.post("/api/v1/unlockachievement", response_class=PlainTextResponse)
|
|
async def unlockachievement(
|
|
Authorization: Union[str, None] = Header(default=None),
|
|
achievementId: int = Form()
|
|
):
|
|
"""
|
|
Adds an achivement to the user
|
|
STUB
|
|
"""
|
|
return "Success"
|
|
|
|
def start():
|
|
"""Launched with `poetry run start` at root level"""
|
|
from . import __main__
|
|
__main__.server.run()
|
|
# uvicorn.run("customiwmserver.main:app", host="0.0.0.0", port=8001, reload=True)
|