|
|
|
|
|
""" |
|
|
PyArrow Dataset Generator for ML Inference Service |
|
|
|
|
|
Generates test datasets for academic challenges and model validation. |
|
|
Creates 100 PyArrow datasets with various image types and test scenarios. |
|
|
""" |
|
|
|
|
|
import base64 |
|
|
import json |
|
|
import random |
|
|
from pathlib import Path |
|
|
from typing import Dict, List, Any, Tuple |
|
|
import io |
|
|
|
|
|
import numpy as np |
|
|
import pyarrow as pa |
|
|
import pyarrow.parquet as pq |
|
|
from PIL import Image, ImageDraw, ImageFont |
|
|
|
|
|
|
|
|
class TestDatasetGenerator: |
|
|
def __init__(self, output_dir: str = "test_datasets"): |
|
|
self.output_dir = Path(output_dir) |
|
|
self.output_dir.mkdir(exist_ok=True) |
|
|
|
|
|
|
|
|
self.imagenet_labels = [ |
|
|
"tench", "goldfish", "great_white_shark", "tiger_shark", "hammerhead", |
|
|
"electric_ray", "stingray", "cock", "hen", "ostrich", "brambling", |
|
|
"goldfinch", "house_finch", "junco", "indigo_bunting", "robin", |
|
|
"bulbul", "jay", "magpie", "chickadee", "water_ouzel", "kite", |
|
|
"bald_eagle", "vulture", "great_grey_owl", "European_fire_salamander", |
|
|
"common_newt", "eft", "spotted_salamander", "axolotl", "bullfrog", |
|
|
"tree_frog", "tailed_frog", "loggerhead", "leatherback_turtle", |
|
|
"mud_turtle", "terrapin", "box_turtle", "banded_gecko", "common_iguana", |
|
|
"American_chameleon", "whiptail", "agama", "frilled_lizard", "alligator_lizard", |
|
|
"Gila_monster", "green_lizard", "African_chameleon", "Komodo_dragon", |
|
|
"African_crocodile", "American_alligator", "triceratops", "thunder_snake" |
|
|
] |
|
|
|
|
|
def create_synthetic_image(self, width: int = 224, height: int = 224, |
|
|
image_type: str = "random") -> Image.Image: |
|
|
"""Create synthetic images for testing.""" |
|
|
if image_type == "random": |
|
|
|
|
|
array = np.random.randint(0, 256, (height, width, 3), dtype=np.uint8) |
|
|
return Image.fromarray(array) |
|
|
|
|
|
elif image_type == "geometric": |
|
|
|
|
|
img = Image.new('RGB', (width, height), color='white') |
|
|
draw = ImageDraw.Draw(img) |
|
|
|
|
|
|
|
|
for _ in range(random.randint(3, 8)): |
|
|
color = tuple(random.randint(0, 255) for _ in range(3)) |
|
|
shape_type = random.choice(['rectangle', 'ellipse']) |
|
|
x1, y1 = random.randint(0, width//2), random.randint(0, height//2) |
|
|
x2, y2 = x1 + random.randint(20, width//2), y1 + random.randint(20, height//2) |
|
|
|
|
|
if shape_type == 'rectangle': |
|
|
draw.rectangle([x1, y1, x2, y2], fill=color) |
|
|
else: |
|
|
draw.ellipse([x1, y1, x2, y2], fill=color) |
|
|
|
|
|
return img |
|
|
|
|
|
elif image_type == "gradient": |
|
|
array = np.zeros((height, width, 3), dtype=np.uint8) |
|
|
for i in range(height): |
|
|
for j in range(width): |
|
|
array[i, j] = [i * 255 // height, j * 255 // width, (i + j) * 255 // (height + width)] |
|
|
return Image.fromarray(array) |
|
|
|
|
|
elif image_type == "text": |
|
|
img = Image.new('RGB', (width, height), color='white') |
|
|
draw = ImageDraw.Draw(img) |
|
|
|
|
|
try: |
|
|
font = ImageFont.load_default() |
|
|
except: |
|
|
font = None |
|
|
|
|
|
text = f"Test Image {random.randint(1, 1000)}" |
|
|
draw.text((width//4, height//2), text, fill='black', font=font) |
|
|
return img |
|
|
|
|
|
else: |
|
|
color = tuple(random.randint(0, 255) for _ in range(3)) |
|
|
return Image.new('RGB', (width, height), color=color) |
|
|
|
|
|
def image_to_base64(self, image: Image.Image, format: str = "JPEG") -> str: |
|
|
"""Convert PIL image to base64 string.""" |
|
|
buffer = io.BytesIO() |
|
|
image.save(buffer, format=format) |
|
|
image_bytes = buffer.getvalue() |
|
|
return base64.b64encode(image_bytes).decode('utf-8') |
|
|
|
|
|
def create_api_request(self, image_b64: str, media_type: str = "image/jpeg") -> Dict[str, Any]: |
|
|
"""Create API request structure matching your service.""" |
|
|
return { |
|
|
"image": { |
|
|
"mediaType": media_type, |
|
|
"data": image_b64 |
|
|
} |
|
|
} |
|
|
|
|
|
def create_expected_response(self, model_name: str = "microsoft/resnet-18", |
|
|
media_type: str = "image/jpeg") -> Dict[str, Any]: |
|
|
"""Create expected response structure.""" |
|
|
prediction = random.choice(self.imagenet_labels) |
|
|
return { |
|
|
"prediction": prediction, |
|
|
"confidence": round(random.uniform(0.3, 0.99), 4), |
|
|
"predicted_label": random.randint(0, len(self.imagenet_labels) - 1), |
|
|
"model": model_name, |
|
|
"mediaType": media_type |
|
|
} |
|
|
|
|
|
def generate_standard_datasets(self, count: int = 25) -> List[Dict[str, Any]]: |
|
|
"""Generate standard test cases with normal images.""" |
|
|
datasets = [] |
|
|
|
|
|
for i in range(count): |
|
|
image_types = ["random", "geometric", "gradient", "text", "solid"] |
|
|
sizes = [(224, 224), (256, 256), (299, 299), (384, 384)] |
|
|
formats = [("JPEG", "image/jpeg"), ("PNG", "image/png")] |
|
|
|
|
|
records = [] |
|
|
for j in range(random.randint(5, 20)): |
|
|
img_type = random.choice(image_types) |
|
|
size = random.choice(sizes) |
|
|
format_info = random.choice(formats) |
|
|
|
|
|
image = self.create_synthetic_image(size[0], size[1], img_type) |
|
|
image_b64 = self.image_to_base64(image, format_info[0]) |
|
|
|
|
|
api_request = self.create_api_request(image_b64, format_info[1]) |
|
|
expected_response = self.create_expected_response() |
|
|
|
|
|
record = { |
|
|
"dataset_id": f"standard_{i:03d}", |
|
|
"image_id": f"img_{j:03d}", |
|
|
"image_type": img_type, |
|
|
"image_size": f"{size[0]}x{size[1]}", |
|
|
"format": format_info[0], |
|
|
"media_type": format_info[1], |
|
|
"api_request": json.dumps(api_request), |
|
|
"expected_response": json.dumps(expected_response), |
|
|
"test_category": "standard", |
|
|
"difficulty": "normal" |
|
|
} |
|
|
records.append(record) |
|
|
|
|
|
datasets.append({ |
|
|
"name": f"standard_test_{i:03d}", |
|
|
"category": "standard", |
|
|
"description": f"Standard test dataset {i+1} with {len(records)} images", |
|
|
"records": records |
|
|
}) |
|
|
|
|
|
return datasets |
|
|
|
|
|
def generate_edge_case_datasets(self, count: int = 25) -> List[Dict[str, Any]]: |
|
|
"""Generate datasets for edge case scenarios.""" |
|
|
datasets = [] |
|
|
|
|
|
for i in range(count): |
|
|
records = [] |
|
|
edge_cases = [ |
|
|
{"type": "tiny", "size": (32, 32), "difficulty": "high"}, |
|
|
{"type": "huge", "size": (2048, 2048), "difficulty": "high"}, |
|
|
{"type": "extreme_aspect", "size": (1000, 50), "difficulty": "medium"}, |
|
|
{"type": "single_pixel", "size": (1, 1), "difficulty": "extreme"}, |
|
|
{"type": "corrupted_base64", "size": (224, 224), "difficulty": "extreme"} |
|
|
] |
|
|
|
|
|
for j, edge_case in enumerate(edge_cases): |
|
|
if edge_case["type"] == "corrupted_base64": |
|
|
image = self.create_synthetic_image(224, 224, "random") |
|
|
image_b64 = self.image_to_base64(image, "JPEG") |
|
|
corrupted_b64 = image_b64[:-20] + "CORRUPTED_DATA" |
|
|
api_request = self.create_api_request(corrupted_b64) |
|
|
expected_response = { |
|
|
"error": "Invalid image data", |
|
|
"status": "failed" |
|
|
} |
|
|
else: |
|
|
image = self.create_synthetic_image( |
|
|
edge_case["size"][0], edge_case["size"][1], "random" |
|
|
) |
|
|
image_b64 = self.image_to_base64(image, "PNG") |
|
|
api_request = self.create_api_request(image_b64, "image/png") |
|
|
expected_response = self.create_expected_response() |
|
|
|
|
|
record = { |
|
|
"dataset_id": f"edge_{i:03d}", |
|
|
"image_id": f"edge_{j:03d}", |
|
|
"image_type": edge_case["type"], |
|
|
"image_size": f"{edge_case['size'][0]}x{edge_case['size'][1]}", |
|
|
"format": "PNG", |
|
|
"media_type": "image/png", |
|
|
"api_request": json.dumps(api_request), |
|
|
"expected_response": json.dumps(expected_response), |
|
|
"test_category": "edge_case", |
|
|
"difficulty": edge_case["difficulty"] |
|
|
} |
|
|
records.append(record) |
|
|
|
|
|
datasets.append({ |
|
|
"name": f"edge_case_{i:03d}", |
|
|
"category": "edge_case", |
|
|
"description": f"Edge case dataset {i+1} with challenging scenarios", |
|
|
"records": records |
|
|
}) |
|
|
|
|
|
return datasets |
|
|
|
|
|
def generate_performance_datasets(self, count: int = 25) -> List[Dict[str, Any]]: |
|
|
"""Generate performance benchmark datasets.""" |
|
|
datasets = [] |
|
|
|
|
|
for i in range(count): |
|
|
batch_sizes = [1, 5, 10, 25, 50, 100] |
|
|
batch_size = random.choice(batch_sizes) |
|
|
|
|
|
records = [] |
|
|
for j in range(batch_size): |
|
|
image = self.create_synthetic_image(224, 224, "random") |
|
|
image_b64 = self.image_to_base64(image, "JPEG") |
|
|
api_request = self.create_api_request(image_b64) |
|
|
expected_response = self.create_expected_response() |
|
|
|
|
|
record = { |
|
|
"dataset_id": f"perf_{i:03d}", |
|
|
"image_id": f"batch_{j:03d}", |
|
|
"image_type": "performance_test", |
|
|
"image_size": "224x224", |
|
|
"format": "JPEG", |
|
|
"media_type": "image/jpeg", |
|
|
"api_request": json.dumps(api_request), |
|
|
"expected_response": json.dumps(expected_response), |
|
|
"test_category": "performance", |
|
|
"difficulty": "normal", |
|
|
"batch_size": batch_size, |
|
|
"expected_max_latency_ms": batch_size * 100 |
|
|
} |
|
|
records.append(record) |
|
|
|
|
|
datasets.append({ |
|
|
"name": f"performance_test_{i:03d}", |
|
|
"category": "performance", |
|
|
"description": f"Performance dataset {i+1} with batch size {batch_size}", |
|
|
"records": records |
|
|
}) |
|
|
|
|
|
return datasets |
|
|
|
|
|
def generate_model_comparison_datasets(self, count: int = 25) -> List[Dict[str, Any]]: |
|
|
"""Generate datasets for comparing different models.""" |
|
|
datasets = [] |
|
|
|
|
|
model_types = [ |
|
|
"microsoft/resnet-18", "microsoft/resnet-50", "google/vit-base-patch16-224", |
|
|
"facebook/convnext-tiny-224", "microsoft/swin-tiny-patch4-window7-224" |
|
|
] |
|
|
|
|
|
for i in range(count): |
|
|
|
|
|
base_images = [] |
|
|
for _ in range(10): |
|
|
image = self.create_synthetic_image(224, 224, "geometric") |
|
|
base_images.append(self.image_to_base64(image, "JPEG")) |
|
|
|
|
|
records = [] |
|
|
for j, model in enumerate(model_types): |
|
|
for k, image_b64 in enumerate(base_images): |
|
|
api_request = self.create_api_request(image_b64) |
|
|
expected_response = self.create_expected_response(model) |
|
|
|
|
|
record = { |
|
|
"dataset_id": f"comparison_{i:03d}", |
|
|
"image_id": f"img_{k:03d}_model_{j}", |
|
|
"image_type": "comparison_base", |
|
|
"image_size": "224x224", |
|
|
"format": "JPEG", |
|
|
"media_type": "image/jpeg", |
|
|
"api_request": json.dumps(api_request), |
|
|
"expected_response": json.dumps(expected_response), |
|
|
"test_category": "model_comparison", |
|
|
"difficulty": "normal", |
|
|
"model_type": model, |
|
|
"comparison_group": k |
|
|
} |
|
|
records.append(record) |
|
|
|
|
|
datasets.append({ |
|
|
"name": f"model_comparison_{i:03d}", |
|
|
"category": "model_comparison", |
|
|
"description": f"Model comparison dataset {i+1} testing {len(model_types)} models", |
|
|
"records": records |
|
|
}) |
|
|
|
|
|
return datasets |
|
|
|
|
|
def save_dataset_to_parquet(self, dataset: Dict[str, Any]): |
|
|
"""Save a dataset to PyArrow Parquet format.""" |
|
|
records = dataset["records"] |
|
|
|
|
|
|
|
|
table = pa.table({ |
|
|
"dataset_id": [r["dataset_id"] for r in records], |
|
|
"image_id": [r["image_id"] for r in records], |
|
|
"image_type": [r["image_type"] for r in records], |
|
|
"image_size": [r["image_size"] for r in records], |
|
|
"format": [r["format"] for r in records], |
|
|
"media_type": [r["media_type"] for r in records], |
|
|
"api_request": [r["api_request"] for r in records], |
|
|
"expected_response": [r["expected_response"] for r in records], |
|
|
"test_category": [r["test_category"] for r in records], |
|
|
"difficulty": [r["difficulty"] for r in records], |
|
|
|
|
|
"batch_size": [r.get("batch_size", 1) for r in records], |
|
|
"expected_max_latency_ms": [r.get("expected_max_latency_ms", 1000) for r in records], |
|
|
"model_type": [r.get("model_type", "microsoft/resnet-18") for r in records], |
|
|
"comparison_group": [r.get("comparison_group", 0) for r in records] |
|
|
}) |
|
|
|
|
|
output_path = self.output_dir / f"{dataset['name']}.parquet" |
|
|
pq.write_table(table, output_path) |
|
|
|
|
|
|
|
|
metadata = { |
|
|
"name": dataset["name"], |
|
|
"category": dataset["category"], |
|
|
"description": dataset["description"], |
|
|
"record_count": len(records), |
|
|
"file_size_mb": round(output_path.stat().st_size / (1024 * 1024), 2), |
|
|
"schema": [field.name for field in table.schema] |
|
|
} |
|
|
|
|
|
metadata_path = self.output_dir / f"{dataset['name']}_metadata.json" |
|
|
with open(metadata_path, 'w') as f: |
|
|
json.dump(metadata, f, indent=2) |
|
|
|
|
|
def generate_all_datasets(self): |
|
|
"""Generate all 100 datasets.""" |
|
|
print(" Starting dataset generation...") |
|
|
|
|
|
print("π Generating standard test datasets (25)...") |
|
|
standard_datasets = self.generate_standard_datasets(25) |
|
|
for dataset in standard_datasets: |
|
|
self.save_dataset_to_parquet(dataset) |
|
|
|
|
|
print("β‘ Generating edge case datasets (25)...") |
|
|
edge_datasets = self.generate_edge_case_datasets(25) |
|
|
for dataset in edge_datasets: |
|
|
self.save_dataset_to_parquet(dataset) |
|
|
|
|
|
print("π Generating performance datasets (25)...") |
|
|
performance_datasets = self.generate_performance_datasets(25) |
|
|
for dataset in performance_datasets: |
|
|
self.save_dataset_to_parquet(dataset) |
|
|
|
|
|
print("π Generating model comparison datasets (25)...") |
|
|
comparison_datasets = self.generate_model_comparison_datasets(25) |
|
|
for dataset in comparison_datasets: |
|
|
self.save_dataset_to_parquet(dataset) |
|
|
|
|
|
print(f"β
Generated 100 datasets in {self.output_dir}/") |
|
|
|
|
|
self.generate_summary() |
|
|
|
|
|
def generate_summary(self): |
|
|
"""Generate a summary of all datasets.""" |
|
|
summary = { |
|
|
"total_datasets": 100, |
|
|
"categories": { |
|
|
"standard": 25, |
|
|
"edge_case": 25, |
|
|
"performance": 25, |
|
|
"model_comparison": 25 |
|
|
}, |
|
|
"dataset_info": [], |
|
|
"usage_instructions": { |
|
|
"loading": "Use pyarrow.parquet.read_table('dataset.parquet')", |
|
|
"testing": "Run python scripts/test_datasets.py", |
|
|
"api_endpoint": "POST /predict/resnet", |
|
|
"request_format": "See api_request column in datasets" |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
for parquet_file in self.output_dir.glob("*.parquet"): |
|
|
metadata_file = self.output_dir / f"{parquet_file.stem}_metadata.json" |
|
|
if metadata_file.exists(): |
|
|
with open(metadata_file, 'r') as f: |
|
|
metadata = json.load(f) |
|
|
summary["dataset_info"].append(metadata) |
|
|
|
|
|
summary_path = self.output_dir / "datasets_summary.json" |
|
|
with open(summary_path, 'w') as f: |
|
|
json.dump(summary, f, indent=2) |
|
|
|
|
|
print(f"π Summary saved to {summary_path}") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
generator = TestDatasetGenerator() |
|
|
generator.generate_all_datasets() |