import os import random import uuid from collections import deque import numpy as np from dotenv import load_dotenv load_dotenv() from game.card import compute_deck_type from ai.engine import AIPersonality, choose_cards, choose_plan from game.rules import PlayerState, GameState, action_play_card, action_sacrifice, action_end_turn from ai.simulate import get_simulation_cards, _make_instances, MAX_TURNS from ai.nn import NeuralNet, NeuralPlayer from ai.card_pick_nn import CardPickPlayer, N_CARD_FEATURES, CARD_PICK_WEIGHTS_PATH NN_WEIGHTS_PATH = os.path.join(os.path.dirname(__file__), "nn_weights.json") P1 = "p1" P2 = "p2" FIXED_PERSONALITIES = [ p for p in AIPersonality if p not in ( AIPersonality.ARBITRARY, AIPersonality.JEBRASKA ) ] def _build_player(pid: str, name: str, cards: list, difficulty: int, personality: AIPersonality, deck_pool: dict | None = None) -> PlayerState: if deck_pool and personality in deck_pool: deck = random.choice(deck_pool[personality]) else: deck = choose_cards(cards, difficulty, personality) instances = _make_instances(deck) random.shuffle(instances) p = PlayerState( user_id=pid, username=name, deck_type=compute_deck_type(deck) or "Balanced", deck=instances, ) return p def _build_nn_player(pid: str, name: str, cards: list, difficulty: int, card_pick_player: CardPickPlayer) -> PlayerState: """Build a PlayerState using the card-pick NN for deck selection.""" max_card_cost = difficulty + 1 if difficulty >= 6 else 6 allowed = [c for c in cards if c.cost <= max_card_cost] or list(cards) deck = card_pick_player.choose_cards(allowed, difficulty) instances = _make_instances(deck) random.shuffle(instances) return PlayerState( user_id=pid, username=name, deck_type=compute_deck_type(deck) or "Balanced", deck=instances, ) def run_episode( p1_state: PlayerState, p2_state: PlayerState, p1_ctrl, # (player, opponent) -> MovePlan p2_ctrl, # (player, opponent) -> MovePlan ) -> str | None: """Returns winner_id (P1 or P2) or None on timeout.""" p1_state.increment_energy_cap() p2_state.increment_energy_cap() p1_state.refill_energy() p1_state.draw_to_full() state = GameState( game_id=str(uuid.uuid4()), players={P1: p1_state, P2: p2_state}, player_order=[P1, P2], active_player_id=P1, phase="main", turn=1, ) ctrls = {P1: p1_ctrl, P2: p2_ctrl} for _ in range(MAX_TURNS): if state.result: break active_id = state.active_player_id player = state.players[active_id] opponent = state.players[state.opponent_id(active_id)] plan = ctrls[active_id](player, opponent) for slot in plan.sacrifice_slots: if player.board[slot] is not None: action_sacrifice(state, slot) plays = list(plan.plays) random.shuffle(plays) for card, slot in plays: hand_idx = next((i for i, c in enumerate(player.hand) if c is card), None) if hand_idx is None or player.board[slot] is not None or card.cost > player.energy: continue action_play_card(state, hand_idx, slot) action_end_turn(state) return state.result.winner_id if state.result else None def train( n_episodes: int = 50_000, self_play_start: int = 0, self_play_max_frac: float = 0.9, lr: float = 1e-3, opp_difficulty: int = 10, temperature: float = 1.0, batch_size: int = 500, save_every: int = 5_000, save_path: str = NN_WEIGHTS_PATH, ) -> NeuralNet: cards = get_simulation_cards() # Pre-build a pool of opponent decks per personality to avoid rebuilding from scratch each episode. DECK_POOL_SIZE = 100 opp_deck_pool: dict[AIPersonality, list] = { p: [choose_cards(cards, opp_difficulty, p) for _ in range(DECK_POOL_SIZE)] for p in FIXED_PERSONALITIES } if os.path.exists(save_path): print(f"Resuming plan net from {save_path}") net = NeuralNet.load(save_path) else: print("Initializing new plan network") net = NeuralNet(seed=42) cp_path = CARD_PICK_WEIGHTS_PATH if os.path.exists(cp_path): print(f"Resuming card-pick net from {cp_path}") card_pick_net = NeuralNet.load(cp_path) else: print("Initializing new card-pick network") card_pick_net = NeuralNet(n_features=N_CARD_FEATURES, hidden=(32, 16), seed=43) recent_outcomes: deque[int] = deque(maxlen=1000) # rolling window for win rate display baseline = 0.0 # EMA of recent outcomes; subtracted before each update baseline_alpha = 0.99 # decay — roughly a 100-episode window batch_gw = [np.zeros_like(w) for w in net.weights] batch_gb = [np.zeros_like(b) for b in net.biases] batch_count = 0 cp_batch_gw = [np.zeros_like(w) for w in card_pick_net.weights] cp_batch_gb = [np.zeros_like(b) for b in card_pick_net.biases] cp_batch_count = 0 for episode in range(1, n_episodes + 1): # Ramp self-play fraction linearly from 0 to self_play_max_frac if episode >= self_play_start: progress = (episode - self_play_start) / max(1, n_episodes - self_play_start) self_play_prob = self_play_max_frac * progress else: self_play_prob = 0.0 # Randomly decide who goes first (NN is always P1 for simplicity) nn_goes_first = random.random() < 0.5 if random.random() < self_play_prob: nn1 = NeuralPlayer(net, training=True, temperature=temperature) nn2 = NeuralPlayer(net, training=True, temperature=temperature) cp1 = CardPickPlayer(card_pick_net, training=True, temperature=temperature) cp2 = CardPickPlayer(card_pick_net, training=True, temperature=temperature) p1_state = _build_nn_player(P1, "NN1", cards, 10, cp1) p2_state = _build_nn_player(P2, "NN2", cards, 10, cp2) if not nn_goes_first: p1_state, p2_state = p2_state, p1_state winner = run_episode(p1_state, p2_state, nn1.choose_plan, nn2.choose_plan) p1_outcome = 1.0 if winner == P1 else -1.0 baseline = baseline_alpha * baseline + (1 - baseline_alpha) * p1_outcome for player_grads in [nn1.compute_grads(p1_outcome - baseline), nn2.compute_grads(-p1_outcome - baseline)]: if player_grads is not None: gw, gb = player_grads for i in range(len(batch_gw)): batch_gw[i] += gw[i] batch_gb[i] += gb[i] batch_count += 1 for cp_grads in [cp1.compute_grads(p1_outcome - baseline), cp2.compute_grads(-p1_outcome - baseline)]: if cp_grads is not None: gw, gb = cp_grads for i in range(len(cp_batch_gw)): cp_batch_gw[i] += gw[i] cp_batch_gb[i] += gb[i] cp_batch_count += 1 else: opp_personality = random.choice(FIXED_PERSONALITIES) nn_player = NeuralPlayer(net, training=True, temperature=temperature) cp_player = CardPickPlayer(card_pick_net, training=True, temperature=temperature) opp_ctrl = lambda p, o, pers=opp_personality, diff=opp_difficulty: choose_plan(p, o, pers, diff) if nn_goes_first: nn_id = P1 p1_state = _build_nn_player(P1, "NN", cards, 10, cp_player) p2_state = _build_player(P2, "OPP", cards, opp_difficulty, opp_personality, opp_deck_pool) winner = run_episode(p1_state, p2_state, nn_player.choose_plan, opp_ctrl) else: nn_id = P2 p1_state = _build_player(P1, "OPP", cards, opp_difficulty, opp_personality, opp_deck_pool) p2_state = _build_nn_player(P2, "NN", cards, 10, cp_player) winner = run_episode(p1_state, p2_state, opp_ctrl, nn_player.choose_plan) nn_outcome = 1.0 if winner == nn_id else -1.0 player_grads = nn_player.compute_grads(nn_outcome - baseline) baseline = baseline_alpha * baseline + (1 - baseline_alpha) * nn_outcome if player_grads is not None: gw, gb = player_grads for i in range(len(batch_gw)): batch_gw[i] += gw[i] batch_gb[i] += gb[i] batch_count += 1 cp_grads = cp_player.compute_grads(nn_outcome - baseline) if cp_grads is not None: gw, gb = cp_grads for i in range(len(cp_batch_gw)): cp_batch_gw[i] += gw[i] cp_batch_gb[i] += gb[i] cp_batch_count += 1 recent_outcomes.append(1 if winner == nn_id else 0) if batch_count >= batch_size: for i in range(len(batch_gw)): batch_gw[i] /= batch_count batch_gb[i] /= batch_count net.adam_update(batch_gw, batch_gb, lr=lr) batch_gw = [np.zeros_like(w) for w in net.weights] batch_gb = [np.zeros_like(b) for b in net.biases] batch_count = 0 if cp_batch_count >= batch_size: for i in range(len(cp_batch_gw)): cp_batch_gw[i] /= cp_batch_count cp_batch_gb[i] /= cp_batch_count card_pick_net.adam_update(cp_batch_gw, cp_batch_gb, lr=lr) cp_batch_gw = [np.zeros_like(w) for w in card_pick_net.weights] cp_batch_gb = [np.zeros_like(b) for b in card_pick_net.biases] cp_batch_count = 0 if episode % 1000 == 0 or episode == n_episodes: wr = sum(recent_outcomes) / len(recent_outcomes) if recent_outcomes else 0.0 print(f"\r[{episode:>6}/{n_episodes}] win rate (last {len(recent_outcomes)}): {wr:.1%} " f"self-play frac: {self_play_prob:.0%}", flush=True) else: print(f" {episode % 1000}/1000", end="\r", flush=True) if episode % save_every == 0: net.save(save_path) card_pick_net.save(cp_path) print(f" → saved to {save_path} and {cp_path}") net.save(save_path) card_pick_net.save(cp_path) wr = sum(recent_outcomes) / len(recent_outcomes) if recent_outcomes else 0.0 print(f"Done. Final win rate (last {len(recent_outcomes)}): {wr:.1%}") return net if __name__ == "__main__": train()