Source code for ewoksxrpd.tasks.subtract_pattern
import logging
import h5py
import numpy
from silx.io.url import DataUrl
from .data_access import TaskWithDataAccess
from .utils import pyfai_utils
from .utils.data_utils import check_if_external_nxprocess_can_be_written
from .utils.data_utils import create_hdf5_link
from .utils.data_utils import get_opening_mode
from .utils.data_utils import split_hdf5_url_parent_data_path
from .utils.nexus_utils import IntegratedPattern
from .utils.nexus_utils import read_nexus_integrated_patterns
_logger = logging.getLogger(__name__)
def _assert_background_can_be_subtracted(
pattern: IntegratedPattern, background_pattern: IntegratedPattern
):
if pattern.radial_units != background_pattern.radial_units:
raise ValueError(
f"Background pattern units {background_pattern.radial_units} are not compatible with data pattern units {pattern.radial_units}."
)
if pattern.intensity.shape != background_pattern.intensity.shape:
raise ValueError(
f"Background pattern intensity shape {background_pattern.intensity.shape} are not compatible with data pattern shape {pattern.intensity.shape}."
)
[docs]
class SubtractBackgroundPattern(
TaskWithDataAccess,
input_names=["nxdata_url", "background_nxdata_url", "output_nxprocess_url"],
optional_input_names=["enabled", "background_factor", "external_nxprocess_url"],
output_names=["nxdata_url"],
):
"""
Subtract an integrated pattern from patterns stored in NeXus and saves the result in a new NeXus process group
.. code:
I_pattern_corrected = I_pattern - (I_background_pattern * background_factor)
If errors are available in the background and in the input data, errors are propagated and saved as well.
.. code:
E_I_pattern_corrected = \\sqrt{E_I_pattern^2 + (E_I_background_pattern * background_factor)^2}
"""
[docs]
def run(self):
check_if_external_nxprocess_can_be_written()
if not self.get_input_value("enabled", True):
_logger.info(
f"Task {self.__class__.__qualname__} is disabled: no pattern was subtracted"
)
self.outputs.nxdata_url = self.inputs.nxdata_url
return
if not isinstance(self.inputs.background_nxdata_url, str):
raise ValueError(
f"background_nxdata_url should be a str. Got {type(self.inputs.background_nxdata_url)} instead."
)
background_nxdata_url = DataUrl(self.inputs.background_nxdata_url)
external_nxprocess_url = DataUrl(
self.get_input_value(
"external_nxprocess_url", self.inputs.output_nxprocess_url
)
)
mode = get_opening_mode(
url_to_read=background_nxdata_url, url_to_write=external_nxprocess_url
)
with self.open_h5item(background_nxdata_url, mode=mode) as background_nxdata:
if not isinstance(background_nxdata, h5py.Group):
raise TypeError(
f"{background_nxdata_url.path()} should point towards a NXData Group."
)
background_patterns = list(
pattern
for _, pattern in read_nexus_integrated_patterns(background_nxdata)
)
background_pattern = background_patterns[0]
nxdata_url = DataUrl(self.inputs.nxdata_url)
output_nxprocess_url = DataUrl(self.inputs.output_nxprocess_url)
output_nxprocess_parent_url, output_nxprocess_name = (
split_hdf5_url_parent_data_path(output_nxprocess_url)
)
mode = get_opening_mode(
url_to_read=nxdata_url, url_to_write=external_nxprocess_url
)
with self.open_h5item(self.inputs.nxdata_url, mode=mode) as nxdata:
if not isinstance(nxdata, h5py.Group):
raise TypeError(
f"{self.inputs.nxdata_url} should point towards a NXData Group."
)
background_factor = float(self.get_input_value("background_factor", 1))
background_intensity = background_pattern.intensity * background_factor
if background_pattern.intensity_errors is None:
background_error = None
else:
background_error = (
background_pattern.intensity_errors * background_factor
)
subtracted_intensity = numpy.empty(
nxdata["intensity"].shape, dtype=nxdata["intensity"].dtype
)
if "intensity_errors" in nxdata and background_error is not None:
subtracted_intensity_errors = numpy.empty(
nxdata["intensity_errors"].shape,
dtype=nxdata["intensity_errors"].dtype,
)
else:
subtracted_intensity_errors = None
pattern = None
for i, pattern in read_nexus_integrated_patterns(nxdata):
_assert_background_can_be_subtracted(pattern, background_pattern)
subtracted_intensity[i] = pattern.intensity - background_intensity
if (
background_error is not None
and subtracted_intensity_errors is not None
and pattern.intensity_errors is not None
):
subtracted_intensity_errors[i] = numpy.sqrt(
pattern.intensity_errors**2 + background_error**2
)
if pattern is None:
raise ValueError(f"No integrated patterns in {nxdata.name}")
with self.open_h5item(
external_nxprocess_url, create=True, mode="a"
) as external_process:
output_nxdata = pyfai_utils.save_nexus_processed_pattern(
external_process,
subtracted_intensity,
subtracted_intensity_errors,
radial=pattern.radial,
radial_name=pattern.radial_name,
radial_units=pattern.radial_units,
config=self.get_input_values(),
)
output_nxdata.create_dataset("background", data=background_intensity)
if "points" in nxdata:
create_hdf5_link(output_nxdata, "points", nxdata["points"])
axes = output_nxdata.attrs["axes"]
output_nxdata.attrs["axes"] = ["points", *axes[1:]]
if output_nxprocess_url != external_nxprocess_url:
with self.open_h5item(
output_nxprocess_parent_url, create=True, mode="a"
) as output_nxprocess_parent:
create_hdf5_link(
output_nxprocess_parent,
output_nxprocess_name,
external_process,
)
self.outputs.nxdata_url = (
f"{output_nxdata.file.filename}::{output_nxdata.name}"
)