Source code for ewoksxrpd.tasks.utils.data_utils

import logging
import os
from fnmatch import fnmatch
from importlib.metadata import version
from numbers import Number
from typing import Mapping
from typing import Optional
from typing import Tuple
from typing import Union

import fabio.edfimage
import fabio.tifimage
import h5py
import numpy
from blissdata.h5api import dynamic_hdf5
from packaging.version import Version
from silx.io.url import DataUrl
from typing_extensions import TypeGuard

logger = logging.getLogger(__name__)


[docs] def hdf5_url(file_name: str, data_path: str) -> str: if not os.path.isabs(file_name): file_name = os.path.abspath(file_name) return f"silx://{file_name}?path={data_path}"
[docs] def split_hdf5_url_parent_data_path(url: DataUrl) -> Tuple[DataUrl, str]: data_path = url.data_path() or "/" parts = [s for s in data_path.split("/")] if not parts: raise ValueError(f"{url.path()!r} needs to refer to a non-root HDF5 group") name = parts[-1] parent_data_path = "/".join(parts[:-1]) if parent_data_path: parent_url = DataUrl(f"{url.file_path()}::{parent_data_path}") else: parent_url = DataUrl(url.file_path()) return parent_url, name
[docs] def is_data(data) -> TypeGuard[Union[numpy.ndarray, Number, str, list]]: if isinstance(data, (numpy.ndarray, Number)): return True if isinstance(data, (str, list)) and data: return True return False
[docs] def is_same_file(filename1: str, filename2: str) -> bool: return os.path.abspath(os.path.normpath(filename1)) == os.path.abspath( os.path.normpath(filename2) )
[docs] def data_from_storage(data, remove_numpy=True): if isinstance(data, numpy.ndarray): if not remove_numpy: return data elif data.ndim == 0: return data.item() else: return data.tolist() elif isinstance(data, Mapping): return { k: data_from_storage(v, remove_numpy=remove_numpy) for k, v in data.items() if not k.startswith("@") } else: return data
def _get_hdf5_filename(file_obj) -> str: try: return file_obj.filename except AttributeError: # to be fixed in blissdata return file_obj._retry_handler.file_obj.filename def _normalize_hdf5_item_name(*parts) -> str: name = "/".join([s for part in parts for s in part.split("/") if s]) return f"/{name}"
[docs] def convert_to_3d(data: Union[numpy.ndarray, Number, str, list]): data_arr = numpy.array(data) if data_arr.ndim >= 3: return data_arr if data_arr.ndim == 2: return data_arr.reshape(1, *data_arr.shape) if data_arr.ndim == 1: return data_arr.reshape(1, 1, *data_arr.shape) return data_arr.reshape(1, 1, 1)
[docs] def save_image( data: numpy.ndarray, save_path: str, save_name: str, monitor_data: Optional[Number] = None, metadata: Optional[dict] = None, ext: str = "edf", ) -> str: normalized_data = data / monitor_data if monitor_data else data header = dict( Summed_monitor_counts=str(monitor_data) if monitor_data else "nan", **(metadata if metadata else {}), ) ext = ext.lower() img_filepath = f"{save_path}/{save_name}.{ext}" if ext.lower() == "edf": Image = fabio.edfimage.EdfImage elif ext == "tiff" or ext == "tif": Image = fabio.tifimage.TifImage else: raise ValueError(f"Unsupported ext {ext}. Only supports EDF and TIFF.") img = Image(data=normalized_data, header=header) img.write(img_filepath) return img_filepath
[docs] def get_opening_mode(url_to_read: DataUrl, url_to_write: DataUrl): # Get around https://gitlab.esrf.fr/workflow/ewoksapps/ewoksxrpd/-/issues/93 by opening all files in `a` mode return "a"
# Ideally, this should be the logic: # if is_same_file(url_to_read.file_path(), url_to_write.file_path()): # return "a" # else: # return "r" SILX_VERSION = Version(version("silx")) def _is_silx_compatible() -> bool: return SILX_VERSION >= Version("2.2.0")
[docs] def check_if_external_nxprocess_can_be_written(): # With older silx version, opening `external_nxprocess_url` raises the following error: # RuntimeError: Unable to synchronously open file (file locking 'ignore disabled locks' flag values don't match) if not _is_silx_compatible(): raise ValueError( f"Silx version must be above or equal 2.2.0 to use this task. The current version is {SILX_VERSION}." )