| """
|
| Handles parallel distribution of numpy arrays across tensor cores and SMs.
|
| Uses direct hardware simulation at electron speed without Python threading limitations.
|
| """
|
| import numpy as np
|
| from typing import List, Tuple, Optional, Dict, Any
|
| import threading
|
| import time
|
| import logging
|
| from http_storage import LocalStorage
|
| from config import get_db_url
|
|
|
|
|
| logging.basicConfig(
|
| level=logging.INFO,
|
| format='%(asctime)s - %(levelname)s - %(message)s'
|
| )
|
|
|
| class ParallelArrayDistributor:
|
| def __init__(self, num_sms: int, cores_per_sm: int):
|
| self.num_sms = num_sms
|
| self.cores_per_sm = cores_per_sm
|
| self.total_cores = num_sms * cores_per_sm
|
|
|
|
|
| max_retries = 3
|
| retry_delay = 1.0
|
|
|
| for attempt in range(max_retries):
|
| try:
|
| self.storage = LocalStorage(db_url=get_db_url())
|
| if not self.storage.wait_for_connection(timeout=10):
|
| raise RuntimeError("Storage connection timeout")
|
| logging.info("Successfully connected to storage backend")
|
| break
|
| except Exception as e:
|
| if attempt == max_retries - 1:
|
| raise RuntimeError(f"Failed to initialize storage after {max_retries} attempts: {str(e)}")
|
| logging.warning(f"Storage initialization attempt {attempt + 1} failed: {str(e)}")
|
| time.sleep(retry_delay)
|
|
|
| def split_array(self, arr: np.ndarray) -> List[np.ndarray]:
|
| """Split array into chunks for parallel processing"""
|
| if arr.ndim == 1:
|
| return np.array_split(arr, self.total_cores)
|
| elif arr.ndim == 2:
|
|
|
| rows, cols = arr.shape
|
| splits_per_dim = int(np.sqrt(self.total_cores))
|
| row_chunks = np.array_split(arr, splits_per_dim, axis=0)
|
| return [np.array_split(chunk, splits_per_dim, axis=1) for chunk in row_chunks]
|
| else:
|
|
|
| return np.array_split(arr, self.total_cores, axis=0)
|
|
|
| def distribute_to_cores(self, chunks: List[np.ndarray]) -> Dict[Tuple[int, int], np.ndarray]:
|
| """Distribute chunks to specific SM and core combinations"""
|
| distribution = {}
|
| chunk_idx = 0
|
| for sm_id in range(self.num_sms):
|
| for core_id in range(self.cores_per_sm):
|
| if chunk_idx < len(chunks):
|
| distribution[(sm_id, core_id)] = chunks[chunk_idx]
|
| chunk_idx += 1
|
| return distribution
|
|
|
| def get_processing_status(self, array_id: int) -> Dict[str, Any]:
|
| """Get status of parallel processing for a specific array"""
|
| try:
|
|
|
| chunks = self.storage.query_tensors(
|
| metadata_filter={'array_id': array_id}
|
| )
|
|
|
| total_chunks = len(chunks) if chunks else 0
|
| processed_chunks = len([c for c in chunks if c.get('metadata', {}).get('processed', False)]) if chunks else 0
|
|
|
| return {
|
| 'array_id': array_id,
|
| 'total_chunks': total_chunks,
|
| 'processed_chunks': processed_chunks,
|
| 'completion_percentage': (processed_chunks / total_chunks * 100) if total_chunks > 0 else 0,
|
| 'timestamp': time.time()
|
| }
|
| except Exception as e:
|
| return {
|
| 'array_id': array_id,
|
| 'error': str(e),
|
| 'timestamp': time.time()
|
| }
|
|
|
| def parallel_process(self, arr: np.ndarray, operation_func) -> np.ndarray:
|
| """Process array in parallel across all cores at electron speed"""
|
|
|
| chunks = self.split_array(arr)
|
|
|
|
|
| distribution = self.distribute_to_cores(chunks)
|
|
|
|
|
| results = {}
|
| chunk_ids = {}
|
|
|
|
|
| for (sm_id, core_id), chunk in distribution.items():
|
|
|
| chunk_id = f"chunk_{sm_id}_{core_id}_{time.time_ns()}"
|
| chunk_ids[(sm_id, core_id)] = chunk_id
|
|
|
|
|
| metadata = {
|
| 'sm_id': sm_id,
|
| 'core_id': core_id,
|
| 'shape': chunk.shape,
|
| 'dtype': str(chunk.dtype),
|
| 'array_id': id(arr),
|
| 'total_cores': self.total_cores,
|
| 'timestamp': time.time()
|
| }
|
| self.storage.store_tensor(chunk_id, chunk, metadata)
|
|
|
|
|
| for (sm_id, core_id), chunk_id in chunk_ids.items():
|
|
|
| chunk_data = self.storage.load_tensor(chunk_id)
|
| if chunk_data is not None:
|
| chunk, _ = chunk_data
|
|
|
| result = operation_func(chunk, sm_id, core_id)
|
| results[(sm_id, core_id)] = result
|
|
|
|
|
| result_id = f"result_{chunk_id}"
|
| self.storage.store_tensor(result_id, result, {
|
| 'parent_chunk': chunk_id,
|
| 'sm_id': sm_id,
|
| 'core_id': core_id,
|
| 'timestamp': time.time()
|
| })
|
|
|
|
|
| ordered_results = []
|
| for sm_id in range(self.num_sms):
|
| for core_id in range(self.cores_per_sm):
|
| if (sm_id, core_id) in results:
|
| ordered_results.append(results[(sm_id, core_id)])
|
|
|
|
|
| if arr.ndim == 1:
|
| return np.concatenate(ordered_results)
|
| elif arr.ndim == 2:
|
| rows = []
|
| splits_per_dim = int(np.sqrt(self.total_cores))
|
| for i in range(0, len(ordered_results), splits_per_dim):
|
| rows.append(np.concatenate(ordered_results[i:i+splits_per_dim], axis=1))
|
| return np.concatenate(rows, axis=0)
|
| else:
|
| return np.concatenate(ordered_results, axis=0)
|
|
|