| try:
|
| import numpy as np
|
| except ImportError:
|
| import pip
|
| pip.main(['install', 'numpy'])
|
| import numpy as np
|
|
|
| from typing import Dict, Any, List, Optional, Tuple
|
| import time
|
| import json
|
| import logging
|
| import duckdb
|
| from huggingface_hub import HfApi, HfFileSystem
|
| from .hal.hal import HardwareAbstractionLayer, HardwareType
|
| from .memory.duckdb_memory_manager import DuckDBMemoryManager
|
| from .commands.command_processor import CommandProcessor
|
| from .graphics.graphics_api import GraphicsAPI
|
| from .memory.memory_pool import MemoryPool, SharedMemoryPool
|
| from .graphics.texture_buffer_manager import (
|
| Texture, Buffer, Framebuffer, TextureFormat, FilterMode,
|
| WrapMode, BufferType, MSAASamples, AttachmentType
|
| )
|
| from .graphics.rasterizer import Rasterizer
|
|
|
| from tensor_core import TensorCore
|
| from config import get_db_url, get_hf_token_cached
|
| from multi_gpu_system_http import MultiGPUSystem
|
| from gpu_parallel_distributor import GPUParallelDistributor
|
| from parallel_array_distributor import ParallelArrayDistributor
|
| from streaming_multiprocessor import StreamingMultiprocessor
|
| from electron_speed import max_switch_freq, GATE_DELAY
|
| from logic_gates import LogicGate, NANDGate, ANDGate
|
| from .warp import Warp
|
|
|
|
|
| DEFAULT_NUM_GPUS = 8
|
| DEFAULT_SMS_PER_GPU = 1500
|
| DEFAULT_CORES_PER_SM = 128
|
| THREADS_PER_WARP = 32
|
|
|
|
|
| DEFAULT_SHARED_MEMORY_PER_SM = 32 * 1024
|
| DEFAULT_REGISTERS_PER_SM = 65536
|
| DEFAULT_MAX_WARPS_PER_SM = DEFAULT_CORES_PER_SM // THREADS_PER_WARP
|
|
|
|
|
| DEFAULT_LOCALITY_WEIGHT = 0.7
|
| DEFAULT_LOAD_WEIGHT = 0.3
|
|
|
| class GPUError(Exception):
|
| """Base class for GPU-related errors"""
|
| pass
|
|
|
| class MemoryError(GPUError):
|
| """Memory allocation or access error"""
|
| pass
|
|
|
| class StreamError(GPUError):
|
| """Stream operation error"""
|
| pass
|
|
|
| class TensorError(GPUError):
|
| """Tensor operation error"""
|
| pass
|
|
|
| class VirtualGPUDriver:
|
| DB_URL = "hf://datasets/Fred808/helium/storage.json"
|
|
|
| def __init__(self, num_gpus: int = DEFAULT_NUM_GPUS,
|
| num_sms_per_gpu: int = DEFAULT_SMS_PER_GPU,
|
| cores_per_sm: int = DEFAULT_CORES_PER_SM,
|
| db_url: Optional[str] = None):
|
| """Initialize GPU driver with configurable architecture parameters"""
|
|
|
| self.num_gpus = num_gpus
|
| self.num_sms_per_gpu = num_sms_per_gpu
|
| self.cores_per_sm = cores_per_sm
|
| self.threads_per_warp = THREADS_PER_WARP
|
|
|
|
|
| self.db_url = db_url or self.DB_URL
|
| self.max_retries = 3
|
| self._connect_with_retries()
|
|
|
|
|
| self.hal = HardwareAbstractionLayer(db_url=self.db_url)
|
| self.multi_gpu_system = MultiGPUSystem(num_gpus=self.num_gpus, db_url=self.db_url)
|
| self.parallel_distributor = GPUParallelDistributor(num_gpus=self.num_gpus)
|
|
|
|
|
| self.streaming_multiprocessors = {}
|
|
|
|
|
| self._setup_database()
|
|
|
|
|
| for chip_id in range(num_gpus):
|
| self.streaming_multiprocessors[chip_id] = {}
|
|
|
|
|
| self._initialize_graphics_subsystem()
|
|
|
| def _connect_with_retries(self):
|
| """Establish database connection with retry logic"""
|
| for attempt in range(self.max_retries):
|
| try:
|
| self.conn = self._init_db_connection()
|
| return
|
| except Exception as e:
|
| if attempt == self.max_retries - 1:
|
| raise RuntimeError(f"Failed to initialize database after {self.max_retries} attempts: {str(e)}")
|
| time.sleep(1)
|
|
|
| def _init_db_connection(self) -> duckdb.DuckDBPyConnection:
|
| """Initialize database connection with HuggingFace configuration"""
|
|
|
| _, _, owner, dataset, db_file = self.db_url.split('/', 4)
|
| db_path = f"s3://datasets-cached/{owner}/{dataset}/{db_file}"
|
|
|
|
|
| conn = duckdb.connect(db_path)
|
| conn.execute("INSTALL httpfs;")
|
| conn.execute("LOAD httpfs;")
|
| conn.execute("SET s3_endpoint='s3.us-east-1.amazonaws.com';")
|
| conn.execute("SET s3_use_ssl=true;")
|
| conn.execute("SET s3_url_style='path';")
|
| conn.execute(f"SET s3_access_key_id='{self.HF_TOKEN}';")
|
| conn.execute(f"SET s3_secret_access_key='{self.HF_TOKEN}';")
|
| return conn
|
|
|
| def _setup_database(self):
|
| """Initialize database tables"""
|
|
|
| self.conn.execute("""
|
| CREATE TABLE IF NOT EXISTS gpu_state (
|
| chip_id INTEGER PRIMARY KEY,
|
| num_sms INTEGER,
|
| cores_per_sm INTEGER,
|
| memory_size BIGINT,
|
| utilization DOUBLE,
|
| temperature DOUBLE,
|
| power_usage DOUBLE,
|
| last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
| state_json JSON
|
| )
|
| """)
|
|
|
|
|
| self.conn.execute("""
|
| CREATE TABLE IF NOT EXISTS streaming_multiprocessors (
|
| sm_id VARCHAR PRIMARY KEY,
|
| chip_id INTEGER,
|
| num_cores INTEGER,
|
| active_warps INTEGER,
|
| shared_memory_used BIGINT,
|
| registers_used INTEGER,
|
| last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
| state_json JSON
|
| )
|
| """)
|
|
|
|
|
| self.conn.execute("""
|
| CREATE TABLE IF NOT EXISTS warp_execution (
|
| warp_id VARCHAR PRIMARY KEY,
|
| sm_id VARCHAR,
|
| chip_id INTEGER,
|
| num_threads INTEGER,
|
| program_counter BIGINT,
|
| status VARCHAR,
|
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
| updated_at TIMESTAMP,
|
| state_json JSON,
|
| FOREIGN KEY (sm_id) REFERENCES streaming_multiprocessors(sm_id)
|
| )
|
| """)
|
|
|
|
|
| self.conn.execute("""
|
| CREATE TABLE IF NOT EXISTS shared_memory_pools (
|
| pool_id VARCHAR PRIMARY KEY,
|
| gpu_id INTEGER,
|
| size BIGINT,
|
| allocated_size BIGINT,
|
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
| last_accessed TIMESTAMP,
|
| last_modified TIMESTAMP,
|
| state_json JSON
|
| )
|
| """)
|
|
|
| def _initialize_graphics_subsystem(self):
|
| """Initialize graphics-related subsystems"""
|
| from src.graphics.texture_buffer_manager import (
|
| Texture, Buffer, Framebuffer, TextureFormat, FilterMode,
|
| WrapMode, BufferType, MSAASamples
|
| )
|
| from src.graphics.rasterizer import Rasterizer
|
|
|
|
|
| self.TextureFormat = TextureFormat
|
| self.FilterMode = FilterMode
|
| self.WrapMode = WrapMode
|
| self.BufferType = BufferType
|
| self.MSAASamples = MSAASamples
|
|
|
|
|
| self.texture_manager = {}
|
| self.buffer_manager = {}
|
| self.framebuffer_manager = {}
|
| self.next_texture_id = 0
|
| self.next_buffer_id = 0
|
| self.next_fb_id = 0
|
|
|
|
|
| self.rasterizers = {
|
| chip_id: Rasterizer(self)
|
| for chip_id in range(self.num_gpus)
|
| }
|
|
|
| def create_texture(self, width: int, height: int,
|
| format: TextureFormat = None,
|
| filter_mode: FilterMode = None,
|
| wrap_mode: WrapMode = None,
|
| generate_mipmaps: bool = True,
|
| aniso_level: int = 1) -> int:
|
| """
|
| Create a new texture with specified parameters
|
| Returns texture_id
|
| """
|
| format = format or self.TextureFormat.RGBA8
|
| filter_mode = filter_mode or self.FilterMode.BILINEAR
|
| wrap_mode = wrap_mode or self.WrapMode.REPEAT
|
|
|
| texture = Texture(width, height, format, filter_mode,
|
| wrap_mode, generate_mipmaps, aniso_level)
|
| texture_id = self.next_texture_id
|
| self.texture_manager[texture_id] = texture
|
| self.next_texture_id += 1
|
| return texture_id
|
|
|
| def create_buffer(self, data: np.ndarray,
|
| buffer_type: BufferType = None,
|
| dynamic: bool = False,
|
| map_write: bool = False) -> int:
|
| """
|
| Create a new buffer with specified parameters
|
| Returns buffer_id
|
| """
|
| buffer_type = buffer_type or self.BufferType.VERTEX
|
| buffer = Buffer(data, buffer_type, dynamic, map_write)
|
| buffer_id = self.next_buffer_id
|
| self.buffer_manager[buffer_id] = buffer
|
| self.next_buffer_id += 1
|
| return buffer_id
|
|
|
| def create_framebuffer(self, width: int, height: int,
|
| color_formats: List[TextureFormat] = None,
|
| depth_format: Optional[TextureFormat] = None,
|
| stencil_format: Optional[TextureFormat] = None,
|
| samples: MSAASamples = None) -> int:
|
| """
|
| Create a new framebuffer with specified parameters
|
| Returns framebuffer_id
|
| """
|
| color_formats = color_formats or [self.TextureFormat.RGBA8]
|
| samples = samples or self.MSAASamples.MSAA_1X
|
|
|
| fb = Framebuffer(width, height, color_formats,
|
| depth_format, stencil_format, samples)
|
| fb_id = self.next_fb_id
|
| self.framebuffer_manager[fb_id] = fb
|
| self.next_fb_id += 1
|
| return fb_id
|
|
|
| def upload_texture_data(self, texture_id: int, data: np.ndarray,
|
| generate_mipmaps: bool = True):
|
| """Upload new data to existing texture"""
|
| if texture_id not in self.texture_manager:
|
| raise GPUError(f"Invalid texture ID: {texture_id}")
|
| texture = self.texture_manager[texture_id]
|
| texture.upload(data, generate_mipmaps)
|
|
|
| def upload_buffer_data(self, buffer_id: int, data: np.ndarray):
|
| """Upload new data to existing buffer"""
|
| if buffer_id not in self.buffer_manager:
|
| raise GPUError(f"Invalid buffer ID: {buffer_id}")
|
| buffer = self.buffer_manager[buffer_id]
|
| buffer.upload(data)
|
|
|
| def map_buffer(self, buffer_id: int) -> Optional[np.ndarray]:
|
| """Map buffer for CPU access"""
|
| if buffer_id not in self.buffer_manager:
|
| raise GPUError(f"Invalid buffer ID: {buffer_id}")
|
| buffer = self.buffer_manager[buffer_id]
|
| return buffer.map()
|
|
|
| def unmap_buffer(self, buffer_id: int):
|
| """Unmap previously mapped buffer"""
|
| if buffer_id not in self.buffer_manager:
|
| raise GPUError(f"Invalid buffer ID: {buffer_id}")
|
| buffer = self.buffer_manager[buffer_id]
|
| buffer.unmap()
|
|
|
| def draw_triangles(self, vertex_buffer_id: int,
|
| index_buffer_id: Optional[int],
|
| framebuffer_id: int,
|
| shader_program: Dict,
|
| num_vertices: int,
|
| start_vertex: int = 0,
|
| chip_id: int = 0):
|
| """
|
| Draw triangles using specified buffers and shader program
|
|
|
| Args:
|
| vertex_buffer_id: Buffer containing vertex data
|
| index_buffer_id: Optional buffer containing indices
|
| framebuffer_id: Target framebuffer
|
| shader_program: Dict containing vertex and fragment shader code
|
| num_vertices: Number of vertices to draw
|
| start_vertex: Starting vertex offset
|
| chip_id: GPU to use for rendering
|
| """
|
| if vertex_buffer_id not in self.buffer_manager:
|
| raise GPUError(f"Invalid vertex buffer ID: {vertex_buffer_id}")
|
| if index_buffer_id and index_buffer_id not in self.buffer_manager:
|
| raise GPUError(f"Invalid index buffer ID: {index_buffer_id}")
|
| if framebuffer_id not in self.framebuffer_manager:
|
| raise GPUError(f"Invalid framebuffer ID: {framebuffer_id}")
|
|
|
| vertex_buffer = self.buffer_manager[vertex_buffer_id]
|
| index_buffer = (self.buffer_manager[index_buffer_id]
|
| if index_buffer_id else None)
|
| framebuffer = self.framebuffer_manager[framebuffer_id]
|
|
|
|
|
| if index_buffer:
|
| indices = index_buffer.data[start_vertex:start_vertex + num_vertices]
|
| vertices = vertex_buffer.data[indices]
|
| else:
|
| vertices = vertex_buffer.data[start_vertex:start_vertex + num_vertices]
|
|
|
|
|
| for i in range(0, len(vertices), 3):
|
| v0, v1, v2 = vertices[i:i+3]
|
|
|
|
|
| fragments = self.rasterizers[chip_id].rasterize_triangle(
|
| v0, v1, v2,
|
| framebuffer.width,
|
| framebuffer.height,
|
| framebuffer.samples
|
| )
|
|
|
|
|
| processed = self.rasterizers[chip_id].process_fragments(
|
| fragments,
|
| shader_program,
|
| chip_id,
|
| early_z=True,
|
| hierarchical_z=True
|
| )
|
|
|
|
|
| passed, depth_buffer = self.rasterizers[chip_id].depth_test(
|
| processed,
|
| framebuffer.depth_attachment.data.tobytes()
|
| if framebuffer.depth_attachment else None,
|
| framebuffer.width
|
| )
|
|
|
|
|
| for target_idx, target in enumerate(framebuffer.color_attachments):
|
| target.data = np.frombuffer(
|
| self.rasterizers[chip_id].write_to_framebuffer(
|
| passed,
|
| target.data.tobytes(),
|
| framebuffer.width,
|
| blend_enable=True
|
| ),
|
| dtype=target.data.dtype
|
| ).reshape(target.data.shape)
|
|
|
| def _initialize_hal_schema(self):
|
| """Initialize HAL database schema for multi-GPU tracking"""
|
|
|
| self.hal.conn.execute("""
|
| CREATE TABLE IF NOT EXISTS cross_gpu_operations (
|
| operation_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
| operation_type TEXT,
|
| source_chip INTEGER,
|
| target_chip INTEGER,
|
| nvlink_path TEXT,
|
| start_time TIMESTAMP,
|
| end_time TIMESTAMP,
|
| state_json JSON
|
| )
|
| """)
|
|
|
|
|
| self.hal.conn.execute("""
|
| CREATE TABLE IF NOT EXISTS memory_coherence (
|
| address INTEGER,
|
| chip_id INTEGER,
|
| version INTEGER,
|
| last_modified TIMESTAMP,
|
| dirty BOOLEAN,
|
| PRIMARY KEY (address, chip_id)
|
| )
|
| """)
|
|
|
|
|
| self.hal.conn.execute("""
|
| CREATE TABLE IF NOT EXISTS sync_points (
|
| sync_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
| operation_id INTEGER,
|
| chips_involved JSON,
|
| reached_by JSON,
|
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
| )
|
| """)
|
|
|
| self.hal.conn.commit()
|
|
|
| def _initialize_memory_management(self, num_chips: int, vram_size_gb: int):
|
| """Initialize cross-GPU memory management"""
|
|
|
| for chip_id in range(num_chips):
|
|
|
| pool_id = f"gpu_{chip_id}_global"
|
| self.memory_pools[pool_id] = MemoryPool(
|
| size_bytes=vram_size_gb * 1024 * 1024 * 1024,
|
| chip_id=chip_id
|
| )
|
|
|
|
|
| for other_chip in range(chip_id + 1, num_chips):
|
| shared_pool_id = f"shared_pool_{chip_id}_{other_chip}"
|
| self.shared_pools[shared_pool_id] = SharedMemoryPool(
|
| size_bytes=1 * 1024 * 1024 * 1024,
|
| gpu_a=chip_id,
|
| gpu_b=other_chip
|
| )
|
|
|
|
|
| self.hal.conn.execute("""
|
| CREATE TABLE IF NOT EXISTS memory_transfers (
|
| transfer_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
| source_chip INTEGER,
|
| target_chip INTEGER,
|
| size_bytes INTEGER,
|
| nvlink_path TEXT,
|
| start_time TIMESTAMP,
|
| end_time TIMESTAMP,
|
| bandwidth_used FLOAT
|
| )
|
| """)
|
|
|
| def _initialize_command_processors(self, num_chips: int):
|
| """Initialize command processors for cross-GPU operations"""
|
|
|
| for chip_a in range(num_chips):
|
| for chip_b in range(chip_a + 1, num_chips):
|
| queue_id = f"xfer_{chip_a}_{chip_b}"
|
| self.hal.conn.execute("""
|
| INSERT INTO hardware_queues (
|
| queue_id, hardware_type, chip_id, sm_id, instructions
|
| ) VALUES (?, ?, ?, ?, ?)
|
| """, (
|
| queue_id,
|
| 'DMA_ENGINE',
|
| chip_a,
|
| None,
|
| json.dumps([])
|
| ))
|
|
|
|
|
| for chip_id in range(num_chips):
|
| self.hal.conn.execute("""
|
| INSERT INTO hardware_queues (
|
| queue_id, hardware_type, chip_id, sm_id, instructions
|
| ) VALUES (?, ?, ?, ?, ?)
|
| """, (
|
| f"compute_{chip_id}",
|
| 'COMPUTE_UNIT',
|
| chip_id,
|
| None,
|
| json.dumps([])
|
| ))
|
|
|
| for chip_id in range(num_chips):
|
| for sm_id in range(self.num_sms_per_chip):
|
| sm = StreamingMultiprocessor(
|
| sm_id=sm_id,
|
| chip_id=chip_id,
|
| num_cores=self.cores_per_sm,
|
| db_url=self.db_url
|
| )
|
| self.streaming_multiprocessors[chip_id][sm_id] = sm
|
|
|
|
|
| self.hal.conn.execute("""
|
| INSERT INTO hardware_states (
|
| component_id, hardware_type, chip_id, sm_id, state_json
|
| ) VALUES (?, ?, ?, ?, ?)
|
| """, (
|
| f"sm_{chip_id}_{sm_id}",
|
| 'STREAMING_MULTIPROCESSOR',
|
| chip_id,
|
| sm_id,
|
| json.dumps(sm.sm_state)
|
| ))
|
|
|
|
|
| self.array_distributors = {}
|
| for chip_id in range(self.num_gpus):
|
| distributor = ParallelArrayDistributor(
|
| num_sms=self.num_sms_per_gpu,
|
| cores_per_sm=self.cores_per_sm
|
| )
|
| self.array_distributors[chip_id] = distributor
|
|
|
|
|
| distributor.storage = self.local_storage
|
| distributor.streaming_multiprocessors = self.streaming_multiprocessors[chip_id]
|
|
|
|
|
| self.warps = {}
|
| for chip_id in range(self.num_gpus):
|
| self.warps[chip_id] = {}
|
| for sm_id in range(self.num_sms_per_gpu):
|
| self.warps[chip_id][sm_id] = []
|
| sm = self.streaming_multiprocessors[chip_id][sm_id]
|
|
|
| warps_per_sm = self.cores_per_sm // self.threads_per_warp
|
| for warp_id in range(warps_per_sm):
|
| warp = Warp(warp_id)
|
| self.warps[chip_id][sm_id].append(warp)
|
|
|
|
|
| sm.schedule_warp(str(warp_id), {
|
| 'warp_id': warp_id,
|
| 'status': 'initialized',
|
| 'priority': 0,
|
| 'dependencies': [],
|
| 'resource_requirements': {
|
| 'tensor_cores': 1,
|
| 'shared_memory': 32768
|
| }
|
| })
|
|
|
|
|
| self.hal = HardwareAbstractionLayer()
|
| self.tensor_core = TensorCore()
|
| self.local_storage = LocalStorage()
|
| self.memory_manager = DuckDBMemoryManager(get_db_url())
|
| self.stream_manager = {}
|
|
|
|
|
| self.initialized = False
|
| self._sync_primitives = {}
|
| self.memory_pools = {}
|
| self.shared_pools = {}
|
|
|
|
|
| self.gate_delay = GATE_DELAY
|
| self.max_switch_freq = max_switch_freq
|
| self.logic_gates = {
|
| 'nand': NANDGate(),
|
| 'and': ANDGate()
|
| }
|
|
|
| def allocate_memory(self, size_bytes, chip_id=None, key=None, tensor_shape=None, dtype=None):
|
| """
|
| Allocate memory with support for tensor operations and local storage across multiple GPUs.
|
| Args:
|
| size_bytes: Size of memory to allocate
|
| chip_id: Target GPU chip (if None, will be automatically selected)
|
| key: Optional persistent storage key
|
| tensor_shape: Shape for tensor allocation
|
| dtype: Data type for tensor allocation
|
| """
|
| if not self.initialized:
|
| raise RuntimeError("Driver not initialized.")
|
|
|
| try:
|
|
|
| if chip_id is None:
|
| gpu_states = self.multi_gpu_system.system_state["global_memory_state"]["allocation_map"]
|
|
|
| available_memory = {gpu: state.get("free_memory", float('inf'))
|
| for gpu, state in gpu_states.items()}
|
| chip_id = max(available_memory.items(), key=lambda x: x[1])[0]
|
|
|
|
|
| if chip_id >= len(self.streaming_multiprocessors):
|
| raise MemoryError(f"Invalid chip_id: {chip_id}")
|
|
|
| address = None
|
|
|
|
|
| if tensor_shape is not None:
|
| if dtype is None:
|
| dtype = np.float32
|
|
|
|
|
| array_distributor = self.array_distributors[chip_id]
|
| sm_assignments = array_distributor.get_optimal_sm_assignment(tensor_shape)
|
|
|
|
|
| address = self.memory_manager.allocate_tensor(
|
| tensor_shape,
|
| str(dtype),
|
| chip_id,
|
| sm_assignments=sm_assignments
|
| )
|
|
|
| if key:
|
| self.local_storage.store(key, {
|
| 'address': address,
|
| 'shape': tensor_shape,
|
| 'dtype': str(dtype),
|
| 'chip_id': chip_id,
|
| 'sm_assignments': sm_assignments
|
| })
|
|
|
|
|
| self.multi_gpu_system.update_memory_allocation(
|
| chip_id,
|
| size_bytes,
|
| tensor=True,
|
| sm_assignments=sm_assignments
|
| )
|
|
|
|
|
| elif key is not None:
|
| address = self.memory_manager.allocate_with_key(
|
| size_bytes,
|
| key,
|
| chip_id,
|
| tensor_shape=tensor_shape,
|
| dtype=str(dtype) if dtype else None
|
| )
|
| self.local_storage.store(key, {
|
| 'address': address,
|
| 'chip_id': chip_id
|
| })
|
| self.multi_gpu_system.update_memory_allocation(chip_id, size_bytes)
|
|
|
|
|
| else:
|
| address = self.memory_manager.allocate(size_bytes, chip_id)
|
| self.multi_gpu_system.update_memory_allocation(chip_id, size_bytes)
|
|
|
| return address
|
|
|
| except Exception as e:
|
| if tensor_shape is not None:
|
| raise TensorError(f"Failed to allocate tensor: {str(e)}")
|
| raise MemoryError(f"Failed to allocate memory: {str(e)}")
|
|
|
| def list_memory_keys(self):
|
| """List all string keys for persistent memory blocks"""
|
| if hasattr(self.memory_manager, 'list_keys'):
|
| return self.memory_manager.list_keys()
|
| return []
|
|
|
| def _calculate_data_locality(self, sm_id: int, operation_data: Dict[str, Any]) -> float:
|
| """
|
| Calculate data locality score for an SM based on input data location
|
| Returns score between 0 and 1 (1 = all data is local)
|
| """
|
| total_data_size = 0
|
| local_data_size = 0
|
|
|
|
|
| for tensor_info in operation_data["inputs"]:
|
| size = tensor_info.get("size", 0)
|
| total_data_size += size
|
|
|
|
|
| if self._is_data_local(sm_id, tensor_info["address"]):
|
| local_data_size += size
|
|
|
| return local_data_size / total_data_size if total_data_size > 0 else 0.0
|
|
|
| def _is_data_local(self, sm_id: int, address: int) -> bool:
|
| """Check if data at address is local to the given SM"""
|
| sm = self.streaming_multiprocessors[self.active_chip_id][sm_id]
|
|
|
|
|
| if address in sm.sm_state["l1_cache"]:
|
| return True
|
|
|
|
|
| if str(address) in sm.sm_state["shared_memory"]:
|
| return True
|
|
|
| return False
|
|
|
| def _select_best_sm(self, operation_data: Dict[str, Any]) -> Tuple[int, int]:
|
| """
|
| Select best SM for operation based on locality and load
|
| Returns (chip_id, sm_id)
|
| """
|
| best_score = -1
|
| best_sm = None
|
| best_chip = None
|
|
|
|
|
| for chip_id in range(self.num_gpus):
|
| for sm_id in range(self.num_sms_per_gpu):
|
| sm = self.streaming_multiprocessors[chip_id][sm_id]
|
|
|
|
|
| load = len(sm.current_tensor_ops)
|
|
|
|
|
| locality_score = self._calculate_data_locality(sm_id, operation_data)
|
|
|
|
|
| load_score = 1.0 / (load + 1)
|
| final_score = (locality_score * DEFAULT_LOCALITY_WEIGHT +
|
| load_score * DEFAULT_LOAD_WEIGHT)
|
|
|
| if final_score > best_score:
|
| best_score = final_score
|
| best_sm = sm_id
|
| best_chip = chip_id
|
|
|
| return best_chip, best_sm
|
|
|
| def schedule_tensor_operation(self, operation_data: Dict[str, Any]) -> Tuple[int, int]:
|
| """
|
| Schedule a tensor operation on the best SM considering:
|
| - Data locality (70% weight)
|
| - Current SM load (30% weight)
|
| Returns (chip_id, sm_id) of selected SM
|
| """
|
|
|
| if "preferred_sm" in operation_data:
|
| chip_id = operation_data.get("preferred_chip", 0)
|
| sm_id = operation_data["preferred_sm"]
|
|
|
|
|
| sm = self.streaming_multiprocessors[chip_id][sm_id]
|
| if len(sm.current_tensor_ops) < DEFAULT_MAX_WARPS_PER_SM:
|
| return chip_id, sm_id
|
|
|
|
|
| return self._select_best_sm(operation_data)
|
|
|
| def create_stream(self, priority=0):
|
| """Create a new execution stream"""
|
| stream_id = len(self.stream_manager)
|
| self.stream_manager[stream_id] = {
|
| 'priority': priority,
|
| 'pending_ops': [],
|
| 'sync_points': set()
|
| }
|
| return stream_id
|
|
|
| def stream_synchronize(self, stream_id):
|
| """Wait for all operations in a stream to complete"""
|
| if stream_id not in self.stream_manager:
|
| raise StreamError(f"Invalid stream ID: {stream_id}")
|
| stream = self.stream_manager[stream_id]
|
|
|
|
|
| while stream['pending_ops']:
|
| op = stream['pending_ops'].pop(0)
|
| try:
|
| self._execute_stream_op(op)
|
| except Exception as e:
|
| raise StreamError(f"Stream operation failed: {str(e)}")
|
|
|
|
|
| stream['sync_points'].clear()
|
|
|
| def _execute_stream_op(self, op):
|
| """Execute a single stream operation"""
|
| op_type = op.get('type')
|
| if op_type == 'tensor':
|
| self._execute_tensor_op(op)
|
| elif op_type == 'memory':
|
| self._execute_memory_op(op)
|
| elif op_type == 'sync':
|
| self._handle_sync_point(op)
|
| else:
|
| raise StreamError(f"Unknown operation type: {op_type}")
|
|
|
| def add_stream_sync_point(self, stream_id, sync_point_id):
|
| """Add a synchronization point to a stream"""
|
| if stream_id not in self.stream_manager:
|
| raise StreamError(f"Invalid stream ID: {stream_id}")
|
| self.stream_manager[stream_id]['sync_points'].add(sync_point_id)
|
| self._sync_primitives[sync_point_id] = False
|
|
|
| def execute_tensor_op(self, op_type, inputs, output, stream_id=None):
|
| """Execute a tensor operation with parallel distribution across multiple GPUs"""
|
| if stream_id is not None and stream_id not in self.stream_manager:
|
| raise StreamError(f"Invalid stream ID: {stream_id}")
|
|
|
| try:
|
|
|
| input_sizes = {addr: inp.nbytes for addr, inp in inputs.items()}
|
| target_gpus = self._select_optimal_gpus(input_sizes)
|
|
|
|
|
| distributed_ops = self.parallel_distributor.distribute_operation({
|
| 'type': op_type,
|
| 'inputs': inputs,
|
| 'output': output,
|
| 'input_size': sum(input_sizes.values()),
|
| 'target_gpus': target_gpus
|
| })
|
|
|
|
|
| op_id = self._register_cross_gpu_operation(op_type, distributed_ops)
|
|
|
|
|
| self._setup_memory_coherence(distributed_ops)
|
|
|
|
|
| start_time = time.time()
|
| expected_compute_time = len(distributed_ops) * self.gate_delay
|
|
|
|
|
| operations = []
|
| for dist_op in distributed_ops:
|
| gpu_id = dist_op['gpu_id']
|
| sm_id = self._select_optimal_sm(gpu_id, dist_op)
|
|
|
|
|
| warp = self._get_available_warp(gpu_id, sm_id)
|
|
|
| op = {
|
| 'type': 'tensor',
|
| 'op_type': op_type,
|
| 'gpu_id': gpu_id,
|
| 'sm_id': sm_id,
|
| 'warp_id': warp.warp_id,
|
| 'inputs': dist_op['inputs'],
|
| 'output_slice': dist_op.get('output_slice'),
|
| 'stream_id': stream_id,
|
| 'all_inputs': inputs,
|
| 'all_output': output
|
| }
|
|
|
|
|
| if stream_id is not None:
|
|
|
| self.stream_manager[stream_id]['pending_ops'].append(op)
|
| return None
|
| else:
|
|
|
| return self._execute_tensor_op(op)
|
|
|
| except Exception as e:
|
| raise TensorError(f"Tensor operation failed: {str(e)}")
|
|
|
| def _execute_tensor_op(self, op):
|
| """Execute a tensor operation"""
|
| op_type = op['op_type']
|
| inputs = op['inputs']
|
| output = op['output']
|
|
|
|
|
| for addr in inputs:
|
| if not self.memory_manager.is_tensor(addr):
|
| raise TensorError(f"Invalid tensor address: {addr}")
|
|
|
| if not self.memory_manager.is_tensor(output):
|
| raise TensorError(f"Invalid output tensor address: {output}")
|
|
|
|
|
| return self.tensor_core.execute_op(op_type, inputs, output)
|
|
|
| def read_memory_by_key(self, key):
|
| """
|
| Read memory block by string key (if supported).
|
| """
|
| if hasattr(self.memory_manager, 'read_by_key'):
|
| return self.memory_manager.read_by_key(key)
|
| raise NotImplementedError("Underlying memory manager does not support read_by_key.")
|
|
|
|
|
| def layernorm(self, x, gamma, beta, eps=1e-5, chip_id=0, sm_id=0):
|
|
|
| mean = np.mean(x, axis=-1, keepdims=True)
|
| var = np.var(x, axis=-1, keepdims=True)
|
| x_norm = (x - mean) / np.sqrt(var + eps)
|
| return gamma * x_norm + beta
|
|
|
| def gelu(self, x, chip_id=0, sm_id=0):
|
|
|
| return 0.5 * x * (1 + np.tanh(np.sqrt(2 / np.pi) * (x + 0.044715 * np.power(x, 3))))
|
|
|
| def softmax(self, x, axis=-1, chip_id=0, sm_id=0):
|
|
|
| x_max = np.max(x, axis=axis, keepdims=True)
|
| e_x = np.exp(x - x_max)
|
| return e_x / np.sum(e_x, axis=axis, keepdims=True)
|
|
|
| def matmul(self, A, B, chip_id=0, sm_id=0):
|
|
|
| return self.hal.v2_tensor_matmul(chip_id, A, B)
|
|
|
|
|
| def initialize(self, num_chips=8, vram_size_gb=16, num_sms_per_chip=1500,
|
| num_cores_per_sm=128, threads_per_core=700000, threads_per_block=32,
|
| num_tensor_cores_per_sm=8):
|
| """Initialize the virtual GPU driver with full multi-GPU support"""
|
| print("Initializing Virtual GPU Driver...")
|
|
|
|
|
| total_threads = num_chips * num_sms_per_chip * num_cores_per_sm * threads_per_core
|
| max_total_threads = self.max_switch_freq * self.gate_delay
|
| if total_threads > max_total_threads:
|
| raise ValueError(
|
| f"Configuration exceeds maximum possible threads at current electron speed.\n"
|
| f"Requested: {total_threads:,} threads\n"
|
| f"Maximum possible: {max_total_threads:,} threads"
|
| )
|
|
|
|
|
| memory_clock_hz = 2132000000
|
| memory_bus_width = 384
|
| memory_bandwidth_per_gpu = (memory_clock_hz * memory_bus_width) / (8 * 1e9)
|
| watts_per_sm = 25
|
| total_power_per_gpu = min(watts_per_sm * num_sms_per_chip, 350)
|
|
|
|
|
| self.hardware_config = {
|
|
|
| 'num_chips': num_chips,
|
| 'vram_size_gb': vram_size_gb,
|
| 'num_sms_per_chip': num_sms_per_chip,
|
| 'num_cores_per_sm': num_cores_per_sm,
|
| 'threads_per_core': threads_per_core,
|
| 'threads_per_block': threads_per_block,
|
| 'num_tensor_cores_per_sm': num_tensor_cores_per_sm,
|
| 'total_threads': total_threads,
|
|
|
|
|
| 'gate_delay': self.gate_delay,
|
| 'max_switch_freq': self.max_switch_freq,
|
|
|
|
|
| 'memory_config': {
|
| 'memory_clock_hz': memory_clock_hz,
|
| 'memory_bus_width': memory_bus_width,
|
| 'bandwidth_gb_per_sec': memory_bandwidth_per_gpu,
|
| 'nvlink_bandwidth_gbps': 900,
|
| 'l1_cache_size_kb': 128,
|
| 'l2_cache_size_mb': 6,
|
| 'shared_memory_per_sm_kb': 164
|
| },
|
|
|
|
|
| 'power_config': {
|
| 'watts_per_sm': watts_per_sm,
|
| 'total_power_limit': total_power_per_gpu,
|
| 'thermal_limit_celsius': 85,
|
| 'total_system_power': total_power_per_gpu * num_chips
|
| },
|
|
|
|
|
| 'compute_capability': {
|
| 'major': 9,
|
| 'minor': 0,
|
| 'features': {
|
| 'tensor_cores': True,
|
| 'ray_tracing_cores': True,
|
| 'optical_flow': True,
|
| 'concurrent_kernels': 128,
|
| 'max_shared_memory_per_sm': 164 * 1024,
|
| 'max_registers_per_thread': 255,
|
| 'max_warps_per_sm': 64
|
| }
|
| }
|
| }
|
|
|
|
|
| self.hal.initialize_hardware(
|
| num_chips=num_chips,
|
| vram_size_gb=vram_size_gb,
|
| num_sms_per_chip=num_sms_per_chip,
|
| num_cores_per_sm=num_cores_per_sm,
|
| threads_per_core=threads_per_core,
|
| threads_per_block=threads_per_block,
|
| num_tensor_cores_per_sm=num_tensor_cores_per_sm,
|
| memory_config=self.hardware_config['memory_config'],
|
| power_config=self.hardware_config['power_config'],
|
| compute_capability=self.hardware_config['compute_capability']
|
| )
|
|
|
|
|
| for chip_id in range(num_chips):
|
|
|
| for sm_id in range(num_sms_per_chip):
|
| sm = self.streaming_multiprocessors[chip_id][sm_id]
|
| sm.initialize(
|
| threads_per_core=threads_per_core,
|
| threads_per_block=threads_per_block,
|
| num_tensor_cores=num_tensor_cores_per_sm
|
| )
|
|
|
|
|
| self.array_distributors[chip_id].initialize(
|
| threads_per_core=threads_per_core,
|
| gate_delay=self.gate_delay
|
| )
|
|
|
|
|
| self.parallel_distributor.initialize(
|
| hardware_config=self.hardware_config,
|
| nvlink_topology=self.multi_gpu_system.system_state["nvlink_state"]["connections"]
|
| )
|
|
|
|
|
| for chip_id in range(num_chips):
|
| pool_id = f"gpu_{chip_id}_global"
|
| self.memory_pools[pool_id] = MemoryPool(
|
| size_bytes=vram_size_gb * 1024 * 1024 * 1024,
|
| chip_id=chip_id
|
| )
|
|
|
|
|
| for i in range(num_chips):
|
| for j in range(i + 1, num_chips):
|
| pool_id = f"shared_pool_{i}_{j}"
|
| self.shared_pools[pool_id] = SharedMemoryPool(
|
| size_bytes=1 * 1024 * 1024 * 1024,
|
| gpu_a=i,
|
| gpu_b=j
|
| )
|
|
|
| self.initialized = True
|
| print(f"Virtual GPU Driver initialized with:")
|
| print(f"- {num_chips} GPUs")
|
| print(f"- {num_sms_per_chip} SMs per GPU")
|
| print(f"- {vram_size_gb}GB VRAM per GPU")
|
| print(f"- {self.hardware_config['total_threads']:,} total threads")
|
| print(f"- {len(self.shared_pools)} shared memory pools")
|
| print(f"- Gate delay: {self.gate_delay:.2e} seconds")
|
| print(f"- Max switching frequency: {self.max_switch_freq:.2e} Hz")
|
|
|
| def shutdown(self):
|
| print("Shutting down Virtual GPU Driver...")
|
| self.hal.shutdown_hardware()
|
|
|
|
|
| for chip_id in range(len(self.streaming_multiprocessors)):
|
| for sm_id in self.streaming_multiprocessors[chip_id]:
|
| self.streaming_multiprocessors[chip_id][sm_id].shutdown()
|
|
|
| self.multi_gpu_system.store_system_state()
|
| self.initialized = False
|
| print("Virtual GPU Driver shut down.")
|
|
|
| def _select_optimal_sm(self, gpu_id: int, operation: Dict[str, Any]) -> int:
|
| """Select the optimal SM for an operation based on load and locality"""
|
| sms = self.streaming_multiprocessors[gpu_id]
|
|
|
|
|
| sm_loads = {
|
| sm_id: len(sm.current_tensor_ops)
|
| for sm_id, sm in sms.items()
|
| }
|
|
|
|
|
| sm_locality_scores = {
|
| sm_id: self._calculate_data_locality(sm, operation)
|
| for sm_id, sm in sms.items()
|
| }
|
|
|
|
|
| sm_scores = {
|
| sm_id: (sm_locality_scores[sm_id] * DEFAULT_LOCALITY_WEIGHT +
|
| (1.0 / (load + 1)) * DEFAULT_LOAD_WEIGHT)
|
| for sm_id, load in sm_loads.items()
|
| }
|
|
|
| return max(sm_scores.items(), key=lambda x: x[1])[0]
|
|
|
| def _calculate_data_locality(self, sm: StreamingMultiprocessor, operation: Dict[str, Any]) -> float:
|
| """Calculate data locality score for an SM based on input data location"""
|
| locality_score = 0.0
|
|
|
|
|
| for input_data in operation['inputs'].values():
|
| if hasattr(input_data, 'address'):
|
| if sm.has_data_in_local_memory(input_data.address):
|
| locality_score += 1.0
|
|
|
| return locality_score
|
|
|
| def _get_available_warp(self, gpu_id: int, sm_id: int) -> Warp:
|
| """Get an available warp from the specified SM"""
|
| warps = self.warps[gpu_id][sm_id]
|
|
|
|
|
| warp_loads = [len(warp.get_active_threads()) for warp in warps]
|
| min_load_idx = warp_loads.index(min(warp_loads))
|
|
|
| return warps[min_load_idx]
|
|
|
| def allocate_memory(self, size_bytes, chip_id=0):
|
| if not self.initialized:
|
| raise RuntimeError("Driver not initialized.")
|
| return self.memory_manager.allocate(size_bytes, chip_id)
|
|
|
|
|
|
|
| def batch_allocate_memory(self, allocations: list[tuple[int, str]], chip_id=0):
|
| if not self.initialized:
|
| raise RuntimeError("Driver not initialized.")
|
| if hasattr(self.memory_manager, 'batch_allocate_with_keys'):
|
| return self.memory_manager.batch_allocate_with_keys(allocations, chip_id)
|
| else:
|
|
|
| addresses = []
|
| for size_bytes, key in allocations:
|
| addresses.append(self.memory_manager.allocate_with_key(size_bytes, key, chip_id))
|
| return addresses
|
|
|
| def free_memory(self, address, chip_id=0):
|
| """Free allocated memory"""
|
| if not self.initialized:
|
| raise RuntimeError("Driver not initialized.")
|
| try:
|
|
|
| tensor_info = self.memory_manager.get_tensor_info(address)
|
| if tensor_info:
|
|
|
| key = f"tensor_{address}"
|
| if self.local_storage.exists(key):
|
| self.local_storage.delete(key)
|
| self.memory_manager.free(address, chip_id)
|
| return
|
|
|
|
|
| self.memory_manager.free(address, chip_id)
|
|
|
|
|
| key = f"mem_{address}"
|
| if self.local_storage.exists(key):
|
| self.local_storage.delete(key)
|
|
|
| except Exception as e:
|
| raise MemoryError(f"Failed to free memory at {address}: {str(e)}")
|
|
|
| def write_memory(self, address, data, chip_id=0):
|
| """Write data to allocated memory"""
|
| if not self.initialized:
|
| raise RuntimeError("Driver not initialized.")
|
| try:
|
|
|
| tensor_info = self.memory_manager.get_tensor_info(address)
|
| if tensor_info:
|
|
|
| data_arr = np.asarray(data)
|
| if data_arr.shape != tensor_info['shape']:
|
| raise ValueError(f"Data shape {data_arr.shape} does not match tensor shape {tensor_info['shape']}")
|
| self.memory_manager.write_data(address, data_arr.tobytes(), chip_id)
|
| return
|
|
|
|
|
| if isinstance(data, (bytes, bytearray)):
|
| self.memory_manager.write_data(address, data, chip_id)
|
| else:
|
| self.memory_manager.write_data(address, bytes(data), chip_id)
|
|
|
| except Exception as e:
|
| raise MemoryError(f"Failed to write memory at {address}: {str(e)}")
|
|
|
| def read_memory(self, address, size_bytes, chip_id=0):
|
| """Read data from allocated memory"""
|
| if not self.initialized:
|
| raise RuntimeError("Driver not initialized.")
|
| try:
|
|
|
| tensor_info = self.memory_manager.get_tensor_info(address)
|
| if tensor_info:
|
| data = self.memory_manager.read_data(address, size_bytes, chip_id)
|
| return np.frombuffer(data, dtype=tensor_info['dtype']).reshape(tensor_info['shape'])
|
|
|
|
|
| return self.memory_manager.read_data(address, size_bytes, chip_id)
|
|
|
| except Exception as e:
|
| raise MemoryError(f"Failed to read memory at {address}: {str(e)}")
|
|
|
| def add_command(self, command_type, **kwargs):
|
| if not self.initialized:
|
| raise RuntimeError("Driver not initialized.")
|
| self.command_processor.add_command(command_type, **kwargs)
|
|
|
| def submit_commands(self, chip_id=0):
|
| if not self.initialized:
|
| raise RuntimeError("Driver not initialized.")
|
| return self.command_processor.submit_commands(chip_id)
|
|
|
| def clear_commands(self):
|
| if not self.initialized:
|
| raise RuntimeError("Driver not initialized.")
|
| self.command_processor.clear_commands()
|
|
|
| def create_memory_pool(self, size_bytes: int, shared: bool = False) -> str:
|
| """Create a new memory pool"""
|
| if not self.initialized:
|
| raise RuntimeError("Driver not initialized.")
|
|
|
| try:
|
| pool_id = f"pool_{len(self.memory_pools)}"
|
|
|
| if shared:
|
| pool = SharedMemoryPool(size_bytes)
|
| self.shared_pools[pool_id] = pool
|
| else:
|
| pool = MemoryPool(size_bytes)
|
| self.memory_pools[pool_id] = pool
|
|
|
| return pool_id
|
|
|
| except Exception as e:
|
| raise MemoryError(f"Failed to create memory pool: {str(e)}")
|
|
|
| def allocate_from_pool(self, pool_id: str, size_bytes: int) -> int:
|
| """Allocate memory from a specific pool"""
|
| if not self.initialized:
|
| raise RuntimeError("Driver not initialized.")
|
|
|
| try:
|
| if pool_id in self.shared_pools:
|
| address = self.shared_pools[pool_id].atomic_allocate(size_bytes)
|
| elif pool_id in self.memory_pools:
|
| address = self.memory_pools[pool_id].allocate(size_bytes)
|
| else:
|
| raise ValueError(f"Invalid pool ID: {pool_id}")
|
|
|
| if address is None:
|
| raise MemoryError(f"Failed to allocate {size_bytes} bytes from pool {pool_id}")
|
|
|
| return address
|
|
|
| except Exception as e:
|
| raise MemoryError(f"Failed to allocate from pool: {str(e)}")
|
|
|
| def free_pool_memory(self, pool_id: str, address: int):
|
| """Free memory allocated from a pool"""
|
| if not self.initialized:
|
| raise RuntimeError("Driver not initialized.")
|
|
|
| try:
|
| if pool_id in self.shared_pools:
|
| self.shared_pools[pool_id].atomic_free(address)
|
| elif pool_id in self.memory_pools:
|
| self.memory_pools[pool_id].free(address)
|
| else:
|
| raise ValueError(f"Invalid pool ID: {pool_id}")
|
|
|
| except Exception as e:
|
| raise MemoryError(f"Failed to free pool memory: {str(e)}")
|
|
|
| def get_pool_stats(self, pool_id: str) -> dict:
|
| """Get statistics about a memory pool"""
|
| if not self.initialized:
|
| raise RuntimeError("Driver not initialized.")
|
|
|
| try:
|
| if pool_id in self.shared_pools:
|
| pool = self.shared_pools[pool_id]
|
| elif pool_id in self.memory_pools:
|
| pool = self.memory_pools[pool_id]
|
| else:
|
| raise ValueError(f"Invalid pool ID: {pool_id}")
|
|
|
| return {
|
| 'fragmentation': pool.get_fragmentation(),
|
| 'total_size': pool.total_size,
|
| 'is_shared': pool_id in self.shared_pools,
|
| }
|
|
|
| except Exception as e:
|
| raise MemoryError(f"Failed to get pool stats: {str(e)}")
|
|
|
| def delete_pool(self, pool_id: str):
|
| """Delete a memory pool"""
|
| if not self.initialized:
|
| raise RuntimeError("Driver not initialized.")
|
|
|
| try:
|
| if pool_id in self.shared_pools:
|
| del self.shared_pools[pool_id]
|
| elif pool_id in self.memory_pools:
|
| del self.memory_pools[pool_id]
|
| else:
|
| raise ValueError(f"Invalid pool ID: {pool_id}")
|
|
|
| except Exception as e:
|
| raise MemoryError(f"Failed to delete pool: {str(e)}")
|
|
|
| def execute_kernel(self, chip_id, sm_id, core_id, thread_block_config, kernel_func, *args, **kwargs):
|
| """
|
| Execute a kernel function across multiple thread blocks.
|
| thread_block_config: {
|
| 'block_dim': (x, y, z), # threads per block
|
| 'grid_dim': (x, y, z), # blocks per grid
|
| 'shared_memory_size': int # bytes per block
|
| }
|
| """
|
| if not self.initialized:
|
| raise RuntimeError("Driver not initialized")
|
|
|
| blocks_per_grid = (
|
| thread_block_config['grid_dim'][0] *
|
| thread_block_config['grid_dim'][1] *
|
| thread_block_config['grid_dim'][2]
|
| )
|
|
|
| threads_per_block = (
|
| thread_block_config['block_dim'][0] *
|
| thread_block_config['block_dim'][1] *
|
| thread_block_config['block_dim'][2]
|
| )
|
|
|
| total_threads = blocks_per_grid * threads_per_block
|
|
|
| if total_threads > self.hardware_config['threads_per_core']:
|
| raise ValueError(f"Total threads {total_threads} exceeds maximum threads per core {self.hardware_config['threads_per_core']}")
|
|
|
| self.add_command(
|
| "execute_kernel",
|
| chip_id=chip_id,
|
| sm_id=sm_id,
|
| core_id=core_id,
|
| thread_block_config=thread_block_config,
|
| kernel_func=kernel_func,
|
| args=args,
|
| kwargs=kwargs
|
| )
|
| print(f"Added kernel execution command for Chip {chip_id}, SM {sm_id}, Core {core_id} "
|
| f"with {blocks_per_grid} blocks × {threads_per_block} threads = {total_threads} total threads")
|
|
|
| def matmul(self, chip_id, sm_id, A, B):
|
| self.add_command("matmul", chip_id=chip_id, sm_id=sm_id, A=A, B=B)
|
| print(f"Added matmul command for Chip {chip_id}, SM {sm_id}.")
|
|
|
| def block_barrier(self, chip_id, sm_id, core_id, block_id):
|
| """Synchronize all threads within a block"""
|
| self.add_command(
|
| "block_barrier",
|
| chip_id=chip_id,
|
| sm_id=sm_id,
|
| core_id=core_id,
|
| block_id=block_id
|
| )
|
|
|
| def core_barrier(self, chip_id, sm_id, core_id):
|
| """Synchronize all threads within a core"""
|
| self.add_command(
|
| "core_barrier",
|
| chip_id=chip_id,
|
| sm_id=sm_id,
|
| core_id=core_id
|
| )
|
|
|
| def global_barrier(self, chip_id=0):
|
| self.add_command("global_barrier", chip_id=chip_id)
|
| print(f"Added global barrier command for Chip {chip_id}.")
|
|
|
| def shared_memory_barrier(self, chip_id, sm_id):
|
| self.add_command("shared_memory_barrier", chip_id=chip_id, sm_id=sm_id)
|
| print(f"Added shared memory barrier command for Chip {chip_id}, SM {sm_id}.")
|
|
|
| def atomic_operation(self, chip_id, sm_id, address, operation, value):
|
| self.add_command("atomic_operation", chip_id=chip_id, sm_id=sm_id, address=address, operation=operation, value=value)
|
| print(f"Added atomic operation command for Chip {chip_id}, SM {sm_id}.")
|
|
|
|
|
| def create_buffer(self, size_bytes, buffer_type="vertex"):
|
| return self.graphics_api.create_buffer(size_bytes, buffer_type)
|
|
|
| def delete_buffer(self, buffer_id):
|
| self.graphics_api.delete_buffer(buffer_id)
|
|
|
| def buffer_data(self, buffer_id, data):
|
| self.graphics_api.buffer_data(buffer_id, data)
|
|
|
| def draw_arrays(self, mode, first, count):
|
| self.graphics_api.draw_arrays(mode, first, count)
|
|
|
| def draw_indexed(self, mode, count, index_buffer_id, index_offset=0):
|
| self.graphics_api.draw_indexed(mode, count, index_buffer_id, index_offset)
|
|
|
| def compile_shader(self, shader_source, shader_type="vertex"):
|
| return self.graphics_api.compile_shader(shader_source, shader_type)
|
|
|
| def use_program(self, program_id):
|
| self.graphics_api.use_program(program_id)
|
|
|
| def create_framebuffer(self, width, height):
|
| return self.graphics_api.create_framebuffer(width, height)
|
|
|
| def bind_framebuffer(self, framebuffer_id):
|
| self.graphics_api.bind_framebuffer(framebuffer_id)
|
|
|
| def clear_color(self, r, g, b, a):
|
| self.graphics_api.clear_color(r, g, b, a)
|
|
|
| def clear_depth(self, depth):
|
| self.graphics_api.clear_depth(depth)
|
|
|
|
|
|
|