import importlib.metadata
import logging
from importlib.metadata import version
import h5py
import numpy
from ewoksdata.data.nexus import select_default_plot
from packaging.version import Version
from silx.io.url import DataUrl
from .data_access import TaskWithDataAccess
from .utils import pyfai_utils
from .utils.data_utils import create_hdf5_link
from .utils.data_utils import split_hdf5_url_parent_data_path
from .utils.nexus_utils import IntegratedPattern
from .utils.nexus_utils import read_nexus_integrated_patterns
_logger = logging.getLogger(__name__)
SILX_VERSION = Version(version("silx"))
[docs]
def is_silx_compatible() -> bool:
# With older silx version, opening `external_nxprocess_url` raises the following error:
# RuntimeError: Unable to synchronously open file (file locking 'ignore disabled locks' flag values don't match)
return SILX_VERSION >= Version("2.2.0")
def _assert_background_can_be_subtracted(
pattern: IntegratedPattern, background_pattern: IntegratedPattern
):
if pattern.radial_units != background_pattern.radial_units:
raise ValueError(
f"Background pattern units {background_pattern.radial_units} are not compatible with data pattern units {pattern.radial_units}."
)
if pattern.intensity.shape != background_pattern.intensity.shape:
raise ValueError(
f"Background pattern intensity shape {background_pattern.intensity.shape} are not compatible with data pattern shape {pattern.intensity.shape}."
)
def _get_opening_mode(url_to_read: DataUrl, url_to_write: DataUrl):
# Get around https://gitlab.esrf.fr/workflow/ewoksapps/ewoksxrpd/-/issues/93 by opening all files in `a` mode
return "a"
# Ideally, this should be the logic:
# if is_same_file(url_to_read.file_path(), url_to_write.file_path()):
# return "a"
# else:
# return "r"
[docs]
class SubtractBackgroundPattern(
TaskWithDataAccess,
input_names=["nxdata_url", "background_nxdata_url", "output_nxprocess_url"],
optional_input_names=["enabled", "background_factor", "external_nxprocess_url"],
output_names=["nxdata_url"],
):
"""
Subtract an integrated pattern from patterns stored in NeXus and saves the result in a new NeXus process group
.. code:
I_pattern_corrected = I_pattern - (I_background_pattern * background_factor)
If errors are available in the background and in the input data, errors are propagated and saved as well.
.. code:
E_I_pattern_corrected = \\sqrt{E_I_pattern^2 + (E_I_background_pattern * background_factor)^2}
"""
[docs]
def run(self):
if not is_silx_compatible():
raise ValueError(
f"Silx version must be above or equal 2.2.0 to use this task. The current version is {SILX_VERSION}."
)
if not self.get_input_value("enabled", True):
_logger.info(
f"Task {self.__class__.__qualname__} is disabled: no pattern was subtracted"
)
self.outputs.nxdata_url = self.inputs.nxdata_url
return
if not isinstance(self.inputs.background_nxdata_url, str):
raise ValueError(
f"background_nxdata_url should be a str. Got {type(self.inputs.background_nxdata_url)} instead."
)
background_nxdata_url = DataUrl(self.inputs.background_nxdata_url)
external_nxprocess_url = DataUrl(
self.get_input_value(
"external_nxprocess_url", self.inputs.output_nxprocess_url
)
)
mode = _get_opening_mode(
url_to_read=background_nxdata_url, url_to_write=external_nxprocess_url
)
with self.open_h5item(background_nxdata_url, mode=mode) as background_nxdata:
if not isinstance(background_nxdata, h5py.Group):
raise TypeError(
f"{background_nxdata_url.path()} should point towards a NXData Group."
)
background_patterns = list(
pattern
for _, pattern in read_nexus_integrated_patterns(background_nxdata)
)
background_pattern = background_patterns[0]
nxdata_url = DataUrl(self.inputs.nxdata_url)
output_nxprocess_url = DataUrl(self.inputs.output_nxprocess_url)
output_nxprocess_parent_url, output_nxprocess_name = (
split_hdf5_url_parent_data_path(output_nxprocess_url)
)
mode = _get_opening_mode(
url_to_read=nxdata_url, url_to_write=external_nxprocess_url
)
with self.open_h5item(self.inputs.nxdata_url, mode=mode) as nxdata:
if not isinstance(nxdata, h5py.Group):
raise TypeError(
f"{self.inputs.nxdata_url} should point towards a NXData Group."
)
background_factor = float(self.get_input_value("background_factor", 1))
background_intensity = background_pattern.intensity * background_factor
if background_pattern.intensity_errors is None:
background_error = None
else:
background_error = (
background_pattern.intensity_errors * background_factor
)
subtracted_intensity = numpy.empty(
nxdata["intensity"].shape, dtype=nxdata["intensity"].dtype
)
if "intensity_errors" in nxdata and background_error is not None:
substracted_intensity_errors = numpy.empty(
nxdata["intensity_errors"].shape,
dtype=nxdata["intensity_errors"].dtype,
)
else:
substracted_intensity_errors = None
pattern = None
for i, pattern in read_nexus_integrated_patterns(nxdata):
_assert_background_can_be_subtracted(pattern, background_pattern)
subtracted_intensity[i] = pattern.intensity - background_intensity
if (
background_error is not None
and substracted_intensity_errors is not None
and pattern.intensity_errors is not None
):
substracted_intensity_errors[i] = numpy.sqrt(
pattern.intensity_errors**2 + background_error**2
)
if pattern is None:
raise ValueError(f"No integrated patterns in {nxdata.name}")
with self.open_h5item(
external_nxprocess_url, create=True, mode="a"
) as external_process:
external_process["program"] = "ewoksxrpd"
external_process["version"] = importlib.metadata.version("ewoksxrpd")
config_group = external_process.create_group("configuration")
config_group.attrs["NX_class"] = "NXparameters"
for name, value in self.get_input_values().items():
if value is not None:
config_group[name] = value
output_nxdata = pyfai_utils.create_integration_results_nxdata(
external_process,
subtracted_intensity.ndim,
pattern.radial,
f"{pattern.radial_name}_{pattern.radial_units}",
azimuthal=None,
azimuthal_units="",
)
if "points" in nxdata:
create_hdf5_link(output_nxdata, "points", nxdata["points"])
axes = output_nxdata.attrs["axes"]
output_nxdata.attrs["axes"] = ["points", *axes[1:]]
output_nxdata.create_dataset("intensity", data=subtracted_intensity)
output_nxdata.attrs["signal"] = "intensity"
output_nxdata["intensity"].attrs["interpretation"] = "spectrum"
if substracted_intensity_errors is not None:
output_nxdata.create_dataset(
"intensity_errors", data=substracted_intensity_errors
)
output_nxdata.create_dataset("background", data=background_intensity)
select_default_plot(output_nxdata)
if output_nxprocess_url != external_nxprocess_url:
with self.open_h5item(
output_nxprocess_parent_url, create=True, mode="a"
) as output_nxprocess_parent:
create_hdf5_link(
output_nxprocess_parent,
output_nxprocess_name,
external_process,
)
self.outputs.nxdata_url = (
f"{output_nxdata.file.filename}::{output_nxdata.name}"
)