import logging import re import secrets import uuid from datetime import datetime, timedelta from fastapi import APIRouter, Depends, HTTPException, Request from fastapi.security import OAuth2PasswordRequestForm from pydantic import BaseModel from sqlalchemy.orm import Session from core.auth import ( create_access_token, create_refresh_token, decode_refresh_token, hash_password, verify_password, ) from core.database import get_db from core.dependencies import get_current_user, limiter from services.email_utils import send_password_reset_email, send_verification_email from core.models import User as UserModel logger = logging.getLogger("app") router = APIRouter() try: from disposable_email_domains import blocklist as _disposable_blocklist except ImportError: _disposable_blocklist: set[str] = set() class RegisterRequest(BaseModel): username: str email: str password: str class ForgotPasswordRequest(BaseModel): email: str class ResetPasswordWithTokenRequest(BaseModel): token: str new_password: str class ResetPasswordRequest(BaseModel): current_password: str new_password: str class ResendVerificationRequest(BaseModel): email: str class RefreshRequest(BaseModel): refresh_token: str def validate_register(username: str, email: str, password: str) -> str | None: if not username.strip(): return "Username is required" if len(username) < 2: return "Username must be at least 2 characters" if len(username) > 16: return "Username must be 16 characters or fewer" if not re.match(r"^[^\s@]+@[^\s@]+\.[^\s@]+$", email): return "Please enter a valid email" domain = email.split("@")[-1].lower() if domain in _disposable_blocklist: return "Disposable email addresses are not allowed" if len(password) < 8: return "Password must be at least 8 characters" if len(password) > 256: return "Password must be 256 characters or fewer" return None @router.post("/register") @limiter.limit("5/minute") def register(request: Request, req: RegisterRequest, db: Session = Depends(get_db)): err = validate_register(req.username, req.email, req.password) if err: raise HTTPException(status_code=400, detail=err) if db.query(UserModel).filter(UserModel.username.ilike(req.username)).first(): raise HTTPException(status_code=400, detail="Username already taken") if db.query(UserModel).filter(UserModel.email == req.email).first(): raise HTTPException(status_code=400, detail="Email already registered") verification_token = secrets.token_urlsafe(32) user = UserModel( id=uuid.uuid4(), username=req.username, email=req.email, password_hash=hash_password(req.password), email_verified=False, email_verification_token=verification_token, email_verification_token_expires_at=datetime.now() + timedelta(hours=24), ) db.add(user) db.commit() try: send_verification_email(req.email, req.username, verification_token) except Exception as e: logger.error(f"Failed to send verification email: {e}") raise HTTPException( status_code=500, detail="Account created but we couldn't send the verification email. Please use 'Resend verification' to try again." ) return {"message": "Account created. Please check your email to verify your account."} @router.post("/login") @limiter.limit("10/minute") def login(request: Request, form: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)): user = db.query(UserModel).filter(UserModel.username.ilike(form.username)).first() if not user or not verify_password(form.password, user.password_hash): raise HTTPException(status_code=400, detail="Invalid username or password") user.last_active_at = datetime.now() db.commit() return { "access_token": create_access_token(str(user.id)), "refresh_token": create_refresh_token(str(user.id)), "token_type": "bearer", } @router.post("/auth/reset-password") @limiter.limit("5/minute") def reset_password(request: Request, req: ResetPasswordRequest, user: UserModel = Depends(get_current_user), db: Session = Depends(get_db)): if not verify_password(req.current_password, user.password_hash): raise HTTPException(status_code=400, detail="Current password is incorrect") if len(req.new_password) < 8: raise HTTPException(status_code=400, detail="Password must be at least 8 characters") if len(req.new_password) > 256: raise HTTPException(status_code=400, detail="Password must be 256 characters or fewer") if req.current_password == req.new_password: raise HTTPException(status_code=400, detail="New password must be different from current password") user.password_hash = hash_password(req.new_password) db.commit() return {"message": "Password updated"} @router.post("/auth/forgot-password") @limiter.limit("5/minute") def forgot_password(request: Request, req: ForgotPasswordRequest, db: Session = Depends(get_db)): user = db.query(UserModel).filter(UserModel.email == req.email).first() # Always return success even if email not found. Prevents user enumeration if user: token = secrets.token_urlsafe(32) user.reset_token = token user.reset_token_expires_at = datetime.now() + timedelta(hours=1) db.commit() try: send_password_reset_email(user.email, user.username, token) except Exception as e: logger.error(f"Failed to send reset email: {e}") raise HTTPException( status_code=500, detail="Failed to send the password reset email. Please try again later." ) return {"message": "If that email is registered you will receive a reset link shortly"} @router.post("/auth/reset-password-with-token") @limiter.limit("5/minute") def reset_password_with_token(request: Request, req: ResetPasswordWithTokenRequest, db: Session = Depends(get_db)): user = db.query(UserModel).filter(UserModel.reset_token == req.token).first() if not user or not user.reset_token_expires_at or user.reset_token_expires_at < datetime.now(): raise HTTPException(status_code=400, detail="Invalid or expired reset link") if len(req.new_password) < 8: raise HTTPException(status_code=400, detail="Password must be at least 8 characters") if len(req.new_password) > 256: raise HTTPException(status_code=400, detail="Password must be 256 characters or fewer") user.password_hash = hash_password(req.new_password) user.reset_token = None user.reset_token_expires_at = None db.commit() return {"message": "Password updated"} @router.get("/auth/verify-email") @limiter.limit("10/minute") def verify_email(request: Request, token: str, db: Session = Depends(get_db)): user = db.query(UserModel).filter(UserModel.email_verification_token == token).first() if not user or not user.email_verification_token_expires_at or user.email_verification_token_expires_at < datetime.now(): raise HTTPException(status_code=400, detail="Invalid or expired verification link") user.email_verified = True user.email_verification_token = None user.email_verification_token_expires_at = None db.commit() return {"message": "Email verified"} @router.post("/auth/resend-verification") @limiter.limit("5/minute") def resend_verification(request: Request, req: ResendVerificationRequest, db: Session = Depends(get_db)): user = db.query(UserModel).filter(UserModel.email == req.email).first() # Always return success to prevent user enumeration if user and not user.email_verified: token = secrets.token_urlsafe(32) user.email_verification_token = token user.email_verification_token_expires_at = datetime.now() + timedelta(hours=24) db.commit() try: send_verification_email(user.email, user.username, token) except Exception as e: logger.error(f"Failed to resend verification email: {e}") raise HTTPException( status_code=500, detail="Failed to send the verification email. Please try again later." ) return {"message": "If that email is registered and unverified, you will receive a new verification link shortly"} @router.post("/auth/refresh") @limiter.limit("20/minute") def refresh(request: Request, req: RefreshRequest, db: Session = Depends(get_db)): user_id = decode_refresh_token(req.refresh_token) if not user_id: raise HTTPException(status_code=401, detail="Invalid or expired refresh token") user = db.query(UserModel).filter(UserModel.id == uuid.UUID(user_id)).first() if not user: raise HTTPException(status_code=401, detail="User not found") user.last_active_at = datetime.now() db.commit() return { "access_token": create_access_token(str(user.id)), "refresh_token": create_refresh_token(str(user.id)), "token_type": "bearer", }