import asyncio import uuid from datetime import datetime import logging import random from dataclasses import dataclass from fastapi import WebSocket from sqlalchemy.orm import Session from game import ( GameState, CardInstance, PlayerState, action_play_card, action_sacrifice, action_end_turn, create_game, CombatEvent, GameResult, BOARD_SIZE ) from models import Card as CardModel, Deck as DeckModel, DeckCard as DeckCardModel, User as UserModel from card import compute_deck_type logger = logging.getLogger("app") AI_USER_ID = "ai" ## Storage active_games: dict[str, GameState] = {} active_deck_ids: dict[str, str|None] = {} # user_id -> deck_id connections: dict[str, dict[str, WebSocket]] = {} # game_id -> {user_id -> websocket} @dataclass class QueueEntry: user_id: str deck_id: str websocket: WebSocket queue: list[QueueEntry] = [] queue_lock = asyncio.Lock() ## Game Result def record_game_result(state: GameState, db: Session): if state.result is None or state.result.winner_id is None: return winner_id_str = state.result.winner_id loser_id_str = state.opponent_id(winner_id_str) # Skip database updates for AI battles if AI_USER_ID not in [winner_id_str, loser_id_str]: winner = db.query(UserModel).filter(UserModel.id == uuid.UUID(winner_id_str)).first() if winner: winner.wins += 1 loser = db.query(UserModel).filter(UserModel.id == uuid.UUID(loser_id_str)).first() if loser: loser.losses += 1 winner_deck_id = active_deck_ids.get(winner_id_str) loser_deck_id = active_deck_ids.get(loser_id_str) if AI_USER_ID not in [winner_id_str, loser_id_str]: deck = db.query(DeckModel).filter(DeckModel.id == uuid.UUID(winner_deck_id)).first() if deck: deck.wins += 1 deck = db.query(DeckModel).filter(DeckModel.id == uuid.UUID(loser_deck_id)).first() if deck: deck.losses += 1 db.commit() ## Serialization def serialize_card(card: CardInstance|None) -> dict | None: if card is None: return None return { "instance_id": card.instance_id, "card_id": card.card_id, "name": card.name, "attack": card.attack, "defense": card.defense, "max_defense": card.max_defense, "cost": card.cost, "card_type": card.card_type, "card_rarity": card.card_rarity, "image_link": card.image_link, "text": card.text } def serialize_player(player: PlayerState, hide_hand=False) -> dict: return { "user_id": player.user_id, "username": player.username, "deck_type": player.deck_type, "life": player.life, "energy": player.energy, "energy_cap": player.energy_cap, "board": [serialize_card(c) for c in player.board], "hand": [serialize_card(c) for c in player.hand] if not hide_hand else [], "hand_size": len(player.hand), "deck_size": len(player.deck), } def serialize_event(event: CombatEvent) -> dict: return { "attacker_slot": event.attacker_slot, "attacker_name": event.attacker_name, "defender_slot": event.defender_slot, "defender_name": event.defender_name, "damage": event.damage, "defender_destroyed": event.defender_destroyed, "life_damage": event.life_damage, } def serialize_state(state: GameState, perspective_user_id: str) -> dict: opponent_id = state.opponent_id(perspective_user_id) return { "game_id": state.game_id, "phase": state.phase, "turn": state.turn, "active_player_id": state.active_player_id, "player_order": state.player_order, "you": serialize_player(state.players[perspective_user_id]), "opponent": serialize_player(state.players[opponent_id], hide_hand=True), "last_combat_events": [serialize_event(e) for e in state.last_combat_events], "result": { "winner_id": state.result.winner_id, "reason": state.result.reason, } if state.result else None, "turn_started_at": state.turn_started_at.isoformat() if state.turn_started_at else None, } ## Broadcasting async def broadcast_state(game_id: str): state = active_games.get(game_id) if not state: return game_connections = connections.get(game_id, {}) for user_id, ws in game_connections.items(): try: await ws.send_json({ "type": "state", "state": serialize_state(state, user_id), }) except Exception: pass if state.active_player_id == AI_USER_ID and not state.result: asyncio.create_task(run_ai_turn(game_id)) async def send_error(ws: WebSocket, message: str): await ws.send_json({"type": "error", "message": message}) ## Matchmaking def load_deck_cards(deck_id: str, user_id: str, db: Session) -> list | None: deck = db.query(DeckModel).filter( DeckModel.id == uuid.UUID(deck_id), DeckModel.user_id == uuid.UUID(user_id) ).first() if not deck: return None deck_card_ids = [ dc.card_id for dc in db.query(DeckCardModel).filter(DeckCardModel.deck_id == deck.id).all() ] cards = db.query(CardModel).filter(CardModel.id.in_(deck_card_ids)).all() return cards async def try_match(db: Session): async with queue_lock: if len(queue) < 2: return p1_entry = queue.pop(0) p2_entry = queue.pop(0) p1_user = db.query(UserModel).filter(UserModel.id == uuid.UUID(p1_entry.user_id)).first() p2_user = db.query(UserModel).filter(UserModel.id == uuid.UUID(p2_entry.user_id)).first() p1_cards = load_deck_cards(p1_entry.deck_id, p1_entry.user_id, db) p2_cards = load_deck_cards(p2_entry.deck_id, p2_entry.user_id, db) p1_deck_type = compute_deck_type(p1_cards if p1_cards else []) p2_deck_type = compute_deck_type(p2_cards if p2_cards else []) active_deck_ids[p1_entry.user_id] = p1_entry.deck_id active_deck_ids[p2_entry.user_id] = p2_entry.deck_id for entry, _ in [(p1_entry, p1_cards), (p2_entry, p2_cards)]: deck = db.query(DeckModel).filter(DeckModel.id == uuid.UUID(entry.deck_id)).first() if deck: deck.times_played += 1 db.commit() if not p1_cards or not p2_cards or not p1_user or not p2_user: await send_error(p1_entry.websocket, "Failed to load deck") await send_error(p2_entry.websocket, "Failed to load deck") return state = create_game( p1_entry.user_id, p1_user.username, p1_deck_type if p1_deck_type else "", p1_cards, p2_entry.user_id, p2_user.username, p2_deck_type if p2_deck_type else "", p2_cards, ) active_games[state.game_id] = state connections[state.game_id] = { p1_entry.user_id: p1_entry.websocket, p2_entry.user_id: p2_entry.websocket, } # Notify both players the game has started for user_id, ws in connections[state.game_id].items(): await ws.send_json({ "type": "game_start", "game_id": state.game_id, }) await broadcast_state(state.game_id) ## Action handler async def handle_action(game_id: str, user_id: str, message: dict, db: Session): state = active_games.get(game_id) if not state: logger.warning(f"handle_action: game {game_id} not found") return if state.result: logger.warning(f"handle_action: game {game_id} already over") return if state.active_player_id != user_id: logger.warning(f"handle_action: not {user_id}'s turn, active is {state.active_player_id}") ws = connections[game_id].get(user_id) if ws: await send_error(ws, "It's not your turn") return action = message.get("type") err = None if action == "play_card": err = action_play_card(state, message["hand_index"], message["slot"]) if not err: # Find the card that was just played slot = message["slot"] card_instance = state.players[user_id].board[slot] if card_instance: try: card = db.query(CardModel).filter( CardModel.id == uuid.UUID(card_instance.card_id) ).first() if card: card.times_played += 1 db.commit() except Exception as e: logger.warning(f"Failed to increment times_played for card {card_instance.card_id}: {e}") elif action == "sacrifice": slot = message.get("slot") if slot is None: err = "No slot provided" else: # Find the card instance_id before it's removed card = state.players[user_id].board[slot] if card: # Notify opponent first opponent_id = state.opponent_id(user_id) opp_ws = connections[game_id].get(opponent_id) if opp_ws: try: await opp_ws.send_json({ "type": "sacrifice_animation", "instance_id": card.instance_id, }) except Exception: pass await asyncio.sleep(0.65) err = action_sacrifice(state, slot) elif action == "end_turn": err = action_end_turn(state) else: ws = connections[game_id].get(user_id) if ws: await send_error(ws, f"Unknown action: {action}") return if err: ws = connections[game_id].get(user_id) if ws: await send_error(ws, err) return await broadcast_state(game_id) if state.result: record_game_result(state, db) for uid in list(connections.get(game_id,{}).keys()): active_deck_ids.pop(uid, None) active_games.pop(game_id, None) connections.pop(game_id, None) DISCONNECT_GRACE_SECONDS = 15 async def handle_disconnect(game_id: str, user_id: str, db: Session): await asyncio.sleep(DISCONNECT_GRACE_SECONDS) # Check if game still exists and player hasn't reconnected if game_id not in active_games: return if user_id in connections.get(game_id, {}): return # player reconnected during grace period state = active_games[game_id] if state.result: return # game already ended normally winner_id = state.opponent_id(user_id) state.result = GameResult( winner_id=winner_id, reason="Opponent disconnected" ) state.phase = "end" record_game_result(state, db) # Notify the remaining player winner_ws = connections[game_id].get(winner_id) if winner_ws: try: await winner_ws.send_json({ "type": "state", "state": serialize_state(state, winner_id), }) except Exception: pass active_deck_ids.pop(user_id, None) active_deck_ids.pop(winner_id, None) active_games.pop(game_id, None) connections.pop(game_id, None) TURN_TIME_LIMIT_SECONDS = 120 async def handle_timeout_claim(game_id: str, claimant_id: str, db: Session) -> str | None: state = active_games.get(game_id) if not state: return "Game not found" if state.result: return "Game already ended" if state.active_player_id == claimant_id: return "It's your turn" if not state.turn_started_at: return "No turn timer running" elapsed = (datetime.now() - state.turn_started_at).total_seconds() if elapsed < TURN_TIME_LIMIT_SECONDS: return f"Timer has not expired yet ({int(TURN_TIME_LIMIT_SECONDS - elapsed)}s remaining)" state.result = GameResult( winner_id=claimant_id, reason="Opponent ran out of time" ) state.phase = "end" record_game_result(state, db) await broadcast_state(game_id) active_deck_ids.pop(state.active_player_id, None) active_deck_ids.pop(claimant_id, None) active_games.pop(game_id, None) connections.pop(game_id, None) return None def create_solo_game( user_id: str, username: str, player_cards: list, ai_cards: list, deck_id: str, ) -> str: player_deck_type = compute_deck_type(player_cards) or "Balanced" ai_deck_type = compute_deck_type(ai_cards) or "Balanced" state = create_game( user_id, username, player_deck_type, player_cards, AI_USER_ID, "Computer", ai_deck_type, ai_cards, ) active_games[state.game_id] = state connections[state.game_id] = {} active_deck_ids[user_id] = deck_id active_deck_ids[AI_USER_ID] = None if state.active_player_id == AI_USER_ID: asyncio.create_task(run_ai_turn(state.game_id)) return state.game_id ANIMATION_DELAYS = { "pre_combat_pause": 0.5, # pause before end_turn "per_attack_pre": 0.1, # delay before each attack animation "lunge_duration": 0.42, # lunge animation duration "shake_duration": 0.4, # shake animation duration "damage_point": 0.22, # when damage is applied mid-shake "post_attack": 0.08, # gap between attacks "destroy_duration": 0.6, # crumble animation duration "post_combat_buffer": 0.3, # buffer after all animations finish } def calculate_combat_animation_time(events: list[CombatEvent]) -> float: total = 0.0 for event in events: total += ANIMATION_DELAYS["per_attack_pre"] # Lunge and shake run simultaneously, so take the longer of the two total += max( ANIMATION_DELAYS["lunge_duration"], ANIMATION_DELAYS["shake_duration"] ) total += ANIMATION_DELAYS["post_attack"] if event.defender_destroyed: total += ANIMATION_DELAYS["destroy_duration"] total += ANIMATION_DELAYS["post_combat_buffer"] return total async def run_ai_turn(game_id: str): state = active_games.get(game_id) if not state or state.result: return if state.active_player_id != AI_USER_ID: return human_id = state.opponent_id(AI_USER_ID) waited = 0 while not connections[game_id].get(human_id) and waited < 10: await asyncio.sleep(0.5) waited += 0.5 await asyncio.sleep(calculate_combat_animation_time(state.last_combat_events)) player = state.players[AI_USER_ID] ws = connections[game_id].get(human_id) async def send_state(state: GameState): if ws: try: await ws.send_json({ "type": "state", "state": serialize_state(state, human_id), }) except Exception: pass most_expensive_in_hand = max((c.cost for c in player.hand), default=0) if player.energy < most_expensive_in_hand: for slot in range(BOARD_SIZE): slot_card = player.board[slot] if slot_card is not None and player.energy + slot_card.cost <= most_expensive_in_hand: action_sacrifice(state, slot) await send_state(state) await asyncio.sleep(1) play_order = list(range(BOARD_SIZE)) random.shuffle(play_order) for slot in play_order: if player.board[slot] is not None: continue affordable = [i for i, c in enumerate(player.hand) if c.cost <= player.energy] if not affordable: break best = max(affordable, key=lambda i: player.hand[i].cost) action_play_card(state, best, slot) await send_state(state) await asyncio.sleep(0.5) action_end_turn(state) await send_state(state) if state.result: from database import SessionLocal db = SessionLocal() try: record_game_result(state, db) if ws: await ws.send_json({ "type": "state", "state": serialize_state(state, human_id), }) finally: db.close() active_deck_ids.pop(human_id, None) active_deck_ids.pop(AI_USER_ID, None) active_games.pop(game_id, None) connections.pop(game_id, None) return if state.active_player_id == AI_USER_ID: asyncio.create_task(run_ai_turn(game_id))