|
|
| from dataclasses import dataclass, field |
| from multiprocessing import Process, Queue |
| from multiprocessing import Event |
| from logging import getLogger |
|
|
| logger = getLogger(__name__) |
|
|
|
|
| @dataclass |
| class Segment: |
| t0: int |
| t1: int |
| text: str |
|
|
| @dataclass |
| class MetaItem: |
| segments: list[Segment] = field(default_factory=list) |
| source_audio: bytes = b"" |
| audio: bytes = b'' |
| transcribe_content: str = '' |
| translate_content: str = '' |
| source_language: str = 'zh' |
| destination_language: str = 'en' |
| speech_status: str = 'END' |
| |
|
|
| class BasePipe(Process): |
| def __init__(self, in_queue=None, out_queue=None) -> None: |
| super().__init__() |
| self._in_queue = in_queue if in_queue else Queue() |
| self._out_queue = out_queue if out_queue else Queue() |
| self._ready = Event() |
|
|
| def set_ready(self): |
| self._ready.set() |
| |
| def is_ready(self): |
| return self._ready.is_set() |
| |
| def wait(self): |
| self._ready.wait() |
|
|
| @property |
| def output_queue(self): |
| return self._out_queue |
| |
| @property |
| def input_queue(self): |
| return self._in_queue |
| |
| def process(self, in_data: MetaItem) -> MetaItem: |
| raise NotImplementedError("Subclasses should implement this method.") |
|
|
|
|
| @classmethod |
| def init(cls): |
| raise NotImplementedError |
|
|
| def run(self): |
| logger.info(f"start initial {self.__class__.__name__}") |
| self.init() |
| logger.info(f"finish initial {self.__class__.__name__}") |
| self.set_ready() |
| while True: |
| item = self.input_queue.get() |
| if item is None: |
| break |
| out_item = self.process(item) |
| if out_item: |
| self.output_queue.put(out_item) |
|
|