import asyncio import random import logging from dataclasses import dataclass from enum import Enum from itertools import combinations, permutations import numpy as np from card import Card from game import action_play_card, action_sacrifice, action_end_turn, BOARD_SIZE, STARTING_LIFE, PlayerState logger = logging.getLogger("app") AI_USER_ID = "ai" class AIPersonality(Enum): AGGRESSIVE = "aggressive" DEFENSIVE = "defensive" BALANCED = "balanced" GREEDY = "greedy" # prioritizes high cost cards, willing to sacrifice SWARM = "swarm" CONTROL = "control" ARBITRARY = "arbitrary" JEBRASKA = "jebraska" # trained neural network plan scorer def get_random_personality() -> AIPersonality: return random.choice(list(AIPersonality)) def calculate_exact_cost(attack: int, defense: int) -> float: """Calculate the exact cost before rounding (matches card.py formula).""" return min(10.0, max(1.0, ((attack**2 + defense**2)**0.18) / 1.5)) def get_power_curve_value(card) -> float: """ Returns how much above the power curve a card is. Positive values mean the card is a better-than-expected deal for its cost. """ exact_cost = calculate_exact_cost(card.attack, card.defense) return exact_cost - card.cost def choose_cards(cards: list[Card], difficulty: int, personality: AIPersonality) -> list[Card]: BUDGET = 50 if difficulty >= 6: max_card_cost = difficulty + 1 else: max_card_cost = 6 allowed = [c for c in cards if c.cost <= max_card_cost] or list(cards) # Vectorized scoring over all allowed cards at once atk = np.array([c.attack for c in allowed], dtype=np.float32) defn = np.array([c.defense for c in allowed], dtype=np.float32) cost = np.array([c.cost for c in allowed], dtype=np.float32) exact_cost = np.minimum(10.0, np.maximum(1.0, ((atk**2 + defn**2)**0.18) / 1.5)) pcv_norm = np.clip(exact_cost - cost, 0.0, 1.0) cost_norm = cost / max_card_cost totals = atk + defn atk_ratio = np.where(totals > 0, atk / totals, 0.5) def_not_one = np.where(defn != 1, 1.0, 0.0) if personality == AIPersonality.AGGRESSIVE: # (1-cost_norm) penalizes expensive cards. High-attack cards are inherently expensive, # so without this the second pass drifts toward costly cards at higher difficulty, # shrinking the deck. The bonus grows with max_card_cost and exactly offsets that drift. scores = 0.50 * atk_ratio + 0.35 * pcv_norm + 0.15 * (1.0 - cost_norm) + 0.10 * def_not_one elif personality == AIPersonality.DEFENSIVE: # Small (1-cost_norm) for the same anti-shrinkage reason; lighter because high-defense # cards don't correlate as strongly with cost as high-attack cards do. scores = 0.10 * (1.0 - atk_ratio) + 0.80 * pcv_norm + 0.10 * cost_norm elif personality == AIPersonality.GREEDY: # Small cost_norm keeps flavour without causing severe deck shrinkage at D10 scores = 0.20 * cost_norm + 0.80 * pcv_norm elif personality == AIPersonality.SWARM: scores = 0.40 * (1.0 - cost_norm) + 0.35 * atk_ratio + 0.20 * pcv_norm + 0.05 * def_not_one elif personality == AIPersonality.CONTROL: # Small cost_norm keeps flavour without causing severe deck shrinkage at D10 scores = 0.85 * pcv_norm + 0.15 * cost_norm elif personality in (AIPersonality.BALANCED, AIPersonality.JEBRASKA): scores = 0.60 * pcv_norm + 0.25 * atk_ratio + 0.15 * (1.0 - atk_ratio) else: # ARBITRARY w = 0.09 * difficulty scores = w * pcv_norm + (1.0 - w) * np.random.random(len(allowed)).astype(np.float32) # Small noise floor at D10 prevents fully deterministic deck building. noise = (max(0,12 - difficulty)**2) * 0.008 scores = scores + np.random.normal(0, noise, len(allowed)).astype(np.float32) order = np.argsort(-scores) sorted_cards = [allowed[i] for i in order] early_budget = { AIPersonality.GREEDY: 20, # cheap cards are sacrifice fodder for big plays AIPersonality.SWARM: 12, AIPersonality.AGGRESSIVE: 18, # raised: ensures cheap high-attack fodder regardless of difficulty AIPersonality.DEFENSIVE: 15, # raised: stable cheap-card base across difficulty levels AIPersonality.CONTROL: 8, AIPersonality.BALANCED: 25, # spread the deck across all cost levels AIPersonality.JEBRASKA: 25, # same as balanced AIPersonality.ARBITRARY: 8, }[personality] selected: list[Card] = [] total_cost = 0 # First pass: secure early-game cards cheap_spent = 0 for card in sorted_cards: if cheap_spent >= early_budget: break if card.cost > 3 or total_cost + card.cost > BUDGET: continue selected.append(card) total_cost += card.cost cheap_spent += card.cost # Second pass: fill remaining budget greedily by score taken = {id(c) for c in selected} for card in sorted_cards: if total_cost >= BUDGET: break if id(card) in taken or total_cost + card.cost > BUDGET: continue selected.append(card) total_cost += card.cost return selected @dataclass class MovePlan: sacrifice_slots: list[int] plays: list[tuple] # (CardInstance, board_slot: int) label: str = "" def _affordable_subsets(hand, energy, start=0): """Yield every subset of cards from hand whose total cost fits within energy.""" yield [] for i in range(start, len(hand)): card = hand[i] if card.cost <= energy: for rest in _affordable_subsets(hand, energy - card.cost, i + 1): yield [card] + rest def _plans_for_sacrifice(player, opponent, sacrifice_slots): """Generate one plan per affordable card subset for a given sacrifice set.""" board = list(player.board) energy = player.energy for slot in sacrifice_slots: if board[slot] is not None: board[slot] = None energy += 1 hand = list(player.hand) empty_slots = [i for i, c in enumerate(board) if c is None] en_board = opponent.board return [ MovePlan( sacrifice_slots=list(sacrifice_slots), plays=list(zip(cards, scoring_slots)), label=f"sac{len(sacrifice_slots)}_play{len(cards)}", ) for cards in _affordable_subsets(hand, energy) for scoring_slots in permutations(empty_slots, len(cards)) ] def generate_plans(player, opponent) -> list[MovePlan]: plans = [] # Sacrifice n board cards occupied = [s for s in range(BOARD_SIZE) if player.board[s] is not None] for n in range(len(occupied) + 1): for slots in combinations(occupied, n): plans += _plans_for_sacrifice(player, opponent, list(slots)) # Idle: do nothing plans.append(MovePlan(sacrifice_slots=[], plays=[], label="idle")) return plans def score_plans_batch( plans: list[MovePlan], player: PlayerState, opponent: PlayerState, personality: AIPersonality, ) -> np.ndarray: n = len(plans) # Pre-compute PCV for every hand card once pcv_cache = { id(c): max(0.0, min(1.0, get_power_curve_value(c))) for c in player.hand } # Build board-state arrays board_atk = np.zeros((n, BOARD_SIZE), dtype=np.float32) board_occ = np.zeros((n, BOARD_SIZE), dtype=np.bool_) n_sac = np.zeros(n, dtype=np.float32) sac_val = np.zeros(n, dtype=np.float32) play_val = np.zeros(n, dtype=np.float32) pcv_score = np.full(n, 0.5, dtype=np.float32) for idx, plan in enumerate(plans): board = list(player.board) for slot in plan.sacrifice_slots: board_slot = board[slot] if board_slot is not None: sac_val[idx] += board_slot.cost board[slot] = None n_sac[idx] = len(plan.sacrifice_slots) for card, slot in plan.plays: board[slot] = card play_val[idx] += card.cost for slot in range(BOARD_SIZE): board_slot = board[slot] if board_slot is not None: board_atk[idx, slot] = board_slot.attack board_occ[idx, slot] = True if plan.plays: pcv_vals = [pcv_cache.get(id(c), 0.5) for c, _ in plan.plays] pcv_score[idx] = sum(pcv_vals) / len(pcv_vals) # Enemy board — same for every plan en_atk = np.array([c.attack if c else 0 for c in opponent.board], dtype=np.float32) en_def = np.array([c.defense if c else 0 for c in opponent.board], dtype=np.float32) en_occ = np.array([c is not None for c in opponent.board], dtype=np.bool_) enemy_occupied = int(en_occ.sum()) # --- Metrics (all shape (n,)) --- direct_damage = (board_atk * ~en_occ).sum(axis=1) blocking = board_occ & en_occ # (n, 5) blocking_slots = blocking.sum(axis=1).astype(np.float32) cards_on_board = board_occ.sum(axis=1).astype(np.float32) cards_destroyed = ((board_atk >= en_def) & blocking).sum(axis=1).astype(np.float32) unblocked_in = (en_atk * ~board_occ).sum(axis=1) atk_score = np.minimum(1.0, direct_damage / max(opponent.life, 1)) block_score = blocking_slots / enemy_occupied if enemy_occupied > 0 else np.ones(n, dtype=np.float32) open_slots = BOARD_SIZE - enemy_occupied cover_score = ( (cards_on_board - blocking_slots) / open_slots if open_slots > 0 else np.ones(n, dtype=np.float32) ) destroy_score = cards_destroyed / enemy_occupied if enemy_occupied > 0 else np.zeros(n, dtype=np.float32) threat_score = 1.0 - np.minimum(1.0, unblocked_in / max(player.life, 1)) opp_cards_left = len(opponent.deck) + len(opponent.hand) + enemy_occupied my_cards_left = len(player.deck) + len(player.hand) + blocking_slots attrition_score = my_cards_left / (my_cards_left + max(opp_cards_left, 1)) net_value = play_val - sac_val net_value_norm = np.clip((net_value + 10) / 20, 0.0, 1.0) # --- Sacrifice penalty --- energy_leftover = player.energy + n_sac - play_val wasted_energy = np.maximum(0, np.minimum(n_sac, energy_leftover)) wasted_penalty = np.where(n_sac > 0, wasted_energy / np.maximum(n_sac, 1), 0.0) swap_penalty = np.clip((n_sac - net_value) / np.maximum(n_sac, 1), 0.0, 1.0) sac_penalty = np.where(n_sac > 0, 0.65 * wasted_penalty + 0.35 * swap_penalty, 0.0) # --- Personality weights --- if personality == AIPersonality.AGGRESSIVE: score = (0.30 * atk_score + 0.07 * block_score + 0.15 * cover_score + 0.08 * net_value_norm + 0.25 * destroy_score + 0.08 * attrition_score + 0.04 * pcv_score + 0.03 * threat_score) elif personality == AIPersonality.DEFENSIVE: score = (0.12 * atk_score + 0.20 * block_score + 0.18 * cover_score + 0.04 * net_value_norm + 0.18 * destroy_score + 0.15 * attrition_score + 0.05 * pcv_score + 0.08 * threat_score) elif personality == AIPersonality.SWARM: score = (0.25 * atk_score + 0.10 * block_score + 0.35 * cover_score + 0.05 * net_value_norm + 0.05 * destroy_score + 0.10 * attrition_score + 0.05 * pcv_score + 0.05 * threat_score) elif personality == AIPersonality.GREEDY: score = (0.15 * atk_score + 0.05 * block_score + 0.18 * cover_score + 0.38 * net_value_norm + 0.05 * destroy_score + 0.09 * attrition_score + 0.05 * pcv_score + 0.05 * threat_score) elif personality == AIPersonality.CONTROL: score = (0.10 * atk_score + 0.05 * block_score + 0.05 * cover_score + 0.20 * net_value_norm + 0.05 * destroy_score + 0.10 * attrition_score + 0.40 * pcv_score + 0.05 * threat_score) elif personality == AIPersonality.BALANCED: score = (0.12 * atk_score + 0.13 * block_score + 0.15 * cover_score + 0.10 * net_value_norm + 0.12 * destroy_score + 0.15 * attrition_score + 0.12 * pcv_score + 0.11 * threat_score) else: # ARBITRARY score = (0.50 * np.random.random(n).astype(np.float32) + 0.06 * atk_score + 0.06 * block_score + 0.08 * cover_score + 0.05 * net_value_norm + 0.06 * destroy_score + 0.08 * attrition_score + 0.06 * pcv_score + 0.05 * threat_score) # --- Context adjustments --- score = np.where(direct_damage >= opponent.life, np.maximum(score, 0.95), score) score = np.where(unblocked_in >= player.life, np.minimum(score, 0.05), score) if opponent.deck_type in ("God Card", "Pantheon"): score = np.minimum(1.0, score + 0.08 * cover_score) if opponent.deck_type in ("Aggro", "Rush"): score = np.minimum(1.0, score + 0.06 * block_score + 0.04 * threat_score) if opponent.deck_type == "Wall": score = np.minimum(1.0, score + 0.06 * atk_score) if opponent.life < STARTING_LIFE * 0.3: score = np.minimum(1.0, score + 0.06 * atk_score) if player.life < STARTING_LIFE * 0.3: score = np.minimum(1.0, score + 0.06 * threat_score + 0.04 * block_score) if opp_cards_left <= 5: score = np.where(cards_on_board > 0, np.minimum(1.0, score + 0.05), score) return np.maximum(0.0, score - sac_penalty) def choose_plan(player: PlayerState, opponent: PlayerState, personality: AIPersonality, difficulty: int) -> MovePlan: plans = generate_plans(player, opponent) if personality == AIPersonality.JEBRASKA: from nn import NeuralNet import os _weights = os.path.join(os.path.dirname(__file__), "nn_weights.json") if not hasattr(choose_plan, "_neural_net"): choose_plan._neural_net = NeuralNet.load(_weights) if os.path.exists(_weights) else None net = choose_plan._neural_net if net is not None: from nn import extract_plan_features scores = net.forward(extract_plan_features(plans, player, opponent)) else: # fallback to BALANCED if weights not found scores = score_plans_batch(plans, player, opponent, AIPersonality.BALANCED) else: scores = score_plans_batch(plans, player, opponent, personality) noise_scale = ((max(0,12 - difficulty)**2) - 4) * 0.008 noise = np.random.normal(0, noise_scale, len(scores)).astype(np.float32) return plans[int(np.argmax(scores + noise))] async def run_ai_turn(game_id: str): from game_manager import ( active_games, connections, active_deck_ids, serialize_state, record_game_result, calculate_combat_animation_time ) state = active_games.get(game_id) if not state or state.result: return if state.active_player_id != AI_USER_ID: return human_id = state.opponent_id(AI_USER_ID) waited = 0 while not connections[game_id].get(human_id) and waited < 10: await asyncio.sleep(0.5) waited += 0.5 await asyncio.sleep(calculate_combat_animation_time(state.last_combat_events)) player = state.players[AI_USER_ID] opponent = state.players[human_id] difficulty = state.ai_difficulty personality = ( AIPersonality(state.ai_personality) if state.ai_personality else AIPersonality.BALANCED ) ws = connections[game_id].get(human_id) async def send_state(s): if ws: try: await ws.send_json({"type": "state", "state": serialize_state(s, human_id)}) except Exception: pass async def send_sacrifice_anim(instance_id): if ws: try: await ws.send_json({"type": "sacrifice_animation", "instance_id": instance_id}) except Exception: pass best_plan = choose_plan(player, opponent, personality, difficulty) logger.info( f"AI turn: d={difficulty} p={personality.value} plan={best_plan.label} " + f"sac={best_plan.sacrifice_slots} plays={[c.name for c, _ in best_plan.plays]}" ) for slot in best_plan.sacrifice_slots: card_slot = player.board[slot] if card_slot is None: continue await send_sacrifice_anim(card_slot.instance_id) await asyncio.sleep(0.65) action_sacrifice(state, slot) await send_state(state) await asyncio.sleep(0.35) # Shuffle play order so the AI doesn't always fill slots left-to-right plays = list(best_plan.plays) random.shuffle(plays) for card, slot in plays: # Re-look up hand index each time (hand shrinks as cards are played) hand_idx = next((i for i, c in enumerate(player.hand) if c is card), None) if hand_idx is None: continue if player.board[slot] is not None: continue if card.cost > player.energy: continue action_play_card(state, hand_idx, slot) await send_state(state) await asyncio.sleep(0.5) action_end_turn(state) await send_state(state) if state.result: from database import SessionLocal db = SessionLocal() try: record_game_result(state, db) if ws: await ws.send_json({ "type": "state", "state": serialize_state(state, human_id), }) finally: db.close() active_deck_ids.pop(human_id, None) active_deck_ids.pop(AI_USER_ID, None) active_games.pop(game_id, None) connections.pop(game_id, None) return if state.active_player_id == AI_USER_ID: asyncio.create_task(run_ai_turn(game_id))