| from typing import cast, Union |
|
|
| import PIL.Image |
| import torch |
|
|
| from diffusers import AutoencoderKL |
| from diffusers.image_processor import VaeImageProcessor |
|
|
|
|
| class EndpointHandler: |
| def __init__(self, path=""): |
| self.device = "cuda" |
| self.dtype = torch.bfloat16 |
| self.vae = cast(AutoencoderKL, AutoencoderKL.from_pretrained(path, torch_dtype=self.dtype).to(self.device, self.dtype).eval()) |
|
|
| self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1) |
| self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor) |
|
|
| @staticmethod |
| def _unpack_latents(latents, height, width, vae_scale_factor): |
| batch_size, num_patches, channels = latents.shape |
|
|
| |
| |
| height = 2 * (int(height) // (vae_scale_factor * 2)) |
| width = 2 * (int(width) // (vae_scale_factor * 2)) |
|
|
| latents = latents.view(batch_size, height // 2, width // 2, channels // 4, 2, 2) |
| latents = latents.permute(0, 3, 1, 4, 2, 5) |
|
|
| latents = latents.reshape(batch_size, channels // (2 * 2), height, width) |
|
|
| return latents |
|
|
| @torch.no_grad() |
| def __call__(self, data) -> Union[torch.Tensor, PIL.Image.Image]: |
| """ |
| Args: |
| data (:obj:): |
| includes the input data and the parameters for the inference. |
| """ |
| tensor = cast(torch.Tensor, data["inputs"]) |
| parameters = cast(dict, data.get("parameters", {})) |
| if tensor.ndim == 3 and ("height" not in parameters or "width" not in parameters): |
| raise ValueError("Expected `height` and `width` in parameters.") |
| height = cast(int, parameters.get("height", 0)) |
| width = cast(int, parameters.get("width", 0)) |
| do_scaling = cast(bool, parameters.get("do_scaling", True)) |
| output_type = cast(str, parameters.get("output_type", "pil")) |
| partial_postprocess = cast(bool, parameters.get("partial_postprocess", False)) |
| if partial_postprocess and output_type != "pt": |
| output_type = "pt" |
|
|
| tensor = tensor.to(self.device, self.dtype) |
| if tensor.ndim == 3: |
| tensor = self._unpack_latents(tensor, height, width, self.vae_scale_factor) |
|
|
| if do_scaling: |
| tensor = ( |
| tensor / self.vae.config.scaling_factor |
| ) + self.vae.config.shift_factor |
|
|
| with torch.no_grad(): |
| image = cast(torch.Tensor, self.vae.decode(tensor, return_dict=False)[0]) |
|
|
| if partial_postprocess: |
| image = (image * 0.5 + 0.5).clamp(0, 1) |
| image = image.permute(0, 2, 3, 1).contiguous().float() |
| image = (image * 255).round().to(torch.uint8) |
| elif output_type == "pil": |
| image = cast(PIL.Image.Image, self.image_processor.postprocess(image, output_type="pil")[0]) |
|
|
| return image |