Spaces:
Running
Running
| """ | |
| Tactical Data Augmentation Module. | |
| Provides domain-specific data augmentation techniques for: | |
| - Cybersecurity threat scenarios | |
| - Military tactical situations | |
| - Multi-step reasoning problems | |
| These augmentations help increase training data diversity and improve | |
| model robustness for tactical analysis tasks. | |
| """ | |
| import logging | |
| import random | |
| from dataclasses import dataclass | |
| from .dataset_loader import DatasetSample | |
| logger = logging.getLogger(__name__) | |
| class AugmentationResult: | |
| """Result of data augmentation.""" | |
| original: DatasetSample | |
| augmented: list[DatasetSample] | |
| augmentation_types: list[str] | |
| class TacticalAugmenter: | |
| """ | |
| Domain-specific data augmentation for tactical analysis. | |
| Augmentation techniques: | |
| - Paraphrasing tactical scenarios | |
| - Varying urgency levels | |
| - Adding/removing constraints | |
| - Scenario parameter variation | |
| - Threat actor substitution | |
| - Temporal shifting | |
| """ | |
| # Tactical scenario templates | |
| URGENCY_MODIFIERS = { | |
| "high": ["IMMEDIATE", "CRITICAL", "URGENT", "TIME-SENSITIVE"], | |
| "medium": ["PRIORITY", "IMPORTANT", "ATTENTION REQUIRED"], | |
| "low": ["ROUTINE", "STANDARD", "WHEN POSSIBLE"], | |
| } | |
| THREAT_ACTORS = [ | |
| "APT28", | |
| "APT29", | |
| "Lazarus Group", | |
| "Cozy Bear", | |
| "Fancy Bear", | |
| "Unknown Actor", | |
| "Nation-State Actor", | |
| "Criminal Organization", | |
| ] | |
| ATTACK_VECTORS = [ | |
| "phishing", | |
| "spear-phishing", | |
| "watering hole", | |
| "supply chain compromise", | |
| "zero-day exploit", | |
| "credential stuffing", | |
| "brute force", | |
| "social engineering", | |
| ] | |
| MILITARY_OBJECTIVES = [ | |
| "secure perimeter", | |
| "establish forward position", | |
| "conduct reconnaissance", | |
| "neutralize threat", | |
| "protect assets", | |
| "maintain operational security", | |
| "coordinate with allied forces", | |
| "execute tactical withdrawal", | |
| ] | |
| ENVIRONMENTAL_CONDITIONS = [ | |
| "night operations", | |
| "adverse weather", | |
| "limited visibility", | |
| "urban terrain", | |
| "mountainous region", | |
| "coastal area", | |
| "contested airspace", | |
| "electronic warfare environment", | |
| ] | |
| def __init__(self, seed: int = 42): | |
| """ | |
| Initialize augmenter. | |
| Args: | |
| seed: Random seed for reproducibility | |
| """ | |
| self.rng = random.Random(seed) | |
| self._augmentation_count = 0 | |
| def augment_sample( | |
| self, | |
| sample: DatasetSample, | |
| num_augmentations: int = 3, | |
| techniques: list[str] | None = None, | |
| ) -> AugmentationResult: | |
| """ | |
| Augment a single sample. | |
| Args: | |
| sample: Original dataset sample | |
| num_augmentations: Number of augmented versions to create | |
| techniques: Specific techniques to use (None for random selection) | |
| Returns: | |
| AugmentationResult with augmented samples | |
| """ | |
| available_techniques = [ | |
| "urgency_variation", | |
| "parameter_substitution", | |
| "constraint_addition", | |
| "temporal_shift", | |
| "perspective_change", | |
| ] | |
| if techniques: | |
| available_techniques = [t for t in techniques if t in available_techniques] | |
| augmented_samples = [] | |
| used_techniques = [] | |
| for _i in range(num_augmentations): | |
| technique = self.rng.choice(available_techniques) | |
| used_techniques.append(technique) | |
| augmented_text = self._apply_technique(sample.text, sample.domain, technique) | |
| aug_sample = DatasetSample( | |
| id=f"{sample.id}_aug_{self._augmentation_count}", | |
| text=augmented_text, | |
| metadata={ | |
| **sample.metadata, | |
| "augmentation": technique, | |
| "original_id": sample.id, | |
| }, | |
| labels=sample.labels, | |
| difficulty=sample.difficulty, | |
| domain=sample.domain, | |
| reasoning_steps=sample.reasoning_steps, | |
| ) | |
| augmented_samples.append(aug_sample) | |
| self._augmentation_count += 1 | |
| return AugmentationResult( | |
| original=sample, | |
| augmented=augmented_samples, | |
| augmentation_types=used_techniques, | |
| ) | |
| def _apply_technique(self, text: str, domain: str | None, technique: str) -> str: | |
| """Apply specific augmentation technique.""" | |
| if technique == "urgency_variation": | |
| return self._augment_urgency(text) | |
| elif technique == "parameter_substitution": | |
| return self._augment_parameters(text, domain) | |
| elif technique == "constraint_addition": | |
| return self._augment_constraints(text, domain) | |
| elif technique == "temporal_shift": | |
| return self._augment_temporal(text) | |
| elif technique == "perspective_change": | |
| return self._augment_perspective(text, domain) | |
| else: | |
| return text | |
| def _augment_urgency(self, text: str) -> str: | |
| """Vary urgency level in the text.""" | |
| urgency_level = self.rng.choice(list(self.URGENCY_MODIFIERS.keys())) | |
| modifier = self.rng.choice(self.URGENCY_MODIFIERS[urgency_level]) | |
| # Add urgency prefix | |
| if urgency_level == "high": | |
| return f"[{modifier}] {text}" | |
| elif urgency_level == "medium": | |
| return f"{modifier}: {text}" | |
| else: | |
| return f"({modifier}) {text}" | |
| def _augment_parameters(self, text: str, domain: str | None) -> str: | |
| """Substitute domain-specific parameters.""" | |
| if domain == "cybersecurity" or "cyber" in text.lower(): | |
| # Substitute threat actors | |
| for actor in self.THREAT_ACTORS: | |
| if actor in text: | |
| new_actor = self.rng.choice([a for a in self.THREAT_ACTORS if a != actor]) | |
| text = text.replace(actor, new_actor) | |
| break | |
| # Substitute attack vectors | |
| for vector in self.ATTACK_VECTORS: | |
| if vector in text.lower(): | |
| new_vector = self.rng.choice([v for v in self.ATTACK_VECTORS if v != vector]) | |
| text = text.replace(vector, new_vector) | |
| break | |
| elif domain == "military" or any(kw in text.lower() for kw in ["tactical", "military", "reconnaissance"]): | |
| # Substitute objectives | |
| for obj in self.MILITARY_OBJECTIVES: | |
| if obj in text.lower(): | |
| new_obj = self.rng.choice([o for o in self.MILITARY_OBJECTIVES if o != obj]) | |
| text = text.replace(obj, new_obj) | |
| break | |
| return text | |
| def _augment_constraints(self, text: str, domain: str | None) -> str: | |
| """Add additional constraints to the scenario.""" | |
| constraints = [] | |
| if domain == "cybersecurity": | |
| constraints = [ | |
| "with limited network visibility", | |
| "under active attack", | |
| "with compromised credentials", | |
| "during maintenance window", | |
| "with restricted access to logs", | |
| ] | |
| elif domain == "military": | |
| constraints = [ | |
| "with limited ammunition", | |
| "under communication blackout", | |
| "with reduced personnel", | |
| "in contested environment", | |
| "with time constraint of 2 hours", | |
| ] | |
| else: | |
| constraints = [ | |
| "with incomplete information", | |
| "under time pressure", | |
| "with resource constraints", | |
| "considering multiple stakeholders", | |
| "with conflicting objectives", | |
| ] | |
| if constraints: | |
| constraint = self.rng.choice(constraints) | |
| return f"{text} [{constraint}]" | |
| return text | |
| def _augment_temporal(self, text: str) -> str: | |
| """Shift temporal context.""" | |
| temporal_contexts = [ | |
| "In the past 24 hours, ", | |
| "Over the next week, ", | |
| "Immediately, ", | |
| "During the upcoming operation, ", | |
| "Following initial assessment, ", | |
| ] | |
| context = self.rng.choice(temporal_contexts) | |
| return f"{context}{text.lower()}" if text else text | |
| def _augment_perspective(self, text: str, domain: str | None) -> str: | |
| """Change analytical perspective.""" | |
| perspectives = { | |
| "cybersecurity": [ | |
| "From a threat hunter's perspective: ", | |
| "Considering the attacker's viewpoint: ", | |
| "For incident response purposes: ", | |
| "From a risk management standpoint: ", | |
| ], | |
| "military": [ | |
| "From the commander's perspective: ", | |
| "Considering enemy capabilities: ", | |
| "For tactical planning purposes: ", | |
| "From a logistics standpoint: ", | |
| ], | |
| "default": [ | |
| "From an analytical perspective: ", | |
| "Considering all factors: ", | |
| "For decision-making purposes: ", | |
| "From a strategic viewpoint: ", | |
| ], | |
| } | |
| domain_perspectives = perspectives.get(domain or "default", perspectives["default"]) | |
| perspective = self.rng.choice(domain_perspectives) | |
| return f"{perspective}{text}" | |
| def augment_batch( | |
| self, | |
| samples: list[DatasetSample], | |
| augmentations_per_sample: int = 2, | |
| ) -> list[DatasetSample]: | |
| """ | |
| Augment a batch of samples. | |
| Args: | |
| samples: List of original samples | |
| augmentations_per_sample: Number of augmentations per sample | |
| Returns: | |
| List of all samples (original + augmented) | |
| """ | |
| all_samples = list(samples) # Keep originals | |
| for sample in samples: | |
| result = self.augment_sample(sample, num_augmentations=augmentations_per_sample) | |
| all_samples.extend(result.augmented) | |
| logger.info( | |
| f"Augmented {len(samples)} samples to {len(all_samples)} (+{len(all_samples) - len(samples)} augmented)" | |
| ) | |
| return all_samples | |
| def create_tactical_scenarios(self, base_samples: list[DatasetSample]) -> list[DatasetSample]: | |
| """ | |
| Create tactical scenario variations from base samples. | |
| Combines multiple augmentation techniques to create | |
| diverse tactical scenarios for training. | |
| Args: | |
| base_samples: Base dataset samples | |
| Returns: | |
| Extended list with tactical scenario variations | |
| """ | |
| scenarios = list(base_samples) | |
| for sample in base_samples: | |
| # Create high-stakes variant | |
| high_stakes = self._augment_urgency(sample.text) | |
| high_stakes = self._augment_constraints(high_stakes, sample.domain) | |
| scenarios.append( | |
| DatasetSample( | |
| id=f"{sample.id}_highstakes_{self._augmentation_count}", | |
| text=high_stakes, | |
| metadata={ | |
| **sample.metadata, | |
| "scenario_type": "high_stakes", | |
| "original_id": sample.id, | |
| }, | |
| labels=sample.labels, | |
| difficulty="hard", # High stakes scenarios are harder | |
| domain=sample.domain, | |
| reasoning_steps=sample.reasoning_steps, | |
| ) | |
| ) | |
| self._augmentation_count += 1 | |
| # Create multi-perspective variant | |
| if self.rng.random() > 0.5: | |
| multi_perspective = self._augment_perspective(sample.text, sample.domain) | |
| scenarios.append( | |
| DatasetSample( | |
| id=f"{sample.id}_multiperspective_{self._augmentation_count}", | |
| text=multi_perspective, | |
| metadata={ | |
| **sample.metadata, | |
| "scenario_type": "multi_perspective", | |
| "original_id": sample.id, | |
| }, | |
| labels=sample.labels, | |
| difficulty=sample.difficulty, | |
| domain=sample.domain, | |
| reasoning_steps=sample.reasoning_steps, | |
| ) | |
| ) | |
| self._augmentation_count += 1 | |
| logger.info(f"Created {len(scenarios) - len(base_samples)} tactical scenarios") | |
| return scenarios | |
| class CyberSecurityAugmenter(TacticalAugmenter): | |
| """ | |
| Specialized augmenter for cybersecurity scenarios. | |
| Focuses on: | |
| - MITRE ATT&CK technique variations | |
| - Threat intelligence context | |
| - Incident response scenarios | |
| """ | |
| MITRE_TACTICS = [ | |
| "Initial Access", | |
| "Execution", | |
| "Persistence", | |
| "Privilege Escalation", | |
| "Defense Evasion", | |
| "Credential Access", | |
| "Discovery", | |
| "Lateral Movement", | |
| "Collection", | |
| "Exfiltration", | |
| "Impact", | |
| ] | |
| SEVERITY_LEVELS = ["LOW", "MEDIUM", "HIGH", "CRITICAL"] | |
| def augment_with_mitre_context(self, sample: DatasetSample) -> DatasetSample: | |
| """ | |
| Add MITRE ATT&CK context to sample. | |
| Args: | |
| sample: Original sample | |
| Returns: | |
| Augmented sample with MITRE context | |
| """ | |
| tactic = self.rng.choice(self.MITRE_TACTICS) | |
| severity = self.rng.choice(self.SEVERITY_LEVELS) | |
| augmented_text = f"[MITRE ATT&CK: {tactic}] [Severity: {severity}] {sample.text}" | |
| return DatasetSample( | |
| id=f"{sample.id}_mitre_{self._augmentation_count}", | |
| text=augmented_text, | |
| metadata={ | |
| **sample.metadata, | |
| "mitre_tactic": tactic, | |
| "severity": severity, | |
| }, | |
| labels=sample.labels, | |
| difficulty=sample.difficulty, | |
| domain="cybersecurity", | |
| reasoning_steps=sample.reasoning_steps, | |
| ) | |
| class MilitaryTacticalAugmenter(TacticalAugmenter): | |
| """ | |
| Specialized augmenter for military tactical scenarios. | |
| Focuses on: | |
| - Environmental condition variations | |
| - Force composition changes | |
| - Mission objective variations | |
| """ | |
| FORCE_COMPOSITIONS = [ | |
| "infantry platoon", | |
| "mechanized company", | |
| "special operations team", | |
| "combined arms battalion", | |
| "air assault element", | |
| ] | |
| def augment_with_force_composition(self, sample: DatasetSample) -> DatasetSample: | |
| """ | |
| Add force composition context to sample. | |
| Args: | |
| sample: Original sample | |
| Returns: | |
| Augmented sample with force composition | |
| """ | |
| force = self.rng.choice(self.FORCE_COMPOSITIONS) | |
| condition = self.rng.choice(self.ENVIRONMENTAL_CONDITIONS) | |
| augmented_text = f"[Force: {force}] [Conditions: {condition}] {sample.text}" | |
| return DatasetSample( | |
| id=f"{sample.id}_tactical_{self._augmentation_count}", | |
| text=augmented_text, | |
| metadata={ | |
| **sample.metadata, | |
| "force_composition": force, | |
| "environmental_conditions": condition, | |
| }, | |
| labels=sample.labels, | |
| difficulty=sample.difficulty, | |
| domain="military", | |
| reasoning_steps=sample.reasoning_steps, | |
| ) | |