import os
import logging
from ewokscore import Task
from .utils import create_par_file
logger = logging.getLogger(__name__)
def _transform_beam(beam, width, height, detector_square_size):
"""
Center a rectangular detector (width×height) into a square of size detector_square_size,
then flip the Y axis.
x_new = x_old + pad_x
y_new = detector_square_size - (y_old + pad_y)
where
pad_x = (detector_square_size - width) / 2
pad_y = (detector_square_size - height) / 2
"""
x, y = float(beam[0]), float(beam[1])
W, H = float(width), float(height)
pad_x = (detector_square_size - W) / 2.0
pad_y = (detector_square_size - H) / 2.0
x_new = x + pad_x
y_new = float(detector_square_size) - (y + pad_y)
return [x_new, y_new]
def _format_rotation(original_line: str, distance, beam):
parts = original_line.strip().split()
header = " ".join(parts[:2])
nums = [float(x) for x in parts[2:]]
nums[3] = float(distance)
nums[4], nums[5] = float(beam[0]), float(beam[1])
body = " ".join(f"{v:.5f}" for v in nums)
return f"{header} {body}"
[docs]
class CreateParFiles(
Task,
input_names=["output", "par_file"],
optional_input_names=[
"distance",
"wavelength",
"polarization",
"beam",
"detector_width",
"detector_height",
"detector_square_size",
],
output_names=["saved_files_path"],
):
[docs]
def run(self):
output = self.get_input_value("output")
par_file = self.get_input_value("par_file")
saved_files = []
ext = os.path.splitext(par_file)[-1].lower()
dest_basename = os.path.basename(output) + ext
dest_dir = os.path.dirname(output)
dest_path = os.path.join(dest_dir, dest_basename)
logger.info(f"Starting CreateParFiles for {par_file}")
if not os.path.exists(par_file) or ext != ".par":
logger.warning(f"Invalid .par file: {par_file}")
self.outputs.saved_files_path = []
return
# optional overrides
dist = self.get_input_value("distance", None)
wl = self.get_input_value("wavelength", None)
pol = self.get_input_value("polarization", None)
beam = self.get_input_value("beam", None)
det_w = self.get_input_value("detector_width", 2068)
det_h = self.get_input_value("detector_height", 2162)
det_sq = self.get_input_value("detector_square_size", 2164)
# transform beam if dimensions provided
if beam is not None and det_w is not None and det_h is not None:
beam = _transform_beam(beam, det_w, det_h, det_sq)
source = par_file
# rewrite file content if any override provided
if any(v is not None for v in (dist, wl, pol, beam)):
logger.info("Applying provided parameters to .par content")
with open(par_file, encoding="latin-1") as f:
lines = f.readlines()
new_lines = []
for line in lines:
stripped = line.lstrip()
# 1) DETECTOR DISTANCE
if dist is not None and stripped.startswith("§ - DETECTOR DISTANCE"):
new_lines.append(
f"§ - DETECTOR DISTANCE (MM): {float(dist):.5f}\n"
)
continue
# 2) WAVELENGTH USERSPECIFIED
if wl is not None and stripped.startswith(
"§ - WAVELENGTH USERSPECIFIED"
):
new_lines.append(
f"§ - WAVELENGTH USERSPECIFIED (ANG): "
f"A1 {float(wl):.5f} A2 {float(wl):.5f} B1 {float(wl):.5f}\n"
)
continue
# 3) CRYSTALLOGRAPHY WAVELENGTH
if wl is not None and line.startswith("CRYSTALLOGRAPHY WAVELENGTH"):
new_lines.append(
f"CRYSTALLOGRAPHY WAVELENGTH "
f"{float(wl):.5f} {float(wl):.5f} {float(wl):.5f}\n"
)
continue
# 4) POLARISATION FACTOR
if pol is not None and stripped.startswith("§ - POLARISATION FACTOR"):
new_lines.append(f"§ - POLARISATION FACTOR {float(pol):.5f}\n")
continue
# 5) DETECTOR ZERO
if beam is not None and stripped.startswith(
"§ - DETECTOR ZERO (PIX, 1X1 BINNING)"
):
new_lines.append(
f"§ - DETECTOR ZERO (PIX, 1X1 BINNING): "
f"X {float(beam[0]):.5f} Y {float(beam[1]):.5f}\n"
)
continue
# 6) ROTATION DETECTORORIENTATION
if dist is not None and line.startswith("ROTATION DETECTORORIENTATION"):
new_lines.append(_format_rotation(line, dist, beam) + "\n")
continue
# keep unchanged
new_lines.append(line)
# write temp file
os.makedirs(dest_dir, exist_ok=True)
temp = os.path.join(dest_dir, "temp.par")
with open(temp, "w", encoding="latin-1") as f:
f.writelines(new_lines)
source = temp
# forward kwargs
kwargs = {}
if dist is not None:
kwargs["distance"] = dist
if wl is not None:
kwargs["wavelength"] = wl
if pol is not None:
kwargs["polarization"] = pol
if beam is not None:
kwargs["beam"] = beam
if det_w is not None:
kwargs["detector_width"] = det_w
if det_h is not None:
kwargs["detector_height"] = det_h
if det_sq is not None:
kwargs["detector_square_size"] = det_sq
# call create_par_file
os.makedirs(dest_dir, exist_ok=True)
try:
create_par_file(source, dest_dir, dest_basename, **kwargs)
except TypeError:
create_par_file(source, dest_dir, dest_basename)
saved_files.append(dest_path)
self.outputs.saved_files_path = saved_files
logger.info(f"CreateParFiles completed: {saved_files}")