114 lines
3.6 KiB
Python
114 lines
3.6 KiB
Python
import asyncio
|
|
import logging
|
|
import uuid
|
|
from contextlib import asynccontextmanager
|
|
from datetime import datetime
|
|
|
|
from sqlalchemy.orm import Session
|
|
from fastapi import FastAPI, Depends, HTTPException, status
|
|
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from pydantic import BaseModel
|
|
|
|
from database import get_db
|
|
from database_functions import fill_card_pool, check_boosters, BOOSTER_MAX
|
|
from models import Card as CardModel
|
|
from models import User as UserModel
|
|
from auth import hash_password, verify_password, create_access_token, decode_access_token
|
|
|
|
logger = logging.getLogger("app")
|
|
|
|
# Auth
|
|
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login")
|
|
|
|
class RegisterRequest(BaseModel):
|
|
username: str
|
|
email: str
|
|
password: str
|
|
|
|
def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)) -> UserModel:
|
|
user_id = decode_access_token(token)
|
|
if not user_id:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
|
|
user = db.query(UserModel).filter(UserModel.id == uuid.UUID(user_id)).first()
|
|
if not user:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found")
|
|
return user
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
asyncio.create_task(fill_card_pool())
|
|
yield
|
|
|
|
app = FastAPI(lifespan=lifespan)
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["http://localhost:5173"], # SvelteKit's default dev port
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
@app.post("/register")
|
|
def register(req: RegisterRequest, db: Session = Depends(get_db)):
|
|
if db.query(UserModel).filter(UserModel.username == req.username).first():
|
|
raise HTTPException(status_code=400, detail="Username already taken")
|
|
if db.query(UserModel).filter(UserModel.email == req.email).first():
|
|
raise HTTPException(status_code=400, detail="Email already registered")
|
|
user = UserModel(
|
|
id=uuid.uuid4(),
|
|
username=req.username,
|
|
email=req.email,
|
|
password_hash=hash_password(req.password),
|
|
)
|
|
db.add(user)
|
|
db.commit()
|
|
return {"message": "User created"}
|
|
|
|
@app.post("/login")
|
|
def login(form: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
|
|
user = db.query(UserModel).filter(UserModel.username == form.username).first()
|
|
if not user or not verify_password(form.password, user.password_hash):
|
|
raise HTTPException(status_code=400, detail="Invalid username or password")
|
|
token = create_access_token(str(user.id))
|
|
return {"access_token": token, "token_type": "bearer"}
|
|
|
|
@app.get("/boosters")
|
|
def get_boosters(user: UserModel = Depends(get_current_user), db: Session = Depends(get_db)) -> tuple[int,datetime|None]:
|
|
return check_boosters(user, db)
|
|
|
|
@app.post("/open_pack")
|
|
async def open_pack(user: UserModel = Depends(get_current_user), db: Session = Depends(get_db)):
|
|
check_boosters(user, db)
|
|
|
|
if user.boosters == 0:
|
|
raise HTTPException(status_code=400, detail="No booster packs available")
|
|
|
|
cards = (
|
|
db.query(CardModel)
|
|
.filter(CardModel.user_id == None)
|
|
.limit(5)
|
|
.all()
|
|
)
|
|
|
|
if len(cards) < 5:
|
|
raise HTTPException(status_code=503, detail="Card pool is low, please try again shortly")
|
|
|
|
for card in cards:
|
|
card.user_id = user.id
|
|
|
|
# was_full = user.boosters == BOOSTER_MAX
|
|
# user.boosters -= 1
|
|
# if was_full:
|
|
# user.boosters_countdown = datetime.now()
|
|
|
|
db.commit()
|
|
|
|
asyncio.create_task(fill_card_pool())
|
|
|
|
return [
|
|
{**{c.name: getattr(card, c.name) for c in card.__table__.columns},
|
|
"card_rarity": card.card_rarity,
|
|
"card_type": card.card_type}
|
|
for card in cards
|
|
] |