Files
wiki-tcg/backend/ai/engine.py
2026-04-01 18:31:33 +02:00

460 lines
17 KiB
Python

import asyncio
import logging
import os
import random
from dataclasses import dataclass
from enum import Enum
from itertools import combinations, permutations
import numpy as np
from game.card import Card
from game.rules import action_play_card, action_sacrifice, action_end_turn, BOARD_SIZE, STARTING_LIFE, PlayerState
logger = logging.getLogger("app")
AI_USER_ID = "ai"
class AIPersonality(Enum):
AGGRESSIVE = "aggressive"
DEFENSIVE = "defensive"
BALANCED = "balanced"
GREEDY = "greedy" # prioritizes high cost cards, willing to sacrifice
SWARM = "swarm"
CONTROL = "control"
ARBITRARY = "arbitrary"
JEBRASKA = "jebraska" # trained neural network plan scorer
def get_random_personality() -> AIPersonality:
return random.choice(list(AIPersonality))
def calculate_exact_cost(attack: int, defense: int) -> float:
"""Calculate the exact cost before rounding (matches card.py formula)."""
return min(10.0, max(1.0, ((attack**2 + defense**2)**0.18) / 1.5))
def get_power_curve_value(card) -> float:
"""
Returns how much above the power curve a card is.
Positive values mean the card is a better-than-expected deal for its cost.
"""
exact_cost = calculate_exact_cost(card.attack, card.defense)
return exact_cost - card.cost
def choose_cards(cards: list[Card], difficulty: int, personality: AIPersonality) -> list[Card]:
BUDGET = 50
if difficulty >= 6:
max_card_cost = difficulty + 1
else:
max_card_cost = 6
allowed = [c for c in cards if c.cost <= max_card_cost] or list(cards)
# Vectorized scoring over all allowed cards at once
atk = np.array([c.attack for c in allowed], dtype=np.float32)
defn = np.array([c.defense for c in allowed], dtype=np.float32)
cost = np.array([c.cost for c in allowed], dtype=np.float32)
exact_cost = np.minimum(10.0, np.maximum(1.0, ((atk**2 + defn**2)**0.18) / 1.5))
pcv_norm = np.clip(exact_cost - cost, 0.0, 1.0)
cost_norm = cost / max_card_cost
totals = atk + defn
atk_ratio = np.where(totals > 0, atk / totals, 0.5)
def_not_one = np.where(defn != 1, 1.0, 0.0)
if personality == AIPersonality.AGGRESSIVE:
# (1-cost_norm) penalizes expensive cards. High-attack cards are inherently expensive,
# so without this the second pass drifts toward costly cards at higher difficulty,
# shrinking the deck. The bonus grows with max_card_cost and exactly offsets that drift.
scores = 0.50 * atk_ratio + 0.35 * pcv_norm + 0.15 * (1.0 - cost_norm) + 0.10 * def_not_one
elif personality == AIPersonality.DEFENSIVE:
# Small (1-cost_norm) for the same anti-shrinkage reason; lighter because high-defense
# cards don't correlate as strongly with cost as high-attack cards do.
scores = 0.10 * (1.0 - atk_ratio) + 0.80 * pcv_norm + 0.10 * cost_norm
elif personality == AIPersonality.GREEDY:
# Small cost_norm keeps flavour without causing severe deck shrinkage at D10
scores = 0.20 * cost_norm + 0.80 * pcv_norm
elif personality == AIPersonality.SWARM:
scores = 0.40 * (1.0 - cost_norm) + 0.35 * atk_ratio + 0.20 * pcv_norm + 0.05 * def_not_one
elif personality == AIPersonality.CONTROL:
# Small cost_norm keeps flavour without causing severe deck shrinkage at D10
scores = 0.85 * pcv_norm + 0.15 * cost_norm
elif personality == AIPersonality.BALANCED:
scores = 0.60 * pcv_norm + 0.25 * atk_ratio + 0.15 * (1.0 - atk_ratio)
elif personality == AIPersonality.JEBRASKA:
# Delegate entirely to the card-pick NN; skip the heuristic scoring path.
from ai.card_pick_nn import CardPickPlayer, CARD_PICK_WEIGHTS_PATH
from ai.nn import NeuralNet
if not hasattr(choose_cards, "_card_pick_net"):
choose_cards._card_pick_net = (
NeuralNet.load(CARD_PICK_WEIGHTS_PATH)
if os.path.exists(CARD_PICK_WEIGHTS_PATH) else None
)
net = choose_cards._card_pick_net
if net is not None:
return CardPickPlayer(net, training=False).choose_cards(allowed, difficulty)
# Fall through to BALANCED heuristic if weights aren't trained yet.
scores = 0.60 * pcv_norm + 0.25 * atk_ratio + 0.15 * (1.0 - atk_ratio)
else: # ARBITRARY
w = 0.09 * difficulty
scores = w * pcv_norm + (1.0 - w) * np.random.random(len(allowed)).astype(np.float32)
# Small noise floor at D10 prevents fully deterministic deck building.
noise = (max(0,12 - difficulty)**2) * 0.008
scores = scores + np.random.normal(0, noise, len(allowed)).astype(np.float32)
order = np.argsort(-scores)
sorted_cards = [allowed[i] for i in order]
early_budget = {
AIPersonality.GREEDY: 20, # cheap cards are sacrifice fodder for big plays
AIPersonality.SWARM: 12,
AIPersonality.AGGRESSIVE: 18, # raised: ensures cheap high-attack fodder regardless of difficulty
AIPersonality.DEFENSIVE: 15, # raised: stable cheap-card base across difficulty levels
AIPersonality.CONTROL: 8,
AIPersonality.BALANCED: 25, # spread the deck across all cost levels
AIPersonality.JEBRASKA: 25, # fallback (no trained weights yet)
AIPersonality.ARBITRARY: 8,
}[personality]
selected: list[Card] = []
total_cost = 0
# First pass: secure early-game cards
cheap_spent = 0
for card in sorted_cards:
if cheap_spent >= early_budget:
break
if card.cost > 3 or total_cost + card.cost > BUDGET:
continue
selected.append(card)
total_cost += card.cost
cheap_spent += card.cost
# Second pass: fill remaining budget greedily by score
taken = {id(c) for c in selected}
for card in sorted_cards:
if total_cost >= BUDGET:
break
if id(card) in taken or total_cost + card.cost > BUDGET:
continue
selected.append(card)
total_cost += card.cost
return selected
@dataclass
class MovePlan:
sacrifice_slots: list[int]
plays: list[tuple] # (CardInstance, board_slot: int)
label: str = ""
def _affordable_subsets(hand, energy, start=0):
"""Yield every subset of cards from hand whose total cost fits within energy."""
yield []
for i in range(start, len(hand)):
card = hand[i]
if card.cost <= energy:
for rest in _affordable_subsets(hand, energy - card.cost, i + 1):
yield [card] + rest
def _plans_for_sacrifice(player, opponent, sacrifice_slots):
"""Generate one plan per affordable card subset for a given sacrifice set."""
board = list(player.board)
energy = player.energy
for slot in sacrifice_slots:
if board[slot] is not None:
board[slot] = None
energy += 1
hand = list(player.hand)
empty_slots = [i for i, c in enumerate(board) if c is None]
en_board = opponent.board
return [
MovePlan(
sacrifice_slots=list(sacrifice_slots),
plays=list(zip(cards, scoring_slots)),
label=f"sac{len(sacrifice_slots)}_play{len(cards)}",
)
for cards in _affordable_subsets(hand, energy)
for scoring_slots in permutations(empty_slots, len(cards))
]
def generate_plans(player, opponent) -> list[MovePlan]:
plans = []
# Sacrifice n board cards
occupied = [s for s in range(BOARD_SIZE) if player.board[s] is not None]
for n in range(len(occupied) + 1):
for slots in combinations(occupied, n):
plans += _plans_for_sacrifice(player, opponent, list(slots))
# Idle: do nothing
plans.append(MovePlan(sacrifice_slots=[], plays=[], label="idle"))
return plans
def score_plans_batch(
plans: list[MovePlan],
player: PlayerState,
opponent: PlayerState,
personality: AIPersonality,
) -> np.ndarray:
n = len(plans)
# Pre-compute PCV for every hand card once
pcv_cache = {
id(c): max(0.0, min(1.0, get_power_curve_value(c)))
for c in player.hand
}
# Build board-state arrays
board_atk = np.zeros((n, BOARD_SIZE), dtype=np.float32)
board_occ = np.zeros((n, BOARD_SIZE), dtype=np.bool_)
n_sac = np.zeros(n, dtype=np.float32)
sac_val = np.zeros(n, dtype=np.float32)
play_val = np.zeros(n, dtype=np.float32)
pcv_score = np.full(n, 0.5, dtype=np.float32)
for idx, plan in enumerate(plans):
board = list(player.board)
for slot in plan.sacrifice_slots:
board_slot = board[slot]
if board_slot is not None:
sac_val[idx] += board_slot.cost
board[slot] = None
n_sac[idx] = len(plan.sacrifice_slots)
for card, slot in plan.plays:
board[slot] = card
play_val[idx] += card.cost
for slot in range(BOARD_SIZE):
board_slot = board[slot]
if board_slot is not None:
board_atk[idx, slot] = board_slot.attack
board_occ[idx, slot] = True
if plan.plays:
pcv_vals = [pcv_cache.get(id(c), 0.5) for c, _ in plan.plays]
pcv_score[idx] = sum(pcv_vals) / len(pcv_vals)
# Enemy board — same for every plan
en_atk = np.array([c.attack if c else 0 for c in opponent.board], dtype=np.float32)
en_def = np.array([c.defense if c else 0 for c in opponent.board], dtype=np.float32)
en_occ = np.array([c is not None for c in opponent.board], dtype=np.bool_)
enemy_occupied = int(en_occ.sum())
# --- Metrics (all shape (n,)) ---
direct_damage = (board_atk * ~en_occ).sum(axis=1)
blocking = board_occ & en_occ # (n, 5)
blocking_slots = blocking.sum(axis=1).astype(np.float32)
cards_on_board = board_occ.sum(axis=1).astype(np.float32)
cards_destroyed = ((board_atk >= en_def) & blocking).sum(axis=1).astype(np.float32)
unblocked_in = (en_atk * ~board_occ).sum(axis=1)
atk_score = np.minimum(1.0, direct_damage / max(opponent.life, 1))
block_score = blocking_slots / enemy_occupied if enemy_occupied > 0 else np.ones(n, dtype=np.float32)
open_slots = BOARD_SIZE - enemy_occupied
cover_score = (
(cards_on_board - blocking_slots) / open_slots
if open_slots > 0
else np.ones(n, dtype=np.float32)
)
destroy_score = cards_destroyed / enemy_occupied if enemy_occupied > 0 else np.zeros(n, dtype=np.float32)
threat_score = 1.0 - np.minimum(1.0, unblocked_in / max(player.life, 1))
opp_cards_left = len(opponent.deck) + len(opponent.hand) + enemy_occupied
my_cards_left = len(player.deck) + len(player.hand) + blocking_slots
attrition_score = my_cards_left / (my_cards_left + max(opp_cards_left, 1))
net_value = play_val - sac_val
net_value_norm = np.clip((net_value + 10) / 20, 0.0, 1.0)
# --- Sacrifice penalty ---
energy_leftover = player.energy + n_sac - play_val
wasted_energy = np.maximum(0, np.minimum(n_sac, energy_leftover))
wasted_penalty = np.where(n_sac > 0, wasted_energy / np.maximum(n_sac, 1), 0.0)
swap_penalty = np.clip((n_sac - net_value) / np.maximum(n_sac, 1), 0.0, 1.0)
sac_penalty = np.where(n_sac > 0, 0.65 * wasted_penalty + 0.35 * swap_penalty, 0.0)
# --- Personality weights ---
if personality == AIPersonality.AGGRESSIVE:
score = (0.30 * atk_score + 0.07 * block_score + 0.15 * cover_score +
0.08 * net_value_norm + 0.25 * destroy_score +
0.08 * attrition_score + 0.04 * pcv_score + 0.03 * threat_score)
elif personality == AIPersonality.DEFENSIVE:
score = (0.12 * atk_score + 0.20 * block_score + 0.18 * cover_score +
0.04 * net_value_norm + 0.18 * destroy_score +
0.15 * attrition_score + 0.05 * pcv_score + 0.08 * threat_score)
elif personality == AIPersonality.SWARM:
score = (0.25 * atk_score + 0.10 * block_score + 0.35 * cover_score +
0.05 * net_value_norm + 0.05 * destroy_score +
0.10 * attrition_score + 0.05 * pcv_score + 0.05 * threat_score)
elif personality == AIPersonality.GREEDY:
score = (0.15 * atk_score + 0.05 * block_score + 0.18 * cover_score +
0.38 * net_value_norm + 0.05 * destroy_score +
0.09 * attrition_score + 0.05 * pcv_score + 0.05 * threat_score)
elif personality == AIPersonality.CONTROL:
score = (0.10 * atk_score + 0.05 * block_score + 0.05 * cover_score +
0.20 * net_value_norm + 0.05 * destroy_score +
0.10 * attrition_score + 0.40 * pcv_score + 0.05 * threat_score)
elif personality == AIPersonality.BALANCED:
score = (0.12 * atk_score + 0.13 * block_score + 0.15 * cover_score +
0.10 * net_value_norm + 0.12 * destroy_score +
0.15 * attrition_score + 0.12 * pcv_score + 0.11 * threat_score)
else: # ARBITRARY
score = (0.50 * np.random.random(n).astype(np.float32) +
0.06 * atk_score + 0.06 * block_score + 0.08 * cover_score +
0.05 * net_value_norm + 0.06 * destroy_score +
0.08 * attrition_score + 0.06 * pcv_score + 0.05 * threat_score)
# --- Context adjustments ---
score = np.where(direct_damage >= opponent.life, np.maximum(score, 0.95), score)
score = np.where(unblocked_in >= player.life, np.minimum(score, 0.05), score)
if opponent.deck_type in ("God Card", "Pantheon"):
score = np.minimum(1.0, score + 0.08 * cover_score)
if opponent.deck_type in ("Aggro", "Rush"):
score = np.minimum(1.0, score + 0.06 * block_score + 0.04 * threat_score)
if opponent.deck_type == "Wall":
score = np.minimum(1.0, score + 0.06 * atk_score)
if opponent.life < STARTING_LIFE * 0.3:
score = np.minimum(1.0, score + 0.06 * atk_score)
if player.life < STARTING_LIFE * 0.3:
score = np.minimum(1.0, score + 0.06 * threat_score + 0.04 * block_score)
if opp_cards_left <= 5:
score = np.where(cards_on_board > 0, np.minimum(1.0, score + 0.05), score)
return np.maximum(0.0, score - sac_penalty)
def choose_plan(player: PlayerState, opponent: PlayerState, personality: AIPersonality, difficulty: int) -> MovePlan:
plans = generate_plans(player, opponent)
if personality == AIPersonality.JEBRASKA:
from ai.nn import NeuralNet
import os
_weights = os.path.join(os.path.dirname(__file__), "nn_weights.json")
if not hasattr(choose_plan, "_neural_net"):
choose_plan._neural_net = NeuralNet.load(_weights) if os.path.exists(_weights) else None
net = choose_plan._neural_net
if net is not None:
from ai.nn import extract_plan_features
scores = net.forward(extract_plan_features(plans, player, opponent))
else: # fallback to BALANCED if weights not found
scores = score_plans_batch(plans, player, opponent, AIPersonality.BALANCED)
else:
scores = score_plans_batch(plans, player, opponent, personality)
noise_scale = ((max(0,12 - difficulty)**2) - 4) * 0.008
noise = np.random.normal(0, noise_scale, len(scores)).astype(np.float32)
return plans[int(np.argmax(scores + noise))]
async def run_ai_turn(game_id: str):
from game.manager import (
active_games, connections, active_deck_ids,
serialize_state, record_game_result, calculate_combat_animation_time
)
state = active_games.get(game_id)
if not state or state.result:
return
if state.active_player_id != AI_USER_ID:
return
human_id = state.opponent_id(AI_USER_ID)
waited = 0
while not connections[game_id].get(human_id) and waited < 10:
await asyncio.sleep(0.5)
waited += 0.5
await asyncio.sleep(calculate_combat_animation_time(state.last_combat_events))
player = state.players[AI_USER_ID]
opponent = state.players[human_id]
difficulty = state.ai_difficulty
personality = (
AIPersonality(state.ai_personality)
if state.ai_personality
else AIPersonality.BALANCED
)
ws = connections[game_id].get(human_id)
async def send_state(s):
if ws:
try:
await ws.send_json({"type": "state", "state": serialize_state(s, human_id)})
except Exception:
pass
async def send_sacrifice_anim(instance_id):
if ws:
try:
await ws.send_json({"type": "sacrifice_animation", "instance_id": instance_id})
except Exception:
pass
best_plan = choose_plan(player, opponent, personality, difficulty)
logger.info(
f"AI turn: d={difficulty} p={personality.value} plan={best_plan.label} " +
f"sac={best_plan.sacrifice_slots} plays={[c.name for c, _ in best_plan.plays]}"
)
for slot in best_plan.sacrifice_slots:
card_slot = player.board[slot]
if card_slot is None:
continue
await send_sacrifice_anim(card_slot.instance_id)
await asyncio.sleep(0.65)
action_sacrifice(state, slot)
await send_state(state)
await asyncio.sleep(0.35)
# Shuffle play order so the AI doesn't always fill slots left-to-right
plays = list(best_plan.plays)
random.shuffle(plays)
for card, slot in plays:
# Re-look up hand index each time (hand shrinks as cards are played)
hand_idx = next((i for i, c in enumerate(player.hand) if c is card), None)
if hand_idx is None:
continue
if player.board[slot] is not None:
continue
if card.cost > player.energy:
continue
action_play_card(state, hand_idx, slot)
await send_state(state)
await asyncio.sleep(0.5)
action_end_turn(state)
await send_state(state)
if state.result:
from core.database import SessionLocal
db = SessionLocal()
try:
record_game_result(state, db)
if ws:
await ws.send_json({
"type": "state",
"state": serialize_state(state, human_id),
})
finally:
db.close()
active_deck_ids.pop(human_id, None)
active_deck_ids.pop(AI_USER_ID, None)
active_games.pop(game_id, None)
connections.pop(game_id, None)
return
if state.active_player_id == AI_USER_ID:
asyncio.create_task(run_ai_turn(game_id))