🐐
This commit is contained in:
269
backend/nn.py
Normal file
269
backend/nn.py
Normal file
@@ -0,0 +1,269 @@
|
||||
import numpy as np
|
||||
import json
|
||||
|
||||
# Layout: [state(8) | my_board(15) | opp_board(15) | plan(3) | result_board(15) | opp_deck_type(8)]
|
||||
N_FEATURES = 64
|
||||
|
||||
_DECK_TYPES = ["Balanced", "Aggro", "Wall", "Rush", "Control", "God Card", "Pantheon", "Unplayable"]
|
||||
_DECK_TYPE_IDX = {dt: i for i, dt in enumerate(_DECK_TYPES)}
|
||||
|
||||
_MAX_ATK = 50.0
|
||||
_MAX_DEF = 100.0
|
||||
_MAX_DECK = 30.0
|
||||
|
||||
|
||||
def _softmax(x: np.ndarray) -> np.ndarray:
|
||||
e = np.exp(x - x.max())
|
||||
return e / e.sum()
|
||||
|
||||
|
||||
class NeuralNet:
|
||||
"""
|
||||
Fully-connected plan scorer: n_features → 64 → 32 → 1
|
||||
Pure numpy so it can be pickled into worker processes.
|
||||
Optimizer: Adam.
|
||||
"""
|
||||
|
||||
def __init__(self, n_features: int = N_FEATURES, hidden: tuple = (64, 32), seed: int | None = None):
|
||||
rng = np.random.RandomState(seed)
|
||||
sizes = [n_features] + list(hidden) + [1]
|
||||
|
||||
self.weights: list[np.ndarray] = []
|
||||
self.biases: list[np.ndarray] = []
|
||||
self.m_w: list[np.ndarray] = []
|
||||
self.v_w: list[np.ndarray] = []
|
||||
self.m_b: list[np.ndarray] = []
|
||||
self.v_b: list[np.ndarray] = []
|
||||
self.t = 0
|
||||
|
||||
for fan_in, fan_out in zip(sizes, sizes[1:]):
|
||||
w = rng.randn(fan_in, fan_out).astype(np.float32) * np.sqrt(2.0 / fan_in)
|
||||
b = np.zeros(fan_out, dtype=np.float32)
|
||||
self.weights.append(w)
|
||||
self.biases.append(b)
|
||||
self.m_w.append(np.zeros_like(w))
|
||||
self.v_w.append(np.zeros_like(w))
|
||||
self.m_b.append(np.zeros_like(b))
|
||||
self.v_b.append(np.zeros_like(b))
|
||||
|
||||
self._acts: list[np.ndarray] = []
|
||||
self._pre_acts: list[np.ndarray] = []
|
||||
|
||||
def forward(self, X: np.ndarray) -> np.ndarray:
|
||||
"""X: (n, n_features) → scores: (n,)"""
|
||||
h = X.astype(np.float32)
|
||||
self._acts = [h]
|
||||
self._pre_acts = []
|
||||
for i, (W, b) in enumerate(zip(self.weights, self.biases)):
|
||||
z = h @ W + b
|
||||
self._pre_acts.append(z)
|
||||
h = np.maximum(0.0, z) if i < len(self.weights) - 1 else z
|
||||
self._acts.append(h)
|
||||
return h.squeeze(-1)
|
||||
|
||||
def backward(self, upstream: np.ndarray) -> tuple[list, list]:
|
||||
"""
|
||||
upstream: (n,) — dJ/d(scores), gradient for ascent.
|
||||
Returns (grads_w, grads_b).
|
||||
"""
|
||||
n = len(upstream)
|
||||
delta = upstream[:, None] # (n, 1)
|
||||
grads_w = [None] * len(self.weights)
|
||||
grads_b = [None] * len(self.biases)
|
||||
for i in range(len(self.weights) - 1, -1, -1):
|
||||
h_in = self._acts[i] # (n, in_size)
|
||||
grads_w[i] = h_in.T @ delta / n
|
||||
grads_b[i] = delta.mean(axis=0)
|
||||
if i > 0:
|
||||
delta = (delta @ self.weights[i].T) * (self._pre_acts[i - 1] > 0)
|
||||
return grads_w, grads_b
|
||||
|
||||
def adam_update(self, grads_w: list, grads_b: list,
|
||||
lr: float = 1e-3, beta1: float = 0.9,
|
||||
beta2: float = 0.999, eps: float = 1e-8,
|
||||
grad_clip: float = 1.0) -> None:
|
||||
# Global gradient norm clipping
|
||||
all_grads = [g for g in grads_w + grads_b if g is not None]
|
||||
global_norm = np.sqrt(sum(np.sum(g * g) for g in all_grads))
|
||||
if global_norm > grad_clip:
|
||||
scale = grad_clip / global_norm
|
||||
grads_w = [g * scale for g in grads_w]
|
||||
grads_b = [g * scale for g in grads_b]
|
||||
|
||||
self.t += 1
|
||||
bc1 = 1 - beta1 ** self.t
|
||||
bc2 = 1 - beta2 ** self.t
|
||||
for i, (gw, gb) in enumerate(zip(grads_w, grads_b)):
|
||||
self.m_w[i] = beta1 * self.m_w[i] + (1 - beta1) * gw
|
||||
self.v_w[i] = beta2 * self.v_w[i] + (1 - beta2) * gw * gw
|
||||
self.weights[i] += lr * (self.m_w[i] / bc1) / (np.sqrt(self.v_w[i] / bc2) + eps)
|
||||
|
||||
self.m_b[i] = beta1 * self.m_b[i] + (1 - beta1) * gb
|
||||
self.v_b[i] = beta2 * self.v_b[i] + (1 - beta2) * gb * gb
|
||||
self.biases[i] += lr * (self.m_b[i] / bc1) / (np.sqrt(self.v_b[i] / bc2) + eps)
|
||||
|
||||
def save(self, path: str) -> None:
|
||||
data = {
|
||||
"weights": [w.tolist() for w in self.weights],
|
||||
"biases": [b.tolist() for b in self.biases],
|
||||
"m_w": [m.tolist() for m in self.m_w],
|
||||
"v_w": [v.tolist() for v in self.v_w],
|
||||
"m_b": [m.tolist() for m in self.m_b],
|
||||
"v_b": [v.tolist() for v in self.v_b],
|
||||
"t": self.t,
|
||||
}
|
||||
with open(path, "w") as f:
|
||||
json.dump(data, f)
|
||||
|
||||
@classmethod
|
||||
def load(cls, path: str) -> "NeuralNet":
|
||||
with open(path) as f:
|
||||
data = json.load(f)
|
||||
net = cls.__new__(cls)
|
||||
net.weights = [np.array(w, dtype=np.float32) for w in data["weights"]]
|
||||
net.biases = [np.array(b, dtype=np.float32) for b in data["biases"]]
|
||||
net.m_w = [np.array(m, dtype=np.float32) for m in data["m_w"]]
|
||||
net.v_w = [np.array(v, dtype=np.float32) for v in data["v_w"]]
|
||||
net.m_b = [np.array(m, dtype=np.float32) for m in data["m_b"]]
|
||||
net.v_b = [np.array(v, dtype=np.float32) for v in data["v_b"]]
|
||||
net.t = data["t"]
|
||||
net._acts = []
|
||||
net._pre_acts = []
|
||||
return net
|
||||
|
||||
|
||||
# ==================== Feature extraction ====================
|
||||
|
||||
def extract_plan_features(plans: list, player, opponent) -> np.ndarray:
|
||||
"""
|
||||
Returns (n_plans, N_FEATURES) float32 array.
|
||||
Layout: [state(8) | my_board(15) | opp_board(15) | plan(3) | result_board(15)]
|
||||
"""
|
||||
from game import BOARD_SIZE, HAND_SIZE, MAX_ENERGY_CAP, STARTING_LIFE
|
||||
|
||||
n = len(plans)
|
||||
|
||||
# ---- state (same for every plan) ----
|
||||
state = np.array([
|
||||
player.life / STARTING_LIFE,
|
||||
opponent.life / STARTING_LIFE,
|
||||
player.energy / MAX_ENERGY_CAP,
|
||||
player.energy_cap / MAX_ENERGY_CAP,
|
||||
len(player.hand) / HAND_SIZE,
|
||||
len(opponent.hand) / HAND_SIZE,
|
||||
len(player.deck) / _MAX_DECK,
|
||||
len(opponent.deck) / _MAX_DECK,
|
||||
], dtype=np.float32)
|
||||
|
||||
# ---- current boards (same for every plan) ----
|
||||
my_board = np.zeros(BOARD_SIZE * 3, dtype=np.float32)
|
||||
opp_board = np.zeros(BOARD_SIZE * 3, dtype=np.float32)
|
||||
for slot in range(BOARD_SIZE):
|
||||
c = player.board[slot]
|
||||
if c is not None:
|
||||
my_board[slot * 3] = c.attack / _MAX_ATK
|
||||
my_board[slot * 3 + 1] = c.defense / _MAX_DEF
|
||||
my_board[slot * 3 + 2] = 1.0
|
||||
c = opponent.board[slot]
|
||||
if c is not None:
|
||||
opp_board[slot * 3] = c.attack / _MAX_ATK
|
||||
opp_board[slot * 3 + 1] = c.defense / _MAX_DEF
|
||||
opp_board[slot * 3 + 2] = 1.0
|
||||
|
||||
# ---- per-plan features ----
|
||||
plan_part = np.zeros((n, 3 + BOARD_SIZE * 3), dtype=np.float32)
|
||||
for idx, plan in enumerate(plans):
|
||||
# simulate board result
|
||||
result = list(player.board)
|
||||
for slot in plan.sacrifice_slots:
|
||||
result[slot] = None
|
||||
for card, slot in plan.plays:
|
||||
result[slot] = card
|
||||
|
||||
total_cost = sum(c.cost for c, _ in plan.plays) if plan.plays else 0
|
||||
plan_part[idx, 0] = len(plan.sacrifice_slots) / BOARD_SIZE
|
||||
plan_part[idx, 1] = len(plan.plays) / HAND_SIZE
|
||||
plan_part[idx, 2] = total_cost / (MAX_ENERGY_CAP + BOARD_SIZE)
|
||||
|
||||
for slot in range(BOARD_SIZE):
|
||||
c = result[slot]
|
||||
if c is not None:
|
||||
plan_part[idx, 3 + slot * 3] = c.attack / _MAX_ATK
|
||||
plan_part[idx, 3 + slot * 3 + 1] = c.defense / _MAX_DEF
|
||||
plan_part[idx, 3 + slot * 3 + 2] = 1.0
|
||||
|
||||
# ---- opponent deck type one-hot (same for every plan) ----
|
||||
opp_deck_oh = np.zeros(len(_DECK_TYPES), dtype=np.float32)
|
||||
opp_deck_oh[_DECK_TYPE_IDX.get(opponent.deck_type, 0)] = 1.0
|
||||
|
||||
state_t = np.tile(state, (n, 1))
|
||||
my_board_t = np.tile(my_board, (n, 1))
|
||||
opp_board_t = np.tile(opp_board, (n, 1))
|
||||
opp_deck_t = np.tile(opp_deck_oh, (n, 1))
|
||||
|
||||
return np.concatenate([state_t, my_board_t, opp_board_t, plan_part, opp_deck_t], axis=1)
|
||||
|
||||
|
||||
# ==================== Neural player ====================
|
||||
|
||||
class NeuralPlayer:
|
||||
"""
|
||||
Wraps a NeuralNet for use in game simulation.
|
||||
In training mode, samples plans stochastically and records the trajectory
|
||||
for a REINFORCE update after the game ends.
|
||||
In inference mode, picks the highest-scoring plan deterministically.
|
||||
"""
|
||||
|
||||
def __init__(self, net: NeuralNet, training: bool = False, temperature: float = 1.0):
|
||||
self.net = net
|
||||
self.training = training
|
||||
self.temperature = temperature
|
||||
self.trajectory: list[tuple[np.ndarray, int]] = [] # (features, chosen_idx)
|
||||
|
||||
def choose_plan(self, player, opponent):
|
||||
from ai import generate_plans
|
||||
plans = generate_plans(player, opponent)
|
||||
features = extract_plan_features(plans, player, opponent)
|
||||
scores = self.net.forward(features)
|
||||
|
||||
if self.training:
|
||||
probs = _softmax((scores / self.temperature).astype(np.float64))
|
||||
probs = np.clip(probs, 1e-10, None)
|
||||
probs /= probs.sum()
|
||||
chosen_idx = int(np.random.choice(len(plans), p=probs))
|
||||
self.trajectory.append((features, chosen_idx))
|
||||
else:
|
||||
chosen_idx = int(np.argmax(scores))
|
||||
|
||||
return plans[chosen_idx]
|
||||
|
||||
def compute_grads(self, outcome: float) -> tuple[list, list] | None:
|
||||
"""
|
||||
Computes averaged REINFORCE gradients for this trajectory without updating weights.
|
||||
outcome: centered reward (win/loss minus baseline).
|
||||
Returns (grads_w, grads_b), or None if trajectory is empty.
|
||||
"""
|
||||
if not self.trajectory:
|
||||
return None
|
||||
|
||||
acc_gw = [np.zeros_like(w) for w in self.net.weights]
|
||||
acc_gb = [np.zeros_like(b) for b in self.net.biases]
|
||||
|
||||
for features, chosen_idx in self.trajectory:
|
||||
scores = self.net.forward(features)
|
||||
probs = _softmax(scores.astype(np.float64)).astype(np.float32)
|
||||
upstream = -probs.copy()
|
||||
upstream[chosen_idx] += 1.0
|
||||
upstream *= outcome
|
||||
gw, gb = self.net.backward(upstream)
|
||||
for i in range(len(acc_gw)):
|
||||
acc_gw[i] += gw[i]
|
||||
acc_gb[i] += gb[i]
|
||||
|
||||
n = len(self.trajectory)
|
||||
for i in range(len(acc_gw)):
|
||||
acc_gw[i] /= n
|
||||
acc_gb[i] /= n
|
||||
|
||||
self.trajectory.clear()
|
||||
return acc_gw, acc_gb
|
||||
Reference in New Issue
Block a user