""" 更新的游戏引擎 适配新的3条战线战场系统 """ from typing import Dict, List, Optional, Any, Union from uuid import UUID from ..battlefield.battlefield_v2 import Battlefield, HQ from ..units.unit import Unit from ..abilities.keywords import TriggerTiming, KeywordRegistry from ..abilities.abilities import AbilityManager from .enums import GamePhase, LineType class GameEvent: """游戏事件""" def __init__(self, event_type: str, source: Optional[Unit] = None, target: Optional[Union[Unit, HQ]] = None, context: Dict[str, Any] = None): self.event_type = event_type self.source = source self.target = target self.context = context or {} self.timestamp = 0 class EventManager: """事件管理器""" def __init__(self): self.event_queue = [] self.event_listeners = {} self.event_counter = 0 def add_event(self, event: GameEvent) -> None: """添加事件到队列""" event.timestamp = self.event_counter self.event_counter += 1 self.event_queue.append(event) def process_events(self, battlefield: Battlefield) -> List[Dict[str, Any]]: """处理所有事件""" results = [] while self.event_queue: event = self.event_queue.pop(0) result = self._process_single_event(event, battlefield) if result: results.append(result) return results def _process_single_event(self, event: GameEvent, battlefield: Battlefield) -> Optional[Dict[str, Any]]: """处理单个事件""" timing_map = { "unit_deployed": TriggerTiming.ON_DEPLOY, "unit_attacks": TriggerTiming.ON_ATTACK, "unit_defends": TriggerTiming.ON_DEFEND, "unit_takes_damage": TriggerTiming.ON_DAMAGE_TAKEN, "unit_dies": TriggerTiming.ON_DEATH, "turn_start": TriggerTiming.ON_TURN_START, "turn_end": TriggerTiming.ON_TURN_END, "before_combat": TriggerTiming.BEFORE_COMBAT, "after_combat": TriggerTiming.AFTER_COMBAT } trigger_timing = timing_map.get(event.event_type) if not trigger_timing: return None # 触发所有相关单位的能力 all_units = battlefield.get_all_units() ability_manager = AbilityManager() results = [] for unit in all_units: # 触发关键词效果 self._trigger_keyword_effects(unit, battlefield, trigger_timing, event.context) # 触发能力 unit_results = ability_manager.trigger_abilities(unit, battlefield, trigger_timing, event.context) results.extend(unit_results) return { "event": event.event_type, "results": results } if results else None def _trigger_keyword_effects(self, unit: Unit, battlefield: Battlefield, timing: TriggerTiming, context: Dict[str, Any]) -> None: """触发单位的关键词效果""" for keyword_name in unit.keywords: keyword_effect = KeywordRegistry.create_keyword(keyword_name) if keyword_effect: keyword_effect.apply_effect(unit, battlefield, timing, context) class CombatResolver: """战斗解算器""" def __init__(self, event_manager: EventManager): self.event_manager = event_manager def resolve_combat(self, attacker: Unit, target: Union[Unit, HQ], battlefield: Battlefield) -> Dict[str, Any]: """解算战斗""" # 战斗前事件 self.event_manager.add_event(GameEvent( "before_combat", source=attacker, target=target, context={"combat_type": "attack"} )) # HQ战斗特殊处理 if isinstance(target, HQ): return self._resolve_hq_attack(attacker, target, battlefield) # 单位间战斗 return self._resolve_unit_combat(attacker, target, battlefield) def _resolve_hq_attack(self, attacker: Unit, hq: HQ, battlefield: Battlefield) -> Dict[str, Any]: """解算对HQ的攻击""" base_damage = attacker.get_effective_attack() actual_damage = hq.take_damage(base_damage) # HQ不会反击 result = { "attacker_id": attacker.id, "target_type": "hq", "target_player": hq.player_id, "damage_dealt": actual_damage, "counter_damage": 0, "hq_destroyed": hq.is_destroyed(), "combat_resolved": True } self.event_manager.add_event(GameEvent( "after_combat", source=attacker, target=hq, context={"damage_dealt": actual_damage, "counter_damage": 0} )) return result def _resolve_unit_combat(self, attacker: Unit, target: Unit, battlefield: Battlefield) -> Dict[str, Any]: """解算单位间战斗""" # 处理伏击 if target.has_keyword("AMBUSH"): ambush_result = self._resolve_ambush(target, attacker, battlefield) if ambush_result.get("attacker_destroyed"): return ambush_result # 计算最终伤害 base_damage = attacker.get_effective_attack() final_damage = self._calculate_damage(base_damage, target) # 应用伤害 actual_damage = target.stats.take_damage(final_damage) # 触发受伤事件 self.event_manager.add_event(GameEvent( "unit_takes_damage", source=attacker, target=target, context={"damage": actual_damage} )) # 反击 counter_damage = 0 if target.stats.is_alive() and self._can_counter_attack(target, attacker): counter_base_damage = target.get_effective_attack() counter_final_damage = self._calculate_damage(counter_base_damage, attacker) counter_damage = attacker.stats.take_damage(counter_final_damage) if counter_damage > 0: self.event_manager.add_event(GameEvent( "unit_takes_damage", source=target, target=attacker, context={"damage": counter_damage, "is_counter": True} )) # 检查单位死亡 units_destroyed = [] if not target.stats.is_alive(): self.event_manager.add_event(GameEvent("unit_dies", target=target)) battlefield.remove_unit(target) units_destroyed.append(target.id) if not attacker.stats.is_alive(): self.event_manager.add_event(GameEvent("unit_dies", target=attacker)) battlefield.remove_unit(attacker) units_destroyed.append(attacker.id) # 战斗后事件 self.event_manager.add_event(GameEvent( "after_combat", source=attacker, target=target, context={"damage_dealt": actual_damage, "counter_damage": counter_damage} )) return { "attacker_id": attacker.id, "target_id": target.id, "damage_dealt": actual_damage, "counter_damage": counter_damage, "units_destroyed": units_destroyed, "combat_resolved": True } def _resolve_ambush(self, defender: Unit, attacker: Unit, battlefield: Battlefield) -> Dict[str, Any]: """解算伏击""" ambush_damage = defender.get_effective_attack() actual_damage = attacker.stats.take_damage(ambush_damage) result = { "ambush_triggered": True, "ambush_damage": actual_damage, "attacker_destroyed": not attacker.stats.is_alive() } if not attacker.stats.is_alive(): self.event_manager.add_event(GameEvent("unit_dies", target=attacker)) battlefield.remove_unit(attacker) return result def _calculate_damage(self, base_damage: int, target: Unit) -> int: """计算最终伤害""" final_damage = base_damage # 重甲减伤 for keyword in target.keywords: if keyword.startswith("HEAVY_ARMOR_"): try: armor_value = int(keyword.split("_")[-1]) final_damage = max(0, final_damage - armor_value) break except ValueError: continue return final_damage def _can_counter_attack(self, defender: Unit, attacker: Unit) -> bool: """检查是否可以反击""" # 轰炸机攻击时目标不能反击(除非目标也是战斗机) if attacker.unit_type.name == "BOMBER" and defender.unit_type.name != "FIGHTER": return False # 被压制的单位不能反击 if defender.has_keyword("PINNED"): return False return True class GameEngineV2: """更新的游戏引擎主类""" def __init__(self, player1_id: str, player2_id: str): self.battlefield = Battlefield(player1_id, player2_id) self.event_manager = EventManager() self.combat_resolver = CombatResolver(self.event_manager) self.current_turn = 1 self.active_player = player1_id self.game_phase = GamePhase.PLAYER_TURN # 指挥点资源管理 self.player1_command_points = 1 # 第一回合1点 self.player2_command_points = 1 self.max_command_points = 12 # 最大12点 self.turn_history = [] def get_command_points(self, player_id: str) -> int: """获取玩家当前指挥点""" if player_id == self.battlefield.player1_id: return self.player1_command_points elif player_id == self.battlefield.player2_id: return self.player2_command_points return 0 def spend_command_points(self, player_id: str, cost: int) -> bool: """消耗指挥点""" current_points = self.get_command_points(player_id) if current_points >= cost: if player_id == self.battlefield.player1_id: self.player1_command_points -= cost elif player_id == self.battlefield.player2_id: self.player2_command_points -= cost return True return False def deploy_unit_to_support(self, unit: Unit, player_id: str) -> Dict[str, Any]: """部署单位到支援线(消耗指挥点)""" if player_id != self.active_player: return {"success": False, "reason": "Not your turn"} # 检查指挥点是否足够 if not self.spend_command_points(player_id, unit.stats.operation_cost): return {"success": False, "reason": "Insufficient command points"} success = self.battlefield.deploy_unit_to_support(unit, player_id) if success: # 触发部署事件 self.event_manager.add_event(GameEvent( "unit_deployed", source=unit, context={"deployment_type": "support"} )) # 处理事件 self.event_manager.process_events(self.battlefield) return {"success": True, "unit_deployed": unit.id, "location": "support"} return {"success": False, "reason": "Failed to deploy to support line"} def deploy_unit_to_front(self, unit: Unit, player_id: str) -> Dict[str, Any]: """部署单位到前线(消耗指挥点)""" if player_id != self.active_player: return {"success": False, "reason": "Not your turn"} # 检查指挥点是否足够 if not self.spend_command_points(player_id, unit.stats.operation_cost): return {"success": False, "reason": "Insufficient command points"} success = self.battlefield.deploy_unit_to_front(unit, player_id) if success: # 触发部署事件 self.event_manager.add_event(GameEvent( "unit_deployed", source=unit, context={"deployment_type": "front"} )) # 处理事件 self.event_manager.process_events(self.battlefield) return {"success": True, "unit_deployed": unit.id, "location": "front"} return {"success": False, "reason": "Failed to deploy to front line"} def attack_target(self, attacker_id: UUID, target_id: UUID) -> Dict[str, Any]: """单位攻击目标(单位或HQ)""" attacker = self.battlefield.find_unit(attacker_id) if not attacker: return {"success": False, "reason": "Attacker not found"} if attacker.owner != self.active_player: return {"success": False, "reason": "Not your unit"} # 查找目标(可能是单位或HQ) target = self.battlefield.find_unit(target_id) if not target: # 检查是否是HQ目标 player1_hq = self.battlefield.player1_hq player2_hq = self.battlefield.player2_hq if str(target_id) == "hq1": target = player1_hq elif str(target_id) == "hq2": target = player2_hq else: return {"success": False, "reason": "Target not found"} # 检查攻击有效性 valid_targets = self.battlefield.get_valid_attack_targets_for_unit(attacker) if target not in valid_targets: return {"success": False, "reason": "Cannot attack this target"} # 检查单位是否已经行动过 if attacker.has_operated_this_turn and not attacker.has_keyword("FURY"): return {"success": False, "reason": "Unit already operated this turn"} # 触发攻击事件 self.event_manager.add_event(GameEvent( "unit_attacks", source=attacker, target=target )) # 解算战斗 combat_result = self.combat_resolver.resolve_combat(attacker, target, self.battlefield) # 标记单位已行动 attacker.has_operated_this_turn = True # 处理所有事件 event_results = self.event_manager.process_events(self.battlefield) return { **combat_result, "event_results": event_results } def move_unit_to_front(self, unit_id: UUID) -> Dict[str, Any]: """将支援线单位移动到前线""" unit = self.battlefield.find_unit(unit_id) if not unit or unit.owner != self.active_player: return {"success": False, "reason": "Unit not found or not yours"} if not unit.position: return {"success": False, "reason": "Unit not on battlefield"} line_type, _ = unit.position if line_type == LineType.FRONT: return {"success": False, "reason": "Unit already on front line"} # 只有坦克可以同回合移动并攻击 if unit.unit_type.name != "TANK" and unit.has_operated_this_turn: return {"success": False, "reason": "Unit already operated this turn"} # 检查前线是否有空位 if self.battlefield.front_line.is_full(): return {"success": False, "reason": "Front line is full"} # 从当前位置移除并部署到前线 old_line = self.battlefield.get_line_by_type(line_type) if old_line: old_line.remove_unit(unit) success = self.battlefield.front_line.add_unit(unit) if success: # 对于非坦克单位,标记为已行动 if unit.unit_type.name != "TANK": unit.has_operated_this_turn = True # 更新前线控制权 self.battlefield._update_front_line_control() return {"success": True, "unit_moved": unit.id, "to": "front"} return {"success": False, "reason": "Failed to move to front"} def end_turn(self) -> Dict[str, Any]: """结束回合""" # 触发回合结束事件 self.event_manager.add_event(GameEvent( "turn_end", context={"player": self.active_player, "turn": self.current_turn} )) # 处理事件 self.event_manager.process_events(self.battlefield) # 切换玩家 self.active_player = (self.battlefield.player2_id if self.active_player == self.battlefield.player1_id else self.battlefield.player1_id) # 增加指挥点并重置 if self.active_player == self.battlefield.player1_id: self.player1_command_points = min(self.max_command_points, self.current_turn + 1) else: self.player2_command_points = min(self.max_command_points, self.current_turn + 1) # 重置单位状态 for unit in self.battlefield.get_all_units(self.active_player): unit.reset_operation_status() # 触发回合开始事件 self.event_manager.add_event(GameEvent( "turn_start", context={"player": self.active_player, "turn": self.current_turn} )) # 处理事件 self.event_manager.process_events(self.battlefield) self.current_turn += 1 self.game_phase = GamePhase.PLAYER_TURN # 检查游戏是否结束 game_over, winner = self.battlefield.is_game_over() return { "turn_ended": True, "new_active_player": self.active_player, "turn_number": self.current_turn, "game_over": game_over, "winner": winner, "front_line_controller": self.battlefield.front_line_controller } def get_game_state(self) -> Dict[str, Any]: """获取游戏状态""" return { "turn": self.current_turn, "active_player": self.active_player, "phase": self.game_phase.value, "battlefield": str(self.battlefield), "player1_hq": self.battlefield.player1_hq.current_defense, "player2_hq": self.battlefield.player2_hq.current_defense, "front_line_controller": self.battlefield.front_line_controller, "units_count": { "player1_total": len(self.battlefield.get_all_units(self.battlefield.player1_id)), "player2_total": len(self.battlefield.get_all_units(self.battlefield.player2_id)), "player1_support": len(self.battlefield.player1_support.get_all_units()), "player1_front": len([u for u in self.battlefield.front_line.get_all_units() if u.owner == self.battlefield.player1_id]), "player2_support": len(self.battlefield.player2_support.get_all_units()), "player2_front": len([u for u in self.battlefield.front_line.get_all_units() if u.owner == self.battlefield.player2_id]) } }