import json import os import random import uuid import asyncio from concurrent.futures import ProcessPoolExecutor from dotenv import load_dotenv load_dotenv() from datetime import datetime from card import Card, CardType, CardRarity, generate_cards, compute_deck_type from game import ( CardInstance, PlayerState, GameState, action_play_card, action_sacrifice, action_end_turn, ) from ai import AIPersonality, choose_cards, choose_plan SIMULATION_CARDS_PATH = os.path.join(os.path.dirname(__file__), "simulation_cards.json") SIMULATION_CARD_COUNT = 1000 # ==================== Card pool ==================== def _card_to_dict(card: Card) -> dict: return { "name": card.name, "created_at": card.created_at.isoformat(), "image_link": card.image_link, "card_rarity": card.card_rarity.name, "card_type": card.card_type.name, "wikidata_instance": card.wikidata_instance, "text": card.text, "attack": card.attack, "defense": card.defense, "cost": card.cost, } def _dict_to_card(d: dict) -> Card: return Card( name=d["name"], created_at=datetime.fromisoformat(d["created_at"]), image_link=d["image_link"], card_rarity=CardRarity[d["card_rarity"]], card_type=CardType[d["card_type"]], wikidata_instance=d["wikidata_instance"], text=d["text"], attack=d["attack"], defense=d["defense"], cost=d["cost"], ) def get_simulation_cards() -> list[Card]: if os.path.exists(SIMULATION_CARDS_PATH): with open(SIMULATION_CARDS_PATH, "r", encoding="utf-8") as f: data = json.load(f) return [_dict_to_card(d) for d in data] print(f"Generating {SIMULATION_CARD_COUNT} cards (this may take a while)...") cards = generate_cards(SIMULATION_CARD_COUNT) with open(SIMULATION_CARDS_PATH, "w", encoding="utf-8") as f: json.dump([_card_to_dict(c) for c in cards], f, ensure_ascii=False, indent=2) print(f"Saved {len(cards)} cards to {SIMULATION_CARDS_PATH}") return cards # ==================== Single game ==================== PLAYER1_ID = "p1" PLAYER2_ID = "p2" MAX_TURNS = 300 # safety cap to prevent infinite games def _make_instances(deck: list[Card]) -> list[CardInstance]: return [ CardInstance( instance_id=str(uuid.uuid4()), card_id=card.name, name=card.name, attack=card.attack, defense=card.defense, max_defense=card.defense, cost=card.cost, card_type=card.card_type.name, card_rarity=card.card_rarity.name, image_link=card.image_link or "", text=card.text or "", ) for card in deck ] async def simulate_game( cards: list[Card], difficulty1: int, personality1: AIPersonality, difficulty2: int, personality2: AIPersonality, ) -> str | None: """ Simulate a single game between two AIs choosing from `cards`. Player 1 always goes first. Returns "p1", "p2", or None if the game exceeds MAX_TURNS. Designed to be awaited inside asyncio.gather() to run many games concurrently. """ deck1 = choose_cards(cards, difficulty1, personality1) deck2 = choose_cards(cards, difficulty2, personality2) instances1 = _make_instances(deck1) instances2 = _make_instances(deck2) random.shuffle(instances1) random.shuffle(instances2) deck_type1 = compute_deck_type(deck1) or "Balanced" deck_type2 = compute_deck_type(deck2) or "Balanced" p1 = PlayerState(user_id=PLAYER1_ID, username="AI1", deck_type=deck_type1, deck=instances1) p2 = PlayerState(user_id=PLAYER2_ID, username="AI2", deck_type=deck_type2, deck=instances2) # P1 always goes first p1.increment_energy_cap() p2.increment_energy_cap() p1.refill_energy() p1.draw_to_full() state = GameState( game_id=str(uuid.uuid4()), players={PLAYER1_ID: p1, PLAYER2_ID: p2}, player_order=[PLAYER1_ID, PLAYER2_ID], active_player_id=PLAYER1_ID, phase="main", turn=1, ) configs = { PLAYER1_ID: (difficulty1, personality1), PLAYER2_ID: (difficulty2, personality2), } for _ in range(MAX_TURNS): if state.result: break active_id = state.active_player_id difficulty, personality = configs[active_id] player = state.players[active_id] opponent = state.players[state.opponent_id(active_id)] plan = await choose_plan(player, opponent, personality, difficulty) for slot in plan.sacrifice_slots: if player.board[slot] is not None: action_sacrifice(state, slot) plays = list(plan.plays) random.shuffle(plays) for card, slot in plays: hand_idx = next((i for i, c in enumerate(player.hand) if c is card), None) if hand_idx is None: continue if player.board[slot] is not None: continue if card.cost > player.energy: continue action_play_card(state, hand_idx, slot) action_end_turn(state) if state.result and state.result.winner_id: return state.result.winner_id return None # ==================== Process-pool worker ==================== # These must be module-level so they are picklable. _worker_cards: list[Card] = [] def _init_worker(cards: list[Card]) -> None: global _worker_cards _worker_cards = cards def _run_game_sync(args: tuple) -> str | None: """Synchronous entry point for a worker process.""" d1, p1_name, d2, p2_name = args return asyncio.run(simulate_game( _worker_cards, d1, AIPersonality(p1_name), d2, AIPersonality(p2_name), )) # ==================== Tournament ==================== def _all_players(difficulties: list[int] | None = None) -> list[tuple[AIPersonality, int]]: """Return all (personality, difficulty) combinations for the given difficulties (default 1-10).""" if difficulties is None: difficulties = list(range(1, 11)) return [ (personality, difficulty) for personality in AIPersonality for difficulty in difficulties ] def _player_label(personality: AIPersonality, difficulty: int) -> str: return f"{personality.value[:3].upper()}-{difficulty}" async def run_tournament( cards: list[Card], games_per_matchup: int = 5, difficulties: list[int] | None = None, ) -> dict[tuple[int, int], int]: """ Pit every (personality, difficulty) pair against every other, as both first and second player. `difficulties` selects which difficulty levels to include (default: 1-10). Returns a wins dict keyed by (first_player_index, second_player_index) where the value is how many of `games_per_matchup` games the first player won. Games run in parallel across all CPU cores via ProcessPoolExecutor. Cards are sent to each worker once at startup, not once per game. """ players = _all_players(difficulties) n = len(players) # Build the flat list of (i, j, args) for every game indexed_args: list[tuple[int, int, tuple]] = [] for i in range(n): p1_personality, p1_difficulty = players[i] for j in range(n): p2_personality, p2_difficulty = players[j] args = (p1_difficulty, p1_personality.value, p2_difficulty, p2_personality.value) for _ in range(games_per_matchup): indexed_args.append((i, j, args)) total_games = len(indexed_args) n_workers = os.cpu_count() or 1 print(f"Running {total_games} games across {n_workers} workers " f"({n} players, {games_per_matchup} games per ordered pair)...") done = [0] report_every = max(1, total_games // 200) loop = asyncio.get_running_loop() async def tracked(future): result = await future done[0] += 1 if done[0] % report_every == 0 or done[0] == total_games: pct = done[0] / total_games * 100 print(f" {done[0]}/{total_games} games done ({pct:.1f}%)", end="\r", flush=True) return result with ProcessPoolExecutor( max_workers=n_workers, initializer=_init_worker, initargs=(cards,), ) as executor: futures = [ loop.run_in_executor(executor, _run_game_sync, args) for _, _, args in indexed_args ] results = await asyncio.gather(*[tracked(f) for f in futures]) print("\nFinished") wins: dict[tuple[int, int], int] = {} ties = 0 for (i, j, _), winner in zip(indexed_args, results): key = (i, j) if key not in wins: wins[key] = 0 if winner == PLAYER1_ID: wins[key] += 1 elif winner is None: ties += 1 print(f"Ties: {ties}") return wins def rank_players( wins: dict[tuple[int, int], int], games_per_matchup: int, players: list[tuple[AIPersonality, int]], ) -> list[int]: """ Rank player indices by total wins (as first + second player combined). Returns indices sorted worst-to-best. """ n = len(players) total_wins = [0] * n for (i, j), p1_wins in wins.items(): if i == j: continue # self-matchups are symmetric; skip to avoid double-counting p2_wins = games_per_matchup - p1_wins total_wins[i] += p1_wins total_wins[j] += p2_wins return sorted(range(n), key=lambda k: total_wins[k]) TOURNAMENT_RESULTS_PATH = os.path.join(os.path.dirname(__file__), "tournament_results.json") def save_tournament( wins: dict[tuple[int, int], int], games_per_matchup: int, players: list[tuple[AIPersonality, int]], path: str = TOURNAMENT_RESULTS_PATH, ): data = { "games_per_matchup": games_per_matchup, "players": [ {"personality": p.value, "difficulty": d} for p, d in players ], "wins": {f"{i},{j}": w for (i, j), w in wins.items()}, } with open(path, "w", encoding="utf-8") as f: json.dump(data, f, indent=2) print(f"Tournament results saved to {path}") def load_tournament(path: str = TOURNAMENT_RESULTS_PATH) -> tuple[dict[tuple[int, int], int], int, list[tuple[AIPersonality, int]]]: """Returns (wins, games_per_matchup, players).""" with open(path, "r", encoding="utf-8") as f: data = json.load(f) wins = { (int(k.split(",")[0]), int(k.split(",")[1])): v for k, v in data["wins"].items() } players = [ (AIPersonality(p["personality"]), p["difficulty"]) for p in data["players"] ] return wins, data["games_per_matchup"], players def draw_grid( wins: dict[tuple[int, int], int], games_per_matchup: int = 5, players: list[tuple[AIPersonality, int]] | None = None, output_path: str = "tournament_grid.png", ): """ Draw a heatmap grid of tournament results. Rows = first player Cols = second player Color = red if first player won more of their games in that cell green if second player won more × = one player swept all games in that cell """ import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt import matplotlib.colors as mcolors import numpy as np if players is None: players = _all_players() n = len(players) ranked = rank_players(wins, games_per_matchup, players) # worst-to-best indices labels = [_player_label(*players[i]) for i in ranked] # Build value matrix: (p1_wins - p2_wins) / games_per_matchup ∈ [-1, 1], NaN on diagonal matrix = np.full((n, n), np.nan) for row, i in enumerate(ranked): for col, j in enumerate(ranked): p1_wins = wins.get((i, j), 0) matrix[row, col] = (p1_wins - (games_per_matchup - p1_wins)) / games_per_matchup cell_size = 0.22 fig_size = n * cell_size + 3 fig, ax = plt.subplots(figsize=(fig_size, fig_size)) cmap = mcolors.LinearSegmentedColormap.from_list( "p1_p2", ["#90EE90", "#67A2E0", "#D74E4E"] # pastel green → blue → red ) norm = mcolors.Normalize(vmin=-1, vmax=1) img = ax.imshow(matrix, cmap=cmap, norm=norm, aspect="equal", interpolation="none") # × marks for sweeps for row, i in enumerate(ranked): for col, j in enumerate(ranked): p1_wins = wins.get((i, j), 0) if p1_wins == games_per_matchup or p1_wins == 0: ax.text(col, row, "×", ha="center", va="center", fontsize=5, color="black", fontweight="bold", zorder=3) ax.set_xticks(range(n)) ax.set_yticks(range(n)) ax.set_xticklabels(labels, rotation=90, fontsize=4) ax.set_yticklabels(labels, fontsize=4) ax.xaxis.set_label_position("top") ax.xaxis.tick_top() ax.set_xlabel("Second player", labelpad=8, fontsize=8) ax.set_ylabel("First player", labelpad=8, fontsize=8) ax.set_title( "Tournament results — red: first player wins more, green: second player wins more", pad=14, fontsize=9, ) plt.colorbar(img, ax=ax, fraction=0.015, pad=0.01, label="(P1 wins - P2 wins) / games per cell") plt.tight_layout() plt.savefig(output_path, dpi=150, bbox_inches="tight") plt.close() print(f"Grid saved to {output_path}") if __name__ == "__main__": import sys GAMES_PER_MATCHUP = 50 difficulties = list(range(1, 11)) card_pool = get_simulation_cards() players = _all_players(difficulties) wins = asyncio.run(run_tournament(card_pool, games_per_matchup=GAMES_PER_MATCHUP, difficulties=difficulties)) save_tournament(wins, games_per_matchup=GAMES_PER_MATCHUP, players=players) draw_grid(wins, games_per_matchup=GAMES_PER_MATCHUP, players=players)