import os
import h5py
import numpy as np
import pytest
from types import SimpleNamespace
import ewoksscxrd.tasks.utils as utils
# --- Tests for create_run_file ---
[docs]
@pytest.fixture
def stub_crysalis(monkeypatch, tmp_path):
"""
Stub crysalis module to record calls and simulate file creation.
"""
calls = {}
# Fake RunHeader
def fake_RunHeader(name, dirpath, one):
calls["RunHeader"] = (name, dirpath, one)
return SimpleNamespace()
# Fake RunDscr
def fake_RunDscr(val):
calls.setdefault("RunDscr", []).append(val)
return SimpleNamespace()
# Fake SCAN_AXIS
fake_scan_axis = {"OMEGA": 42}
# Fake saveRun writes a .run file
def fake_saveRun(runname, header, runFile):
calls["saveRun"] = (runname, header, runFile)
# create a dummy .run file
with open(runname + ".run", "w") as f:
f.write("# run file")
# Fake saveCrysalisExpSettings
def fake_saveCrysalisExpSettings(dirpath):
calls["saveCrysalisExpSettings"] = dirpath
# Monkeypatch
monkeypatch.setattr(
utils,
"crysalis",
SimpleNamespace(
RunHeader=fake_RunHeader,
RunDscr=fake_RunDscr,
SCAN_AXIS=fake_scan_axis,
saveRun=fake_saveRun,
saveCrysalisExpSettings=fake_saveCrysalisExpSettings,
),
)
return calls
[docs]
def test_create_run_file_success(stub_crysalis, tmp_path):
scans = [
[{"kappa": 1, "omega_start": 0, "omega_end": 10, "domega": 0.5, "count": 5}]
]
crys_dir = str(tmp_path / "crys")
basename = "run1"
os.makedirs(crys_dir, exist_ok=True)
# Call utility
utils.create_run_file(scans, crys_dir, basename)
# Assertions
calls = stub_crysalis
# RunHeader called with encoded names
assert calls["RunHeader"][0] == basename.encode()
assert calls["RunHeader"][1] == crys_dir.encode()
# RunDscr called once with 0
assert calls["RunDscr"] == [0]
# saveRun called with correct runname
runname, header, runFile = calls["saveRun"]
assert runname == os.path.join(crys_dir, basename)
# .run file created
assert os.path.exists(runname + ".run")
# saveCrysalisExpSettings called
assert calls["saveCrysalisExpSettings"] == crys_dir
# --- Tests for create_par_file ---
[docs]
@pytest.fixture
def sample_par(tmp_path):
path = tmp_path / "orig.par"
content = [
"LINE1\n",
"FILE CHIP oldfile.ext\n",
"LINE3\n",
]
path.write_text("".join(content), encoding="iso-8859-1")
return str(path)
[docs]
def test_create_par_file(sample_par, tmp_path):
dest_dir = str(tmp_path / "out")
basename = "new.par"
os.makedirs(dest_dir, exist_ok=True)
utils.create_par_file(sample_par, dest_dir, basename)
new_path = os.path.join(dest_dir, basename)
assert os.path.exists(new_path)
with open(new_path, encoding="iso-8859-1") as f:
text = f.readlines()
# First and third lines unchanged
assert text[0] == "LINE1\n"
# FILE CHIP line replaced
assert text[1].startswith("FILE CHIP new.ccd")
assert text[2] == "LINE3\n"
# --- Tests for read_dataset ---
[docs]
@pytest.fixture
def sample_h5(tmp_path):
path = tmp_path / "data.h5"
arr = np.arange(24).reshape(2, 3, 4)
with h5py.File(path, "w") as f:
grp = f.require_group("entry_0000/measurement")
grp.create_dataset("data", data=arr)
return str(path), arr
[docs]
def test_read_dataset(sample_h5):
path, arr = sample_h5
out = utils.read_dataset(path)
assert isinstance(out, np.ndarray)
assert out.shape == arr.shape
assert np.array_equal(out, arr)
# --- Tests for subtract_frame_inplace ---
[docs]
@pytest.fixture
def sample_frame():
# frame shape (2,H,W)
H, W = 2, 2
dtype = np.uint32
frame = np.zeros((2, H, W), dtype=dtype)
# frame[0]
frame[0] = np.array([[100, 0], [0, 0]], dtype=dtype)
# frame[1]
frame[1] = np.array([[1, 10], [np.iinfo(dtype).max, 2]], dtype=dtype)
return frame
[docs]
def test_subtract_frame_inplace(sample_frame):
"""
Test subtract_frame_inplace with threshold set to max uint32.
"""
frame = sample_frame.copy()
scale = 2
maxv = np.iinfo(np.uint32).max
thresh = maxv # threshold always max uint32
utils.subtract_frame_inplace(frame, scale, thresh)
# pixel [0,0]: frame1=1<thresh => sub:100-2*1=98
assert frame[0, 0, 0] == 98
# [0,1]: frame1=10<thresh => sub:0-2*10 = -20 clamped to 0
assert frame[0, 0, 1] == 0
# [1,0]: frame1=max => saturate => max
assert frame[0, 1, 0] == maxv
# [1,1]: frame1=2<thresh => sub:0-2*2 = -4 clamped to 0
assert frame[0, 1, 1] == 0
[docs]
def test_subtract_images_parallel_valid():
# create data shape (2,2,2,2)
data = np.zeros((2, 2, 2, 2), dtype=np.uint32)
# fill frame0
data[:, 0] = 100
# frame1 values: first frame below thresh, second above
data[0, 1] = 1
data[1, 1] = 10
out = utils.subtract_images_inplace_parallel(data, scale_factor=1, masking_value=5)
# out shape: (2,2,2)
assert out.shape == (2, 2, 2)
# first frame: 100-1=99
assert out[0, 0, 0] == 99
# second frame: 100+10=110
assert out[1, 0, 0] == 110
[docs]
@pytest.mark.parametrize(
"arr",
[
np.zeros((2, 2, 2), dtype=np.uint32), # 3D
np.zeros((2, 1, 2, 2), dtype=np.uint32), # too small dim1
],
)
def test_subtract_images_parallel_invalid(arr):
with pytest.raises(ValueError):
utils.subtract_images_inplace_parallel(arr, 1, 5)
# --- Tests for create_header_from_file ---
# --- Tests for write_lima_images ---
[docs]
def test_write_lima_images(tmp_path, monkeypatch):
# Prepare data and header
data = np.arange(2 * 3 * 4, dtype=np.uint32).reshape(2, 3, 4)
header = {
"title": "test",
"instrument": "ESRF-ID15B",
"detector": "mydet",
"exposure_time": 0.5,
"latency_time": 0.1,
"mode": "Single",
"trigger_mode": "Trig",
"model": "Mod",
"type": "Typ",
"pixel_size_x": 0.1,
"pixel_size_y": 0.2,
"image_roi": "<0,0>-<3x4>",
}
out = tmp_path / "out.nxs"
# Ensure hdf5plugin is None to force gzip
monkeypatch.setattr(utils, "hdf5plugin", None)
# Execute write
utils.write_lima_images(data, str(out), header)
# Open with h5py and verify dataset exists
with h5py.File(str(out), "r") as f:
# path '/entry_0000/ESRF-ID15B/mydet/data'
d = f["/entry_0000/ESRF-ID15B/mydet/data"]
assert d.shape == (2, 3, 4)
assert d.dtype == np.uint32
# verify attribute
assert d.attrs["interpretation"] == "image"
# Check entry default attr
with h5py.File(str(out), "r") as f:
entry = f["/entry_0000"]
assert "default" in entry.attrs