Source code for ewoksxrpd.tasks.subtract_pattern

import logging

import h5py
import numpy
from silx.io.url import DataUrl

from .data_access import TaskWithDataAccess
from .utils import pyfai_utils
from .utils.data_utils import check_if_external_nxprocess_can_be_written
from .utils.data_utils import create_hdf5_link
from .utils.data_utils import get_opening_mode
from .utils.data_utils import split_hdf5_url_parent_data_path
from .utils.nexus_utils import IntegratedPattern
from .utils.nexus_utils import read_nexus_integrated_patterns

_logger = logging.getLogger(__name__)


def _assert_background_can_be_subtracted(
    pattern: IntegratedPattern, background_pattern: IntegratedPattern
):
    if pattern.radial_units != background_pattern.radial_units:
        raise ValueError(
            f"Background pattern units {background_pattern.radial_units} are not compatible with data pattern units {pattern.radial_units}."
        )

    if pattern.intensity.shape != background_pattern.intensity.shape:
        raise ValueError(
            f"Background pattern intensity shape {background_pattern.intensity.shape} are not compatible with data pattern shape {pattern.intensity.shape}."
        )


[docs] class SubtractBackgroundPattern( TaskWithDataAccess, input_names=["nxdata_url", "background_nxdata_url", "output_nxprocess_url"], optional_input_names=["enabled", "background_factor", "external_nxprocess_url"], output_names=["nxdata_url"], ): """ Subtract an integrated pattern from patterns stored in NeXus and saves the result in a new NeXus process group .. code: I_pattern_corrected = I_pattern - (I_background_pattern * background_factor) If errors are available in the background and in the input data, errors are propagated and saved as well. .. code: E_I_pattern_corrected = \\sqrt{E_I_pattern^2 + (E_I_background_pattern * background_factor)^2} """
[docs] def run(self): check_if_external_nxprocess_can_be_written() if not self.get_input_value("enabled", True): _logger.info( f"Task {self.__class__.__qualname__} is disabled: no pattern was subtracted" ) self.outputs.nxdata_url = self.inputs.nxdata_url return if not isinstance(self.inputs.background_nxdata_url, str): raise ValueError( f"background_nxdata_url should be a str. Got {type(self.inputs.background_nxdata_url)} instead." ) background_nxdata_url = DataUrl(self.inputs.background_nxdata_url) external_nxprocess_url = DataUrl( self.get_input_value( "external_nxprocess_url", self.inputs.output_nxprocess_url ) ) mode = get_opening_mode( url_to_read=background_nxdata_url, url_to_write=external_nxprocess_url ) with self.open_h5item(background_nxdata_url, mode=mode) as background_nxdata: if not isinstance(background_nxdata, h5py.Group): raise TypeError( f"{background_nxdata_url.path()} should point towards a NXData Group." ) background_patterns = list( pattern for _, pattern in read_nexus_integrated_patterns(background_nxdata) ) background_pattern = background_patterns[0] nxdata_url = DataUrl(self.inputs.nxdata_url) output_nxprocess_url = DataUrl(self.inputs.output_nxprocess_url) output_nxprocess_parent_url, output_nxprocess_name = ( split_hdf5_url_parent_data_path(output_nxprocess_url) ) mode = get_opening_mode( url_to_read=nxdata_url, url_to_write=external_nxprocess_url ) with self.open_h5item(self.inputs.nxdata_url, mode=mode) as nxdata: if not isinstance(nxdata, h5py.Group): raise TypeError( f"{self.inputs.nxdata_url} should point towards a NXData Group." ) background_factor = float(self.get_input_value("background_factor", 1)) background_intensity = background_pattern.intensity * background_factor if background_pattern.intensity_errors is None: background_error = None else: background_error = ( background_pattern.intensity_errors * background_factor ) subtracted_intensity = numpy.empty( nxdata["intensity"].shape, dtype=nxdata["intensity"].dtype ) if "intensity_errors" in nxdata and background_error is not None: subtracted_intensity_errors = numpy.empty( nxdata["intensity_errors"].shape, dtype=nxdata["intensity_errors"].dtype, ) else: subtracted_intensity_errors = None pattern = None for i, pattern in read_nexus_integrated_patterns(nxdata): _assert_background_can_be_subtracted(pattern, background_pattern) subtracted_intensity[i] = pattern.intensity - background_intensity if ( background_error is not None and subtracted_intensity_errors is not None and pattern.intensity_errors is not None ): subtracted_intensity_errors[i] = numpy.sqrt( pattern.intensity_errors**2 + background_error**2 ) if pattern is None: raise ValueError(f"No integrated patterns in {nxdata.name}") with self.open_h5item( external_nxprocess_url, create=True, mode="a" ) as external_process: output_nxdata = pyfai_utils.save_nexus_processed_pattern( external_process, subtracted_intensity, subtracted_intensity_errors, radial=pattern.radial, radial_name=pattern.radial_name, radial_units=pattern.radial_units, config=self.get_input_values(), ) output_nxdata.create_dataset("background", data=background_intensity) if "points" in nxdata: create_hdf5_link(output_nxdata, "points", nxdata["points"]) axes = output_nxdata.attrs["axes"] output_nxdata.attrs["axes"] = ["points", *axes[1:]] if output_nxprocess_url != external_nxprocess_url: with self.open_h5item( output_nxprocess_parent_url, create=True, mode="a" ) as output_nxprocess_parent: create_hdf5_link( output_nxprocess_parent, output_nxprocess_name, external_process, ) self.outputs.nxdata_url = ( f"{output_nxdata.file.filename}::{output_nxdata.name}" )