|
|
|
|
|
""" |
|
|
Dataset Tester for ML Inference Service |
|
|
|
|
|
Tests the generated PyArrow datasets against the running ML inference service. |
|
|
Validates API requests/responses and measures performance metrics. |
|
|
""" |
|
|
|
|
|
import json |
|
|
import time |
|
|
import asyncio |
|
|
import statistics |
|
|
from pathlib import Path |
|
|
from typing import Dict, List, Any, Optional |
|
|
import argparse |
|
|
|
|
|
import pyarrow.parquet as pq |
|
|
import requests |
|
|
import pandas as pd |
|
|
|
|
|
|
|
|
class DatasetTester: |
|
|
def __init__(self, base_url: str = "http://127.0.0.1:8000", datasets_dir: str = "test_datasets"): |
|
|
self.base_url = base_url.rstrip('/') |
|
|
self.datasets_dir = Path(datasets_dir) |
|
|
self.endpoint = f"{self.base_url}/predict" |
|
|
self.results = [] |
|
|
|
|
|
def load_dataset(self, dataset_path: Path) -> pd.DataFrame: |
|
|
"""Load a PyArrow dataset.""" |
|
|
table = pq.read_table(dataset_path) |
|
|
return table.to_pandas() |
|
|
|
|
|
def test_api_connection(self) -> bool: |
|
|
"""Test if the API is running and accessible.""" |
|
|
try: |
|
|
response = requests.get(f"{self.base_url}/docs", timeout=5) |
|
|
return response.status_code == 200 |
|
|
except requests.RequestException: |
|
|
return False |
|
|
|
|
|
def send_prediction_request(self, api_request_json: str) -> Dict[str, Any]: |
|
|
"""Send a single prediction request to the API.""" |
|
|
try: |
|
|
request_data = json.loads(api_request_json) |
|
|
start_time = time.time() |
|
|
|
|
|
response = requests.post( |
|
|
self.endpoint, |
|
|
json=request_data, |
|
|
headers={"Content-Type": "application/json"}, |
|
|
timeout=30 |
|
|
) |
|
|
|
|
|
end_time = time.time() |
|
|
latency_ms = (end_time - start_time) * 1000 |
|
|
|
|
|
return { |
|
|
"success": response.status_code == 200, |
|
|
"status_code": response.status_code, |
|
|
"response": response.json() if response.status_code == 200 else response.text, |
|
|
"latency_ms": round(latency_ms, 2), |
|
|
"error": None |
|
|
} |
|
|
|
|
|
except requests.RequestException as e: |
|
|
return { |
|
|
"success": False, |
|
|
"status_code": None, |
|
|
"response": None, |
|
|
"latency_ms": None, |
|
|
"error": str(e) |
|
|
} |
|
|
except json.JSONDecodeError as e: |
|
|
return { |
|
|
"success": False, |
|
|
"status_code": None, |
|
|
"response": None, |
|
|
"latency_ms": None, |
|
|
"error": f"JSON decode error: {str(e)}" |
|
|
} |
|
|
|
|
|
def validate_response(self, actual_response: Dict[str, Any], |
|
|
expected_response_json: str) -> Dict[str, Any]: |
|
|
"""Validate API response against expected response.""" |
|
|
try: |
|
|
expected = json.loads(expected_response_json) |
|
|
|
|
|
validation = { |
|
|
"structure_valid": True, |
|
|
"field_errors": [] |
|
|
} |
|
|
|
|
|
|
|
|
required_fields = ["prediction", "confidence", "predicted_label", "model", "mediaType"] |
|
|
for field in required_fields: |
|
|
if field not in actual_response: |
|
|
validation["structure_valid"] = False |
|
|
validation["field_errors"].append(f"Missing field: {field}") |
|
|
|
|
|
|
|
|
if "confidence" in actual_response: |
|
|
if not isinstance(actual_response["confidence"], (int, float)): |
|
|
validation["field_errors"].append("confidence must be numeric") |
|
|
elif not (0 <= actual_response["confidence"] <= 1): |
|
|
validation["field_errors"].append("confidence must be between 0 and 1") |
|
|
|
|
|
if "predicted_label" in actual_response: |
|
|
if not isinstance(actual_response["predicted_label"], int): |
|
|
validation["field_errors"].append("predicted_label must be integer") |
|
|
|
|
|
return validation |
|
|
|
|
|
except json.JSONDecodeError: |
|
|
return { |
|
|
"structure_valid": False, |
|
|
"field_errors": ["Invalid expected response JSON"] |
|
|
} |
|
|
|
|
|
def test_dataset(self, dataset_path: Path, max_samples: Optional[int] = None) -> Dict[str, Any]: |
|
|
"""Test a single dataset.""" |
|
|
print(f"๐ Testing dataset: {dataset_path.name}") |
|
|
|
|
|
try: |
|
|
df = self.load_dataset(dataset_path) |
|
|
if max_samples: |
|
|
df = df.head(max_samples) |
|
|
|
|
|
results = { |
|
|
"dataset_name": dataset_path.stem, |
|
|
"total_samples": len(df), |
|
|
"tested_samples": 0, |
|
|
"successful_requests": 0, |
|
|
"failed_requests": 0, |
|
|
"validation_errors": 0, |
|
|
"latencies_ms": [], |
|
|
"errors": [], |
|
|
"category": df['test_category'].iloc[0] if not df.empty else "unknown" |
|
|
} |
|
|
|
|
|
for idx, row in df.iterrows(): |
|
|
print(f" Testing sample {idx + 1}/{len(df)}", end="\r") |
|
|
|
|
|
|
|
|
api_result = self.send_prediction_request(row['api_request']) |
|
|
results["tested_samples"] += 1 |
|
|
|
|
|
if api_result["success"]: |
|
|
results["successful_requests"] += 1 |
|
|
results["latencies_ms"].append(api_result["latency_ms"]) |
|
|
|
|
|
|
|
|
validation = self.validate_response( |
|
|
api_result["response"], |
|
|
row['expected_response'] |
|
|
) |
|
|
|
|
|
if not validation["structure_valid"]: |
|
|
results["validation_errors"] += 1 |
|
|
results["errors"].append({ |
|
|
"sample_id": row['image_id'], |
|
|
"type": "validation_error", |
|
|
"details": validation["field_errors"] |
|
|
}) |
|
|
|
|
|
else: |
|
|
results["failed_requests"] += 1 |
|
|
results["errors"].append({ |
|
|
"sample_id": row['image_id'], |
|
|
"type": "request_failed", |
|
|
"status_code": api_result["status_code"], |
|
|
"error": api_result["error"] |
|
|
}) |
|
|
|
|
|
|
|
|
if results["latencies_ms"]: |
|
|
results["avg_latency_ms"] = round(statistics.mean(results["latencies_ms"]), 2) |
|
|
results["min_latency_ms"] = round(min(results["latencies_ms"]), 2) |
|
|
results["max_latency_ms"] = round(max(results["latencies_ms"]), 2) |
|
|
results["median_latency_ms"] = round(statistics.median(results["latencies_ms"]), 2) |
|
|
else: |
|
|
results.update({ |
|
|
"avg_latency_ms": None, |
|
|
"min_latency_ms": None, |
|
|
"max_latency_ms": None, |
|
|
"median_latency_ms": None |
|
|
}) |
|
|
|
|
|
results["success_rate"] = round( |
|
|
results["successful_requests"] / results["tested_samples"] * 100, 2 |
|
|
) if results["tested_samples"] > 0 else 0 |
|
|
|
|
|
print(f"\n โ
Completed: {results['success_rate']}% success rate") |
|
|
return results |
|
|
|
|
|
except Exception as e: |
|
|
print(f"\n โ Failed to test dataset: {str(e)}") |
|
|
return { |
|
|
"dataset_name": dataset_path.stem, |
|
|
"error": str(e), |
|
|
"success_rate": 0 |
|
|
} |
|
|
|
|
|
def test_all_datasets(self, max_samples_per_dataset: Optional[int] = None, |
|
|
category_filter: Optional[str] = None) -> Dict[str, Any]: |
|
|
"""Test all datasets or filtered by category.""" |
|
|
if not self.test_api_connection(): |
|
|
print("โ API is not accessible. Please start the service first:") |
|
|
print(" uvicorn main:app --reload") |
|
|
return {"error": "API not accessible"} |
|
|
|
|
|
print(f" Starting dataset testing against {self.endpoint}") |
|
|
|
|
|
parquet_files = list(self.datasets_dir.glob("*.parquet")) |
|
|
if not parquet_files: |
|
|
print(f"โ No datasets found in {self.datasets_dir}") |
|
|
return {"error": "No datasets found"} |
|
|
|
|
|
if category_filter: |
|
|
parquet_files = [f for f in parquet_files if category_filter in f.name] |
|
|
|
|
|
print(f" Found {len(parquet_files)} datasets to test") |
|
|
|
|
|
all_results = [] |
|
|
start_time = time.time() |
|
|
|
|
|
for dataset_file in parquet_files: |
|
|
result = self.test_dataset(dataset_file, max_samples_per_dataset) |
|
|
all_results.append(result) |
|
|
|
|
|
end_time = time.time() |
|
|
total_time = end_time - start_time |
|
|
|
|
|
summary = self.generate_summary(all_results, total_time) |
|
|
|
|
|
self.save_results(summary, all_results) |
|
|
|
|
|
return summary |
|
|
|
|
|
def generate_summary(self, results: List[Dict[str, Any]], total_time: float) -> Dict[str, Any]: |
|
|
"""Generate summary of all test results.""" |
|
|
successful_datasets = [r for r in results if r.get("success_rate", 0) > 0] |
|
|
failed_datasets = [r for r in results if r.get("error") or r.get("success_rate", 0) == 0] |
|
|
|
|
|
total_samples = sum(r.get("tested_samples", 0) for r in results) |
|
|
total_successful = sum(r.get("successful_requests", 0) for r in results) |
|
|
total_failed = sum(r.get("failed_requests", 0) for r in results) |
|
|
|
|
|
all_latencies = [] |
|
|
for r in results: |
|
|
all_latencies.extend(r.get("latencies_ms", [])) |
|
|
|
|
|
summary = { |
|
|
"test_summary": { |
|
|
"total_datasets": len(results), |
|
|
"successful_datasets": len(successful_datasets), |
|
|
"failed_datasets": len(failed_datasets), |
|
|
"total_samples_tested": total_samples, |
|
|
"total_successful_requests": total_successful, |
|
|
"total_failed_requests": total_failed, |
|
|
"overall_success_rate": round( |
|
|
total_successful / total_samples * 100, 2 |
|
|
) if total_samples > 0 else 0, |
|
|
"total_test_time_seconds": round(total_time, 2) |
|
|
}, |
|
|
"performance_metrics": { |
|
|
"avg_latency_ms": round(statistics.mean(all_latencies), 2) if all_latencies else None, |
|
|
"median_latency_ms": round(statistics.median(all_latencies), 2) if all_latencies else None, |
|
|
"min_latency_ms": round(min(all_latencies), 2) if all_latencies else None, |
|
|
"max_latency_ms": round(max(all_latencies), 2) if all_latencies else None, |
|
|
"requests_per_second": round( |
|
|
total_successful / total_time, 2 |
|
|
) if total_time > 0 else 0 |
|
|
}, |
|
|
"category_breakdown": {}, |
|
|
"failed_datasets": [r["dataset_name"] for r in failed_datasets] |
|
|
} |
|
|
|
|
|
categories = {} |
|
|
for result in results: |
|
|
category = result.get("category", "unknown") |
|
|
if category not in categories: |
|
|
categories[category] = { |
|
|
"count": 0, |
|
|
"success_rates": [], |
|
|
"avg_success_rate": 0 |
|
|
} |
|
|
categories[category]["count"] += 1 |
|
|
categories[category]["success_rates"].append(result.get("success_rate", 0)) |
|
|
|
|
|
for category, data in categories.items(): |
|
|
data["avg_success_rate"] = round( |
|
|
statistics.mean(data["success_rates"]), 2 |
|
|
) if data["success_rates"] else 0 |
|
|
|
|
|
summary["category_breakdown"] = categories |
|
|
|
|
|
return summary |
|
|
|
|
|
def save_results(self, summary: Dict[str, Any], detailed_results: List[Dict[str, Any]]): |
|
|
"""Save test results to files.""" |
|
|
results_dir = Path("test_results") |
|
|
results_dir.mkdir(exist_ok=True) |
|
|
|
|
|
timestamp = int(time.time()) |
|
|
|
|
|
|
|
|
summary_path = results_dir / f"test_summary_{timestamp}.json" |
|
|
with open(summary_path, 'w') as f: |
|
|
json.dump(summary, f, indent=2) |
|
|
|
|
|
|
|
|
detailed_path = results_dir / f"test_detailed_{timestamp}.json" |
|
|
with open(detailed_path, 'w') as f: |
|
|
json.dump(detailed_results, f, indent=2) |
|
|
|
|
|
print(f" Results saved:") |
|
|
print(f" Summary: {summary_path}") |
|
|
print(f" Details: {detailed_path}") |
|
|
|
|
|
def print_summary(self, summary: Dict[str, Any]): |
|
|
"""Print test summary to console.""" |
|
|
print("\n" + "="*60) |
|
|
print("๐ DATASET TESTING SUMMARY") |
|
|
print("="*60) |
|
|
|
|
|
ts = summary["test_summary"] |
|
|
print(f"Datasets tested: {ts['total_datasets']}") |
|
|
print(f"Successful datasets: {ts['successful_datasets']}") |
|
|
print(f"Failed datasets: {ts['failed_datasets']}") |
|
|
print(f"Total samples: {ts['total_samples_tested']}") |
|
|
print(f"Overall success rate: {ts['overall_success_rate']}%") |
|
|
print(f"Test duration: {ts['total_test_time_seconds']}s") |
|
|
|
|
|
pm = summary["performance_metrics"] |
|
|
if pm["avg_latency_ms"]: |
|
|
print(f"\nPerformance:") |
|
|
print(f" Avg latency: {pm['avg_latency_ms']}ms") |
|
|
print(f" Median latency: {pm['median_latency_ms']}ms") |
|
|
print(f" Min latency: {pm['min_latency_ms']}ms") |
|
|
print(f" Max latency: {pm['max_latency_ms']}ms") |
|
|
print(f" Requests/sec: {pm['requests_per_second']}") |
|
|
|
|
|
print(f"\nCategory breakdown:") |
|
|
for category, data in summary["category_breakdown"].items(): |
|
|
print(f" {category}: {data['count']} datasets, {data['avg_success_rate']}% avg success") |
|
|
|
|
|
if summary["failed_datasets"]: |
|
|
print(f"\nFailed datasets: {', '.join(summary['failed_datasets'])}") |
|
|
|
|
|
|
|
|
def main(): |
|
|
parser = argparse.ArgumentParser(description="Test PyArrow datasets against ML inference service") |
|
|
parser.add_argument("--base-url", default="http://127.0.0.1:8000", help="Base URL of the API") |
|
|
parser.add_argument("--datasets-dir", default="test_datasets", help="Directory containing datasets") |
|
|
parser.add_argument("--max-samples", type=int, help="Max samples per dataset to test") |
|
|
parser.add_argument("--category", help="Filter datasets by category (standard, edge_case, performance, model_comparison)") |
|
|
parser.add_argument("--quick", action="store_true", help="Quick test with max 5 samples per dataset") |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
tester = DatasetTester(args.base_url, args.datasets_dir) |
|
|
|
|
|
max_samples = args.max_samples |
|
|
if args.quick: |
|
|
max_samples = 5 |
|
|
|
|
|
results = tester.test_all_datasets(max_samples, args.category) |
|
|
|
|
|
if "error" not in results: |
|
|
tester.print_summary(results) |
|
|
|
|
|
if results["test_summary"]["overall_success_rate"] > 90: |
|
|
print("\n๐ Excellent! API is working great with the datasets!") |
|
|
elif results["test_summary"]["overall_success_rate"] > 70: |
|
|
print("\n๐ Good! API works well, minor issues detected.") |
|
|
else: |
|
|
print("\nโ ๏ธ Warning: Several issues detected. Check the detailed results.") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |