460 lines
17 KiB
Python
460 lines
17 KiB
Python
import asyncio
|
|
import logging
|
|
import os
|
|
import random
|
|
from dataclasses import dataclass
|
|
from enum import Enum
|
|
from itertools import combinations, permutations
|
|
|
|
import numpy as np
|
|
|
|
from game.card import Card
|
|
from game.rules import action_play_card, action_sacrifice, action_end_turn, BOARD_SIZE, STARTING_LIFE, PlayerState
|
|
|
|
logger = logging.getLogger("app")
|
|
|
|
AI_USER_ID = "ai"
|
|
|
|
class AIPersonality(Enum):
|
|
AGGRESSIVE = "aggressive"
|
|
DEFENSIVE = "defensive"
|
|
BALANCED = "balanced"
|
|
GREEDY = "greedy" # prioritizes high cost cards, willing to sacrifice
|
|
SWARM = "swarm"
|
|
CONTROL = "control"
|
|
ARBITRARY = "arbitrary"
|
|
JEBRASKA = "jebraska" # trained neural network plan scorer
|
|
|
|
def get_random_personality() -> AIPersonality:
|
|
return random.choice(list(AIPersonality))
|
|
|
|
def calculate_exact_cost(attack: int, defense: int) -> float:
|
|
"""Calculate the exact cost before rounding (matches card.py formula)."""
|
|
return min(10.0, max(1.0, ((attack**2 + defense**2)**0.18) / 1.5))
|
|
|
|
def get_power_curve_value(card) -> float:
|
|
"""
|
|
Returns how much above the power curve a card is.
|
|
Positive values mean the card is a better-than-expected deal for its cost.
|
|
"""
|
|
exact_cost = calculate_exact_cost(card.attack, card.defense)
|
|
return exact_cost - card.cost
|
|
|
|
|
|
def choose_cards(cards: list[Card], difficulty: int, personality: AIPersonality) -> list[Card]:
|
|
BUDGET = 50
|
|
|
|
if difficulty >= 6:
|
|
max_card_cost = difficulty + 1
|
|
else:
|
|
max_card_cost = 6
|
|
|
|
allowed = [c for c in cards if c.cost <= max_card_cost] or list(cards)
|
|
|
|
# Vectorized scoring over all allowed cards at once
|
|
atk = np.array([c.attack for c in allowed], dtype=np.float32)
|
|
defn = np.array([c.defense for c in allowed], dtype=np.float32)
|
|
cost = np.array([c.cost for c in allowed], dtype=np.float32)
|
|
|
|
exact_cost = np.minimum(10.0, np.maximum(1.0, ((atk**2 + defn**2)**0.18) / 1.5))
|
|
pcv_norm = np.clip(exact_cost - cost, 0.0, 1.0)
|
|
cost_norm = cost / max_card_cost
|
|
totals = atk + defn
|
|
atk_ratio = np.where(totals > 0, atk / totals, 0.5)
|
|
def_not_one = np.where(defn != 1, 1.0, 0.0)
|
|
|
|
if personality == AIPersonality.AGGRESSIVE:
|
|
# (1-cost_norm) penalizes expensive cards. High-attack cards are inherently expensive,
|
|
# so without this the second pass drifts toward costly cards at higher difficulty,
|
|
# shrinking the deck. The bonus grows with max_card_cost and exactly offsets that drift.
|
|
scores = 0.50 * atk_ratio + 0.35 * pcv_norm + 0.15 * (1.0 - cost_norm) + 0.10 * def_not_one
|
|
elif personality == AIPersonality.DEFENSIVE:
|
|
# Small (1-cost_norm) for the same anti-shrinkage reason; lighter because high-defense
|
|
# cards don't correlate as strongly with cost as high-attack cards do.
|
|
scores = 0.10 * (1.0 - atk_ratio) + 0.80 * pcv_norm + 0.10 * cost_norm
|
|
elif personality == AIPersonality.GREEDY:
|
|
# Small cost_norm keeps flavour without causing severe deck shrinkage at D10
|
|
scores = 0.20 * cost_norm + 0.80 * pcv_norm
|
|
elif personality == AIPersonality.SWARM:
|
|
scores = 0.40 * (1.0 - cost_norm) + 0.35 * atk_ratio + 0.20 * pcv_norm + 0.05 * def_not_one
|
|
elif personality == AIPersonality.CONTROL:
|
|
# Small cost_norm keeps flavour without causing severe deck shrinkage at D10
|
|
scores = 0.85 * pcv_norm + 0.15 * cost_norm
|
|
elif personality == AIPersonality.BALANCED:
|
|
scores = 0.60 * pcv_norm + 0.25 * atk_ratio + 0.15 * (1.0 - atk_ratio)
|
|
elif personality == AIPersonality.JEBRASKA:
|
|
# Delegate entirely to the card-pick NN; skip the heuristic scoring path.
|
|
from ai.card_pick_nn import CardPickPlayer, CARD_PICK_WEIGHTS_PATH
|
|
from ai.nn import NeuralNet
|
|
if not hasattr(choose_cards, "_card_pick_net"):
|
|
choose_cards._card_pick_net = (
|
|
NeuralNet.load(CARD_PICK_WEIGHTS_PATH)
|
|
if os.path.exists(CARD_PICK_WEIGHTS_PATH) else None
|
|
)
|
|
net = choose_cards._card_pick_net
|
|
if net is not None:
|
|
return CardPickPlayer(net, training=False).choose_cards(allowed, difficulty)
|
|
# Fall through to BALANCED heuristic if weights aren't trained yet.
|
|
scores = 0.60 * pcv_norm + 0.25 * atk_ratio + 0.15 * (1.0 - atk_ratio)
|
|
else: # ARBITRARY
|
|
w = 0.09 * difficulty
|
|
scores = w * pcv_norm + (1.0 - w) * np.random.random(len(allowed)).astype(np.float32)
|
|
|
|
# Small noise floor at D10 prevents fully deterministic deck building.
|
|
noise = (max(0,12 - difficulty)**2) * 0.008
|
|
scores = scores + np.random.normal(0, noise, len(allowed)).astype(np.float32)
|
|
|
|
order = np.argsort(-scores)
|
|
sorted_cards = [allowed[i] for i in order]
|
|
|
|
early_budget = {
|
|
AIPersonality.GREEDY: 20, # cheap cards are sacrifice fodder for big plays
|
|
AIPersonality.SWARM: 12,
|
|
AIPersonality.AGGRESSIVE: 18, # raised: ensures cheap high-attack fodder regardless of difficulty
|
|
AIPersonality.DEFENSIVE: 15, # raised: stable cheap-card base across difficulty levels
|
|
AIPersonality.CONTROL: 8,
|
|
AIPersonality.BALANCED: 25, # spread the deck across all cost levels
|
|
AIPersonality.JEBRASKA: 25, # fallback (no trained weights yet)
|
|
AIPersonality.ARBITRARY: 8,
|
|
}[personality]
|
|
|
|
selected: list[Card] = []
|
|
total_cost = 0
|
|
|
|
# First pass: secure early-game cards
|
|
cheap_spent = 0
|
|
for card in sorted_cards:
|
|
if cheap_spent >= early_budget:
|
|
break
|
|
if card.cost > 3 or total_cost + card.cost > BUDGET:
|
|
continue
|
|
selected.append(card)
|
|
total_cost += card.cost
|
|
cheap_spent += card.cost
|
|
|
|
# Second pass: fill remaining budget greedily by score
|
|
taken = {id(c) for c in selected}
|
|
for card in sorted_cards:
|
|
if total_cost >= BUDGET:
|
|
break
|
|
if id(card) in taken or total_cost + card.cost > BUDGET:
|
|
continue
|
|
selected.append(card)
|
|
total_cost += card.cost
|
|
|
|
return selected
|
|
|
|
|
|
@dataclass
|
|
class MovePlan:
|
|
sacrifice_slots: list[int]
|
|
plays: list[tuple] # (CardInstance, board_slot: int)
|
|
label: str = ""
|
|
|
|
|
|
def _affordable_subsets(hand, energy, start=0):
|
|
"""Yield every subset of cards from hand whose total cost fits within energy."""
|
|
yield []
|
|
for i in range(start, len(hand)):
|
|
card = hand[i]
|
|
if card.cost <= energy:
|
|
for rest in _affordable_subsets(hand, energy - card.cost, i + 1):
|
|
yield [card] + rest
|
|
|
|
|
|
def _plans_for_sacrifice(player, opponent, sacrifice_slots):
|
|
"""Generate one plan per affordable card subset for a given sacrifice set."""
|
|
board = list(player.board)
|
|
energy = player.energy
|
|
|
|
for slot in sacrifice_slots:
|
|
if board[slot] is not None:
|
|
board[slot] = None
|
|
energy += 1
|
|
|
|
hand = list(player.hand)
|
|
empty_slots = [i for i, c in enumerate(board) if c is None]
|
|
en_board = opponent.board
|
|
|
|
return [
|
|
MovePlan(
|
|
sacrifice_slots=list(sacrifice_slots),
|
|
plays=list(zip(cards, scoring_slots)),
|
|
label=f"sac{len(sacrifice_slots)}_play{len(cards)}",
|
|
)
|
|
for cards in _affordable_subsets(hand, energy)
|
|
for scoring_slots in permutations(empty_slots, len(cards))
|
|
]
|
|
|
|
|
|
def generate_plans(player, opponent) -> list[MovePlan]:
|
|
plans = []
|
|
|
|
# Sacrifice n board cards
|
|
occupied = [s for s in range(BOARD_SIZE) if player.board[s] is not None]
|
|
for n in range(len(occupied) + 1):
|
|
for slots in combinations(occupied, n):
|
|
plans += _plans_for_sacrifice(player, opponent, list(slots))
|
|
|
|
# Idle: do nothing
|
|
plans.append(MovePlan(sacrifice_slots=[], plays=[], label="idle"))
|
|
|
|
return plans
|
|
|
|
def score_plans_batch(
|
|
plans: list[MovePlan],
|
|
player: PlayerState,
|
|
opponent: PlayerState,
|
|
personality: AIPersonality,
|
|
) -> np.ndarray:
|
|
n = len(plans)
|
|
|
|
# Pre-compute PCV for every hand card once
|
|
pcv_cache = {
|
|
id(c): max(0.0, min(1.0, get_power_curve_value(c)))
|
|
for c in player.hand
|
|
}
|
|
|
|
# Build board-state arrays
|
|
board_atk = np.zeros((n, BOARD_SIZE), dtype=np.float32)
|
|
board_occ = np.zeros((n, BOARD_SIZE), dtype=np.bool_)
|
|
n_sac = np.zeros(n, dtype=np.float32)
|
|
sac_val = np.zeros(n, dtype=np.float32)
|
|
play_val = np.zeros(n, dtype=np.float32)
|
|
pcv_score = np.full(n, 0.5, dtype=np.float32)
|
|
|
|
for idx, plan in enumerate(plans):
|
|
board = list(player.board)
|
|
for slot in plan.sacrifice_slots:
|
|
board_slot = board[slot]
|
|
if board_slot is not None:
|
|
sac_val[idx] += board_slot.cost
|
|
board[slot] = None
|
|
n_sac[idx] = len(plan.sacrifice_slots)
|
|
for card, slot in plan.plays:
|
|
board[slot] = card
|
|
play_val[idx] += card.cost
|
|
for slot in range(BOARD_SIZE):
|
|
board_slot = board[slot]
|
|
if board_slot is not None:
|
|
board_atk[idx, slot] = board_slot.attack
|
|
board_occ[idx, slot] = True
|
|
if plan.plays:
|
|
pcv_vals = [pcv_cache.get(id(c), 0.5) for c, _ in plan.plays]
|
|
pcv_score[idx] = sum(pcv_vals) / len(pcv_vals)
|
|
|
|
# Enemy board — same for every plan
|
|
en_atk = np.array([c.attack if c else 0 for c in opponent.board], dtype=np.float32)
|
|
en_def = np.array([c.defense if c else 0 for c in opponent.board], dtype=np.float32)
|
|
en_occ = np.array([c is not None for c in opponent.board], dtype=np.bool_)
|
|
enemy_occupied = int(en_occ.sum())
|
|
|
|
# --- Metrics (all shape (n,)) ---
|
|
direct_damage = (board_atk * ~en_occ).sum(axis=1)
|
|
blocking = board_occ & en_occ # (n, 5)
|
|
blocking_slots = blocking.sum(axis=1).astype(np.float32)
|
|
cards_on_board = board_occ.sum(axis=1).astype(np.float32)
|
|
cards_destroyed = ((board_atk >= en_def) & blocking).sum(axis=1).astype(np.float32)
|
|
unblocked_in = (en_atk * ~board_occ).sum(axis=1)
|
|
|
|
atk_score = np.minimum(1.0, direct_damage / max(opponent.life, 1))
|
|
block_score = blocking_slots / enemy_occupied if enemy_occupied > 0 else np.ones(n, dtype=np.float32)
|
|
open_slots = BOARD_SIZE - enemy_occupied
|
|
cover_score = (
|
|
(cards_on_board - blocking_slots) / open_slots
|
|
if open_slots > 0
|
|
else np.ones(n, dtype=np.float32)
|
|
)
|
|
destroy_score = cards_destroyed / enemy_occupied if enemy_occupied > 0 else np.zeros(n, dtype=np.float32)
|
|
threat_score = 1.0 - np.minimum(1.0, unblocked_in / max(player.life, 1))
|
|
|
|
opp_cards_left = len(opponent.deck) + len(opponent.hand) + enemy_occupied
|
|
my_cards_left = len(player.deck) + len(player.hand) + blocking_slots
|
|
attrition_score = my_cards_left / (my_cards_left + max(opp_cards_left, 1))
|
|
|
|
net_value = play_val - sac_val
|
|
net_value_norm = np.clip((net_value + 10) / 20, 0.0, 1.0)
|
|
|
|
# --- Sacrifice penalty ---
|
|
energy_leftover = player.energy + n_sac - play_val
|
|
wasted_energy = np.maximum(0, np.minimum(n_sac, energy_leftover))
|
|
wasted_penalty = np.where(n_sac > 0, wasted_energy / np.maximum(n_sac, 1), 0.0)
|
|
swap_penalty = np.clip((n_sac - net_value) / np.maximum(n_sac, 1), 0.0, 1.0)
|
|
sac_penalty = np.where(n_sac > 0, 0.65 * wasted_penalty + 0.35 * swap_penalty, 0.0)
|
|
|
|
# --- Personality weights ---
|
|
if personality == AIPersonality.AGGRESSIVE:
|
|
score = (0.30 * atk_score + 0.07 * block_score + 0.15 * cover_score +
|
|
0.08 * net_value_norm + 0.25 * destroy_score +
|
|
0.08 * attrition_score + 0.04 * pcv_score + 0.03 * threat_score)
|
|
elif personality == AIPersonality.DEFENSIVE:
|
|
score = (0.12 * atk_score + 0.20 * block_score + 0.18 * cover_score +
|
|
0.04 * net_value_norm + 0.18 * destroy_score +
|
|
0.15 * attrition_score + 0.05 * pcv_score + 0.08 * threat_score)
|
|
elif personality == AIPersonality.SWARM:
|
|
score = (0.25 * atk_score + 0.10 * block_score + 0.35 * cover_score +
|
|
0.05 * net_value_norm + 0.05 * destroy_score +
|
|
0.10 * attrition_score + 0.05 * pcv_score + 0.05 * threat_score)
|
|
elif personality == AIPersonality.GREEDY:
|
|
score = (0.15 * atk_score + 0.05 * block_score + 0.18 * cover_score +
|
|
0.38 * net_value_norm + 0.05 * destroy_score +
|
|
0.09 * attrition_score + 0.05 * pcv_score + 0.05 * threat_score)
|
|
elif personality == AIPersonality.CONTROL:
|
|
score = (0.10 * atk_score + 0.05 * block_score + 0.05 * cover_score +
|
|
0.20 * net_value_norm + 0.05 * destroy_score +
|
|
0.10 * attrition_score + 0.40 * pcv_score + 0.05 * threat_score)
|
|
elif personality == AIPersonality.BALANCED:
|
|
score = (0.12 * atk_score + 0.13 * block_score + 0.15 * cover_score +
|
|
0.10 * net_value_norm + 0.12 * destroy_score +
|
|
0.15 * attrition_score + 0.12 * pcv_score + 0.11 * threat_score)
|
|
else: # ARBITRARY
|
|
score = (0.50 * np.random.random(n).astype(np.float32) +
|
|
0.06 * atk_score + 0.06 * block_score + 0.08 * cover_score +
|
|
0.05 * net_value_norm + 0.06 * destroy_score +
|
|
0.08 * attrition_score + 0.06 * pcv_score + 0.05 * threat_score)
|
|
|
|
# --- Context adjustments ---
|
|
score = np.where(direct_damage >= opponent.life, np.maximum(score, 0.95), score)
|
|
score = np.where(unblocked_in >= player.life, np.minimum(score, 0.05), score)
|
|
|
|
if opponent.deck_type in ("God Card", "Pantheon"):
|
|
score = np.minimum(1.0, score + 0.08 * cover_score)
|
|
if opponent.deck_type in ("Aggro", "Rush"):
|
|
score = np.minimum(1.0, score + 0.06 * block_score + 0.04 * threat_score)
|
|
if opponent.deck_type == "Wall":
|
|
score = np.minimum(1.0, score + 0.06 * atk_score)
|
|
if opponent.life < STARTING_LIFE * 0.3:
|
|
score = np.minimum(1.0, score + 0.06 * atk_score)
|
|
if player.life < STARTING_LIFE * 0.3:
|
|
score = np.minimum(1.0, score + 0.06 * threat_score + 0.04 * block_score)
|
|
if opp_cards_left <= 5:
|
|
score = np.where(cards_on_board > 0, np.minimum(1.0, score + 0.05), score)
|
|
|
|
return np.maximum(0.0, score - sac_penalty)
|
|
|
|
|
|
def choose_plan(player: PlayerState, opponent: PlayerState, personality: AIPersonality, difficulty: int) -> MovePlan:
|
|
plans = generate_plans(player, opponent)
|
|
|
|
if personality == AIPersonality.JEBRASKA:
|
|
from ai.nn import NeuralNet
|
|
import os
|
|
_weights = os.path.join(os.path.dirname(__file__), "nn_weights.json")
|
|
if not hasattr(choose_plan, "_neural_net"):
|
|
choose_plan._neural_net = NeuralNet.load(_weights) if os.path.exists(_weights) else None
|
|
net = choose_plan._neural_net
|
|
if net is not None:
|
|
from ai.nn import extract_plan_features
|
|
scores = net.forward(extract_plan_features(plans, player, opponent))
|
|
else: # fallback to BALANCED if weights not found
|
|
scores = score_plans_batch(plans, player, opponent, AIPersonality.BALANCED)
|
|
else:
|
|
scores = score_plans_batch(plans, player, opponent, personality)
|
|
|
|
noise_scale = ((max(0,12 - difficulty)**2) - 4) * 0.008
|
|
noise = np.random.normal(0, noise_scale, len(scores)).astype(np.float32)
|
|
return plans[int(np.argmax(scores + noise))]
|
|
|
|
async def run_ai_turn(game_id: str):
|
|
from game.manager import (
|
|
active_games, connections, active_deck_ids,
|
|
serialize_state, record_game_result, calculate_combat_animation_time
|
|
)
|
|
|
|
state = active_games.get(game_id)
|
|
if not state or state.result:
|
|
return
|
|
if state.active_player_id != AI_USER_ID:
|
|
return
|
|
|
|
human_id = state.opponent_id(AI_USER_ID)
|
|
waited = 0
|
|
while not connections[game_id].get(human_id) and waited < 10:
|
|
await asyncio.sleep(0.5)
|
|
waited += 0.5
|
|
|
|
await asyncio.sleep(calculate_combat_animation_time(state.last_combat_events))
|
|
|
|
player = state.players[AI_USER_ID]
|
|
opponent = state.players[human_id]
|
|
difficulty = state.ai_difficulty
|
|
personality = (
|
|
AIPersonality(state.ai_personality)
|
|
if state.ai_personality
|
|
else AIPersonality.BALANCED
|
|
)
|
|
|
|
ws = connections[game_id].get(human_id)
|
|
|
|
async def send_state(s):
|
|
if ws:
|
|
try:
|
|
await ws.send_json({"type": "state", "state": serialize_state(s, human_id)})
|
|
except Exception:
|
|
pass
|
|
|
|
async def send_sacrifice_anim(instance_id):
|
|
if ws:
|
|
try:
|
|
await ws.send_json({"type": "sacrifice_animation", "instance_id": instance_id})
|
|
except Exception:
|
|
pass
|
|
|
|
best_plan = choose_plan(player, opponent, personality, difficulty)
|
|
|
|
logger.info(
|
|
f"AI turn: d={difficulty} p={personality.value} plan={best_plan.label} " +
|
|
f"sac={best_plan.sacrifice_slots} plays={[c.name for c, _ in best_plan.plays]}"
|
|
)
|
|
|
|
for slot in best_plan.sacrifice_slots:
|
|
card_slot = player.board[slot]
|
|
if card_slot is None:
|
|
continue
|
|
await send_sacrifice_anim(card_slot.instance_id)
|
|
await asyncio.sleep(0.65)
|
|
action_sacrifice(state, slot)
|
|
await send_state(state)
|
|
await asyncio.sleep(0.35)
|
|
|
|
# Shuffle play order so the AI doesn't always fill slots left-to-right
|
|
plays = list(best_plan.plays)
|
|
random.shuffle(plays)
|
|
|
|
for card, slot in plays:
|
|
# Re-look up hand index each time (hand shrinks as cards are played)
|
|
hand_idx = next((i for i, c in enumerate(player.hand) if c is card), None)
|
|
if hand_idx is None:
|
|
continue
|
|
if player.board[slot] is not None:
|
|
continue
|
|
if card.cost > player.energy:
|
|
continue
|
|
action_play_card(state, hand_idx, slot)
|
|
await send_state(state)
|
|
await asyncio.sleep(0.5)
|
|
|
|
action_end_turn(state)
|
|
await send_state(state)
|
|
|
|
if state.result:
|
|
from core.database import SessionLocal
|
|
db = SessionLocal()
|
|
try:
|
|
record_game_result(state, db)
|
|
if ws:
|
|
await ws.send_json({
|
|
"type": "state",
|
|
"state": serialize_state(state, human_id),
|
|
})
|
|
finally:
|
|
db.close()
|
|
active_deck_ids.pop(human_id, None)
|
|
active_deck_ids.pop(AI_USER_ID, None)
|
|
active_games.pop(game_id, None)
|
|
connections.pop(game_id, None)
|
|
return
|
|
|
|
if state.active_player_id == AI_USER_ID:
|
|
asyncio.create_task(run_ai_turn(game_id))
|