Mehdi Lakbar
Initial demo of Lina-speech (pardi-speech)
56cfa73
raw
history blame
7.35 kB
import torch
from torch import nn
class ConvNeXtBlock(nn.Module):
"""ConvNeXt Block adapted from https://github.com/facebookresearch/ConvNeXt to 1D audio signal.
Args:
dim (int): Number of input channels.
intermediate_dim (int): Dimensionality of the intermediate layer.
layer_scale_init_value (float, optional): Initial value for the layer scale. None means no scaling.
Defaults to None.
adanorm_num_embeddings (int, optional): Number of embeddings for AdaLayerNorm.
None means non-conditional LayerNorm. Defaults to None.
"""
def __init__(
self,
dim: int,
intermediate_dim: int | None = None,
layer_scale_init_value: float = 0.0,
elementwise_affine_ln: bool = True,
is_causal: bool = False,
):
super().__init__()
intermediate_dim = intermediate_dim if intermediate_dim is not None else dim * 3
self.dwconv = nn.Conv1d(
dim, dim, kernel_size=7, padding=0 if is_causal else 3, groups=dim
) # depthwise conv
self.norm = nn.LayerNorm(
dim, eps=1e-6, elementwise_affine=elementwise_affine_ln
)
self.pwconv1 = nn.Linear(
dim, intermediate_dim
) # pointwise/1x1 convs, implemented with linear layers
self.act = nn.GELU()
self.pwconv2 = nn.Linear(intermediate_dim, dim)
self.gamma = (
nn.Parameter(layer_scale_init_value * torch.ones(dim), requires_grad=True)
if layer_scale_init_value > 0
else None
)
self.is_causal = is_causal
def forward(
self,
x: torch.Tensor,
scale_shift: tuple[torch.Tensor, torch.Tensor] | None = None,
gate: torch.Tensor | None = None,
) -> torch.Tensor:
residual = x
if self.is_causal:
x = torch.nn.functional.pad(x, (6, 0))
x = self.dwconv(x)
x = x.transpose(1, 2) # (B, C, T) -> (B, T, C)
x = self.norm(x)
if scale_shift is not None:
scale, shift = scale_shift
x = x * scale[:, None] + shift[:, None]
x = self.pwconv1(x)
x = self.act(x)
x = self.pwconv2(x)
if self.gamma is not None:
x = self.gamma * x
if gate is not None:
x = gate[:, None] * x
x = x.transpose(1, 2) # (B, T, C) -> (B, C, T)
x = residual + x
return x
class ConvNextNet(nn.Module):
def __init__(self, n_layers, dim, intermediate_dim: int | None = None):
super().__init__()
self.net = nn.Sequential(
*[
ConvNeXtBlock(
dim,
intermediate_dim,
)
for _ in range(n_layers)
]
)
def forward(self, x):
return self.net(x)
class ConvNextPatchEncoder(nn.Module):
def __init__(
self,
patch_sizes: list[int],
n_layers_per_patch: int,
patch_expansion_factor: float = 1.5,
is_decoder: bool = False,
):
super().__init__()
patch_to_dim = []
convnext = []
for i, patch_size in enumerate(patch_sizes):
in_dim = int((patch_expansion_factor if i > 0 else 1.0) * patch_size)
out_dim = int(patch_expansion_factor * patch_size)
if is_decoder:
in_dim, out_dim = out_dim, in_dim
patch_to_dim.append(
nn.Linear(
in_dim,
out_dim,
)
)
convnext += [
nn.Sequential(
*[
ConvNeXtBlock(int(patch_size * patch_expansion_factor))
for _ in range(n_layers_per_patch)
]
)
]
self.is_decoder = is_decoder
self.patch_sizes = patch_sizes
self.patch_expansion_factor = patch_expansion_factor
self.patch_to_dim = nn.ModuleList(patch_to_dim)
self.convnext = nn.ModuleList(convnext)
def forward(self, x):
if self.is_decoder:
for i, patch_size in reversed(list(enumerate(self.patch_sizes))):
B, P, N = x.shape
patch_expansion_factor_maybe = (
self.patch_expansion_factor if i > 0 else 1.0
)
x = x.reshape(B, int(patch_size * self.patch_expansion_factor), -1)
x = self.convnext[i](x)
x = self.patch_to_dim[i](x.transpose(1, 2)).transpose(1, 2)
else:
for i, patch_size in enumerate(self.patch_sizes):
B, P, N = x.shape
patch_expansion_factor_maybe = (
self.patch_expansion_factor if i > 0 else 1.0
)
x = x.reshape(B, int(patch_size * patch_expansion_factor_maybe), -1)
x = self.patch_to_dim[i](x.transpose(1, 2)).transpose(1, 2)
x = self.convnext[i](x)
return x
class ConvNextEncoder(nn.Module):
def __init__(
self,
in_dim: int,
dim: int,
n_layers: int,
intermediate_dim: int | None = None,
stride: int = 1,
):
super().__init__()
self.in_proj = nn.Linear(in_dim, dim)
if stride > 1:
self.stride = nn.Conv1d(
in_channels=dim,
out_channels=dim,
kernel_size=(stride * 2) + 1,
stride=stride,
padding=stride // 2,
)
else:
self.stride = nn.Identity()
self.net = ConvNextNet(n_layers, dim, intermediate_dim)
def forward(self, x):
x = self.in_proj(x.transpose(1, 2)).transpose(1, 2)
x = self.stride(x)
return self.net(x)
class ConvNextDecoder(nn.Module):
def __init__(
self,
out_dim: int,
dim: int,
n_layers: int,
intermediate_dim: int | None = None,
stride: int = 1,
stride_position: str = "before",
):
super().__init__()
self.out_proj = nn.Linear(dim, out_dim)
if stride > 1:
self.stride = nn.ConvTranspose1d(
in_channels=dim,
out_channels=dim,
kernel_size=(stride * 2) + 1,
stride=stride,
padding=stride // 2,
output_padding=stride // 2,
)
else:
self.stride = nn.Identity()
self.stride_position = stride_position
self.net = ConvNextNet(n_layers, dim, intermediate_dim)
def forward(self, x):
if self.stride_position == "before":
x = self.stride(x)
x = self.net(x)
if self.stride_position == "after":
x = self.stride(x)
return self.out_proj(x.transpose(1, 2)).transpose(1, 2)
class SwiGLU(nn.Module):
def __init__(self, d_model: int, ffn_expansion_factor: int = 4):
super().__init__()
self.p_in = nn.Linear(d_model, (d_model * ffn_expansion_factor // 3) * 2)
self.p_out = nn.Linear(d_model * ffn_expansion_factor // 3, d_model)
def forward(self, x):
gate, x = self.p_in(x).chunk(2, dim=-1)
return self.p_out(nn.functional.silu(gate) * x)