Source code for ewoksxrpd.tasks.worker

import logging
from collections import OrderedDict
from contextlib import contextmanager
from typing import Dict
from typing import Iterable
from typing import Mapping

import pyFAI
import pyFAI.worker

try:
    from pyFAI.worker import WorkerFiber
except ImportError:
    WorkerFiber = pyFAI.worker.Worker

from ewokscore.hashing import uhash
from packaging.version import Version

from .. import pyfai_api
from .utils import pyfai_utils

_WORKER_POOL = None


logger = logging.getLogger(__name__)


[docs] class WorkerPool: """Pool with one worker per configuration up to a maximum number of workers.""" def __init__(self, nworkers: int = 1) -> None: self._workers: Dict[int, pyFAI.worker.Worker] = OrderedDict() self.nworkers = nworkers @staticmethod def _worker_id(*args): return uhash(args) @property def nworkers(self): return self._nworkers @nworkers.setter def nworkers(self, value: int): self._nworkers = value self._check_pool_size() def _check_pool_size(self): while self._workers and len(self._workers) > self.nworkers: self._workers.popitem(last=False)
[docs] @contextmanager def worker( self, ewoks_pyfai_options: Mapping, demo: bool = False ) -> Iterable[pyFAI.worker.Worker]: # TODO: deal with threads and subprocesses worker_options, integration_options = ( pyfai_utils.split_worker_and_integration_options(ewoks_pyfai_options) ) logger.info("Pyfai worker options: %s", worker_options) logger.info("Pyfai integration options: %s", integration_options) worker_id = self._worker_id(worker_options, integration_options, demo) worker = self._workers.pop(worker_id, None) if worker is None: logger.info("Creating a new pyfai worker") worker = self._create_worker(worker_options, integration_options, demo) self._workers[worker_id] = worker self._check_pool_size() logger.info("Pyfai integration method: %s", worker._method) yield worker
@staticmethod def _create_worker( worker_options: Mapping, integration_options: Mapping, demo: bool ) -> pyFAI.worker.Worker: if demo: return DemoWorker(integration_options, worker_options) integrator_class = integration_options.get( "integrator_class", "AzimuthalIntegrator" ) if integrator_class == "AzimuthalIntegrator": return EwoksWorker(integration_options, worker_options) elif integrator_class == "FiberIntegrator": if WorkerFiber is not None: return EwoksWorkerFiber(integration_options, worker_options) else: raise RuntimeError( f"WorkerFiber is not available for PyFAI {pyfai_api.PYFAI_VERSION}. Needed at least 2025.12" ) else: raise TypeError( f"{integrator_class} is not a valid integrator class for PyFAI" )
def _get_global_pool() -> WorkerPool: global _WORKER_POOL if _WORKER_POOL is None: _WORKER_POOL = WorkerPool() return _WORKER_POOL
[docs] def set_maximum_persistent_workers(nworkers: int) -> None: pool = _get_global_pool() pool.nworkers = nworkers
[docs] class EwoksWorker(pyFAI.worker.Worker): def __init__(self, integration_options: Mapping, worker_options: Mapping) -> None: super().__init__(**worker_options) self.output = "raw" self._i = 0 integration_options, mask, flatfield, darkcurrent = ( pyfai_utils.extract_images_from_integration_options(integration_options) ) provided = set(integration_options) self.set_config(integration_options, consume_keys=True) unused = {k: v for k, v in integration_options.items() if k in provided} if unused: logger.warning("Unused pyFAI integration options: %s", unused) else: logger.info("All pyFAI integration options were used") # Flat/dark correction: # Icor = (I - darkcurrent) / flatfield if mask is not None: if pyfai_api.USE_PROPERTY_API: self.ai.detector.mask = mask else: self.ai.detector.set_mask(mask) if flatfield is not None: if pyfai_api.USE_PROPERTY_API: self.ai.detector.flatfield = flatfield else: self.ai.detector.set_flatfield(flatfield) if darkcurrent is not None: if pyfai_api.USE_PROPERTY_API: self.ai.detector.darkcurrent = darkcurrent else: self.ai.detector.set_darkcurrent(darkcurrent)
[docs] def set_config(self, integration_options, consume_keys=False): if pyfai_api.PYFAI_VERSION < Version("2025.1.0"): # In older pyFAI versions, the "method" is required # when upgrading version 2 to 3. version = integration_options.get("version", 1) if version == 2: if self.integrator_name and "clip" in self.integrator_name: method = "csr" else: method = "" integration_options["method"] = method super().set_config(integration_options, consume_keys=consume_keys)
[docs] class EwoksWorkerFiber(WorkerFiber): def __init__(self, integration_options: Mapping, worker_options: Mapping) -> None: if pyfai_api.PYFAI_VERSION < Version("2025.12"): raise RuntimeError( f"To use WorkerFiber, PyFAI version ({pyfai_api.PYFAI_VERSION}) should be at least 2025.12" ) super().__init__(**worker_options) self.output = "raw" self._i = 0 integration_options, mask, flatfield, darkcurrent = ( pyfai_utils.extract_images_from_integration_options(integration_options) ) provided = set(integration_options) self.set_config(integration_options, consume_keys=True) unused = {k: v for k, v in integration_options.items() if k in provided} if unused: logger.warning("Unused pyFAI integration options: %s", unused) else: logger.info("All pyFAI integration options were used") if mask is not None: if pyfai_api.USE_PROPERTY_API: self.ai.detector.mask = mask else: self.ai.detector.set_mask(mask) if flatfield is not None: if pyfai_api.USE_PROPERTY_API: self.ai.detector.flatfield = flatfield else: self.ai.detector.set_flatfield(flatfield) if darkcurrent is not None: if pyfai_api.USE_PROPERTY_API: self.ai.detector.darkcurrent = darkcurrent else: self.ai.detector.set_darkcurrent(darkcurrent)
[docs] class DemoWorker(EwoksWorker):
[docs] def process(self, data, *args, **kwargs): return super().process(data[:-1, :-1], *args, **kwargs)
[docs] @contextmanager def persistent_worker( ewoks_pyfai_options: Mapping, demo: bool = False ) -> Iterable[pyFAI.worker.Worker]: """Get a worker for a particular configuration that stays in memory.""" pool = _get_global_pool() with pool.worker(ewoks_pyfai_options, demo) as worker: yield worker