from datetime import datetime from fastapi import FastAPI, Form, File, UploadFile, Header, HTTPException, Body from fastapi.responses import PlainTextResponse from fastapi.exceptions import RequestValidationError from pydantic import BaseModel from starlette.exceptions import HTTPException as StarletteHTTPException from typing import Union, Optional, Any import uvicorn from . import data_types as types from . import database as db from . import hook_system from . import hooks import pymongo app = FastAPI() hook = hook_system.hook @app.exception_handler(RequestValidationError) async def http_exception_handler(request, exc): return PlainTextResponse(f"{str(exc)}", status_code=422) @app.exception_handler(StarletteHTTPException) async def http_exception_handler(request, exc): return PlainTextResponse( f"[{exc.status_code}] {str(exc.detail)}", status_code=exc.status_code ) ## Users @app.post("/api/v1/login") async def login(username: str = Form(), password: str = Form(), version: str = Form()): """User login""" hook.execute_hooks("player_login", username=username) auth = db.auth_check(username + ":" + password) if not auth[0]: if auth[1] == "nouser": raise HTTPException(404, detail="User not found.") elif auth[1] == "wrongpass": raise HTTPException(403, detail="Password is incorrect.") else: return {"token": username + ":" + password, "userId": auth[1]["ID"]} @app.put("/api/v1/user") async def create_user( username: str = Form(), password: str = Form(), email: str = Form(), version: str = Form(), ): """Create a user with specified credentials""" token = username + ":" + password UserID = len(list(db.user_collection.find({}))) if UserID == 0: UserID = 1 insert_data = { **types.User(Username=username, Email=email, ID=UserID).dict(), "Password": password, } db.user_collection.insert_one(insert_data) return {"token": token, "userId": UserID} @app.get("/api/v1/notifunread") async def notifunread(): """Returns the number of unread notifications.""" # FIXME Add notifications return 0 @app.get("/api/v1/refresh") async def refresh_login(Authorization: Union[str, None] = Header(default=None)): """Intended for refreshing user token.""" return {"token": Authorization} @app.get("/api/v1/user/{user_id}") async def get_user(user_id: int): """Returns specified user's profile.""" query = db.user_collection.find_one({"ID": user_id}) del query["Password"] return types.User(**query) ## Maps @app.get("/api/v1/mapcount") async def mapcount(): return len(list(db.maps_collection.find({}))) @app.get("/api/v1/useruploadcooldown") async def useruploadcooldown(): """Limits the amount of levels that user can upload.""" return {"success": True} @app.get("/api/v1/map") async def search_for_maps( start: int = 0, limit: int = 5, min_diff: float = 0.0, max_diff: float = 5.0, order: str = '[{ "Dir": "desc", "Name": "created_at" }]', name: str = "", author: str = "", author_id: Optional[int] = None, last_x_hours: Optional[int] = None, required_tags: str = "", disallowed_tags: str = "", code: Optional[str] = None, admin_show_unlisted: Optional[int] = 0, ): """Search for maps.""" # query = db.maps_collection.find({ "CreatorId": author_id }).limit(limit) if code: query = list(db.maps_collection.find({"MapCode": code}).skip(start).limit(limit))[0] del query["_id"] del query["MapData"] return [query] query = db.maps_collection.find({}).skip(start).limit(limit) entries = [] # Convert required_tags and disallowed_tags to a list. required_tag_list = required_tags.split(",") if required_tags != "" else [] disallowed_tag_list = disallowed_tags.split(",") if disallowed_tags != "" else [] for i in query: del i["_id"] del i["MapData"] if not i["Listed"] and admin_show_unlisted != 1: continue level_tags = i['TagIDs'].split(",") CreatedAt = datetime.strptime(i["CreatedAt"], "%Y-%m-%dT%H:%M:%SZ") # TODO: Improve tag filtering required_tags_included = False if len(required_tag_list) != 0 else True disallowed_tags_included = False for tag in level_tags: if tag in disallowed_tag_list and len(disallowed_tag_list) != 0: disallowed_tags_included = True elif tag in required_tag_list and len(required_tag_list) != 0: required_tags_included = True else: if required_tags_included and not disallowed_tags_included: if not last_x_hours or CreatedAt.hour < last_x_hours: entries.append(i) return entries @app.get("/api/v1/map/{mapID}") async def getMap(mapID: int): query = db.maps_collection.find_one({"ID": mapID}) del query["_id"] return query @app.post("/api/v1/map/{mapID}/start") async def startMap( mapID: int, Authorization: Union[str, None] = Header(default=None) ): authcheck = db.auth_check(Authorization) if not authcheck[0] and authcheck[1] == "nouser": raise HTTPException(404, detail="User not found") elif not authcheck[0] and authcheck[1] == "wrongpass": raise HTTPException(403, detail="Wrong password") elif authcheck[0]: userData = types.User(**authcheck[1]) query = db.maps_collection.find_one({"ID": mapID}) del query["_id"] returned_resp = { "BestDeaths": 0, "BestPlaytime": 0, "Clear": False, "CurMap": query, "Difficulty": 0, "Followed": False, "Played": True, "Rating": userData.Ratings[str(mapID)] if str(mapID) in userData.Ratings else -1, "TagIDs": query["TagIDs"], "TagNames": query["TagNames"], } return returned_resp @app.post("/api/v1/map/{mapID}/stop") async def stopMapPlay( mapID: int, clear: int = Form(), deaths: int = Form(), playtime: int = Form(), totalDeaths: int = Form(), totalTime: int = Form(), replayData: str = Form(), Authorization: Union[str, None] = Header(default=None), ): """Saves the map replay, and informs the user if their play is a record""" authcheck = db.auth_check(Authorization) if not authcheck[0] and authcheck[1] == "nouser": raise HTTPException(404, detail="User not found") elif not authcheck[0] and authcheck[1] == "wrongpass": raise HTTPException(403, detail="Wrong password") elif authcheck[0]: NewMapRecord = False FirstClear = False userData = types.User(**authcheck[1]) query = db.maps_collection.find_one( {"ID": mapID, "Leaderboard.UserID": userData.ID} ) if query is not None: del query["_id"] BestUserTime = None BestTime = None if query is not None and "Leaderboard" in query: for i in query["Leaderboard"]: if i["UserID"] == userData.ID: if BestUserTime is None or BestUserTime > i["BestPlaytime"]: BestUserTime = i["BestPlaytime"] if BestTime is None or BestTime > i["BestPlaytime"]: BestTime = i["BestPlaytime"] if len(query["Leaderboard"]) <= 1 and i["UserID"] != userData.ID: FirstClear = True if BestUserTime is None or playtime < BestUserTime: updateQuery = db.maps_collection.update_one( {"ID": mapID}, {"$pull": {"Leaderboard": {"UserID": userData.ID}}, } ) updateQuery = db.maps_collection.update_one( {"ID": mapID}, { "$push": { "Leaderboard": types.MapLeaderboard( ShoesColor=userData.ShoesColor, PantsColor=userData.PantsColor, ShirtColor=userData.ShirtColor, CapeColor=userData.CapeColor, SkinColor=userData.SkinColor, HairColor=userData.HairColor, HatSpr=userData.HatSpr, Country=userData.Country, HairSpr=userData.HairSpr, HatColor=userData.HatColor, HatColorInv=userData.HatColorInv, FacialExpression=userData.FacialExpression, DeathEffect=userData.DeathEffect, GunSpr=userData.GunSpr, BulletSpr=userData.BulletSpr, SwordSpr=userData.SwordSpr, Costume=userData.Costume, FollowerSpr=userData.FollowerSpr, FollowerColor=userData.FollowerColor, SaveEffect=userData.SaveEffect, TextSnd=userData.TextSnd, BestPlaytime=playtime, BestReplay=replayData, CreatorName=userData.Username, UserID=userData.ID, ).dict() }, }, ) if BestTime is None or playtime < BestTime: NewMapRecord = True hook.execute_hooks( "map_finished", user=userData, mapID=mapID, clear=clear, deaths=deaths, playtime=playtime, FirstClear=FirstClear, NewMapRecord=NewMapRecord, ) return {"FirstClear": FirstClear, "NewMapRecord": NewMapRecord} @app.put("/api/v1/map") async def upload_map( Authorization: Union[str, None] = Header(default=None), mapName: str = Form(), mapDescription: str = Form(default=""), mapVersion: int = Form(), mapData: str = Form(), file: Optional[UploadFile] = None, mapReplay: str = Form(), deaths: int = Form(), playtime: int = Form(), totalDeaths: int = Form(), totalTime: int = Form(), listed: int = Form(), requiresCancels: int = Form(), hideInChallenges: int = Form(), tags: str = Form(), rng: int = Form(), clientVersion: float = Form(), ): authcheck = db.auth_check(Authorization) if not authcheck[0] and authcheck[1] == "nouser": raise HTTPException(404, detail="User not found") elif not authcheck[0] and authcheck[1] == "wrongpass": raise HTTPException(403, detail="Wrong password") elif authcheck[0]: print(authcheck) userData = types.User(**authcheck[1]) ID = len(list(db.maps_collection.find({}))) + 1 MapCode = db.id_to_mapcode(ID) db.maps_collection.insert_one( { **types.Map( CreatorName=userData.Username, CreatorId=userData.ID, ID=ID, Name=mapName, Description=mapDescription, Version=mapVersion, MapCode=MapCode, MapData=mapData, Listed=bool(listed), HiddenInChallenges=hideInChallenges, TagIDs=tags, TagNames=",".join(types.convertTagsToNames(tags)), ShoesColor=userData.ShoesColor, PantsColor=userData.PantsColor, ShirtColor=userData.ShirtColor, CapeColor=userData.CapeColor, SkinColor=userData.SkinColor, HairColor=userData.HairColor, HatSpr=userData.HatSpr, Country=userData.Country, HairSpr=userData.HairSpr, HatColor=userData.HatColor, HatColorInv=userData.HatColorInv, FacialExpression=userData.FacialExpression, DeathEffect=userData.DeathEffect, GunSpr=userData.GunSpr, BulletSpr=userData.BulletSpr, SwordSpr=userData.SwordSpr, Costume=userData.Costume, FollowerSpr=userData.FollowerSpr, FollowerColor=userData.FollowerColor, SaveEffect=userData.SaveEffect, TextSnd=userData.TextSnd, Leaderboard=[ types.MapLeaderboard( BestPlaytime=playtime, BestReplay=mapReplay, CreatorName=userData.Username, UserID=userData.ID, ).dict() ], ).dict() } ) return {"MapCode": MapCode} # raise HTTPException(501) class Rating(BaseModel): Rating: int @app.post("/api/v1/map/{mapID}/rating") async def rateMap(mapID: int, Rating: Rating, Authorization: Union[str, None] = Header(default=None)): authcheck = db.auth_check(Authorization) if not authcheck[0] and authcheck[1] == "nouser": raise HTTPException(404, detail="User not found") elif not authcheck[0] and authcheck[1] == "wrongpass": raise HTTPException(403, detail="Wrong password") elif authcheck[0]: userData = types.User(**authcheck[1]) userRating = userData.Ratings[str(mapID)] if str(mapID) in userData.Ratings else 0 print(userRating, type(userRating)) if Rating.Rating == 5: if userRating is not None and userRating == 5: raise HTTPException(400, detail="Map already rated! Set rating in request to -1 to unrate.") additional = {} if userRating == 1: additional = {"NumThumbsDown": -1} query = db.maps_collection.update_one({"ID": mapID}, {"$inc": {"NumThumbsUp":1, "NumRatings": 1, **additional}}) userRating = 5 elif Rating.Rating == 1: if userRating is not None and userRating == 1: raise HTTPException(400, detail="Map already rated! Set rating in request to -1 to unrate.") additional = {} if userRating == 5: additional = {"NumThumbsUp": -1} query = db.maps_collection.update_one({"ID": mapID}, {"$inc": {"NumThumbsDown": 1, "NumRatings": 1, **additional}}) userRating = 1 elif Rating.Rating == -1: if userRating is None or userRating == -1: raise HTTPException(400, detail="Map is not rated!") key = "" if userRating == 5: key = "NumThumbsUp" elif userRating == 1: key = "NumThumbsDown" else: raise HTTPException(400, detail=f"Previous rating was not 5 or 1. It was {userRating} [{repr(type(userRating))}]") query = db.maps_collection.update_one({"ID": mapID}, {"$inc": {"NumRatings": -1, key: -1}}) userRating = -1 query = db.maps_collection.find_one({"ID": mapID}) print(userRating, "+") userQuery = db.user_collection.update_one({"ID": userData.ID}, {"$set": {f"Ratings.{mapID}": userRating} }) print(userQuery.raw_result) return { "Exists": True, "TagIDs": query["TagIDs"], "TagNames": query["TagNames"], "UserMap": { "BestDeaths": 0, "BestFullPlaytime": 0, "BestFullPlaytimeTime": None, "BestPlaytime": 0, "BestPlaytimeTime": None, "BestReplay": "", "Clear": False, "Difficulty": 0, "FirstClearInvalid": False, "FirstClearPlaytime": 0, "FirstClearTime": False, "FirstDeathTimeValid": False, "FirstDeaths": -1, "FirstPlayRecorded": True, "FirstPlaytime": -1, "FullClear": False, "MapID": mapID, "Played": True, "Rating": userRating, "UserID": userData.ID } } @app.get("/api/v1/map/{mapID}/besttimes/{maxEntries}") async def getMapLeaderboard(mapID: int, maxEntries: int = 5): """Returns maxEntries records for the specified level""" query = db.maps_collection.find_one({"ID": int(mapID)}) if not query: raise HTTPException(404, detail="Map not found") del query["_id"] leaderboard = query["Leaderboard"] return sorted(leaderboard, key=lambda sort: sort["BestPlaytime"])[0:maxEntries] @app.get("/api/v1/map/{mapID}/userbesttime/{userID}") async def getPlayerRecord(mapID, userID): """Returns specific replay""" query = db.maps_collection.find_one({"ID": int(mapID)}) if not query: raise HTTPException(404, detail="Map not found") del query["_id"] leaderboard = query["Leaderboard"] # Find user replayIndex = None BestTime = None for i in range(len(leaderboard)): print(f"DBG: {i}, {leaderboard[i]}\t{leaderboard[i]['UserID'] == int(userID):}") if leaderboard[i]["UserID"] == int(userID) and (BestTime is None or leaderboard[i]["BestPlaytime"] < BestTime): replayIndex = i print(leaderboard[replayIndex]) if replayIndex is not None: return {"BestMapTime": leaderboard[replayIndex], "Exists": True} else: return {"BestMapTime": None, "Exists": False} @app.get("/api/v1/map/{mapID}/besttime") async def getBestRecord(mapID: int): query = db.maps_collection.find_one({"ID": int(mapID)}) BestTime = None BestPlay = None for i in query["Leaderboard"]: if BestTime is None or BestTime > i["BestPlaytime"]: BestTime = i["BestPlaytime"] BestPlay = i return {"BestMapTime": BestPlay, "Exists": True} @app.post("/api/v1/map/{mapID}/invalidatealltimes") async def invalidateAllTimes( mapID: int, Reason: int = Body(), CustomReason: str = Body(default=""), Authorization: Union[str, None] = Header(default=None), ): """Removes ALL records from a specific map""" authcheck = db.auth_check(Authorization) if not authcheck[0] and authcheck[1] == "nouser": raise HTTPException(404, detail="User not found") elif not authcheck[0] and authcheck[1] == "wrongpass": raise HTTPException(403, detail="Wrong password") elif authcheck[0]: userData = types.User(**authcheck[1]) if userData.Admin: query = db.maps_collection.update_one( {"ID": mapID}, {"$push": {"Leaderboard": {"$each": [], "$slice": 0}}} ) db.LogAdminAction( action_type="invalidateAllTimes", action_data={ "Reason": Reason, "CustomReason": CustomReason, "mapID": mapID, }, UserID=userData.ID, ) else: db.LogAdminAction( action_type="invalidateAllTimes", action_data={ "Reason": Reason, "CustomReason": CustomReason, "mapID": mapID, "unauthorized": True, }, success=False, ) raise HTTPException( 403, detail="Attempted to perform an administrator action without permission. This will be reported.", ) ## General @app.post("/api/v1/reports") async def reportContent( Authorization: Union[str, None] = Header(default=None), user_id: int = Form(), map_id: Optional[int] = Form(default=None), report_type: int = Form(), content: str = Form(), ): authcheck = db.auth_check(Authorization) if not authcheck[0] and authcheck[1] == "nouser": raise HTTPException(404, detail="User not found") elif not authcheck[0] and authcheck[1] == "wrongpass": raise HTTPException(403, detail="Wrong password") elif authcheck[0]: db.reports_collection.insert_one( { "user_id": user_id, "map_id": map_id, "report_type": report_type, "content": content, } ) return True @app.get("/api/v1/featuredlist") async def featuredlist(): """Returns the list id of the weekly levels list.""" # FIXME Add playlists raise HTTPException(404, detail="Not available due to lack of playlist support.") @app.get("/api/v1/followcheck") async def followcheck(): """Check, if creators that the user follows uploaded new levels.""" # FIXME: Stub raise HTTPException(404, detail="Not available due to follows not being implemented.") def start(): """Launched with `poetry run start` at root level""" uvicorn.run("customiwmserver.main:app", host="0.0.0.0", port=8001, reload=True)