from __future__ import annotations
import logging
import warnings
from numbers import Number
from typing import Dict
from typing import List
from typing import Optional
from typing import Sequence
from typing import Set
from typing import Tuple
from typing import Union
import pyFAI.calibrant
import pyFAI.detectors
import pyFAI.geometry
import pyFAI.geometryRefinement
import pyFAI.goniometer
from .. import pyfai_api
from .data_access import TaskWithDataAccess
from .utils import calibrate_utils
from .utils import data_utils
from .utils import xrpd_utils
__all__ = ["CalibrateSingle", "CalibrateMulti", "CalculateGeometry"]
logger = logging.getLogger(__name__)
[docs]
class CalibrateSingle(
TaskWithDataAccess,
input_names=["image", "detector", "calibrant", "geometry", "energy"],
optional_input_names=[
"detector_config",
"fixed",
"max_rings",
"robust",
"ring_detector",
],
output_names=["geometry", "energy", "detector", "detector_config", "rings", "chi2"],
):
"""Single distance and energy calibration. Accepts multiple values for max_rings for sequential refinement."""
[docs]
def run(self):
detector = data_utils.data_from_storage(self.inputs.detector)
detector_config = data_utils.data_from_storage(
self.get_input_value("detector_config", None)
)
detector_object = pyFAI.detectors.detector_factory(
detector, config=detector_config
)
calibrant = data_utils.data_from_storage(self.inputs.calibrant)
calibrant_object = pyFAI.calibrant.get_calibrant(calibrant)
geometry0 = data_utils.data_from_storage(self.inputs.geometry)
xrpd_utils.validate_geometry(geometry0)
wavelength0 = xrpd_utils.energy_wavelength(self.inputs.energy)
# Find diffraction rings and fit
geometry_refinement = self._refine(
dict(geometry0),
wavelength0,
calibrant_object,
detector_object,
)
# Parse results
results = geometry_refinement.get_config()
geometry = {
k: v for k, v in results.items() if k in xrpd_utils.GEOMETRY_PARAMETERS
}
energy = xrpd_utils.energy_wavelength(results["wavelength"])
rings = xrpd_utils.points_to_rings(geometry_refinement.data, calibrant_object)
self.outputs.geometry = geometry
self.outputs.energy = energy
self.outputs.detector = detector
self.outputs.detector_config = detector_config
self.outputs.rings = rings
if geometry_refinement.data.ndim != 2:
self.outputs.chi2 = float("nan")
else:
self.outputs.chi2 = geometry_refinement.chi2()
def _refine(
self,
geometry0: dict,
wavelength0: float,
calibrant: pyFAI.calibrant.Calibrant,
detector: pyFAI.detectors.Detector,
) -> pyFAI.geometryRefinement.GeometryRefinement:
"""Find diffraction rings and fit"""
parametrization = {"energy": ["wavelength"]}
always_fixed_params = calibrate_utils.parse_fixed(
self.inputs.fixed, parametrization
)
all_parameters = xrpd_utils.GEOMETRY_PARAMETERS | {"wavelength"}
if self.inputs.robust:
incremental_release_groups = [
{"dist"},
{"poni1", "poni2"},
{"rot1", "rot2", "rot3"},
{"wavelength"},
]
else:
incremental_release_groups = [set(all_parameters)]
# Initial fit parameters
geometry = geometry0
wavelength = wavelength0
max_rings_list = calibrate_utils.parse_max_rings(self.inputs.max_rings)
fixed_parameter_sets = _fixed_parameters_sets(
all_parameters, always_fixed_params, incremental_release_groups
)
n_rings = len(max_rings_list)
n_releases = len(fixed_parameter_sets)
for i, max_rings in enumerate(max_rings_list):
for j, fixed_parameters in enumerate(fixed_parameter_sets):
# Detect experimental rings
cps = self._extract_control_points(
geometry, calibrant, wavelength, detector, max_rings
)
# Fit parameters to detected rings
setup = pyFAI.goniometer.SingleGeometry(
"single_user_detector",
detector=detector,
geometry=geometry,
calibrant=calibrant,
control_points=cps,
)
if pyfai_api.USE_PROPERTY_API:
setup.wavelength = wavelength
else:
setup.set_wavelength(wavelength)
if cps is None:
logger.warning("No rings detected for calibration")
return setup.geometry_refinement
_refine_geometry(
setup.geometry_refinement, fixed_parameters=fixed_parameters
)
# Update parameters with fit result
results = setup.geometry_refinement.get_config()
geometry = {
k: v
for k, v in results.items()
if k in xrpd_utils.GEOMETRY_PARAMETERS
}
wavelength = results["wavelength"]
self.progress = (i * n_rings + j) / (n_releases * n_rings) * 100
return setup.geometry_refinement
def _extract_control_points(
self,
geometry: dict,
calibrant: pyFAI.calibrant.Calibrant,
wavelength: float,
detector: pyFAI.detectors.Detector,
max_rings: int | None,
) -> pyFAI.goniometer.ControlPoints | None:
ring_detector_name = self.get_input_value("ring_detector", None)
if ring_detector_name is None:
ring_detector = detector
else:
ring_detector = pyFAI.detectors.detector_factory(ring_detector_name)
setup = pyFAI.goniometer.SingleGeometry(
"single_ring_detector",
image=self.get_image(self.inputs.image),
detector=ring_detector,
geometry=geometry,
calibrant=calibrant,
)
if pyfai_api.USE_PROPERTY_API:
setup.wavelength = wavelength
else:
setup.set_wavelength(wavelength)
setup.extract_cp(max_rings=max_rings)
if setup.geometry_refinement.data.ndim != 2:
logger.warning("No rings detected for calibration")
return
return setup.control_points
[docs]
class CalibrateMulti(
TaskWithDataAccess,
input_names=[
"images",
"positions",
"detector",
"calibrant",
"geometry",
"energy",
],
optional_input_names=[
"reference_position",
"sample_position",
"positionunits_in_meter",
"detector_config",
"fixed",
"max_rings",
"robust",
],
output_names=[
"geometry",
"energy",
"detector",
"detector_config",
"rings",
"chi2",
"parametrization",
"parameters",
],
):
"""Single energy, multi distance calibration with 1 distance motor.
The `images` and `positions` are the diffraction patterns and the motor positions at which those were measured.
The `reference_position` is the detector position that corresponds to `geometry` (`positions[0]` by default).
The `sample_position` is the position where sample and detector would theoretically coincide (0 by default).
The output `energy` and `geometry` correspond to `sample_position`.
The units of `positions`, `sample_position` and `reference_position` are defined by `positionunits_in_meter`
(1e-3 by default which means the unit is in millimeter).
The parametrization allow deriving energy+geometry from the detector position.
.. code-block:: python
energy, geometry = parametrization(parameters, position)
"""
[docs]
def run(self):
if len(self.inputs.images) != len(self.inputs.positions):
raise ValueError("number of 'images' and 'positions' must be the same")
detector = data_utils.data_from_storage(self.inputs.detector)
detector_config = data_utils.data_from_storage(
self.get_input_value("detector_config", None)
)
detector_object = pyFAI.detectors.detector_factory(
detector, config=detector_config
)
calibrant = data_utils.data_from_storage(self.inputs.calibrant)
calibrant_object = pyFAI.calibrant.get_calibrant(calibrant)
energy0 = self.inputs.energy
wavelength0 = xrpd_utils.energy_wavelength(energy0)
geometry0 = data_utils.data_from_storage(self.inputs.geometry)
xrpd_utils.validate_geometry(geometry0)
# Parameterize the detector geometry
trans_function, initial_guess = self._reparametrization(geometry0, energy0)
# Define setup
setup = pyFAI.goniometer.GoniometerRefinement(
initial_guess,
pos_function=lambda x: x,
trans_function=trans_function,
detector=detector_object,
wavelength=wavelength0,
)
for i, (image, xi) in enumerate(zip(self.inputs.images, self.inputs.positions)):
label = f"position{i}"
image = self.get_image(image)
xi = self.get_data(xi)
setup.new_geometry(
label, image=image, metadata=xi, calibrant=calibrant_object
)
# Find diffraction rings and fit
self._refine(setup)
# Parse results
geometry, energy = self._get_fit_results(setup)
paramerization = self._get_parameterization(setup)
parameters = self._get_parameters(setup)
rings = self._get_rings(setup, calibrant_object)
chi2 = setup.chi2()
self.outputs.geometry = geometry
self.outputs.energy = energy
self.outputs.detector = detector
self.outputs.detector_config = detector_config
self.outputs.rings = rings
self.outputs.chi2 = chi2
self.outputs.parametrization = paramerization
self.outputs.parameters = parameters
def _reparametrization(
self, geometry0: Dict, energy0: float
) -> Tuple[pyFAI.goniometer.ExtendedTransformation, List[Number]]:
# xunits are the position units
# dunits are the distance units of the fit parameters
# aunits are the angular units of the fit parameters
dunits_per_m = 100 # dunits/m
aunits_per_rad = 10 # aunits/rad
m_per_xunits = self._get_position_units() # m/xunits
trans_function = pyFAI.goniometer.ExtendedTransformation(
param_names=[
"dist_offset", # dunits
"dist_scale", # dunits/xunits
"poni1_offset", # dunits
"poni1_scale", # dunits/xunits
"poni2_offset", # dunits
"poni2_scale", # dunits/xunits
"arot1", # aunits
"arot2", # aunits
"arot3", # aunits
"energy", # keV
],
pos_names=["x"], # xunits
dist_expr=f"dist_scale / {dunits_per_m} * x + dist_offset / {dunits_per_m}",
poni1_expr=f"poni1_scale / {dunits_per_m} * x + poni1_offset / {dunits_per_m}",
poni2_expr=f"poni2_scale / {dunits_per_m} * x + poni2_offset / {dunits_per_m}",
rot1_expr=f"arot1 / {aunits_per_rad}",
rot2_expr=f"arot2 / {aunits_per_rad}",
rot3_expr=f"arot3 / {aunits_per_rad}",
wavelength_expr="hc*1e-10/energy",
)
xref = self._get_reference_position() # xunits
dist_offset = self._get_sample_position() * m_per_xunits # m
dist_scale = (geometry0["dist"] - dist_offset) / xref # m/xunits
poni1_scale = 0.0 # m/xunits
poni1_offset = geometry0["poni1"] # m
poni2_scale = 0.0 # m/xunits
poni2_offset = geometry0["poni2"] # m
initial_guess = [
dist_offset * dunits_per_m,
dist_scale * dunits_per_m,
poni1_offset * dunits_per_m,
poni1_scale * dunits_per_m,
poni2_offset * dunits_per_m,
poni2_scale * dunits_per_m,
geometry0["rot1"] * aunits_per_rad,
geometry0["rot2"] * aunits_per_rad,
geometry0["rot3"] * aunits_per_rad,
energy0,
]
return trans_function, initial_guess
@property
def _max_rings(self):
max_rings = self.inputs.max_rings
if not max_rings:
return None
if max_rings < 0:
return None
return max_rings
def _refine(self, setup: pyFAI.goniometer.GoniometerRefinement) -> None:
"""Find diffraction rings and fit"""
parametrization = {
"dist": ["dist_offset", "dist_scale"],
"poni1": ["poni1_offset", "poni1_scale"],
"poni2": ["poni2_offset", "poni2_scale"],
"rot1": ["arot1"],
"rot2": ["arot2"],
"rot3": ["arot3"],
}
always_fixed_params = calibrate_utils.parse_fixed(
self.inputs.fixed, parametrization
)
all_parameters = set(setup.trans_function.param_names)
if self.inputs.robust:
incremental_release_groups = [
{"dist_offset", "dist_scale"},
{"poni1_offset", "poni2_offset"},
{"arot1", "arot2", "arot3"},
{"poni1_scale", "poni2_scale"},
{"energy"},
]
else:
incremental_release_groups = [set(all_parameters)]
fixed_parameter_sets = _fixed_parameters_sets(
all_parameters, always_fixed_params, incremental_release_groups
)
n_releases = len(fixed_parameter_sets)
for i, fixed_parameters in enumerate(fixed_parameter_sets, 1):
# Detect experimental rings
for setup_single_dist in setup.single_geometries.values():
setup_single_dist.extract_cp(max_rings=self._max_rings)
if setup_single_dist.geometry_refinement.data.ndim != 2:
logger.warning("No rings detected for calibration")
return
# Fit parameters to detected rings
_refine_geometry(setup, fixed_parameters=fixed_parameters)
self.progress = i / n_releases * 100
def _get_rings(
self,
setup: pyFAI.goniometer.GoniometerRefinement,
calibrant: pyFAI.calibrant.Calibrant,
) -> Dict[str, dict]:
rings = dict()
for i, setup_single_dist in enumerate(setup.single_geometries.values()):
points = setup_single_dist.geometry_refinement.data
rings[str(i)] = xrpd_utils.points_to_rings(points, calibrant)
return rings
def _get_fit_results(
self, setup: pyFAI.goniometer.GoniometerRefinement
) -> Tuple[dict, Number]:
parameters = setup.param
xref = self._get_reference_position()
results = setup.trans_function(parameters, xref)
geometry = {k: getattr(results, k) for k in xrpd_utils.GEOMETRY_PARAMETERS}
energy = xrpd_utils.energy_wavelength(results.wavelength)
return geometry, energy
def _get_parameterization(
self, setup: pyFAI.goniometer.GoniometerRefinement
) -> dict:
parametrization = dict(setup.trans_function.to_dict())
parametrization.pop("constants")
parametrization.pop("content")
return parametrization
def _get_parameters(self, setup: pyFAI.goniometer.GoniometerRefinement) -> dict:
parameters = setup.param
names = setup.trans_function.param_names
return dict(zip(names, parameters))
def _get_reference_position(self):
"""first image position by default (xunits)"""
if self.missing_inputs.reference_position:
return self.get_data(self.inputs.positions[0])
else:
return self.get_data(self.inputs.reference_position)
def _get_sample_position(self):
"""0 by default (xunits)"""
if self.missing_inputs.sample_position:
return 0
else:
return self.get_data(self.inputs.sample_position)
def _get_position_units(self):
"""millimeter by default"""
if self.inputs.positionunits_in_meter:
return self.inputs.positionunits_in_meter # m/xunits
else:
return 1e-3 # m/xunits
[docs]
def calculate_geometry(
parametrization: dict, parameters: dict, position: Number
) -> Tuple[dict, Number]:
trans_function = pyFAI.goniometer.ExtendedTransformation(**parametrization)
parameters = [parameters[name] for name in trans_function.param_names]
trans_function(parameters, position)
results = trans_function(parameters, position)
geometry = {k: getattr(results, k) for k in xrpd_utils.GEOMETRY_PARAMETERS}
energy = xrpd_utils.energy_wavelength(results.wavelength)
return geometry, energy
[docs]
class CalculateGeometry(
TaskWithDataAccess,
input_names=["parametrization", "parameters", "position"],
output_names=["geometry", "energy"],
):
"""Calculate energy and geometry from pyFAI parametrization"""
[docs]
def run(self):
parametrization = data_utils.data_from_storage(self.inputs.parametrization)
geometry, energy = calculate_geometry(
parametrization,
self.inputs.parameters,
self.get_data(self.inputs.position),
)
self.outputs.geometry = geometry
self.outputs.energy = energy
def _fixed_parameters_sets(
all_params: Set[str],
always_fixed_params: Set[str],
incremental_release_groups: Sequence[Set[str]],
) -> List[Optional[Set[str]]]:
"""
Returns a sequence of fixed parameter sets based on incremental parameter releases.
:param all_params: The complete set of parameters.
:param always_fixed_params: Parameters that are always fixed, regardless of release groups.
:param incremental_release_groups: A sequence of parameter groups that are released incrementally.
:returns: A list where each element is a set of parameters that remain fixed
after the corresponding release step.
"""
fixed_parameters = list()
accumulate_released_parameters = set()
for release_group in incremental_release_groups:
accumulate_released_parameters |= release_group - always_fixed_params
if not accumulate_released_parameters:
continue
fixed = all_params - accumulate_released_parameters
if not fixed:
fixed = None
if fixed in fixed_parameters:
continue
fixed_parameters.append(fixed)
return fixed_parameters
def _refine_geometry(
geometry_refinement: Union[
pyFAI.goniometer.GoniometerRefinement,
pyFAI.geometryRefinement.GeometryRefinement,
],
fixed_parameters: Optional[Sequence] = None,
) -> bool:
"""
Refines the given geometry within the default parameter bounds.
:param geometry_refinement: A pyFAI object that parameterizes a single or multi-distances geometry.
:param fixed_parameters: Parameters that are not allowed to change during refinement.
:return: True if refinement is successful within the default parameter bounds, False otherwise.
"""
with warnings.catch_warnings(record=True) as caught_warnings:
warnings.filterwarnings(
"always",
message="Values in x were outside bounds during a minimize step, clipping to bounds",
category=RuntimeWarning,
)
geometry_refinement.refine3(fix=fixed_parameters) # method="slsqp"
if not caught_warnings:
return True
return False