Source code for ewoksxrpd.tasks.worker
import logging
from collections import OrderedDict
from contextlib import contextmanager
from typing import Dict
from typing import Iterable
from typing import Mapping
import pyFAI
import pyFAI.worker
from ewokscore.hashing import uhash
from packaging.version import Version
from .. import pyfai_api
from .utils import pyfai_utils
_WORKER_POOL = None
logger = logging.getLogger(__name__)
[docs]
class WorkerPool:
"""Pool with one worker per configuration up to a maximum number of workers."""
def __init__(self, nworkers: int = 1) -> None:
self._workers: Dict[int, pyFAI.worker.Worker] = OrderedDict()
self.nworkers = nworkers
@staticmethod
def _worker_id(*args):
return uhash(args)
@property
def nworkers(self):
return self._nworkers
@nworkers.setter
def nworkers(self, value: int):
self._nworkers = value
self._check_pool_size()
def _check_pool_size(self):
while self._workers and len(self._workers) > self.nworkers:
self._workers.popitem(last=False)
[docs]
@contextmanager
def worker(
self, ewoks_pyfai_options: Mapping, demo: bool = False
) -> Iterable[pyFAI.worker.Worker]:
# TODO: deal with threads and subprocesses
worker_options, integration_options = (
pyfai_utils.split_worker_and_integration_options(ewoks_pyfai_options)
)
logger.info("Pyfai worker options: %s", worker_options)
logger.info("Pyfai integration options: %s", integration_options)
worker_id = self._worker_id(worker_options, integration_options, demo)
worker = self._workers.pop(worker_id, None)
if worker is None:
logger.info("Creating a new pyfai worker")
worker = self._create_worker(worker_options, integration_options, demo)
self._workers[worker_id] = worker
self._check_pool_size()
logger.info("Pyfai integration method: %s", worker._method)
yield worker
@staticmethod
def _create_worker(
worker_options: Mapping, integration_options: Mapping, demo: bool
) -> pyFAI.worker.Worker:
if demo:
return DemoWorker(integration_options, worker_options)
return EwoksWorker(integration_options, worker_options)
def _get_global_pool() -> WorkerPool:
global _WORKER_POOL
if _WORKER_POOL is None:
_WORKER_POOL = WorkerPool()
return _WORKER_POOL
[docs]
def set_maximum_persistent_workers(nworkers: int) -> None:
pool = _get_global_pool()
pool.nworkers = nworkers
[docs]
class EwoksWorker(pyFAI.worker.Worker):
def __init__(self, integration_options: Mapping, worker_options: Mapping) -> None:
super().__init__(**worker_options)
self.output = "raw"
self._i = 0
integration_options, mask, flatfield, darkcurrent = (
pyfai_utils.extract_images_from_integration_options(integration_options)
)
provided = set(integration_options)
self.set_config(integration_options, consume_keys=True)
unused = {k: v for k, v in integration_options.items() if k in provided}
if unused:
logger.warning("Unused pyFAI integration options: %s", unused)
else:
logger.info("All pyFAI integration options were used")
# Flat/dark correction:
# Icor = (I - darkcurrent) / flatfield
if mask is not None:
self.ai.detector.set_mask(mask)
if flatfield is not None:
self.ai.detector.set_flatfield(flatfield)
if darkcurrent is not None:
self.ai.detector.set_darkcurrent(darkcurrent)
[docs]
def set_config(self, integration_options, consume_keys=False):
if pyfai_api.PYFAI_VERSION < Version("2025.1.0"):
# In older pyFAI versions, the "method" is required
# when upgrading version 2 to 3.
version = integration_options.get("version", 1)
if version == 2:
if self.integrator_name and "clip" in self.integrator_name:
method = "csr"
else:
method = ""
integration_options["method"] = method
super().set_config(integration_options, consume_keys=consume_keys)
[docs]
class DemoWorker(EwoksWorker):
[docs]
def process(self, data, *args, **kwargs):
return super().process(data[:-1, :-1], *args, **kwargs)
[docs]
@contextmanager
def persistent_worker(
ewoks_pyfai_options: Mapping, demo: bool = False
) -> Iterable[pyFAI.worker.Worker]:
"""Get a worker for a particular configuration that stays in memory."""
pool = _get_global_pool()
with pool.worker(ewoks_pyfai_options, demo) as worker:
yield worker