Source code for ewoksxrpd.tasks.utils.data_utils

import logging
import os
from numbers import Number
from typing import Mapping
from typing import Optional
from typing import Tuple
from typing import Union

import fabio.edfimage
import fabio.tifimage
import h5py
import numpy
from blissdata.h5api import dynamic_hdf5
from silx.io.url import DataUrl
from typing_extensions import TypeGuard

logger = logging.getLogger(__name__)


[docs] def hdf5_url(file_name: str, data_path: str) -> str: if not os.path.isabs(file_name): file_name = os.path.abspath(file_name) return f"silx://{file_name}?path={data_path}"
[docs] def split_hdf5_url_parent_data_path(url: DataUrl) -> Tuple[DataUrl, str]: data_path = url.data_path() or "/" parts = [s for s in data_path.split("/")] if not parts: raise ValueError(f"{url.path()!r} needs to refer to a non-root HDF5 group") name = parts[-1] parent_data_path = "/".join(parts[:-1]) if parent_data_path: parent_url = DataUrl(f"{url.file_path()}::{parent_data_path}") else: parent_url = DataUrl(url.file_path()) return parent_url, name
[docs] def is_data(data) -> TypeGuard[Union[numpy.ndarray, Number, str, list]]: if isinstance(data, (numpy.ndarray, Number)): return True if isinstance(data, (str, list)) and data: return True return False
[docs] def is_same_file(filename1: str, filename2: str) -> bool: return os.path.abspath(os.path.normpath(filename1)) == os.path.abspath( os.path.normpath(filename2) )
[docs] def data_from_storage(data, remove_numpy=True): if isinstance(data, numpy.ndarray): if not remove_numpy: return data elif data.ndim == 0: return data.item() else: return data.tolist() elif isinstance(data, Mapping): return { k: data_from_storage(v, remove_numpy=remove_numpy) for k, v in data.items() if not k.startswith("@") } else: return data
def _get_hdf5_filename(file_obj) -> str: try: return file_obj.filename except AttributeError: # to be fixed in blissdata return file_obj._retry_handler.file_obj.filename def _normalize_hdf5_item_name(*parts) -> str: name = "/".join([s for part in parts for s in part.split("/") if s]) return f"/{name}"
[docs] def convert_to_3d(data: Union[numpy.ndarray, Number, str, list]): data_arr = numpy.array(data) if data_arr.ndim >= 3: return data_arr if data_arr.ndim == 2: return data_arr.reshape(1, *data_arr.shape) if data_arr.ndim == 1: return data_arr.reshape(1, 1, *data_arr.shape) return data_arr.reshape(1, 1, 1)
[docs] def save_image( data: numpy.ndarray, save_path: str, save_name: str, monitor_data: Optional[Number] = None, metadata: Optional[dict] = None, ext: str = "edf", ) -> str: normalized_data = data / monitor_data if monitor_data else data header = dict( Summed_monitor_counts=str(monitor_data) if monitor_data else "nan", **(metadata if metadata else {}), ) ext = ext.lower() img_filepath = f"{save_path}/{save_name}.{ext}" if ext.lower() == "edf": Image = fabio.edfimage.EdfImage elif ext == "tiff" or ext == "tif": Image = fabio.tifimage.TifImage else: raise ValueError(f"Unsupported ext {ext}. Only supports EDF and TIFF.") img = Image(data=normalized_data, header=header) img.write(img_filepath) return img_filepath