279 lines
9.6 KiB
Python
279 lines
9.6 KiB
Python
import os
|
|
import random
|
|
import uuid
|
|
from collections import deque
|
|
|
|
import numpy as np
|
|
from dotenv import load_dotenv
|
|
load_dotenv()
|
|
|
|
from game.card import compute_deck_type
|
|
from ai.engine import AIPersonality, choose_cards, choose_plan
|
|
from game.rules import PlayerState, GameState, action_play_card, action_sacrifice, action_end_turn
|
|
from ai.simulate import get_simulation_cards, _make_instances, MAX_TURNS
|
|
from ai.nn import NeuralNet, NeuralPlayer
|
|
from ai.card_pick_nn import CardPickPlayer, N_CARD_FEATURES, CARD_PICK_WEIGHTS_PATH
|
|
|
|
NN_WEIGHTS_PATH = os.path.join(os.path.dirname(__file__), "nn_weights.json")
|
|
|
|
P1 = "p1"
|
|
P2 = "p2"
|
|
|
|
FIXED_PERSONALITIES = [
|
|
p for p in AIPersonality
|
|
if p not in (
|
|
AIPersonality.ARBITRARY,
|
|
AIPersonality.JEBRASKA
|
|
)
|
|
]
|
|
|
|
|
|
def _build_player(pid: str, name: str, cards: list, difficulty: int, personality: AIPersonality,
|
|
deck_pool: dict | None = None) -> PlayerState:
|
|
if deck_pool and personality in deck_pool:
|
|
deck = random.choice(deck_pool[personality])
|
|
else:
|
|
deck = choose_cards(cards, difficulty, personality)
|
|
instances = _make_instances(deck)
|
|
random.shuffle(instances)
|
|
p = PlayerState(
|
|
user_id=pid, username=name,
|
|
deck_type=compute_deck_type(deck) or "Balanced",
|
|
deck=instances,
|
|
)
|
|
return p
|
|
|
|
|
|
def _build_nn_player(pid: str, name: str, cards: list, difficulty: int,
|
|
card_pick_player: CardPickPlayer) -> PlayerState:
|
|
"""Build a PlayerState using the card-pick NN for deck selection."""
|
|
max_card_cost = difficulty + 1 if difficulty >= 6 else 6
|
|
allowed = [c for c in cards if c.cost <= max_card_cost] or list(cards)
|
|
deck = card_pick_player.choose_cards(allowed, difficulty)
|
|
instances = _make_instances(deck)
|
|
random.shuffle(instances)
|
|
return PlayerState(
|
|
user_id=pid, username=name,
|
|
deck_type=compute_deck_type(deck) or "Balanced",
|
|
deck=instances,
|
|
)
|
|
|
|
|
|
def run_episode(
|
|
p1_state: PlayerState,
|
|
p2_state: PlayerState,
|
|
p1_ctrl, # (player, opponent) -> MovePlan
|
|
p2_ctrl, # (player, opponent) -> MovePlan
|
|
) -> str | None:
|
|
"""Returns winner_id (P1 or P2) or None on timeout."""
|
|
p1_state.increment_energy_cap()
|
|
p2_state.increment_energy_cap()
|
|
p1_state.refill_energy()
|
|
p1_state.draw_to_full()
|
|
|
|
state = GameState(
|
|
game_id=str(uuid.uuid4()),
|
|
players={P1: p1_state, P2: p2_state},
|
|
player_order=[P1, P2],
|
|
active_player_id=P1,
|
|
phase="main",
|
|
turn=1,
|
|
)
|
|
ctrls = {P1: p1_ctrl, P2: p2_ctrl}
|
|
|
|
for _ in range(MAX_TURNS):
|
|
if state.result:
|
|
break
|
|
active_id = state.active_player_id
|
|
player = state.players[active_id]
|
|
opponent = state.players[state.opponent_id(active_id)]
|
|
|
|
plan = ctrls[active_id](player, opponent)
|
|
|
|
for slot in plan.sacrifice_slots:
|
|
if player.board[slot] is not None:
|
|
action_sacrifice(state, slot)
|
|
|
|
plays = list(plan.plays)
|
|
random.shuffle(plays)
|
|
for card, slot in plays:
|
|
hand_idx = next((i for i, c in enumerate(player.hand) if c is card), None)
|
|
if hand_idx is None or player.board[slot] is not None or card.cost > player.energy:
|
|
continue
|
|
action_play_card(state, hand_idx, slot)
|
|
|
|
action_end_turn(state)
|
|
|
|
return state.result.winner_id if state.result else None
|
|
|
|
|
|
def train(
|
|
n_episodes: int = 50_000,
|
|
self_play_start: int = 0,
|
|
self_play_max_frac: float = 0.9,
|
|
lr: float = 1e-3,
|
|
opp_difficulty: int = 10,
|
|
temperature: float = 1.0,
|
|
batch_size: int = 500,
|
|
save_every: int = 5_000,
|
|
save_path: str = NN_WEIGHTS_PATH,
|
|
) -> NeuralNet:
|
|
cards = get_simulation_cards()
|
|
|
|
# Pre-build a pool of opponent decks per personality to avoid rebuilding from scratch each episode.
|
|
DECK_POOL_SIZE = 100
|
|
opp_deck_pool: dict[AIPersonality, list] = {
|
|
p: [choose_cards(cards, opp_difficulty, p) for _ in range(DECK_POOL_SIZE)]
|
|
for p in FIXED_PERSONALITIES
|
|
}
|
|
|
|
if os.path.exists(save_path):
|
|
print(f"Resuming plan net from {save_path}")
|
|
net = NeuralNet.load(save_path)
|
|
else:
|
|
print("Initializing new plan network")
|
|
net = NeuralNet(seed=42)
|
|
|
|
cp_path = CARD_PICK_WEIGHTS_PATH
|
|
if os.path.exists(cp_path):
|
|
print(f"Resuming card-pick net from {cp_path}")
|
|
card_pick_net = NeuralNet.load(cp_path)
|
|
else:
|
|
print("Initializing new card-pick network")
|
|
card_pick_net = NeuralNet(n_features=N_CARD_FEATURES, hidden=(32, 16), seed=43)
|
|
|
|
recent_outcomes: deque[int] = deque(maxlen=1000) # rolling window for win rate display
|
|
baseline = 0.0 # EMA of recent outcomes; subtracted before each update
|
|
baseline_alpha = 0.99 # decay — roughly a 100-episode window
|
|
|
|
batch_gw = [np.zeros_like(w) for w in net.weights]
|
|
batch_gb = [np.zeros_like(b) for b in net.biases]
|
|
batch_count = 0
|
|
|
|
cp_batch_gw = [np.zeros_like(w) for w in card_pick_net.weights]
|
|
cp_batch_gb = [np.zeros_like(b) for b in card_pick_net.biases]
|
|
cp_batch_count = 0
|
|
|
|
for episode in range(1, n_episodes + 1):
|
|
# Ramp self-play fraction linearly from 0 to self_play_max_frac
|
|
if episode >= self_play_start:
|
|
progress = (episode - self_play_start) / max(1, n_episodes - self_play_start)
|
|
self_play_prob = self_play_max_frac * progress
|
|
else:
|
|
self_play_prob = 0.0
|
|
|
|
# Randomly decide who goes first (NN is always P1 for simplicity)
|
|
nn_goes_first = random.random() < 0.5
|
|
|
|
if random.random() < self_play_prob:
|
|
nn1 = NeuralPlayer(net, training=True, temperature=temperature)
|
|
nn2 = NeuralPlayer(net, training=True, temperature=temperature)
|
|
cp1 = CardPickPlayer(card_pick_net, training=True, temperature=temperature)
|
|
cp2 = CardPickPlayer(card_pick_net, training=True, temperature=temperature)
|
|
|
|
p1_state = _build_nn_player(P1, "NN1", cards, 10, cp1)
|
|
p2_state = _build_nn_player(P2, "NN2", cards, 10, cp2)
|
|
|
|
if not nn_goes_first:
|
|
p1_state, p2_state = p2_state, p1_state
|
|
|
|
winner = run_episode(p1_state, p2_state, nn1.choose_plan, nn2.choose_plan)
|
|
p1_outcome = 1.0 if winner == P1 else -1.0
|
|
baseline = baseline_alpha * baseline + (1 - baseline_alpha) * p1_outcome
|
|
|
|
for player_grads in [nn1.compute_grads(p1_outcome - baseline),
|
|
nn2.compute_grads(-p1_outcome - baseline)]:
|
|
if player_grads is not None:
|
|
gw, gb = player_grads
|
|
for i in range(len(batch_gw)):
|
|
batch_gw[i] += gw[i]
|
|
batch_gb[i] += gb[i]
|
|
batch_count += 1
|
|
|
|
for cp_grads in [cp1.compute_grads(p1_outcome - baseline),
|
|
cp2.compute_grads(-p1_outcome - baseline)]:
|
|
if cp_grads is not None:
|
|
gw, gb = cp_grads
|
|
for i in range(len(cp_batch_gw)):
|
|
cp_batch_gw[i] += gw[i]
|
|
cp_batch_gb[i] += gb[i]
|
|
cp_batch_count += 1
|
|
|
|
else:
|
|
opp_personality = random.choice(FIXED_PERSONALITIES)
|
|
nn_player = NeuralPlayer(net, training=True, temperature=temperature)
|
|
cp_player = CardPickPlayer(card_pick_net, training=True, temperature=temperature)
|
|
opp_ctrl = lambda p, o, pers=opp_personality, diff=opp_difficulty: choose_plan(p, o, pers, diff)
|
|
|
|
if nn_goes_first:
|
|
nn_id = P1
|
|
p1_state = _build_nn_player(P1, "NN", cards, 10, cp_player)
|
|
p2_state = _build_player(P2, "OPP", cards, opp_difficulty, opp_personality, opp_deck_pool)
|
|
winner = run_episode(p1_state, p2_state, nn_player.choose_plan, opp_ctrl)
|
|
else:
|
|
nn_id = P2
|
|
p1_state = _build_player(P1, "OPP", cards, opp_difficulty, opp_personality, opp_deck_pool)
|
|
p2_state = _build_nn_player(P2, "NN", cards, 10, cp_player)
|
|
winner = run_episode(p1_state, p2_state, opp_ctrl, nn_player.choose_plan)
|
|
|
|
nn_outcome = 1.0 if winner == nn_id else -1.0
|
|
player_grads = nn_player.compute_grads(nn_outcome - baseline)
|
|
baseline = baseline_alpha * baseline + (1 - baseline_alpha) * nn_outcome
|
|
|
|
if player_grads is not None:
|
|
gw, gb = player_grads
|
|
for i in range(len(batch_gw)):
|
|
batch_gw[i] += gw[i]
|
|
batch_gb[i] += gb[i]
|
|
batch_count += 1
|
|
|
|
cp_grads = cp_player.compute_grads(nn_outcome - baseline)
|
|
if cp_grads is not None:
|
|
gw, gb = cp_grads
|
|
for i in range(len(cp_batch_gw)):
|
|
cp_batch_gw[i] += gw[i]
|
|
cp_batch_gb[i] += gb[i]
|
|
cp_batch_count += 1
|
|
|
|
recent_outcomes.append(1 if winner == nn_id else 0)
|
|
|
|
if batch_count >= batch_size:
|
|
for i in range(len(batch_gw)):
|
|
batch_gw[i] /= batch_count
|
|
batch_gb[i] /= batch_count
|
|
net.adam_update(batch_gw, batch_gb, lr=lr)
|
|
batch_gw = [np.zeros_like(w) for w in net.weights]
|
|
batch_gb = [np.zeros_like(b) for b in net.biases]
|
|
batch_count = 0
|
|
|
|
if cp_batch_count >= batch_size:
|
|
for i in range(len(cp_batch_gw)):
|
|
cp_batch_gw[i] /= cp_batch_count
|
|
cp_batch_gb[i] /= cp_batch_count
|
|
card_pick_net.adam_update(cp_batch_gw, cp_batch_gb, lr=lr)
|
|
cp_batch_gw = [np.zeros_like(w) for w in card_pick_net.weights]
|
|
cp_batch_gb = [np.zeros_like(b) for b in card_pick_net.biases]
|
|
cp_batch_count = 0
|
|
|
|
if episode % 1000 == 0 or episode == n_episodes:
|
|
wr = sum(recent_outcomes) / len(recent_outcomes) if recent_outcomes else 0.0
|
|
print(f"\r[{episode:>6}/{n_episodes}] win rate (last {len(recent_outcomes)}): {wr:.1%} "
|
|
f"self-play frac: {self_play_prob:.0%}", flush=True)
|
|
else:
|
|
print(f" {episode % 1000}/1000", end="\r", flush=True)
|
|
|
|
if episode % save_every == 0:
|
|
net.save(save_path)
|
|
card_pick_net.save(cp_path)
|
|
print(f" → saved to {save_path} and {cp_path}")
|
|
|
|
net.save(save_path)
|
|
card_pick_net.save(cp_path)
|
|
wr = sum(recent_outcomes) / len(recent_outcomes) if recent_outcomes else 0.0
|
|
print(f"Done. Final win rate (last {len(recent_outcomes)}): {wr:.1%}")
|
|
return net
|
|
|
|
|
|
if __name__ == "__main__":
|
|
train()
|