import re
import zipfile
from pathlib import Path
import h5py
import numpy
import pytest
from ewoksorange.tests.utils import execute_task
from silx.io.dictdump import dicttonx
from orangecontrib.ewoksxrpd.ascii import OWSaveAsciiPattern1D
from ..tasks.ascii import SaveAsciiMultiPattern1D
from ..tasks.ascii import SaveNexusPatternsAsAscii
[docs]
def test_save_ascii_task(tmpdir, setup1):
assert_save_ascii(tmpdir, setup1, None)
[docs]
def assert_save_ascii(tmpdir, setup1, qtapp):
inputs = {
"filename": str(tmpdir / "result.dat"),
"x": numpy.linspace(1, 60, 60),
"y": numpy.random.random(60),
"xunits": "2th_deg",
"header": {
"energy": 10.2,
"detector": setup1.detector,
"detector_config": setup1.detector_config,
"geometry": setup1.geometry,
},
"metadata": {"name": "mysample"},
}
execute_task(
OWSaveAsciiPattern1D.ewokstaskclass if qtapp is None else OWSaveAsciiPattern1D,
inputs=inputs,
)
x, y = numpy.loadtxt(str(tmpdir / "result.dat")).T
numpy.testing.assert_array_equal(x, inputs["x"])
numpy.testing.assert_array_equal(y, inputs["y"])
with open(tmpdir / "result.dat") as f:
lines = list()
for line in f:
if not line.startswith("#"):
break
lines.append(line)
lines = "".join(lines)
for key in (
"detector",
"energy",
"distance",
"center dim0",
"center dim1",
"rot1",
"rot2",
"rot3",
"xunits",
):
assert f"{key} =" in lines
assert "name = mysample" in lines
m = re.findall("energy = (.+) keV", lines)
assert len(m) == 1
assert float(m[0]) == inputs["header"]["energy"]
[docs]
def test_save_multi_ascii(tmpdir, setup1):
inputs = {
"filenames": [str(tmpdir / "result1.dat"), str(tmpdir / "result2.dat")],
"x_list": [numpy.linspace(1, 60, 60), numpy.linspace(1, 60, 60)],
"y_list": [numpy.random.random(60), numpy.random.random(60)],
"yerror_list": [numpy.random.random(60), numpy.random.random(60)],
"xunits_list": ["2th_deg", "2th_rad"],
"header_list": [
{
"energy": 10.2,
"detector": setup1.detector,
"geometry": setup1.geometry,
},
{
"energy": 9.8,
"detector": setup1.detector,
"geometry": setup1.geometry,
},
],
"metadata_list": [{"name": "mysample"}, {"name": "mysample"}],
}
execute_task(SaveAsciiMultiPattern1D, inputs=inputs)
for (
filename,
input_x,
input_y,
input_yerror,
input_header,
input_metadata,
) in zip(
inputs["filenames"],
inputs["x_list"],
inputs["y_list"],
inputs["yerror_list"],
inputs["header_list"],
inputs["metadata_list"],
):
x, y, yerror = numpy.loadtxt(filename).T
numpy.testing.assert_array_equal(x, input_x)
numpy.testing.assert_array_equal(y, input_y)
numpy.testing.assert_array_equal(yerror, input_yerror)
with open(filename) as f:
lines = list()
for line in f:
if not line.startswith("#"):
break
lines.append(line)
lines = "".join(lines)
for key in (
"detector",
"energy",
"distance",
"center dim0",
"center dim1",
"rot1",
"rot2",
"rot3",
"xunits",
):
assert f"{key} =" in lines
for k, v in input_metadata.items():
assert f"{k} = {v}" in lines
matches = re.findall("energy = (.+) keV", lines)
assert len(matches) == 1
assert float(matches[0]) == input_header["energy"]
[docs]
def test_SaveNexusPatternsAsAscii_single_pattern(tmp_path):
"""Test with a 1D signal, no errors and no "points" axis"""
input_filename = str(tmp_path / "input.h5")
nxdata_content = {
"@NX_class": "NXdata",
"@axes": ["q"],
"@interpretation": "spectrum",
"@signal": "intensity",
"intensity": 2 * numpy.ones(100, dtype=numpy.float32),
"q": numpy.linspace(0.01, 10, 100, dtype=numpy.float64),
"q@units": "A^-1",
}
dicttonx(nxdata_content, input_filename, "/entry/integrated")
task = SaveNexusPatternsAsAscii(
inputs={
"nxdata_url": f"{input_filename}::/entry/integrated",
"output_filename_template": str(tmp_path / "output_%04d.xye"),
}
)
task.execute()
expected_filepath = tmp_path / "output_0000.xye"
assert task.outputs.filenames == (str(expected_filepath),)
# Check header
text = expected_filepath.read_text().splitlines()
assert "# xunits = A^-1" in text, "xunit is missing or wrong"
# Check data
assert numpy.array_equal(
numpy.loadtxt(expected_filepath),
# Expected array content:
numpy.transpose(
[
nxdata_content["q"],
nxdata_content["intensity"],
]
),
), "Saved data differs from original data"
[docs]
def test_SaveNexusPatternsAsAscii_multi_patterns(
tmp_path, multi_integrated_pattern_file
):
"""Test with a 2D signal, no errors and no "points" axis"""
task = SaveNexusPatternsAsAscii(
inputs={
"nxdata_url": f"{multi_integrated_pattern_file}::/entry/integrated",
"output_filename_template": str(tmp_path / "output_%04d.xye"),
"header": {"info": "test"},
}
)
task.execute()
expected_filepaths = (
tmp_path / "output_0000.xye",
tmp_path / "output_0001.xye",
)
assert task.outputs.filenames == tuple(str(p) for p in expected_filepaths)
with h5py.File(multi_integrated_pattern_file, "r") as h5file:
input_q = h5file["/entry/integrated/q"][()]
input_intensity = h5file["/entry/integrated/intensity"][()]
for index, filepath in enumerate(expected_filepaths):
# Check header
text = filepath.read_text().splitlines()
assert "# point =" not in text, "No point header expected"
assert "# xunits = A^-1" in text, "xunit is missing or wrong"
assert "# info = test" in text, "Header information is missing"
# Check data
assert numpy.array_equal(
numpy.loadtxt(filepath),
# Expected array content:
numpy.transpose(
[
input_q,
input_intensity[index],
]
),
), "Saved data differs from original data"
[docs]
def test_SaveNexusPatternsAsAscii_multi_patterns_with_errors_and_points(tmp_path):
"""Test with a 2D signal, errors and a first "points" axis"""
input_filename = str(tmp_path / "input.h5")
nxdata_content = {
"@NX_class": "NXdata",
"@axes": ["points", "q"],
"@interpretation": "spectrum",
"@signal": "intensity",
"intensity": 2 * numpy.ones((2, 100), dtype=numpy.float32),
"intensity_errors": numpy.ones((2, 100), dtype=numpy.float32),
"q": numpy.linspace(0.01, 10, 100, dtype=numpy.float64),
"q@units": "A^-1",
"points": [0, 1],
}
dicttonx(nxdata_content, input_filename, "/entry/integrated")
task = SaveNexusPatternsAsAscii(
inputs={
"nxdata_url": f"{input_filename}::/entry/integrated",
"output_filename_template": str(tmp_path / "output_%04d.xye"),
"header": {"info": "test"},
}
)
task.execute()
expected_filepaths = (
tmp_path / "output_0000.xye",
tmp_path / "output_0001.xye",
)
assert task.outputs.filenames == tuple(str(p) for p in expected_filepaths)
for index, filepath in enumerate(expected_filepaths):
# Check header
text = filepath.read_text().splitlines()
assert (
f"# point = {nxdata_content['points'][index]}" in text
), "Corresponding point value is missing or wrong"
assert "# xunits = A^-1" in text, "xunit is missing or wrong"
assert "# info = test" in text, "Header information is missing"
# Check data
assert numpy.array_equal(
numpy.loadtxt(filepath),
# Expected array content:
numpy.transpose(
[
nxdata_content["q"],
nxdata_content["intensity"][index],
nxdata_content["intensity_errors"][index],
]
),
), "Saved data differs from original data"
[docs]
def test_SaveNexusPatternsAsAscii_zip(tmp_path, multi_integrated_pattern_file):
"""Test save as a ZIP file"""
zip_filename = str(tmp_path / "archive.zip")
task = SaveNexusPatternsAsAscii(
inputs={
"nxdata_url": f"{multi_integrated_pattern_file}::/entry/integrated",
"output_filename_template": "output_%04d.xye",
"output_archive_filename": zip_filename,
"header": {"info": "test"},
}
)
task.execute()
assert task.outputs.filenames == (zip_filename,)
extracted_path = tmp_path / "zip_content"
zipf = zipfile.ZipFile(zip_filename)
zipf.extractall(path=extracted_path)
expected_filenames = "output_0000.xye", "output_0001.xye"
assert tuple(zipf.namelist()) == expected_filenames
with h5py.File(multi_integrated_pattern_file, "r") as h5file:
input_q = h5file["/entry/integrated/q"][()]
input_intensity = h5file["/entry/integrated/intensity"][()]
for index, filename in enumerate(expected_filenames):
filepath = extracted_path / filename
# Check header
text = filepath.read_text().splitlines()
assert "# point =" not in text, "No point header expected"
assert "# xunits = A^-1" in text, "xunit is missing or wrong"
assert "# info = test" in text, "Header information is missing"
# Check data
assert numpy.array_equal(
numpy.loadtxt(filepath),
# Expected array content:
numpy.transpose(
[
input_q,
input_intensity[index],
]
),
), "Saved data differs from original data"
[docs]
def test_overwrite_SaveNexusPatternsAsAscii(tmp_path, multi_integrated_pattern_file):
expected_filepaths = (
tmp_path / "output_0000.xye",
tmp_path / "output_0001.xye",
)
for filepath in expected_filepaths:
filepath.touch()
assert filepath.exists()
inputs = {
"nxdata_url": f"{multi_integrated_pattern_file}::/entry/integrated",
"output_filename_template": str(tmp_path / "output_%04d.xye"),
"header": {"info": "test"},
}
task_without_overwrite = SaveNexusPatternsAsAscii(inputs=inputs)
with pytest.raises(RuntimeError) as exc:
task_without_overwrite.execute()
assert isinstance(exc.__cause__, FileExistsError)
task = SaveNexusPatternsAsAscii(
inputs={
**inputs,
"overwrite": True,
}
)
task.execute()
assert task.outputs.filenames == tuple(str(p) for p in expected_filepaths)
[docs]
def test_overwrite_zip_SaveNexusPatternsAsAscii(
tmp_path, multi_integrated_pattern_file
):
zip_filename = tmp_path / "archive.zip"
zip_filename.touch()
assert zip_filename.exists()
inputs = {
"nxdata_url": f"{multi_integrated_pattern_file}::/entry/integrated",
"output_filename_template": "output_%04d.xye",
"output_archive_filename": zip_filename,
"header": {"info": "test"},
}
task_without_overwrite = SaveNexusPatternsAsAscii(inputs=inputs)
with pytest.raises(RuntimeError) as exc:
task_without_overwrite.execute()
assert isinstance(exc.__cause__, FileExistsError)
task = SaveNexusPatternsAsAscii(
inputs={
**inputs,
"overwrite": True,
}
)
task.execute()
assert task.outputs.filenames == (zip_filename,)
extracted_path = tmp_path / "zip_content"
zipf = zipfile.ZipFile(zip_filename)
zipf.extractall(path=extracted_path)
expected_filenames = "output_0000.xye", "output_0001.xye"
assert tuple(zipf.namelist()) == expected_filenames
[docs]
def test_SaveNexusPatternsAsAscii_single_2d_pattern(tmp_path):
"""Test with a 2D signal (azimuthal, radial)"""
input_filename = str(tmp_path / "input.h5")
nxdata_content = {
"@NX_class": "NXdata",
"@axes": ["chi", "q"],
"@interpretation": "spectrum",
"@signal": "intensity",
"intensity": 2 * numpy.ones((6, 100), dtype=numpy.float32),
"q": numpy.linspace(0.01, 10, 100, dtype=numpy.float64),
"chi": numpy.linspace(0, 360, 6, dtype=numpy.float64),
"q@units": "A^-1",
}
dicttonx(nxdata_content, input_filename, "/entry/integrated")
task = SaveNexusPatternsAsAscii(
inputs={
"nxdata_url": f"{input_filename}::/entry/integrated",
"output_filename_template": str(tmp_path / "output_%04d.xye"),
"header": {"info": "test"},
}
)
task.execute()
expected_filepaths = (
tmp_path / "output_0000_sector0000.xye",
tmp_path / "output_0000_sector0001.xye",
tmp_path / "output_0000_sector0002.xye",
tmp_path / "output_0000_sector0003.xye",
tmp_path / "output_0000_sector0004.xye",
tmp_path / "output_0000_sector0005.xye",
)
assert task.outputs.filenames == tuple(str(p) for p in expected_filepaths)
for index, filepath in enumerate(expected_filepaths):
# Check header
text = filepath.read_text().splitlines()
assert "# point =" not in text, "No point header expected"
assert "# xunits = A^-1" in text, "xunit is missing or wrong"
assert (
f"# chi = {nxdata_content['chi'][index]}" in text
), "chi is missing or wrong"
assert "# info = test" in text, "Header information is missing"
# Check data
assert numpy.array_equal(
numpy.loadtxt(filepath),
# Expected array content:
numpy.transpose(
[
nxdata_content["q"],
nxdata_content["intensity"][index],
]
),
), "Saved data differs from original data"
[docs]
def test_SaveNexusPatternsAsAscii_multiple_2d_patterns(tmp_path):
"""Test with multiple 2D signals, no errors"""
input_filename = str(tmp_path / "input.h5")
nxdata_content = {
"@NX_class": "NXdata",
"@axes": [".", "chi", "q"],
"@interpretation": "spectrum",
"@signal": "intensity",
"intensity": 2 * numpy.ones((2, 6, 100), dtype=numpy.float32),
"q": numpy.linspace(0.01, 10, 100, dtype=numpy.float64),
"chi": numpy.linspace(0, 360, 6, dtype=numpy.float64),
"q@units": "A^-1",
}
dicttonx(nxdata_content, input_filename, "/entry/integrated")
task = SaveNexusPatternsAsAscii(
inputs={
"nxdata_url": f"{input_filename}::/entry/integrated",
"output_filename_template": str(tmp_path / "output_%04d.xye"),
}
)
task.execute()
expected_filepaths = (
tmp_path / "output_0000_sector0000.xye",
tmp_path / "output_0000_sector0001.xye",
tmp_path / "output_0000_sector0002.xye",
tmp_path / "output_0000_sector0003.xye",
tmp_path / "output_0000_sector0004.xye",
tmp_path / "output_0000_sector0005.xye",
tmp_path / "output_0001_sector0000.xye",
tmp_path / "output_0001_sector0001.xye",
tmp_path / "output_0001_sector0002.xye",
tmp_path / "output_0001_sector0003.xye",
tmp_path / "output_0001_sector0004.xye",
tmp_path / "output_0001_sector0005.xye",
)
assert task.outputs.filenames == tuple(str(p) for p in expected_filepaths)
expected_filepath = Path(expected_filepaths[0])
# Check header
text = expected_filepath.read_text()
assert "# xunits = A^-1" in text, "xunit is missing or wrong"
assert "# chi = 0.0" in text, "azim value is missing or wrong"
# Check data
assert numpy.array_equal(
numpy.loadtxt(expected_filepath),
# Expected array content:
numpy.transpose(
[
nxdata_content["q"],
nxdata_content["intensity"][0, 0],
]
),
), "Saved data differs from original data"
[docs]
def test_SaveNexusPatternsAsAscii_with_counter(tmp_path):
input_filename = str(tmp_path / "input.h5")
diode_data = numpy.arange(5)
energy = 14
entry_content = {
"integrated": {
"@NX_class": "NXdata",
"@axes": ["q"],
"@interpretation": "spectrum",
"@signal": "intensity",
"intensity": 2 * numpy.ones((5, 100), dtype=numpy.float32),
"q": numpy.linspace(0.01, 10, 100, dtype=numpy.float64),
"q@units": "A^-1",
"@units": "A^-1",
},
"instrument": {
"diode": {"data": diode_data},
"primary_beam_energy": {"data": energy},
},
"measurement": {
">diode": "../instrument/diode/data",
">primary_beam_energy": "../instrument/primary_beam_energy/data",
},
}
dicttonx(entry_content, input_filename, "/entry")
task = SaveNexusPatternsAsAscii(
inputs={
"nxdata_url": f"{input_filename}::/entry/integrated",
"output_filename_template": str(tmp_path / "output_%04d.xye"),
"counter_urls": [
f"{input_filename}::/entry/measurement/diode",
f"{input_filename}::/entry/measurement/primary_beam_energy",
],
}
)
task.execute()
expected_filepaths = tuple(str(tmp_path / f"output_000{i}.xye") for i in range(5))
assert task.outputs.filenames == expected_filepaths
# Check header
for i, expected_filepath in enumerate(expected_filepaths):
text = Path(expected_filepath).read_text().splitlines()
assert f"# diode = {diode_data[i]}" in text, "diode is missing or wrong"
assert (
f"# primary_beam_energy = {energy}" in text
), "primary_beam_energy is missing or wrong"