Source code for ewoksxrpd.tasks.ascii

from __future__ import annotations

import logging
import os
import zipfile
from typing import Any
from typing import Dict
from typing import Iterator
from typing import Mapping
from typing import Sequence
from typing import Tuple
from typing import Union

import h5py
import numpy
from ewokscore import Task

from .data_access import TaskWithDataAccess
from .utils import zip_utils
from .utils.ascii_utils import ensure_parent_folder
from .utils.ascii_utils import save_pattern_as_ascii
from .utils.nexus_utils import IntegratedPattern
from .utils.nexus_utils import read_nexus_integrated_patterns

logger = logging.getLogger(__name__)


__all__ = [
    "SaveAsciiPattern1D",
    "SaveAsciiMultiPattern1D",
    "SaveNexusPatternsAsAscii",
]


[docs] class SaveAsciiPattern1D( Task, input_names=["filename", "x", "y", "xunits"], optional_input_names=["header", "yerror", "metadata"], output_names=["saved"], ): """Save single diffractogram in ASCII format"""
[docs] def run(self): header = self.get_input_value("header", dict()) metadata = self.get_input_value("metadata", dict()) yerror = self.get_input_value("yerror", None) save_pattern_as_ascii( self.inputs.filename, self.inputs.x, self.inputs.y, self.inputs.xunits, yerror, header, metadata, ) self.outputs.saved = True
[docs] class SaveAsciiMultiPattern1D( Task, input_names=["filenames", "x_list", "y_list", "xunits_list"], optional_input_names=["header_list", "yerror_list", "metadata_list"], output_names=["saved"], ):
[docs] def run(self): filenames: Sequence[str] = self.inputs.filenames x_list: Sequence[numpy.ndarray] = self.inputs.x_list y_list: Sequence[numpy.ndarray] = self.inputs.y_list xunits_list: Sequence[str] = self.inputs.xunits_list header_list: Sequence[Mapping] = self.get_input_value( "header_list", len(filenames) * [dict()] ) yerror_list: Sequence[numpy.ndarray] | Sequence[None] = self.get_input_value( "yerror_list", len(filenames) * [None] ) metadata_list: Sequence[Dict[str, Any]] = self.get_input_value( "metadata_list", len(filenames) * [dict()] ) for args in zip( filenames, x_list, y_list, xunits_list, yerror_list, header_list, metadata_list, ): save_pattern_as_ascii(*args) self.outputs.saved = True
[docs] class SaveNexusPatternsAsAscii( TaskWithDataAccess, input_names=["nxdata_url", "output_filename_template"], optional_input_names=[ "header", "enabled", "output_archive_filename", "overwrite", "sector_suffix_template", "counter_urls", ], output_names=["filenames"], ): """Convert azimuthal integration results from NeXus to ASCII files One ASCII file is created for each integration pattern in nxdata_url. ASCII files are named from the output_filename_template and the pattern index. If output_archive_filename is provided, all ASCII files are stored in a single ZIP file. In this case, output_filename_template is the template path of ASCII files inside the ZIP file. Required inputs: - nxdata_url (str): The url of the NXData group storing the azimuthal integration results - output_filename_template (str): A string template containing one '%d' field. It is used to generate the filename from the frame number. Optional inputs: - header (dict): Information to store in ASCII file header (default: {}) - enabled (bool): True to enable saving as ASCII files, False to skip task (default: True) - output_archive_filename (str): Filename of the ZIP file containing all ASCII files. If this is None (default) or the empty string, ZIP compression is disabled. - overwrite (bool): True to allow overwriting existing ASCII/ZIP files (default: False) - sector_suffix_template (str): A string template containing one '%d' field. Used to generate the string appended at the end of the filename showing the sector number. Defaults to "sector%04d". Only used if 2D integrated patterns are present. - counter_urls (list[str]): Urls to datasets with counter values to save in the file header. The dataset should be scalar or have a length equal to the number of integrated frames from the nxdata_url. If this is None (default), no counter value is added to the header. Outputs: - filenames (tuple[str]): The names of the created ASCII files or ZIP file """
[docs] def run(self): if not self.get_input_value("enabled", True): logger.info( f"Task {self.__class__.__qualname__} is disabled: No file is saved" ) self.outputs.filenames = tuple() return output_archive_filename = self.get_input_value("output_archive_filename", None) overwrite = self.get_input_value("overwrite", False) if output_archive_filename: ensure_parent_folder(output_archive_filename) mode = "w" if overwrite else "x" with zipfile.ZipFile( output_archive_filename, mode=mode, compression=zipfile.ZIP_DEFLATED ) as zipf: with self.open_h5item(self.inputs.nxdata_url) as group: for filename, pattern, metadata in self._export_data(group): with zip_utils.open_in_zipfile( zipf, filename, mode="w" ) as file: save_pattern_as_ascii( file, x=pattern.radial, y=pattern.intensity, xunits=pattern.radial_units, yerror=pattern.intensity_errors, header=self.get_input_value("header", {}), metadata=metadata, ) self.outputs.filenames = (output_archive_filename,) else: filenames = [] with self.open_h5item(self.inputs.nxdata_url) as group: for filename, pattern, metadata in self._export_data(group): if not overwrite and os.path.exists(filename): raise FileExistsError(f"File exists: {filename}") save_pattern_as_ascii( filename, x=pattern.radial, y=pattern.intensity, xunits=pattern.radial_units, yerror=pattern.intensity_errors, header=self.get_input_value("header", {}), metadata=metadata, ) filenames.append(filename) self.outputs.filenames = tuple(filenames)
def _export_data(self, group) -> Iterator[Tuple[str, IntegratedPattern, dict]]: sector_suffix_template = self.get_input_value( "sector_suffix_template", "sector%04d" ) counter_data = self._get_counter_data() for frame_index, pattern in read_nexus_integrated_patterns(group): filename = self.inputs.output_filename_template % frame_index metadata = {} if pattern.point is not None: metadata["point"] = pattern.point for name, data in counter_data.items(): metadata[name] = data[frame_index] if data.ndim >= 1 else data if pattern.azimuthal_point is not None: metadata[pattern.azimuthal_point.name] = pattern.azimuthal_point.value root, ext = os.path.splitext(filename) filename = f"{root}_{sector_suffix_template % pattern.azimuthal_point.sector_index}{ext}" yield filename, pattern, metadata def _get_counter_data(self) -> Dict[str, Union[numpy.ndarray, numpy.generic]]: counter_urls = self.get_input_value("counter_urls", None) if counter_urls is None: return {} counter_data: Dict[str, Union[numpy.ndarray, numpy.generic]] = {} for counter_url in counter_urls: with self.open_h5item(counter_url) as counter: if not isinstance(counter, h5py.Dataset): logger.warning( f"Cannot save data from {counter_url} in header: not a dataset" ) continue if counter.ndim > 1: logger.warning( f"Cannot save data from {counter_url} in header: not a scalar or 1D" ) continue counter_name = counter.name.split("/")[-1] counter_data[counter_name] = counter[()] return counter_data