"""Create an XDS.INP file from explicit task inputs."""
from __future__ import annotations
import logging
import re
from pathlib import Path
from typing import Iterable
from ewokscore import Task
logger = logging.getLogger(__name__)
_INDEX_TOKEN_RE = re.compile(r"\{index(?::0?(\d+)d)?\}")
_DEFAULT_JOB_COMMENT_LINES = [
"!JOB= XYCORR INIT COLSPOT IDXREF DEFPIX INTEGRATE CORRECT",
"!JOB= XYCORR INIT COLSPOT IDXREF",
]
_DEFAULT_UNIT_CELL_CONSTANTS = (
"!UNIT_CELL_CONSTANTS= 10.317 10.317 7.3378 90 90 120 ! put correct values if known"
)
_DEFAULT_REFINE_IDXREF = "CELL ORIENTATION AXIS ! BEAM POSITION"
_DEFAULT_REFINE_INTEGRATE = "CELL ORIENTATION AXIS ! BEAM POSITION"
_DEFAULT_REFINE_CORRECT = "CELL ORIENTATION AXIS ! BEAM POSITION"
_DEFAULT_UNTRUSTED_RECTANGLES = [
[513, 514, 0, 3262],
[1028, 1039, 0, 3262],
[1553, 1554, 0, 3262],
[2068, 2079, 0, 3262],
[2593, 2594, 0, 3262],
[0, 3108, 512, 549],
[0, 3108, 1062, 1099],
[0, 3108, 1612, 1649],
[0, 3108, 2162, 2199],
[0, 3108, 2712, 2749],
]
def _format_number(value) -> str:
number = float(value)
if number.is_integer():
return str(int(number))
return f"{number:.8f}".rstrip("0").rstrip(".")
def _format_value(value) -> str:
if isinstance(value, str):
return value
if isinstance(value, (list, tuple)):
return " ".join(_format_number(item) for item in value)
return _format_number(value)
def _format_fixed(value, decimals: int) -> str:
if isinstance(value, str):
return value
return f"{float(value):.{decimals}f}"
def _normalize_lines(value) -> list[str]:
if value is None:
return []
if isinstance(value, str):
return value.splitlines() or [value]
if isinstance(value, Iterable):
return [str(item) for item in value]
return [str(value)]
def _format_untrusted_rectangle(rectangle) -> str:
values = [int(item) for item in rectangle]
return (
f"UNTRUSTED_RECTANGLE={values[0]:5d} {values[1]:4d} "
f"{values[2]:4d} {values[3]:4d}"
)
def _resolve_output_dir(output: str) -> Path:
output_path = Path(output)
if "{index" in output or output_path.suffix:
return output_path.parent or Path(".")
return output_path
def _template_from_output(output: str) -> str | None:
output_name = Path(output).name
def replace(match: re.Match[str]) -> str:
width = match.group(1)
return "?" * int(width) if width else "?"
template = _INDEX_TOKEN_RE.sub(replace, output_name)
return None if template == output_name else template
[docs]
class CreateXDSInp(
Task,
input_names=[
"output",
"detector",
"nx",
"ny",
"qx",
"qy",
"data_range",
"oscillation_range",
],
optional_input_names=[
"filename",
"job",
"job_comment_lines",
"library",
"beam",
"distance",
"wavelength",
"overload",
"space_group_number",
"unit_cell_constants",
"friedels_law",
"fraction_of_polarization",
"minimum_valid_pixel_value",
"trusted_region",
"polarization_plane_normal",
"direction_of_detector_x_axis",
"direction_of_detector_y_axis",
"name_template_of_data_frames",
"spot_range",
"background_range",
"untrusted_rectangles",
"rotation_axis",
"incident_beam_direction",
"sensor_thickness",
"silicon",
"starting_angle",
"starting_frame",
"orgx",
"orgy",
"detector_distance",
"minimum_number_of_pixels_in_a_spot",
"maximum_number_of_strong_pixels",
"background_pixel",
"signal_pixel",
"refine_idxref",
"refine_integrate",
"refine_correct",
"exclude_resolution_range",
"include_resolution_range",
"beam_divergence",
"beam_divergence_esd",
"reflecting_range",
"reflecting_range_esd",
"xds_extra",
],
output_names=["saved_files_path"],
):
"""Create an XDS.INP file using explicit experiment parameters."""
[docs]
def run(self):
output_value = self.get_input_value("output")
output_dir = _resolve_output_dir(output_value)
if output_dir.exists() and not output_dir.is_dir():
raise NotADirectoryError(f"Output must be a directory: {output_dir}")
output_dir.mkdir(parents=True, exist_ok=True)
filename = self.get_input_value("filename", "XDS.INP")
xds_path = output_dir / filename
data_range = self.get_input_value("data_range")
spot_range = self.get_input_value("spot_range", data_range)
background_range = self.get_input_value("background_range", data_range)
beam = self.get_input_value("beam", None)
if beam is not None:
orgx, orgy = beam
else:
orgx = self.get_input_value("orgx")
orgy = self.get_input_value("orgy")
distance = self.get_input_value(
"distance", self.get_input_value("detector_distance", None)
)
wavelength = self.get_input_value("wavelength", None)
starting_frame = self.get_input_value("starting_frame", data_range[0])
name_template = self.get_input_value(
"name_template_of_data_frames",
_template_from_output(output_value),
)
if name_template is None:
raise ValueError(
"name_template_of_data_frames is required when output has no {index} token"
)
missing = []
if orgx is None or orgy is None:
missing.append("beam")
if distance is None:
missing.append("distance")
if wavelength is None:
missing.append("wavelength")
if missing:
raise ValueError("Missing required XDS inputs: " + ", ".join(missing))
untrusted_rectangles = self.get_input_value(
"untrusted_rectangles", list(_DEFAULT_UNTRUSTED_RECTANGLES)
)
xds_extra = _normalize_lines(self.get_input_value("xds_extra", []))
starting_angle = self.get_input_value("starting_angle", None)
unit_cell_constants = self.get_input_value("unit_cell_constants", None)
trusted_region = self.get_input_value("trusted_region", None)
minimum_valid_pixel_value = self.get_input_value(
"minimum_valid_pixel_value", None
)
lines = [
*_normalize_lines(
self.get_input_value(
"job_comment_lines", list(_DEFAULT_JOB_COMMENT_LINES)
)
),
(
f"JOB= {self.get_input_value('job', 'XYCORR INIT COLSPOT IDXREF DEFPIX INTEGRATE CORRECT')}"
),
"",
f"NAME_TEMPLATE_OF_DATA_FRAMES= {name_template}",
]
library = self.get_input_value("library", None)
if library is not None:
lines.append(f"!LIB= {library}")
lines.extend(
[
"",
f"SPACE_GROUP_NUMBER={_format_value(self.get_input_value('space_group_number', 0))} ! 0 if unknown",
(
f"UNIT_CELL_CONSTANTS={_format_value(unit_cell_constants)}"
if unit_cell_constants is not None
else _DEFAULT_UNIT_CELL_CONSTANTS
),
"",
]
)
lines.extend(
[
(
f"FRIEDEL'S_LAW={self.get_input_value('friedels_law', 'FALSE')} "
"! This acts only on the CORRECT step"
),
"",
f"FRACTION_OF_POLARIZATION={_format_value(self.get_input_value('fraction_of_polarization', 0.99))}",
(
"ROTATION_AXIS="
f"{_format_value(self.get_input_value('rotation_axis', [0.0, -1.0, 0.0]))}"
),
(
"POLARIZATION_PLANE_NORMAL="
f"{_format_value(self.get_input_value('polarization_plane_normal', [0.0, 1.0, 0.0]))}"
),
(
"DIRECTION_OF_DETECTOR_X-AXIS="
f"{_format_value(self.get_input_value('direction_of_detector_x_axis', [1.0, 0.0, 0.0]))}"
),
(
"DIRECTION_OF_DETECTOR_Y-AXIS="
f"{_format_value(self.get_input_value('direction_of_detector_y_axis', [0.0, 1.0, 0.0]))}"
),
(
"INCIDENT_BEAM_DIRECTION="
f"{_format_value(self.get_input_value('incident_beam_direction', [0.0, 0.0, 1.0]))}"
),
f"OSCILLATION_RANGE= {_format_value(self.get_input_value('oscillation_range'))}",
]
)
if starting_angle is not None:
lines.append(f"STARTING_ANGLE= {_format_value(starting_angle)}")
lines.append(f"STARTING_FRAME= {_format_value(starting_frame)}")
lines.extend(
[
"",
f"OVERLOAD={_format_value(self.get_input_value('overload', 100000000))}",
]
)
if minimum_valid_pixel_value is not None:
lines.append(
f"MINIMUM_VALID_PIXEL_VALUE= {_format_value(minimum_valid_pixel_value)}"
)
if trusted_region is not None:
lines.append(f"TRUSTED_REGION= {_format_value(trusted_region)}")
lines.extend(
[
"",
f"DATA_RANGE= {_format_value(data_range)}",
f"SPOT_RANGE= {_format_value(spot_range)}",
f"BACKGROUND_RANGE= {_format_value(background_range)}",
"",
(
f"NX= {_format_value(self.get_input_value('nx'))} "
f"NY= {_format_value(self.get_input_value('ny'))} "
f"QX= {_format_fixed(self.get_input_value('qx'), 6)} "
f"QY= {_format_fixed(self.get_input_value('qy'), 6)}"
),
f"DETECTOR= {self.get_input_value('detector')}",
f"DETECTOR_DISTANCE= {_format_value(distance)}",
f"ORGX= {_format_value(orgx)} ORGY= {_format_value(orgy)}",
f"X-RAY_WAVELENGTH={_format_value(wavelength)}",
"",
]
)
for rectangle in untrusted_rectangles:
lines.append(_format_untrusted_rectangle(rectangle))
lines.extend(
[
"",
f"SENSOR_THICKNESS={_format_value(self.get_input_value('sensor_thickness', 0.75))} !mm",
f"SILICON= {_format_value(self.get_input_value('silicon', 18.023))} !1/mm",
"",
(
"MINIMUM_NUMBER_OF_PIXELS_IN_A_SPOT="
f"{_format_value(self.get_input_value('minimum_number_of_pixels_in_a_spot', 4))} "
"! default of 6 is sometimes too high"
),
(
"MAXIMUM_NUMBER_OF_STRONG_PIXELS="
f"{_format_value(self.get_input_value('maximum_number_of_strong_pixels', 18000))} "
"! total number of strong pixels used for indexation"
),
(
"BACKGROUND_PIXEL="
f"{_format_fixed(self.get_input_value('background_pixel', 2.0), 1)} "
"! used by COLSPOT and INTEGRATE"
),
(
"SIGNAL_PIXEL="
f"{_format_fixed(self.get_input_value('signal_pixel', 3.0), 1)} "
"! needs to be lager than BACKGROUND_PIXEL, specifies standard deviation, "
"used in COLSPOT and INTEGRATE"
),
"",
(
"REFINE(IDXREF)= "
f"{self.get_input_value('refine_idxref', _DEFAULT_REFINE_IDXREF)}"
),
(
"REFINE(INTEGRATE)= "
f"{self.get_input_value('refine_integrate', _DEFAULT_REFINE_INTEGRATE)}"
),
(
"REFINE(CORRECT)= "
f"{self.get_input_value('refine_correct', _DEFAULT_REFINE_CORRECT)}"
),
]
)
exclude_resolution_range = self.get_input_value(
"exclude_resolution_range", None
)
include_resolution_range = self.get_input_value(
"include_resolution_range", None
)
beam_divergence = self.get_input_value("beam_divergence", None)
beam_divergence_esd = self.get_input_value("beam_divergence_esd", None)
reflecting_range = self.get_input_value("reflecting_range", None)
reflecting_range_esd = self.get_input_value("reflecting_range_esd", None)
if (
exclude_resolution_range is not None
or include_resolution_range is not None
or beam_divergence is not None
or beam_divergence_esd is not None
or reflecting_range is not None
or reflecting_range_esd is not None
or xds_extra
):
lines.extend(["", ""])
if exclude_resolution_range is not None:
lines.append(
f"EXCLUDE_RESOLUTION_RANGE= {_format_value(exclude_resolution_range)}"
)
if include_resolution_range is not None:
lines.append(
f"INCLUDE_RESOLUTION_RANGE={_format_value(include_resolution_range)}"
)
if beam_divergence is not None or beam_divergence_esd is not None:
lines.append(
" BEAM_DIVERGENCE= "
f"{_format_value(beam_divergence)} "
f"BEAM_DIVERGENCE_E.S.D.= {_format_value(beam_divergence_esd)}"
)
if reflecting_range is not None or reflecting_range_esd is not None:
lines.append(
" REFLECTING_RANGE= "
f"{_format_value(reflecting_range)} "
f"REFLECTING_RANGE_E.S.D.= {_format_value(reflecting_range_esd)}"
)
if xds_extra:
if (
exclude_resolution_range is not None
or include_resolution_range is not None
or beam_divergence is not None
or beam_divergence_esd is not None
or reflecting_range is not None
or reflecting_range_esd is not None
):
lines.append("")
lines.extend(xds_extra)
xds_path.write_text("\n".join(lines) + "\n", encoding="ascii")
self.outputs.saved_files_path = [str(xds_path)]
logger.info("Created XDS input file at %s", xds_path)