Source code for cuvis.Worker

from ._cuvis_il import cuvis_il
from .Measurement import Measurement
from .Viewer import Viewer, ImageData
from .cuvis_aux import SDKException, WorkerState
from .AcquisitionContext import AcquisitionContext
from .ProcessingContext import ProcessingContext
from .Export import Exporter
from .SessionFile import SessionFile
from .Measurement import Measurement
from .FileWriteSettings import WorkerSettings

import asyncio as a
from .doc import copydoc

from dataclasses import dataclass
from typing import Callable, Awaitable


[docs] @dataclass class WorkerResult: mesu: Measurement view: ImageData
[docs] class Worker(object): def __init__(self, args: WorkerSettings): self._exporter_set = False self._acquisition_set = False self._processing_set = False self._viewer_set = False self._session_file_set = False self._worker_poll_task = None self._handle = None _ptr = cuvis_il.new_p_int() settings = args._get_internal() if cuvis_il.status_ok != cuvis_il.cuvis_worker_create(_ptr, settings): raise SDKException() self._handle = cuvis_il.p_int_value(_ptr) pass
[docs] @copydoc(cuvis_il.cuvis_worker_set_acq_cont) def set_acquisition_context(self, base: AcquisitionContext = None) -> None: if base is not None: if cuvis_il.status_ok != cuvis_il.cuvis_worker_set_acq_cont( self._handle, base._handle): raise SDKException() self._acquisition_set = True else: if cuvis_il.status_ok != cuvis_il.cuvis_worker_set_acq_cont( self._handle, 0): raise SDKException() self._acquisition_set = False pass
[docs] @copydoc(cuvis_il.cuvis_worker_set_proc_cont) def set_processing_context(self, base: ProcessingContext = None) -> None: if base is not None: if cuvis_il.status_ok != cuvis_il.cuvis_worker_set_proc_cont( self._handle, base._handle): raise SDKException() self._processing_set = True else: if cuvis_il.status_ok != cuvis_il.cuvis_worker_set_proc_cont( self._handle, 0): raise SDKException() self._processing_set = False pass
[docs] @copydoc(cuvis_il.cuvis_worker_set_exporter) def set_exporter(self, base: Exporter = None) -> None: if base is not None: if cuvis_il.status_ok != cuvis_il.cuvis_worker_set_exporter( self._handle, base._handle): raise SDKException() self._exporter_set = True else: if cuvis_il.status_ok != cuvis_il.cuvis_worker_set_exporter( self._handle, 0): raise SDKException() self._exporter_set = False pass
[docs] @copydoc(cuvis_il.cuvis_worker_set_viewer) def set_viewer(self, base: Viewer = None) -> None: if base is not None: if cuvis_il.status_ok != cuvis_il.cuvis_worker_set_viewer( self._handle, base._handle): raise SDKException() self._viewer_set = True else: if cuvis_il.status_ok != cuvis_il.cuvis_worker_set_viewer( self._handle, 0): raise SDKException() self._viewer_set = False pass
[docs] @copydoc(cuvis_il.cuvis_worker_ingest_session_file) def ingest_session_file(self, session: SessionFile, frame_selection: str = 'all') -> None: if cuvis_il.status_ok != cuvis_il.cuvis_worker_ingest_session_file( self._handle, session._handle, frame_selection): raise SDKException()
[docs] @copydoc(cuvis_il.cuvis_worker_ingest_mesu) def ingest_mesu(self, mesu: Measurement) -> None: if cuvis_il.status_ok != cuvis_il.cuvis_worker_ingest_mesu( self._handle, mesu._handle): raise SDKException() pass
@property @copydoc(cuvis_il.cuvis_worker_query_session_progress) def query_session_progress(self) -> float: val = cuvis_il.new_p_double() if cuvis_il.status_ok != \ cuvis_il.cuvis_worker_query_session_progress(self._handle, val): raise SDKException() return cuvis_il.p_double_value(val)
[docs] @copydoc(cuvis_il.cuvis_worker_has_next_result) def has_next_result(self) -> bool: val = cuvis_il.new_p_int() if cuvis_il.status_ok != cuvis_il.cuvis_worker_has_next_result( self._handle, val): raise SDKException() return cuvis_il.p_int_value(val) != 0
[docs] @copydoc(cuvis_il.cuvis_worker_get_next_result) def get_next_result(self, timeout) -> WorkerResult: ptr_mesu = cuvis_il.new_p_int() ptr_view = cuvis_il.new_p_int() if cuvis_il.status_ok != cuvis_il.cuvis_worker_get_next_result( self._handle, ptr_mesu, ptr_view, timeout): raise SDKException() mesu = Measurement(cuvis_il.p_int_value(ptr_mesu)) if self._viewer_set: view = Viewer._create_view_data(None, cuvis_il.p_int_value(ptr_view)) else: view = None return WorkerResult(mesu, view)
[docs] async def get_next_result_async(self, timeout: int) -> WorkerResult: poll_intervall = 100 ptr_mesu = cuvis_il.new_p_int() ptr_view = cuvis_il.new_p_int() tries = 0 while tries * poll_intervall < timeout: if self.has_next_result(): await a.sleep(0) if cuvis_il.status_ok != cuvis_il.cuvis_worker_get_next_result( self._handle, ptr_mesu, ptr_view, 100): raise SDKException() break else: tries += 1 await a.sleep(poll_intervall / 1000) mesu = Measurement(cuvis_il.p_int_value(ptr_mesu)) if self._viewer_set: view = Viewer._create_view_data(None, cuvis_il.p_int_value(ptr_view)) else: view = None return WorkerResult(mesu, view)
@property @copydoc(cuvis_il.cuvis_worker_get_input_queue_limit) def input_queue_limit(self) -> int: val = cuvis_il.new_p_ulong() if cuvis_il.status_ok != cuvis_il.cuvis_worker_get_input_queue_limit( self._handle, val): raise SDKException() return cuvis_il.p_ulong_value(val) @property @copydoc(cuvis_il.cuvis_worker_get_mandatory_queue_limit) def mandatory_queue_limit(self) -> int: val = cuvis_il.new_p_ulong() if cuvis_il.status_ok != cuvis_il.cuvis_worker_get_mandatory_queue_limit( self._handle, val): raise SDKException() return cuvis_il.p_ulong_value(val) @property @copydoc(cuvis_il.cuvis_worker_get_supplementary_queue_limit) def supplementary_queue_limit(self) -> int: val = cuvis_il.new_p_ulong() if cuvis_il.status_ok != cuvis_il.cuvis_worker_get_supplementary_queue_limit( self._handle, val): raise SDKException() return cuvis_il.p_ulong_value(val) @property @copydoc(cuvis_il.cuvis_worker_get_output_queue_limit) def output_queue_limit(self) -> int: val = cuvis_il.new_p_ulong() if cuvis_il.status_ok != cuvis_il.cuvis_worker_get_output_queue_limit( self._handle, val): raise SDKException() return cuvis_il.p_ulong_value(val) @property @copydoc(cuvis_il.cuvis_worker_get_queue_used) def queue_used(self) -> int: val = cuvis_il.new_p_int() if cuvis_il.status_ok != cuvis_il.cuvis_worker_get_queue_used( self._handle, val): raise SDKException() return cuvis_il.p_int_value(val) @property @copydoc(cuvis_il.cuvis_worker_get_can_drop_results) def can_drop_results(self) -> bool: val = cuvis_il.new_p_int() if cuvis_il.status_ok != cuvis_il.cuvis_worker_get_can_drop_results( self._handle, val): raise SDKException() return bool(cuvis_il.p_int_value(val)) @property @copydoc(cuvis_il.cuvis_worker_get_can_skip_measurements) def can_skip_measurements(self) -> bool: val = cuvis_il.new_p_int() if cuvis_il.status_ok != cuvis_il.cuvis_worker_get_can_skip_measurements( self._handle, val): raise SDKException() return bool(cuvis_il.p_int_value(val)) @property @copydoc(cuvis_il.cuvis_worker_get_can_skip_supplementary) def can_skip_supplementary(self) -> bool: val = cuvis_il.new_p_int() if cuvis_il.status_ok != cuvis_il.cuvis_worker_get_can_skip_supplementary( self._handle, val): raise SDKException() return bool(cuvis_il.p_int_value(val)) @property @copydoc(cuvis_il.cuvis_worker_is_processing_mandatory) def is_processing_mandatory(self) -> bool: val = cuvis_il.new_p_int() if cuvis_il.status_ok != cuvis_il.cuvis_worker_is_processing_mandatory( self._handle, val): raise SDKException() return bool(cuvis_il.p_int_value(val)) @property @copydoc(cuvis_il.cuvis_worker_is_processing) def is_processing(self) -> bool: val = cuvis_il.new_p_int() if cuvis_il.status_ok != cuvis_il.cuvis_worker_is_processing( self._handle, val): raise SDKException() return bool(cuvis_il.p_int_value(val)) @property @copydoc(cuvis_il.cuvis_worker_get_threads_busy) def threads_busy(self) -> int: val = cuvis_il.new_p_int() if cuvis_il.status_ok != cuvis_il.cuvis_worker_get_threads_busy( self._handle, val): raise SDKException() return cuvis_il.p_int_value(val) @property @copydoc(cuvis_il.cuvis_worker_get_state) def state(self) -> WorkerState: val = cuvis_il.cuvis_worker_state_t() if cuvis_il.status_ok != cuvis_il.cuvis_worker_get_state( self._handle, val): raise SDKException() return WorkerState._from_internal(val)
[docs] @copydoc(cuvis_il.cuvis_worker_start) def start_processing(self) -> None: if cuvis_il.status_ok != cuvis_il.cuvis_worker_start( self._handle): raise SDKException()
[docs] @copydoc(cuvis_il.cuvis_worker_stop) def stop_processing(self) -> None: if cuvis_il.status_ok != cuvis_il.cuvis_worker_stop( self._handle): raise SDKException()
[docs] @copydoc(cuvis_il.cuvis_worker_drop_all_queued) def drop_all_queued(self) -> None: if cuvis_il.status_ok != cuvis_il.cuvis_worker_drop_all_queued( self._handle): raise SDKException()
[docs] def register_worker_callback(self, callback: Callable[[WorkerResult], Awaitable[None]]) -> None: self.reset_worker_callback() poll_time = 0.001 async def _internal_worker_loop(): while True: if self.has_next_result(): workerContainer = await self.get_next_result_async(1000) task = a.create_task(callback(workerContainer)) # TODO limit number of created task objects like in the cpp wrapper else: await a.sleep(poll_time) self._worker_poll_task = a.create_task(_internal_worker_loop())
[docs] def reset_worker_callback(self) -> None: if self._worker_poll_task is not None: self._worker_poll_task.cancel() self._worker_poll_task = None
def __del__(self): self.reset_worker_callback() _ptr = cuvis_il.new_p_int() cuvis_il.p_int_assign(_ptr, self._handle) cuvis_il.cuvis_worker_free(_ptr) self._handle = cuvis_il.p_int_value(_ptr) def __deepcopy__(self, memo): '''This functions is not permitted due to the class only keeping a handle, that is managed by the cuvis sdk.''' raise TypeError('Deep copying is not supported for Worker') def __copy__(self): '''This functions is not permitted due to the class only keeping a handle, that is managed by the cuvis sdk.''' raise TypeError('Shallow copying is not supported for Worker')