| | from time import perf_counter |
| | import logging |
| | from rich.console import Console |
| |
|
| | logger = logging.getLogger(__name__) |
| | console = Console() |
| |
|
| |
|
| | class BaseHandler: |
| | """ |
| | Base class for pipeline parts. Each part of the pipeline has an input and an output queue. |
| | The `setup` method along with `setup_args` and `setup_kwargs` can be used to address the specific requirements of the implemented pipeline part. |
| | To stop a handler properly, set the stop_event and, to avoid queue deadlocks, place b"END" in the input queue. |
| | Objects placed in the input queue will be processed by the `process` method, and the yielded results will be placed in the output queue. |
| | The cleanup method handles stopping the handler, and b"END" is placed in the output queue. |
| | """ |
| |
|
| | def __init__(self, stop_event, queue_in, queue_out, setup_args=(), setup_kwargs={}): |
| | self.stop_event = stop_event |
| | self.queue_in = queue_in |
| | self.queue_out = queue_out |
| | self.setup(*setup_args, **setup_kwargs) |
| | self._times = [] |
| |
|
| | def setup(self): |
| | pass |
| |
|
| | def process(self): |
| | raise NotImplementedError |
| |
|
| | def run(self): |
| | while not self.stop_event.is_set(): |
| | input = self.queue_in.get() |
| | if isinstance(input, bytes) and input == b"END": |
| | |
| | logger.debug("Stopping thread") |
| | break |
| | start_time = perf_counter() |
| | for output in self.process(input): |
| | self._times.append(perf_counter() - start_time) |
| | if self.last_time > self.min_time_to_debug: |
| | console.print(f"{self.__class__.__name__}: {self.last_time: .3f} s") |
| | logger.debug(f"{self.__class__.__name__}: {self.last_time: .3f} s") |
| | self.queue_out.put(output) |
| | start_time = perf_counter() |
| |
|
| | self.cleanup() |
| | self.queue_out.put(b"END") |
| |
|
| | @property |
| | def last_time(self): |
| | return self._times[-1] |
| | |
| | @property |
| | def min_time_to_debug(self): |
| | return 0.001 |
| |
|
| | def cleanup(self): |
| | pass |
| |
|