import logging
from collections import OrderedDict
from contextlib import contextmanager
from typing import Dict
from typing import Iterable
from typing import Mapping
import pyFAI
import pyFAI.worker
try:
from pyFAI.worker import WorkerFiber
except ImportError:
WorkerFiber = pyFAI.worker.Worker
from ewokscore.hashing import uhash
from packaging.version import Version
from .. import pyfai_api
from .utils import pyfai_utils
_WORKER_POOL = None
logger = logging.getLogger(__name__)
[docs]
class WorkerPool:
"""Pool with one worker per configuration up to a maximum number of workers."""
def __init__(self, nworkers: int = 1) -> None:
self._workers: Dict[int, pyFAI.worker.Worker] = OrderedDict()
self.nworkers = nworkers
@staticmethod
def _worker_id(*args):
return uhash(args)
@property
def nworkers(self):
return self._nworkers
@nworkers.setter
def nworkers(self, value: int):
self._nworkers = value
self._check_pool_size()
def _check_pool_size(self):
while self._workers and len(self._workers) > self.nworkers:
self._workers.popitem(last=False)
[docs]
@contextmanager
def worker(
self, ewoks_pyfai_options: Mapping, demo: bool = False
) -> Iterable[pyFAI.worker.Worker]:
# TODO: deal with threads and subprocesses
worker_options, integration_options = (
pyfai_utils.split_worker_and_integration_options(ewoks_pyfai_options)
)
logger.info("Pyfai worker options: %s", worker_options)
logger.info("Pyfai integration options: %s", integration_options)
worker_id = self._worker_id(worker_options, integration_options, demo)
worker = self._workers.pop(worker_id, None)
if worker is None:
logger.info("Creating a new pyfai worker")
worker = self._create_worker(worker_options, integration_options, demo)
self._workers[worker_id] = worker
self._check_pool_size()
logger.info("Pyfai integration method: %s", worker._method)
yield worker
@staticmethod
def _create_worker(
worker_options: Mapping, integration_options: Mapping, demo: bool
) -> pyFAI.worker.Worker:
if demo:
return DemoWorker(integration_options, worker_options)
integrator_class = integration_options.get(
"integrator_class", "AzimuthalIntegrator"
)
if integrator_class == "AzimuthalIntegrator":
return EwoksWorker(integration_options, worker_options)
elif integrator_class == "FiberIntegrator":
if WorkerFiber is not None:
return EwoksWorkerFiber(integration_options, worker_options)
else:
raise RuntimeError(
f"WorkerFiber is not available for PyFAI {pyfai_api.PYFAI_VERSION}. Needed at least 2025.12"
)
else:
raise TypeError(
f"{integrator_class} is not a valid integrator class for PyFAI"
)
def _get_global_pool() -> WorkerPool:
global _WORKER_POOL
if _WORKER_POOL is None:
_WORKER_POOL = WorkerPool()
return _WORKER_POOL
[docs]
def set_maximum_persistent_workers(nworkers: int) -> None:
pool = _get_global_pool()
pool.nworkers = nworkers
[docs]
class EwoksWorker(pyFAI.worker.Worker):
def __init__(self, integration_options: Mapping, worker_options: Mapping) -> None:
super().__init__(**worker_options)
self.output = "raw"
self._i = 0
integration_options, mask, flatfield, darkcurrent = (
pyfai_utils.extract_images_from_integration_options(integration_options)
)
provided = set(integration_options)
self.set_config(integration_options, consume_keys=True)
unused = {k: v for k, v in integration_options.items() if k in provided}
if unused:
logger.warning("Unused pyFAI integration options: %s", unused)
else:
logger.info("All pyFAI integration options were used")
# Flat/dark correction:
# Icor = (I - darkcurrent) / flatfield
if mask is not None:
if pyfai_api.USE_PROPERTY_API:
self.ai.detector.mask = mask
else:
self.ai.detector.set_mask(mask)
if flatfield is not None:
if pyfai_api.USE_PROPERTY_API:
self.ai.detector.flatfield = flatfield
else:
self.ai.detector.set_flatfield(flatfield)
if darkcurrent is not None:
if pyfai_api.USE_PROPERTY_API:
self.ai.detector.darkcurrent = darkcurrent
else:
self.ai.detector.set_darkcurrent(darkcurrent)
[docs]
def set_config(self, integration_options, consume_keys=False):
if pyfai_api.PYFAI_VERSION < Version("2025.1.0"):
# In older pyFAI versions, the "method" is required
# when upgrading version 2 to 3.
version = integration_options.get("version", 1)
if version == 2:
if self.integrator_name and "clip" in self.integrator_name:
method = "csr"
else:
method = ""
integration_options["method"] = method
super().set_config(integration_options, consume_keys=consume_keys)
[docs]
class EwoksWorkerFiber(WorkerFiber):
def __init__(self, integration_options: Mapping, worker_options: Mapping) -> None:
if pyfai_api.PYFAI_VERSION < Version("2025.12"):
raise RuntimeError(
f"To use WorkerFiber, PyFAI version ({pyfai_api.PYFAI_VERSION}) should be at least 2025.12"
)
super().__init__(**worker_options)
self.output = "raw"
self._i = 0
integration_options, mask, flatfield, darkcurrent = (
pyfai_utils.extract_images_from_integration_options(integration_options)
)
provided = set(integration_options)
self.set_config(integration_options, consume_keys=True)
unused = {k: v for k, v in integration_options.items() if k in provided}
if unused:
logger.warning("Unused pyFAI integration options: %s", unused)
else:
logger.info("All pyFAI integration options were used")
if mask is not None:
if pyfai_api.USE_PROPERTY_API:
self.ai.detector.mask = mask
else:
self.ai.detector.set_mask(mask)
if flatfield is not None:
if pyfai_api.USE_PROPERTY_API:
self.ai.detector.flatfield = flatfield
else:
self.ai.detector.set_flatfield(flatfield)
if darkcurrent is not None:
if pyfai_api.USE_PROPERTY_API:
self.ai.detector.darkcurrent = darkcurrent
else:
self.ai.detector.set_darkcurrent(darkcurrent)
[docs]
class DemoWorker(EwoksWorker):
[docs]
def process(self, data, *args, **kwargs):
return super().process(data[:-1, :-1], *args, **kwargs)
[docs]
@contextmanager
def persistent_worker(
ewoks_pyfai_options: Mapping, demo: bool = False
) -> Iterable[pyFAI.worker.Worker]:
"""Get a worker for a particular configuration that stays in memory."""
pool = _get_global_pool()
with pool.worker(ewoks_pyfai_options, demo) as worker:
yield worker