| |
| """ |
| Dataset Tester for ML Inference Service |
| |
| Tests the generated PyArrow datasets against the running ML inference service. |
| Validates API requests/responses and measures performance metrics. |
| """ |
|
|
| import json |
| import time |
| import asyncio |
| import statistics |
| from pathlib import Path |
| from typing import Dict, List, Any, Optional |
| import argparse |
|
|
| import pyarrow.parquet as pq |
| import requests |
| import pandas as pd |
|
|
|
|
| class DatasetTester: |
| def __init__(self, base_url: str = "http://127.0.0.1:8000", datasets_dir: str = "test_datasets"): |
| self.base_url = base_url.rstrip('/') |
| self.datasets_dir = Path(datasets_dir) |
| self.endpoint = f"{self.base_url}/predict" |
| self.results = [] |
|
|
| def load_dataset(self, dataset_path: Path) -> pd.DataFrame: |
| """Load a PyArrow dataset.""" |
| table = pq.read_table(dataset_path) |
| return table.to_pandas() |
|
|
| def test_api_connection(self) -> bool: |
| """Test if the API is running and accessible.""" |
| try: |
| response = requests.get(f"{self.base_url}/docs", timeout=5) |
| return response.status_code == 200 |
| except requests.RequestException: |
| return False |
|
|
| def send_prediction_request(self, api_request_json: str) -> Dict[str, Any]: |
| """Send a single prediction request to the API.""" |
| try: |
| request_data = json.loads(api_request_json) |
| start_time = time.time() |
|
|
| response = requests.post( |
| self.endpoint, |
| json=request_data, |
| headers={"Content-Type": "application/json"}, |
| timeout=30 |
| ) |
|
|
| end_time = time.time() |
| latency_ms = (end_time - start_time) * 1000 |
|
|
| return { |
| "success": response.status_code == 200, |
| "status_code": response.status_code, |
| "response": response.json() if response.status_code == 200 else response.text, |
| "latency_ms": round(latency_ms, 2), |
| "error": None |
| } |
|
|
| except requests.RequestException as e: |
| return { |
| "success": False, |
| "status_code": None, |
| "response": None, |
| "latency_ms": None, |
| "error": str(e) |
| } |
| except json.JSONDecodeError as e: |
| return { |
| "success": False, |
| "status_code": None, |
| "response": None, |
| "latency_ms": None, |
| "error": f"JSON decode error: {str(e)}" |
| } |
|
|
| def validate_response(self, actual_response: Dict[str, Any], |
| expected_response_json: str) -> Dict[str, Any]: |
| """Validate API response against expected response.""" |
| try: |
| expected = json.loads(expected_response_json) |
|
|
| validation = { |
| "structure_valid": True, |
| "field_errors": [] |
| } |
|
|
| |
| required_fields = ["prediction", "confidence", "predicted_label", "model", "mediaType"] |
| for field in required_fields: |
| if field not in actual_response: |
| validation["structure_valid"] = False |
| validation["field_errors"].append(f"Missing field: {field}") |
|
|
| |
| if "confidence" in actual_response: |
| if not isinstance(actual_response["confidence"], (int, float)): |
| validation["field_errors"].append("confidence must be numeric") |
| elif not (0 <= actual_response["confidence"] <= 1): |
| validation["field_errors"].append("confidence must be between 0 and 1") |
|
|
| if "predicted_label" in actual_response: |
| if not isinstance(actual_response["predicted_label"], int): |
| validation["field_errors"].append("predicted_label must be integer") |
|
|
| return validation |
|
|
| except json.JSONDecodeError: |
| return { |
| "structure_valid": False, |
| "field_errors": ["Invalid expected response JSON"] |
| } |
|
|
| def test_dataset(self, dataset_path: Path, max_samples: Optional[int] = None) -> Dict[str, Any]: |
| """Test a single dataset.""" |
| print(f"📊 Testing dataset: {dataset_path.name}") |
|
|
| try: |
| df = self.load_dataset(dataset_path) |
| if max_samples: |
| df = df.head(max_samples) |
|
|
| results = { |
| "dataset_name": dataset_path.stem, |
| "total_samples": len(df), |
| "tested_samples": 0, |
| "successful_requests": 0, |
| "failed_requests": 0, |
| "validation_errors": 0, |
| "latencies_ms": [], |
| "errors": [], |
| "category": df['test_category'].iloc[0] if not df.empty else "unknown" |
| } |
|
|
| for idx, row in df.iterrows(): |
| print(f" Testing sample {idx + 1}/{len(df)}", end="\r") |
|
|
| |
| api_result = self.send_prediction_request(row['api_request']) |
| results["tested_samples"] += 1 |
|
|
| if api_result["success"]: |
| results["successful_requests"] += 1 |
| results["latencies_ms"].append(api_result["latency_ms"]) |
|
|
| |
| validation = self.validate_response( |
| api_result["response"], |
| row['expected_response'] |
| ) |
|
|
| if not validation["structure_valid"]: |
| results["validation_errors"] += 1 |
| results["errors"].append({ |
| "sample_id": row['image_id'], |
| "type": "validation_error", |
| "details": validation["field_errors"] |
| }) |
|
|
| else: |
| results["failed_requests"] += 1 |
| results["errors"].append({ |
| "sample_id": row['image_id'], |
| "type": "request_failed", |
| "status_code": api_result["status_code"], |
| "error": api_result["error"] |
| }) |
|
|
| |
| if results["latencies_ms"]: |
| results["avg_latency_ms"] = round(statistics.mean(results["latencies_ms"]), 2) |
| results["min_latency_ms"] = round(min(results["latencies_ms"]), 2) |
| results["max_latency_ms"] = round(max(results["latencies_ms"]), 2) |
| results["median_latency_ms"] = round(statistics.median(results["latencies_ms"]), 2) |
| else: |
| results.update({ |
| "avg_latency_ms": None, |
| "min_latency_ms": None, |
| "max_latency_ms": None, |
| "median_latency_ms": None |
| }) |
|
|
| results["success_rate"] = round( |
| results["successful_requests"] / results["tested_samples"] * 100, 2 |
| ) if results["tested_samples"] > 0 else 0 |
|
|
| print(f"\n ✅ Completed: {results['success_rate']}% success rate") |
| return results |
|
|
| except Exception as e: |
| print(f"\n ❌ Failed to test dataset: {str(e)}") |
| return { |
| "dataset_name": dataset_path.stem, |
| "error": str(e), |
| "success_rate": 0 |
| } |
|
|
| def test_all_datasets(self, max_samples_per_dataset: Optional[int] = None, |
| category_filter: Optional[str] = None) -> Dict[str, Any]: |
| """Test all datasets or filtered by category.""" |
| if not self.test_api_connection(): |
| print("❌ API is not accessible. Please start the service first:") |
| print(" uvicorn main:app --reload") |
| return {"error": "API not accessible"} |
|
|
| print(f" Starting dataset testing against {self.endpoint}") |
|
|
| parquet_files = list(self.datasets_dir.glob("*.parquet")) |
| if not parquet_files: |
| print(f"❌ No datasets found in {self.datasets_dir}") |
| return {"error": "No datasets found"} |
|
|
| if category_filter: |
| parquet_files = [f for f in parquet_files if category_filter in f.name] |
|
|
| print(f" Found {len(parquet_files)} datasets to test") |
|
|
| all_results = [] |
| start_time = time.time() |
|
|
| for dataset_file in parquet_files: |
| result = self.test_dataset(dataset_file, max_samples_per_dataset) |
| all_results.append(result) |
|
|
| end_time = time.time() |
| total_time = end_time - start_time |
|
|
| summary = self.generate_summary(all_results, total_time) |
|
|
| self.save_results(summary, all_results) |
|
|
| return summary |
|
|
| def generate_summary(self, results: List[Dict[str, Any]], total_time: float) -> Dict[str, Any]: |
| """Generate summary of all test results.""" |
| successful_datasets = [r for r in results if r.get("success_rate", 0) > 0] |
| failed_datasets = [r for r in results if r.get("error") or r.get("success_rate", 0) == 0] |
|
|
| total_samples = sum(r.get("tested_samples", 0) for r in results) |
| total_successful = sum(r.get("successful_requests", 0) for r in results) |
| total_failed = sum(r.get("failed_requests", 0) for r in results) |
|
|
| all_latencies = [] |
| for r in results: |
| all_latencies.extend(r.get("latencies_ms", [])) |
|
|
| summary = { |
| "test_summary": { |
| "total_datasets": len(results), |
| "successful_datasets": len(successful_datasets), |
| "failed_datasets": len(failed_datasets), |
| "total_samples_tested": total_samples, |
| "total_successful_requests": total_successful, |
| "total_failed_requests": total_failed, |
| "overall_success_rate": round( |
| total_successful / total_samples * 100, 2 |
| ) if total_samples > 0 else 0, |
| "total_test_time_seconds": round(total_time, 2) |
| }, |
| "performance_metrics": { |
| "avg_latency_ms": round(statistics.mean(all_latencies), 2) if all_latencies else None, |
| "median_latency_ms": round(statistics.median(all_latencies), 2) if all_latencies else None, |
| "min_latency_ms": round(min(all_latencies), 2) if all_latencies else None, |
| "max_latency_ms": round(max(all_latencies), 2) if all_latencies else None, |
| "requests_per_second": round( |
| total_successful / total_time, 2 |
| ) if total_time > 0 else 0 |
| }, |
| "category_breakdown": {}, |
| "failed_datasets": [r["dataset_name"] for r in failed_datasets] |
| } |
|
|
| categories = {} |
| for result in results: |
| category = result.get("category", "unknown") |
| if category not in categories: |
| categories[category] = { |
| "count": 0, |
| "success_rates": [], |
| "avg_success_rate": 0 |
| } |
| categories[category]["count"] += 1 |
| categories[category]["success_rates"].append(result.get("success_rate", 0)) |
|
|
| for category, data in categories.items(): |
| data["avg_success_rate"] = round( |
| statistics.mean(data["success_rates"]), 2 |
| ) if data["success_rates"] else 0 |
|
|
| summary["category_breakdown"] = categories |
|
|
| return summary |
|
|
| def save_results(self, summary: Dict[str, Any], detailed_results: List[Dict[str, Any]]): |
| """Save test results to files.""" |
| results_dir = Path("test_results") |
| results_dir.mkdir(exist_ok=True) |
|
|
| timestamp = int(time.time()) |
|
|
| |
| summary_path = results_dir / f"test_summary_{timestamp}.json" |
| with open(summary_path, 'w') as f: |
| json.dump(summary, f, indent=2) |
|
|
| |
| detailed_path = results_dir / f"test_detailed_{timestamp}.json" |
| with open(detailed_path, 'w') as f: |
| json.dump(detailed_results, f, indent=2) |
|
|
| print(f" Results saved:") |
| print(f" Summary: {summary_path}") |
| print(f" Details: {detailed_path}") |
|
|
| def print_summary(self, summary: Dict[str, Any]): |
| """Print test summary to console.""" |
| print("\n" + "="*60) |
| print("🏁 DATASET TESTING SUMMARY") |
| print("="*60) |
|
|
| ts = summary["test_summary"] |
| print(f"Datasets tested: {ts['total_datasets']}") |
| print(f"Successful datasets: {ts['successful_datasets']}") |
| print(f"Failed datasets: {ts['failed_datasets']}") |
| print(f"Total samples: {ts['total_samples_tested']}") |
| print(f"Overall success rate: {ts['overall_success_rate']}%") |
| print(f"Test duration: {ts['total_test_time_seconds']}s") |
|
|
| pm = summary["performance_metrics"] |
| if pm["avg_latency_ms"]: |
| print(f"\nPerformance:") |
| print(f" Avg latency: {pm['avg_latency_ms']}ms") |
| print(f" Median latency: {pm['median_latency_ms']}ms") |
| print(f" Min latency: {pm['min_latency_ms']}ms") |
| print(f" Max latency: {pm['max_latency_ms']}ms") |
| print(f" Requests/sec: {pm['requests_per_second']}") |
|
|
| print(f"\nCategory breakdown:") |
| for category, data in summary["category_breakdown"].items(): |
| print(f" {category}: {data['count']} datasets, {data['avg_success_rate']}% avg success") |
|
|
| if summary["failed_datasets"]: |
| print(f"\nFailed datasets: {', '.join(summary['failed_datasets'])}") |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser(description="Test PyArrow datasets against ML inference service") |
| parser.add_argument("--base-url", default="http://127.0.0.1:8000", help="Base URL of the API") |
| parser.add_argument("--datasets-dir", default="test_datasets", help="Directory containing datasets") |
| parser.add_argument("--max-samples", type=int, help="Max samples per dataset to test") |
| parser.add_argument("--category", help="Filter datasets by category (standard, edge_case, performance, model_comparison)") |
| parser.add_argument("--quick", action="store_true", help="Quick test with max 5 samples per dataset") |
|
|
| args = parser.parse_args() |
|
|
| tester = DatasetTester(args.base_url, args.datasets_dir) |
|
|
| max_samples = args.max_samples |
| if args.quick: |
| max_samples = 5 |
|
|
| results = tester.test_all_datasets(max_samples, args.category) |
|
|
| if "error" not in results: |
| tester.print_summary(results) |
|
|
| if results["test_summary"]["overall_success_rate"] > 90: |
| print("\n🎉 Excellent! API is working great with the datasets!") |
| elif results["test_summary"]["overall_success_rate"] > 70: |
| print("\n👍 Good! API works well, minor issues detected.") |
| else: |
| print("\n⚠️ Warning: Several issues detected. Check the detailed results.") |
|
|
|
|
| if __name__ == "__main__": |
| main() |