Files
wiki-tcg/backend/ai.py

450 lines
17 KiB
Python

import asyncio
import random
import logging
from dataclasses import dataclass
from enum import Enum
from itertools import combinations, permutations
import numpy as np, permutations
from card import Card
from game import action_play_card, action_sacrifice, action_end_turn, BOARD_SIZE, STARTING_LIFE, PlayerState
logger = logging.getLogger("app")
AI_USER_ID = "ai"
class AIPersonality(Enum):
AGGRESSIVE = "aggressive" # Prefers high attack cards, plays aggressively
DEFENSIVE = "defensive" # Prefers high defense cards, plays conservatively
BALANCED = "balanced" # Mix of offense and defense
GREEDY = "greedy" # Prioritizes high cost cards, willing to sacrifice
SWARM = "swarm" # Prefers low cost cards, fills board quickly
CONTROL = "control" # Focuses on board control and efficiency
SHOCKER = "shocker" # Cheap high-defense walls + a few powerful high-attack finishers
ARBITRARY = "arbitrary" # Just does whatever
def get_random_personality() -> AIPersonality:
"""Returns a random AI personality."""
return random.choice(list(AIPersonality))
def calculate_exact_cost(attack: int, defense: int) -> float:
"""Calculate the exact cost before rounding (matches card.py formula)."""
return min(11.0, max(1.0, ((attack**2 + defense**2)**0.18) / 1.5))
def get_power_curve_value(card) -> float:
"""
Returns how much above the power curve a card is.
Positive values mean the card is a better-than-expected deal for its cost.
"""
exact_cost = calculate_exact_cost(card.attack, card.defense)
return exact_cost - card.cost
def choose_cards(cards: list[Card], difficulty: int, personality: AIPersonality) -> list[Card]:
BUDGET = 50
if difficulty >= 6:
max_card_cost = difficulty + 1
else:
max_card_cost = 6
allowed = [c for c in cards if c.cost <= max_card_cost] or list(cards)
# Vectorized scoring over all allowed cards at once
atk = np.array([c.attack for c in allowed], dtype=np.float32)
defn = np.array([c.defense for c in allowed], dtype=np.float32)
cost = np.array([c.cost for c in allowed], dtype=np.float32)
exact_cost = np.minimum(11.0, np.maximum(1.0, ((atk**2 + defn**2)**0.18) / 1.5))
pcv_norm = np.clip(exact_cost - cost, 0.0, 1.0)
cost_norm = cost / max_card_cost
totals = atk + defn
atk_ratio = np.where(totals > 0, atk / totals, 0.5)
def_not_one = np.where(defn != 1, 1.0, 0.0)
if personality == AIPersonality.AGGRESSIVE:
# (1-cost_norm) penalizes expensive cards. High-attack cards are inherently expensive,
# so without this the second pass drifts toward costly cards at higher difficulty,
# shrinking the deck. The bonus grows with max_card_cost and exactly offsets that drift.
scores = 0.50 * atk_ratio + 0.35 * pcv_norm + 0.15 * (1.0 - cost_norm) + 0.10 * def_not_one
elif personality == AIPersonality.DEFENSIVE:
# Small (1-cost_norm) for the same anti-shrinkage reason; lighter because high-defense
# cards don't correlate as strongly with cost as high-attack cards do.
scores = 0.10 * (1.0 - atk_ratio) + 0.80 * pcv_norm + 0.10 * cost_norm
elif personality == AIPersonality.GREEDY:
# Small cost_norm keeps flavour without causing severe deck shrinkage at D10
scores = 0.20 * cost_norm + 0.80 * pcv_norm
elif personality == AIPersonality.SWARM:
scores = 0.40 * (1.0 - cost_norm) + 0.35 * atk_ratio + 0.20 * pcv_norm + 0.05 * def_not_one
elif personality == AIPersonality.CONTROL:
# Small cost_norm keeps flavour without causing severe deck shrinkage at D10
scores = 0.85 * pcv_norm + 0.15 * cost_norm
elif personality == AIPersonality.BALANCED:
scores = 0.60 * pcv_norm + 0.25 * atk_ratio + 0.15 * (1.0 - atk_ratio)
elif personality == AIPersonality.SHOCKER:
# Both cheap walls and expensive finishers want high attack.
# (1-cost_norm) drives first-pass cheap-card selection; pcv_norm drives second-pass finishers.
# defense_ok zeros out cards with defense==1 on the first term so fragile walls are excluded.
# cost-11 cards have pcv=0 so they score near-zero and never shrink the deck.
scores = atk_ratio * (1.0 - cost_norm) * def_not_one + atk_ratio * pcv_norm
else: # ARBITRARY
w = 0.05 * difficulty
scores = w * pcv_norm + (1.0 - w) * np.random.random(len(allowed)).astype(np.float32)
# Small noise floor at D10 prevents fully deterministic deck building.
# A locked-in deck loses every game against counters; tiny randomness avoids this.
noise = max(0.03, (10 - difficulty) / 9.0) * 0.50
scores = scores + np.random.normal(0, noise, len(allowed)).astype(np.float32)
order = np.argsort(-scores)
sorted_cards = [allowed[i] for i in order]
early_budget = {
AIPersonality.GREEDY: 20, # cheap cards are sacrifice fodder for big plays
AIPersonality.SWARM: 12,
AIPersonality.AGGRESSIVE: 18, # raised: ensures cheap high-attack fodder regardless of difficulty
AIPersonality.DEFENSIVE: 15, # raised: stable cheap-card base across difficulty levels
AIPersonality.CONTROL: 8,
AIPersonality.BALANCED: 25, # spread the deck across all cost levels
AIPersonality.SHOCKER: 15, # ~15 cost-1 shields, then expensive attackers fill remaining budget
AIPersonality.ARBITRARY: 8,
}[personality]
selected: list[Card] = []
total_cost = 0
# First pass: secure early-game cards
cheap_spent = 0
for card in sorted_cards:
if cheap_spent >= early_budget:
break
if card.cost > 3 or total_cost + card.cost > BUDGET:
continue
selected.append(card)
total_cost += card.cost
cheap_spent += card.cost
# Second pass: fill remaining budget greedily by score
taken = {id(c) for c in selected}
for card in sorted_cards:
if total_cost >= BUDGET:
break
if id(card) in taken or total_cost + card.cost > BUDGET:
continue
selected.append(card)
total_cost += card.cost
return selected
# ==================== Turn planning ====================
@dataclass
class MovePlan:
sacrifice_slots: list[int]
plays: list[tuple] # (CardInstance, board_slot: int)
label: str = ""
def _affordable_subsets(hand, energy, start=0):
"""Yield every subset of cards from hand whose total cost fits within energy."""
yield []
for i in range(start, len(hand)):
card = hand[i]
if card.cost <= energy:
for rest in _affordable_subsets(hand, energy - card.cost, i + 1):
yield [card] + rest
def _plans_for_sacrifice(player, opponent, sacrifice_slots):
"""Generate one plan per affordable card subset for a given sacrifice set."""
board = list(player.board)
energy = player.energy
for slot in sacrifice_slots:
if board[slot] is not None:
board[slot] = None
energy += 1
hand = list(player.hand)
empty_slots = [i for i, c in enumerate(board) if c is None]
en_board = opponent.board
return [
MovePlan(
sacrifice_slots=list(sacrifice_slots),
plays=list(zip(cards, scoring_slots)),
label=f"sac{len(sacrifice_slots)}_play{len(cards)}",
)
for cards in _affordable_subsets(hand, energy)
for scoring_slots in permutations(empty_slots, len(cards))
]
def generate_plans(player, opponent) -> list[MovePlan]:
"""Generate diverse candidate move plans covering a range of strategies."""
plans = []
# Sacrifice n board cards
occupied = [s for s in range(BOARD_SIZE) if player.board[s] is not None]
for n in range(len(occupied) + 1):
for slots in combinations(occupied, n):
plans += _plans_for_sacrifice(player, opponent, list(slots))
# Idle: do nothing
plans.append(MovePlan(sacrifice_slots=[], plays=[], label="idle"))
return plans
# ==================== Turn execution ====================
def score_plans_batch(
plans: list[MovePlan],
player: PlayerState,
opponent: PlayerState,
personality: AIPersonality,
) -> np.ndarray:
n = len(plans)
# Pre-compute PCV for every hand card once
pcv_cache = {
id(c): max(0.0, min(1.0, get_power_curve_value(c)))
for c in player.hand
}
# Build board-state arrays with one Python loop (unavoidable)
board_atk = np.zeros((n, BOARD_SIZE), dtype=np.float32)
board_occ = np.zeros((n, BOARD_SIZE), dtype=np.bool_)
n_sac = np.zeros(n, dtype=np.float32)
sac_val = np.zeros(n, dtype=np.float32)
play_val = np.zeros(n, dtype=np.float32)
pcv_score = np.full(n, 0.5, dtype=np.float32)
for idx, plan in enumerate(plans):
board = list(player.board)
for slot in plan.sacrifice_slots:
board_slot = board[slot]
if board_slot is not None:
sac_val[idx] += board_slot.cost
board[slot] = None
n_sac[idx] = len(plan.sacrifice_slots)
for card, slot in plan.plays:
board[slot] = card
play_val[idx] += card.cost
for slot in range(BOARD_SIZE):
board_slot = board[slot]
if board_slot is not None:
board_atk[idx, slot] = board_slot.attack
board_occ[idx, slot] = True
if plan.plays:
pcv_vals = [pcv_cache.get(id(c), 0.5) for c, _ in plan.plays]
pcv_score[idx] = sum(pcv_vals) / len(pcv_vals)
# Enemy board — same for every plan
en_atk = np.array([c.attack if c else 0 for c in opponent.board], dtype=np.float32)
en_def = np.array([c.defense if c else 0 for c in opponent.board], dtype=np.float32)
en_occ = np.array([c is not None for c in opponent.board], dtype=np.bool_)
enemy_occupied = int(en_occ.sum())
# --- Metrics (all shape (n,)) ---
direct_damage = (board_atk * ~en_occ).sum(axis=1)
blocking = board_occ & en_occ # (n, 5)
blocking_slots = blocking.sum(axis=1).astype(np.float32)
cards_on_board = board_occ.sum(axis=1).astype(np.float32)
cards_destroyed = ((board_atk >= en_def) & blocking).sum(axis=1).astype(np.float32)
unblocked_in = (en_atk * ~board_occ).sum(axis=1)
atk_score = np.minimum(1.0, direct_damage / max(opponent.life, 1))
block_score = blocking_slots / enemy_occupied if enemy_occupied > 0 else np.ones(n, dtype=np.float32)
open_slots = BOARD_SIZE - enemy_occupied
cover_score = (
(cards_on_board - blocking_slots) / open_slots
if open_slots > 0
else np.ones(n, dtype=np.float32)
)
destroy_score = cards_destroyed / enemy_occupied if enemy_occupied > 0 else np.zeros(n, dtype=np.float32)
threat_score = 1.0 - np.minimum(1.0, unblocked_in / max(player.life, 1))
opp_cards_left = len(opponent.deck) + len(opponent.hand) + enemy_occupied
my_cards_left = len(player.deck) + len(player.hand) + blocking_slots
attrition_score = my_cards_left / (my_cards_left + max(opp_cards_left, 1))
net_value = play_val - sac_val
net_value_norm = np.clip((net_value + 10) / 20, 0.0, 1.0)
# --- Sacrifice penalty ---
energy_leftover = player.energy + n_sac - play_val
wasted_energy = np.maximum(0, np.minimum(n_sac, energy_leftover))
wasted_penalty = np.where(n_sac > 0, wasted_energy / np.maximum(n_sac, 1), 0.0)
swap_penalty = np.clip((n_sac - net_value) / np.maximum(n_sac, 1), 0.0, 1.0)
sac_penalty = np.where(n_sac > 0, 0.65 * wasted_penalty + 0.35 * swap_penalty, 0.0)
# --- Personality weights ---
if personality == AIPersonality.AGGRESSIVE:
score = (0.30 * atk_score + 0.07 * block_score + 0.15 * cover_score +
0.08 * net_value_norm + 0.25 * destroy_score +
0.08 * attrition_score + 0.04 * pcv_score + 0.03 * threat_score)
elif personality == AIPersonality.DEFENSIVE:
score = (0.12 * atk_score + 0.20 * block_score + 0.18 * cover_score +
0.04 * net_value_norm + 0.18 * destroy_score +
0.15 * attrition_score + 0.05 * pcv_score + 0.08 * threat_score)
elif personality == AIPersonality.SWARM:
score = (0.25 * atk_score + 0.10 * block_score + 0.35 * cover_score +
0.05 * net_value_norm + 0.05 * destroy_score +
0.10 * attrition_score + 0.05 * pcv_score + 0.05 * threat_score)
elif personality == AIPersonality.GREEDY:
score = (0.15 * atk_score + 0.05 * block_score + 0.18 * cover_score +
0.38 * net_value_norm + 0.05 * destroy_score +
0.09 * attrition_score + 0.05 * pcv_score + 0.05 * threat_score)
elif personality == AIPersonality.CONTROL:
score = (0.10 * atk_score + 0.05 * block_score + 0.05 * cover_score +
0.20 * net_value_norm + 0.05 * destroy_score +
0.10 * attrition_score + 0.40 * pcv_score + 0.05 * threat_score)
elif personality == AIPersonality.BALANCED:
score = (0.12 * atk_score + 0.13 * block_score + 0.15 * cover_score +
0.10 * net_value_norm + 0.12 * destroy_score +
0.15 * attrition_score + 0.12 * pcv_score + 0.11 * threat_score)
elif personality == AIPersonality.SHOCKER:
score = (0.25 * destroy_score + 0.33 * cover_score + 0.18 * atk_score +
0.05 * block_score + 0.8 * attrition_score + 0.02 * threat_score +
0.05 * net_value_norm + 0.04 * pcv_score)
else: # ARBITRARY
score = (0.60 * np.random.random(n).astype(np.float32) +
0.05 * atk_score + 0.05 * block_score + 0.05 * cover_score +
0.05 * net_value_norm + 0.05 * destroy_score +
0.05 * attrition_score + 0.05 * pcv_score + 0.05 * threat_score)
# --- Context adjustments ---
score = np.where(direct_damage >= opponent.life, np.maximum(score, 0.95), score)
score = np.where(unblocked_in >= player.life, np.minimum(score, 0.05), score)
if opponent.deck_type in ("God Card", "Pantheon"):
score = np.minimum(1.0, score + 0.08 * cover_score)
if opponent.deck_type in ("Aggro", "Rush"):
score = np.minimum(1.0, score + 0.06 * block_score + 0.04 * threat_score)
if opponent.deck_type == "Wall":
score = np.minimum(1.0, score + 0.06 * atk_score)
if opponent.life < STARTING_LIFE * 0.3:
score = np.minimum(1.0, score + 0.06 * atk_score)
if player.life < STARTING_LIFE * 0.3:
score = np.minimum(1.0, score + 0.06 * threat_score + 0.04 * block_score)
if opp_cards_left <= 5:
score = np.where(cards_on_board > 0, np.minimum(1.0, score + 0.05), score)
return np.maximum(0.0, score - sac_penalty)
async def choose_plan(player: PlayerState, opponent: PlayerState, personality: AIPersonality, difficulty: int) -> MovePlan:
plans = generate_plans(player, opponent)
scores = score_plans_batch(plans, player, opponent, personality)
noise_scale = (max(0,11 - difficulty)**2) * 0.01 - 0.01
noise = np.random.normal(0, noise_scale, len(scores)).astype(np.float32)
return plans[int(np.argmax(scores + noise))]
async def run_ai_turn(game_id: str):
from game_manager import (
active_games, connections, active_deck_ids,
serialize_state, record_game_result, calculate_combat_animation_time
)
state = active_games.get(game_id)
if not state or state.result:
return
if state.active_player_id != AI_USER_ID:
return
human_id = state.opponent_id(AI_USER_ID)
waited = 0
while not connections[game_id].get(human_id) and waited < 10:
await asyncio.sleep(0.5)
waited += 0.5
await asyncio.sleep(calculate_combat_animation_time(state.last_combat_events))
player = state.players[AI_USER_ID]
opponent = state.players[human_id]
difficulty = state.ai_difficulty
personality = (
AIPersonality(state.ai_personality)
if state.ai_personality
else AIPersonality.BALANCED
)
ws = connections[game_id].get(human_id)
async def send_state(s):
if ws:
try:
await ws.send_json({"type": "state", "state": serialize_state(s, human_id)})
except Exception:
pass
async def send_sacrifice_anim(instance_id):
if ws:
try:
await ws.send_json({"type": "sacrifice_animation", "instance_id": instance_id})
except Exception:
pass
# --- Generate and score candidate plans ---
best_plan = await choose_plan(player, opponent, personality, difficulty)
logger.info(
f"AI turn: d={difficulty} p={personality.value} plan={best_plan.label} plans={len(plans)} " +
f"sac={best_plan.sacrifice_slots} plays={[c.name for c, _ in best_plan.plays]}"
)
# --- Execute sacrifices ---
for slot in best_plan.sacrifice_slots:
card_slot = player.board[slot]
if card_slot is None:
continue
await send_sacrifice_anim(card_slot.instance_id)
await asyncio.sleep(0.65)
action_sacrifice(state, slot)
await send_state(state)
await asyncio.sleep(0.35)
# --- Execute plays ---
# Shuffle play order so the AI doesn't always fill slots left-to-right
plays = list(best_plan.plays)
random.shuffle(plays)
for card, slot in plays:
# Re-look up hand index each time (hand shrinks as cards are played)
hand_idx = next((i for i, c in enumerate(player.hand) if c is card), None)
if hand_idx is None:
continue
if player.board[slot] is not None:
continue
if card.cost > player.energy:
continue
action_play_card(state, hand_idx, slot)
await send_state(state)
await asyncio.sleep(0.5)
action_end_turn(state)
await send_state(state)
if state.result:
from database import SessionLocal
db = SessionLocal()
try:
record_game_result(state, db)
if ws:
await ws.send_json({
"type": "state",
"state": serialize_state(state, human_id),
})
finally:
db.close()
active_deck_ids.pop(human_id, None)
active_deck_ids.pop(AI_USER_ID, None)
active_games.pop(game_id, None)
connections.pop(game_id, None)
return
if state.active_player_id == AI_USER_ID:
asyncio.create_task(run_ai_turn(game_id))