File size: 15,749 Bytes
40ee6b4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
"""
Tactical Data Augmentation Module.

Provides domain-specific data augmentation techniques for:
- Cybersecurity threat scenarios
- Military tactical situations
- Multi-step reasoning problems

These augmentations help increase training data diversity and improve
model robustness for tactical analysis tasks.
"""

import logging
import random
from dataclasses import dataclass

from .dataset_loader import DatasetSample

logger = logging.getLogger(__name__)


@dataclass
class AugmentationResult:
    """Result of data augmentation."""

    original: DatasetSample
    augmented: list[DatasetSample]
    augmentation_types: list[str]


class TacticalAugmenter:
    """
    Domain-specific data augmentation for tactical analysis.

    Augmentation techniques:
    - Paraphrasing tactical scenarios
    - Varying urgency levels
    - Adding/removing constraints
    - Scenario parameter variation
    - Threat actor substitution
    - Temporal shifting
    """

    # Tactical scenario templates
    URGENCY_MODIFIERS = {
        "high": ["IMMEDIATE", "CRITICAL", "URGENT", "TIME-SENSITIVE"],
        "medium": ["PRIORITY", "IMPORTANT", "ATTENTION REQUIRED"],
        "low": ["ROUTINE", "STANDARD", "WHEN POSSIBLE"],
    }

    THREAT_ACTORS = [
        "APT28",
        "APT29",
        "Lazarus Group",
        "Cozy Bear",
        "Fancy Bear",
        "Unknown Actor",
        "Nation-State Actor",
        "Criminal Organization",
    ]

    ATTACK_VECTORS = [
        "phishing",
        "spear-phishing",
        "watering hole",
        "supply chain compromise",
        "zero-day exploit",
        "credential stuffing",
        "brute force",
        "social engineering",
    ]

    MILITARY_OBJECTIVES = [
        "secure perimeter",
        "establish forward position",
        "conduct reconnaissance",
        "neutralize threat",
        "protect assets",
        "maintain operational security",
        "coordinate with allied forces",
        "execute tactical withdrawal",
    ]

    ENVIRONMENTAL_CONDITIONS = [
        "night operations",
        "adverse weather",
        "limited visibility",
        "urban terrain",
        "mountainous region",
        "coastal area",
        "contested airspace",
        "electronic warfare environment",
    ]

    def __init__(self, seed: int = 42):
        """
        Initialize augmenter.

        Args:
            seed: Random seed for reproducibility
        """
        self.rng = random.Random(seed)
        self._augmentation_count = 0

    def augment_sample(
        self,
        sample: DatasetSample,
        num_augmentations: int = 3,
        techniques: list[str] | None = None,
    ) -> AugmentationResult:
        """
        Augment a single sample.

        Args:
            sample: Original dataset sample
            num_augmentations: Number of augmented versions to create
            techniques: Specific techniques to use (None for random selection)

        Returns:
            AugmentationResult with augmented samples
        """
        available_techniques = [
            "urgency_variation",
            "parameter_substitution",
            "constraint_addition",
            "temporal_shift",
            "perspective_change",
        ]

        if techniques:
            available_techniques = [t for t in techniques if t in available_techniques]

        augmented_samples = []
        used_techniques = []

        for _i in range(num_augmentations):
            technique = self.rng.choice(available_techniques)
            used_techniques.append(technique)

            augmented_text = self._apply_technique(sample.text, sample.domain, technique)

            aug_sample = DatasetSample(
                id=f"{sample.id}_aug_{self._augmentation_count}",
                text=augmented_text,
                metadata={
                    **sample.metadata,
                    "augmentation": technique,
                    "original_id": sample.id,
                },
                labels=sample.labels,
                difficulty=sample.difficulty,
                domain=sample.domain,
                reasoning_steps=sample.reasoning_steps,
            )

            augmented_samples.append(aug_sample)
            self._augmentation_count += 1

        return AugmentationResult(
            original=sample,
            augmented=augmented_samples,
            augmentation_types=used_techniques,
        )

    def _apply_technique(self, text: str, domain: str | None, technique: str) -> str:
        """Apply specific augmentation technique."""
        if technique == "urgency_variation":
            return self._augment_urgency(text)
        elif technique == "parameter_substitution":
            return self._augment_parameters(text, domain)
        elif technique == "constraint_addition":
            return self._augment_constraints(text, domain)
        elif technique == "temporal_shift":
            return self._augment_temporal(text)
        elif technique == "perspective_change":
            return self._augment_perspective(text, domain)
        else:
            return text

    def _augment_urgency(self, text: str) -> str:
        """Vary urgency level in the text."""
        urgency_level = self.rng.choice(list(self.URGENCY_MODIFIERS.keys()))
        modifier = self.rng.choice(self.URGENCY_MODIFIERS[urgency_level])

        # Add urgency prefix
        if urgency_level == "high":
            return f"[{modifier}] {text}"
        elif urgency_level == "medium":
            return f"{modifier}: {text}"
        else:
            return f"({modifier}) {text}"

    def _augment_parameters(self, text: str, domain: str | None) -> str:
        """Substitute domain-specific parameters."""
        if domain == "cybersecurity" or "cyber" in text.lower():
            # Substitute threat actors
            for actor in self.THREAT_ACTORS:
                if actor in text:
                    new_actor = self.rng.choice([a for a in self.THREAT_ACTORS if a != actor])
                    text = text.replace(actor, new_actor)
                    break

            # Substitute attack vectors
            for vector in self.ATTACK_VECTORS:
                if vector in text.lower():
                    new_vector = self.rng.choice([v for v in self.ATTACK_VECTORS if v != vector])
                    text = text.replace(vector, new_vector)
                    break

        elif domain == "military" or any(kw in text.lower() for kw in ["tactical", "military", "reconnaissance"]):
            # Substitute objectives
            for obj in self.MILITARY_OBJECTIVES:
                if obj in text.lower():
                    new_obj = self.rng.choice([o for o in self.MILITARY_OBJECTIVES if o != obj])
                    text = text.replace(obj, new_obj)
                    break

        return text

    def _augment_constraints(self, text: str, domain: str | None) -> str:
        """Add additional constraints to the scenario."""
        constraints = []

        if domain == "cybersecurity":
            constraints = [
                "with limited network visibility",
                "under active attack",
                "with compromised credentials",
                "during maintenance window",
                "with restricted access to logs",
            ]
        elif domain == "military":
            constraints = [
                "with limited ammunition",
                "under communication blackout",
                "with reduced personnel",
                "in contested environment",
                "with time constraint of 2 hours",
            ]
        else:
            constraints = [
                "with incomplete information",
                "under time pressure",
                "with resource constraints",
                "considering multiple stakeholders",
                "with conflicting objectives",
            ]

        if constraints:
            constraint = self.rng.choice(constraints)
            return f"{text} [{constraint}]"

        return text

    def _augment_temporal(self, text: str) -> str:
        """Shift temporal context."""
        temporal_contexts = [
            "In the past 24 hours, ",
            "Over the next week, ",
            "Immediately, ",
            "During the upcoming operation, ",
            "Following initial assessment, ",
        ]

        context = self.rng.choice(temporal_contexts)
        return f"{context}{text.lower()}" if text else text

    def _augment_perspective(self, text: str, domain: str | None) -> str:
        """Change analytical perspective."""
        perspectives = {
            "cybersecurity": [
                "From a threat hunter's perspective: ",
                "Considering the attacker's viewpoint: ",
                "For incident response purposes: ",
                "From a risk management standpoint: ",
            ],
            "military": [
                "From the commander's perspective: ",
                "Considering enemy capabilities: ",
                "For tactical planning purposes: ",
                "From a logistics standpoint: ",
            ],
            "default": [
                "From an analytical perspective: ",
                "Considering all factors: ",
                "For decision-making purposes: ",
                "From a strategic viewpoint: ",
            ],
        }

        domain_perspectives = perspectives.get(domain or "default", perspectives["default"])
        perspective = self.rng.choice(domain_perspectives)

        return f"{perspective}{text}"

    def augment_batch(
        self,
        samples: list[DatasetSample],
        augmentations_per_sample: int = 2,
    ) -> list[DatasetSample]:
        """
        Augment a batch of samples.

        Args:
            samples: List of original samples
            augmentations_per_sample: Number of augmentations per sample

        Returns:
            List of all samples (original + augmented)
        """
        all_samples = list(samples)  # Keep originals

        for sample in samples:
            result = self.augment_sample(sample, num_augmentations=augmentations_per_sample)
            all_samples.extend(result.augmented)

        logger.info(
            f"Augmented {len(samples)} samples to {len(all_samples)} (+{len(all_samples) - len(samples)} augmented)"
        )

        return all_samples

    def create_tactical_scenarios(self, base_samples: list[DatasetSample]) -> list[DatasetSample]:
        """
        Create tactical scenario variations from base samples.

        Combines multiple augmentation techniques to create
        diverse tactical scenarios for training.

        Args:
            base_samples: Base dataset samples

        Returns:
            Extended list with tactical scenario variations
        """
        scenarios = list(base_samples)

        for sample in base_samples:
            # Create high-stakes variant
            high_stakes = self._augment_urgency(sample.text)
            high_stakes = self._augment_constraints(high_stakes, sample.domain)
            scenarios.append(
                DatasetSample(
                    id=f"{sample.id}_highstakes_{self._augmentation_count}",
                    text=high_stakes,
                    metadata={
                        **sample.metadata,
                        "scenario_type": "high_stakes",
                        "original_id": sample.id,
                    },
                    labels=sample.labels,
                    difficulty="hard",  # High stakes scenarios are harder
                    domain=sample.domain,
                    reasoning_steps=sample.reasoning_steps,
                )
            )
            self._augmentation_count += 1

            # Create multi-perspective variant
            if self.rng.random() > 0.5:
                multi_perspective = self._augment_perspective(sample.text, sample.domain)
                scenarios.append(
                    DatasetSample(
                        id=f"{sample.id}_multiperspective_{self._augmentation_count}",
                        text=multi_perspective,
                        metadata={
                            **sample.metadata,
                            "scenario_type": "multi_perspective",
                            "original_id": sample.id,
                        },
                        labels=sample.labels,
                        difficulty=sample.difficulty,
                        domain=sample.domain,
                        reasoning_steps=sample.reasoning_steps,
                    )
                )
                self._augmentation_count += 1

        logger.info(f"Created {len(scenarios) - len(base_samples)} tactical scenarios")
        return scenarios


class CyberSecurityAugmenter(TacticalAugmenter):
    """
    Specialized augmenter for cybersecurity scenarios.

    Focuses on:
    - MITRE ATT&CK technique variations
    - Threat intelligence context
    - Incident response scenarios
    """

    MITRE_TACTICS = [
        "Initial Access",
        "Execution",
        "Persistence",
        "Privilege Escalation",
        "Defense Evasion",
        "Credential Access",
        "Discovery",
        "Lateral Movement",
        "Collection",
        "Exfiltration",
        "Impact",
    ]

    SEVERITY_LEVELS = ["LOW", "MEDIUM", "HIGH", "CRITICAL"]

    def augment_with_mitre_context(self, sample: DatasetSample) -> DatasetSample:
        """
        Add MITRE ATT&CK context to sample.

        Args:
            sample: Original sample

        Returns:
            Augmented sample with MITRE context
        """
        tactic = self.rng.choice(self.MITRE_TACTICS)
        severity = self.rng.choice(self.SEVERITY_LEVELS)

        augmented_text = f"[MITRE ATT&CK: {tactic}] [Severity: {severity}] {sample.text}"

        return DatasetSample(
            id=f"{sample.id}_mitre_{self._augmentation_count}",
            text=augmented_text,
            metadata={
                **sample.metadata,
                "mitre_tactic": tactic,
                "severity": severity,
            },
            labels=sample.labels,
            difficulty=sample.difficulty,
            domain="cybersecurity",
            reasoning_steps=sample.reasoning_steps,
        )


class MilitaryTacticalAugmenter(TacticalAugmenter):
    """
    Specialized augmenter for military tactical scenarios.

    Focuses on:
    - Environmental condition variations
    - Force composition changes
    - Mission objective variations
    """

    FORCE_COMPOSITIONS = [
        "infantry platoon",
        "mechanized company",
        "special operations team",
        "combined arms battalion",
        "air assault element",
    ]

    def augment_with_force_composition(self, sample: DatasetSample) -> DatasetSample:
        """
        Add force composition context to sample.

        Args:
            sample: Original sample

        Returns:
            Augmented sample with force composition
        """
        force = self.rng.choice(self.FORCE_COMPOSITIONS)
        condition = self.rng.choice(self.ENVIRONMENTAL_CONDITIONS)

        augmented_text = f"[Force: {force}] [Conditions: {condition}] {sample.text}"

        return DatasetSample(
            id=f"{sample.id}_tactical_{self._augmentation_count}",
            text=augmented_text,
            metadata={
                **sample.metadata,
                "force_composition": force,
                "environmental_conditions": condition,
            },
            labels=sample.labels,
            difficulty=sample.difficulty,
            domain="military",
            reasoning_steps=sample.reasoning_steps,
        )