| import os | |
| import sys | |
| from typing import Dict, Optional, Union | |
| import numpy as np | |
| from safetensors import deserialize, safe_open, serialize, serialize_file | |
| def _tobytes(tensor: np.ndarray) -> bytes: | |
| if not _is_little_endian(tensor): | |
| tensor = tensor.byteswap(inplace=False) | |
| return tensor.tobytes() | |
| def save( | |
| tensor_dict: Dict[str, np.ndarray], metadata: Optional[Dict[str, str]] = None | |
| ) -> bytes: | |
| """ | |
| Saves a dictionary of tensors into raw bytes in safetensors format. | |
| Args: | |
| tensor_dict (`Dict[str, np.ndarray]`): | |
| The incoming tensors. Tensors need to be contiguous and dense. | |
| metadata (`Dict[str, str]`, *optional*, defaults to `None`): | |
| Optional text only metadata you might want to save in your header. | |
| For instance it can be useful to specify more about the underlying | |
| tensors. This is purely informative and does not affect tensor loading. | |
| Returns: | |
| `bytes`: The raw bytes representing the format | |
| Example: | |
| ```python | |
| from safetensors.numpy import save | |
| import numpy as np | |
| tensors = {"embedding": np.zeros((512, 1024)), "attention": np.zeros((256, 256))} | |
| byte_data = save(tensors) | |
| ``` | |
| """ | |
| flattened = { | |
| k: {"dtype": v.dtype.name, "shape": v.shape, "data": _tobytes(v)} | |
| for k, v in tensor_dict.items() | |
| } | |
| serialized = serialize(flattened, metadata=metadata) | |
| result = bytes(serialized) | |
| return result | |
| def save_file( | |
| tensor_dict: Dict[str, np.ndarray], | |
| filename: Union[str, os.PathLike], | |
| metadata: Optional[Dict[str, str]] = None, | |
| ) -> None: | |
| """ | |
| Saves a dictionary of tensors into raw bytes in safetensors format. | |
| Args: | |
| tensor_dict (`Dict[str, np.ndarray]`): | |
| The incoming tensors. Tensors need to be contiguous and dense. | |
| filename (`str`, or `os.PathLike`)): | |
| The filename we're saving into. | |
| metadata (`Dict[str, str]`, *optional*, defaults to `None`): | |
| Optional text only metadata you might want to save in your header. | |
| For instance it can be useful to specify more about the underlying | |
| tensors. This is purely informative and does not affect tensor loading. | |
| Returns: | |
| `None` | |
| Example: | |
| ```python | |
| from safetensors.numpy import save_file | |
| import numpy as np | |
| tensors = {"embedding": np.zeros((512, 1024)), "attention": np.zeros((256, 256))} | |
| save_file(tensors, "model.safetensors") | |
| ``` | |
| """ | |
| flattened = { | |
| k: {"dtype": v.dtype.name, "shape": v.shape, "data": _tobytes(v)} | |
| for k, v in tensor_dict.items() | |
| } | |
| serialize_file(flattened, filename, metadata=metadata) | |
| def load(data: bytes) -> Dict[str, np.ndarray]: | |
| """ | |
| Loads a safetensors file into numpy format from pure bytes. | |
| Args: | |
| data (`bytes`): | |
| The content of a safetensors file | |
| Returns: | |
| `Dict[str, np.ndarray]`: dictionary that contains name as key, value as `np.ndarray` on cpu | |
| Example: | |
| ```python | |
| from safetensors.numpy import load | |
| file_path = "./my_folder/bert.safetensors" | |
| with open(file_path, "rb") as f: | |
| data = f.read() | |
| loaded = load(data) | |
| ``` | |
| """ | |
| flat = deserialize(data) | |
| return _view2np(flat) | |
| def load_file(filename: Union[str, os.PathLike]) -> Dict[str, np.ndarray]: | |
| """ | |
| Loads a safetensors file into numpy format. | |
| Args: | |
| filename (`str`, or `os.PathLike`)): | |
| The name of the file which contains the tensors | |
| Returns: | |
| `Dict[str, np.ndarray]`: dictionary that contains name as key, value as `np.ndarray` | |
| Example: | |
| ```python | |
| from safetensors.numpy import load_file | |
| file_path = "./my_folder/bert.safetensors" | |
| loaded = load_file(file_path) | |
| ``` | |
| """ | |
| result = {} | |
| with safe_open(filename, framework="np") as f: | |
| for k in f.offset_keys(): | |
| result[k] = f.get_tensor(k) | |
| return result | |
| _TYPES = { | |
| "F64": np.float64, | |
| "F32": np.float32, | |
| "F16": np.float16, | |
| "I64": np.int64, | |
| "U64": np.uint64, | |
| "I32": np.int32, | |
| "U32": np.uint32, | |
| "I16": np.int16, | |
| "U16": np.uint16, | |
| "I8": np.int8, | |
| "U8": np.uint8, | |
| "BOOL": bool, | |
| } | |
| def _getdtype(dtype_str: str) -> np.dtype: | |
| return _TYPES[dtype_str] | |
| def _view2np(safeview) -> Dict[str, np.ndarray]: | |
| result = {} | |
| for k, v in safeview: | |
| dtype = _getdtype(v["dtype"]) | |
| arr = np.frombuffer(v["data"], dtype=dtype).reshape(v["shape"]) | |
| result[k] = arr | |
| return result | |
| def _is_little_endian(tensor: np.ndarray) -> bool: | |
| byteorder = tensor.dtype.byteorder | |
| if byteorder == "=": | |
| if sys.byteorder == "little": | |
| return True | |
| else: | |
| return False | |
| elif byteorder == "|": | |
| return True | |
| elif byteorder == "<": | |
| return True | |
| elif byteorder == ">": | |
| return False | |
| raise ValueError(f"Unexpected byte order {byteorder}") | |