This commit is contained in:
2026-03-19 22:53:33 +01:00
parent fa05447895
commit 4fa0cadc7f
7 changed files with 2399 additions and 294 deletions

440
backend/simulate.py Normal file
View File

@@ -0,0 +1,440 @@
import json
import os
import random
import uuid
import asyncio
from concurrent.futures import ProcessPoolExecutor
from dotenv import load_dotenv
load_dotenv()
from datetime import datetime
from card import Card, CardType, CardRarity, generate_cards, compute_deck_type
from game import (
CardInstance, PlayerState, GameState,
action_play_card, action_sacrifice, action_end_turn,
)
from ai import AIPersonality, choose_cards, choose_plan
SIMULATION_CARDS_PATH = os.path.join(os.path.dirname(__file__), "simulation_cards.json")
SIMULATION_CARD_COUNT = 1000
# ==================== Card pool ====================
def _card_to_dict(card: Card) -> dict:
return {
"name": card.name,
"created_at": card.created_at.isoformat(),
"image_link": card.image_link,
"card_rarity": card.card_rarity.name,
"card_type": card.card_type.name,
"wikidata_instance": card.wikidata_instance,
"text": card.text,
"attack": card.attack,
"defense": card.defense,
"cost": card.cost,
}
def _dict_to_card(d: dict) -> Card:
return Card(
name=d["name"],
created_at=datetime.fromisoformat(d["created_at"]),
image_link=d["image_link"],
card_rarity=CardRarity[d["card_rarity"]],
card_type=CardType[d["card_type"]],
wikidata_instance=d["wikidata_instance"],
text=d["text"],
attack=d["attack"],
defense=d["defense"],
cost=d["cost"],
)
def get_simulation_cards() -> list[Card]:
if os.path.exists(SIMULATION_CARDS_PATH):
with open(SIMULATION_CARDS_PATH, "r", encoding="utf-8") as f:
data = json.load(f)
return [_dict_to_card(d) for d in data]
print(f"Generating {SIMULATION_CARD_COUNT} cards (this may take a while)...")
cards = generate_cards(SIMULATION_CARD_COUNT)
with open(SIMULATION_CARDS_PATH, "w", encoding="utf-8") as f:
json.dump([_card_to_dict(c) for c in cards], f, ensure_ascii=False, indent=2)
print(f"Saved {len(cards)} cards to {SIMULATION_CARDS_PATH}")
return cards
# ==================== Single game ====================
PLAYER1_ID = "p1"
PLAYER2_ID = "p2"
MAX_TURNS = 300 # safety cap to prevent infinite games
def _make_instances(deck: list[Card]) -> list[CardInstance]:
return [
CardInstance(
instance_id=str(uuid.uuid4()),
card_id=card.name,
name=card.name,
attack=card.attack,
defense=card.defense,
max_defense=card.defense,
cost=card.cost,
card_type=card.card_type.name,
card_rarity=card.card_rarity.name,
image_link=card.image_link or "",
text=card.text or "",
)
for card in deck
]
async def simulate_game(
cards: list[Card],
difficulty1: int,
personality1: AIPersonality,
difficulty2: int,
personality2: AIPersonality,
) -> str | None:
"""
Simulate a single game between two AIs choosing from `cards`.
Player 1 always goes first.
Returns "p1", "p2", or None if the game exceeds MAX_TURNS.
Designed to be awaited inside asyncio.gather() to run many games concurrently.
"""
deck1 = choose_cards(cards, difficulty1, personality1)
deck2 = choose_cards(cards, difficulty2, personality2)
instances1 = _make_instances(deck1)
instances2 = _make_instances(deck2)
random.shuffle(instances1)
random.shuffle(instances2)
deck_type1 = compute_deck_type(deck1) or "Balanced"
deck_type2 = compute_deck_type(deck2) or "Balanced"
p1 = PlayerState(user_id=PLAYER1_ID, username="AI1", deck_type=deck_type1, deck=instances1)
p2 = PlayerState(user_id=PLAYER2_ID, username="AI2", deck_type=deck_type2, deck=instances2)
# P1 always goes first
p1.increment_energy_cap()
p2.increment_energy_cap()
p1.refill_energy()
p1.draw_to_full()
state = GameState(
game_id=str(uuid.uuid4()),
players={PLAYER1_ID: p1, PLAYER2_ID: p2},
player_order=[PLAYER1_ID, PLAYER2_ID],
active_player_id=PLAYER1_ID,
phase="main",
turn=1,
)
configs = {
PLAYER1_ID: (difficulty1, personality1),
PLAYER2_ID: (difficulty2, personality2),
}
for _ in range(MAX_TURNS):
if state.result:
break
active_id = state.active_player_id
difficulty, personality = configs[active_id]
player = state.players[active_id]
opponent = state.players[state.opponent_id(active_id)]
plan = await choose_plan(player, opponent, personality, difficulty)
for slot in plan.sacrifice_slots:
if player.board[slot] is not None:
action_sacrifice(state, slot)
plays = list(plan.plays)
random.shuffle(plays)
for card, slot in plays:
hand_idx = next((i for i, c in enumerate(player.hand) if c is card), None)
if hand_idx is None:
continue
if player.board[slot] is not None:
continue
if card.cost > player.energy:
continue
action_play_card(state, hand_idx, slot)
action_end_turn(state)
if state.result and state.result.winner_id:
return state.result.winner_id
return None
# ==================== Process-pool worker ====================
# These must be module-level so they are picklable.
_worker_cards: list[Card] = []
def _init_worker(cards: list[Card]) -> None:
global _worker_cards
_worker_cards = cards
def _run_game_sync(args: tuple) -> str | None:
"""Synchronous entry point for a worker process."""
d1, p1_name, d2, p2_name = args
return asyncio.run(simulate_game(
_worker_cards,
d1, AIPersonality(p1_name),
d2, AIPersonality(p2_name),
))
# ==================== Tournament ====================
def _all_players(difficulties: list[int] | None = None) -> list[tuple[AIPersonality, int]]:
"""Return all (personality, difficulty) combinations for the given difficulties (default 1-10)."""
if difficulties is None:
difficulties = list(range(1, 11))
return [
(personality, difficulty)
for personality in AIPersonality
for difficulty in difficulties
]
def _player_label(personality: AIPersonality, difficulty: int) -> str:
return f"{personality.value[:3].upper()}-{difficulty}"
async def run_tournament(
cards: list[Card],
games_per_matchup: int = 5,
difficulties: list[int] | None = None,
) -> dict[tuple[int, int], int]:
"""
Pit every (personality, difficulty) pair against every other, as both
first and second player.
`difficulties` selects which difficulty levels to include (default: 1-10).
Returns a wins dict keyed by (first_player_index, second_player_index)
where the value is how many of `games_per_matchup` games the first player won.
Games run in parallel across all CPU cores via ProcessPoolExecutor.
Cards are sent to each worker once at startup, not once per game.
"""
players = _all_players(difficulties)
n = len(players)
# Build the flat list of (i, j, args) for every game
indexed_args: list[tuple[int, int, tuple]] = []
for i in range(n):
p1_personality, p1_difficulty = players[i]
for j in range(n):
p2_personality, p2_difficulty = players[j]
args = (p1_difficulty, p1_personality.value, p2_difficulty, p2_personality.value)
for _ in range(games_per_matchup):
indexed_args.append((i, j, args))
total_games = len(indexed_args)
n_workers = os.cpu_count() or 1
print(f"Running {total_games} games across {n_workers} workers "
f"({n} players, {games_per_matchup} games per ordered pair)...")
done = [0]
report_every = max(1, total_games // 200)
loop = asyncio.get_running_loop()
async def tracked(future):
result = await future
done[0] += 1
if done[0] % report_every == 0 or done[0] == total_games:
pct = done[0] / total_games * 100
print(f" {done[0]}/{total_games} games done ({pct:.1f}%)", end="\r", flush=True)
return result
with ProcessPoolExecutor(
max_workers=n_workers,
initializer=_init_worker,
initargs=(cards,),
) as executor:
futures = [
loop.run_in_executor(executor, _run_game_sync, args)
for _, _, args in indexed_args
]
results = await asyncio.gather(*[tracked(f) for f in futures])
print("\nFinished")
wins: dict[tuple[int, int], int] = {}
ties = 0
for (i, j, _), winner in zip(indexed_args, results):
key = (i, j)
if key not in wins:
wins[key] = 0
if winner == PLAYER1_ID:
wins[key] += 1
elif winner is None:
ties += 1
print(f"Ties: {ties}")
return wins
def rank_players(
wins: dict[tuple[int, int], int],
games_per_matchup: int,
players: list[tuple[AIPersonality, int]],
) -> list[int]:
"""
Rank player indices by total wins (as first + second player combined).
Returns indices sorted worst-to-best.
"""
n = len(players)
total_wins = [0] * n
for (i, j), p1_wins in wins.items():
if i == j:
continue # self-matchups are symmetric; skip to avoid double-counting
p2_wins = games_per_matchup - p1_wins
total_wins[i] += p1_wins
total_wins[j] += p2_wins
return sorted(range(n), key=lambda k: total_wins[k])
TOURNAMENT_RESULTS_PATH = os.path.join(os.path.dirname(__file__), "tournament_results.json")
def save_tournament(
wins: dict[tuple[int, int], int],
games_per_matchup: int,
players: list[tuple[AIPersonality, int]],
path: str = TOURNAMENT_RESULTS_PATH,
):
data = {
"games_per_matchup": games_per_matchup,
"players": [
{"personality": p.value, "difficulty": d}
for p, d in players
],
"wins": {f"{i},{j}": w for (i, j), w in wins.items()},
}
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
print(f"Tournament results saved to {path}")
def load_tournament(path: str = TOURNAMENT_RESULTS_PATH) -> tuple[dict[tuple[int, int], int], int, list[tuple[AIPersonality, int]]]:
"""Returns (wins, games_per_matchup, players)."""
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
wins = {
(int(k.split(",")[0]), int(k.split(",")[1])): v
for k, v in data["wins"].items()
}
players = [
(AIPersonality(p["personality"]), p["difficulty"])
for p in data["players"]
]
return wins, data["games_per_matchup"], players
def draw_grid(
wins: dict[tuple[int, int], int],
games_per_matchup: int = 5,
players: list[tuple[AIPersonality, int]] | None = None,
output_path: str = "tournament_grid.png",
):
"""
Draw a heatmap grid of tournament results.
Rows = first player
Cols = second player
Color = red if first player won more of their games in that cell
green if second player won more
× = one player swept all games in that cell
"""
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
import numpy as np
if players is None:
players = _all_players()
n = len(players)
ranked = rank_players(wins, games_per_matchup, players) # worst-to-best indices
labels = [_player_label(*players[i]) for i in ranked]
# Build value matrix: (p1_wins - p2_wins) / games_per_matchup ∈ [-1, 1], NaN on diagonal
matrix = np.full((n, n), np.nan)
for row, i in enumerate(ranked):
for col, j in enumerate(ranked):
p1_wins = wins.get((i, j), 0)
matrix[row, col] = (p1_wins - (games_per_matchup - p1_wins)) / games_per_matchup
cell_size = 0.22
fig_size = n * cell_size + 3
fig, ax = plt.subplots(figsize=(fig_size, fig_size))
cmap = mcolors.LinearSegmentedColormap.from_list(
"p1_p2", ["#90EE90", "#67A2E0", "#D74E4E"] # pastel green → blue → red
)
norm = mcolors.Normalize(vmin=-1, vmax=1)
img = ax.imshow(matrix, cmap=cmap, norm=norm, aspect="equal", interpolation="none")
# × marks for sweeps
for row, i in enumerate(ranked):
for col, j in enumerate(ranked):
p1_wins = wins.get((i, j), 0)
if p1_wins == games_per_matchup or p1_wins == 0:
ax.text(col, row, "×", ha="center", va="center",
fontsize=5, color="black", fontweight="bold", zorder=3)
ax.set_xticks(range(n))
ax.set_yticks(range(n))
ax.set_xticklabels(labels, rotation=90, fontsize=4)
ax.set_yticklabels(labels, fontsize=4)
ax.xaxis.set_label_position("top")
ax.xaxis.tick_top()
ax.set_xlabel("Second player", labelpad=8, fontsize=8)
ax.set_ylabel("First player", labelpad=8, fontsize=8)
ax.set_title(
"Tournament results — red: first player wins more, green: second player wins more",
pad=14, fontsize=9,
)
plt.colorbar(img, ax=ax, fraction=0.015, pad=0.01,
label="(P1 wins - P2 wins) / games per cell")
plt.tight_layout()
plt.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close()
print(f"Grid saved to {output_path}")
if __name__ == "__main__":
import sys
GAMES_PER_MATCHUP = 50
difficulties = list(range(1, 11))
card_pool = get_simulation_cards()
players = _all_players(difficulties)
wins = asyncio.run(run_tournament(card_pool, games_per_matchup=GAMES_PER_MATCHUP, difficulties=difficulties))
save_tournament(wins, games_per_matchup=GAMES_PER_MATCHUP, players=players)
draw_grid(wins, games_per_matchup=GAMES_PER_MATCHUP, players=players)