Source code for ewoksscxrd.tasks.createxdsinp

"""Create an XDS.INP file from explicit task inputs."""

from __future__ import annotations

import logging
import re

from pathlib import Path
from typing import Iterable

from ewokscore import Task

logger = logging.getLogger(__name__)

_INDEX_TOKEN_RE = re.compile(r"\{index(?::0?(\d+)d)?\}")
_DEFAULT_JOB_COMMENT_LINES = [
    "!JOB= XYCORR INIT COLSPOT IDXREF DEFPIX INTEGRATE CORRECT",
    "!JOB= XYCORR INIT COLSPOT IDXREF",
]
_DEFAULT_UNIT_CELL_CONSTANTS = (
    "!UNIT_CELL_CONSTANTS= 10.317 10.317 7.3378 90 90 120 ! put correct values if known"
)
_DEFAULT_REFINE_IDXREF = "CELL ORIENTATION AXIS  ! BEAM POSITION"
_DEFAULT_REFINE_INTEGRATE = "CELL ORIENTATION AXIS ! BEAM POSITION"
_DEFAULT_REFINE_CORRECT = "CELL ORIENTATION AXIS ! BEAM POSITION"
_DEFAULT_UNTRUSTED_RECTANGLES = [
    [513, 514, 0, 3262],
    [1028, 1039, 0, 3262],
    [1553, 1554, 0, 3262],
    [2068, 2079, 0, 3262],
    [2593, 2594, 0, 3262],
    [0, 3108, 512, 549],
    [0, 3108, 1062, 1099],
    [0, 3108, 1612, 1649],
    [0, 3108, 2162, 2199],
    [0, 3108, 2712, 2749],
]


def _format_number(value) -> str:
    number = float(value)
    if number.is_integer():
        return str(int(number))
    return f"{number:.8f}".rstrip("0").rstrip(".")


def _format_value(value) -> str:
    if isinstance(value, str):
        return value
    if isinstance(value, (list, tuple)):
        return " ".join(_format_number(item) for item in value)
    return _format_number(value)


def _format_fixed(value, decimals: int) -> str:
    if isinstance(value, str):
        return value
    return f"{float(value):.{decimals}f}"


def _normalize_lines(value) -> list[str]:
    if value is None:
        return []
    if isinstance(value, str):
        return value.splitlines() or [value]
    if isinstance(value, Iterable):
        return [str(item) for item in value]
    return [str(value)]


def _format_untrusted_rectangle(rectangle) -> str:
    values = [int(item) for item in rectangle]
    return (
        f"UNTRUSTED_RECTANGLE={values[0]:5d} {values[1]:4d} "
        f"{values[2]:4d} {values[3]:4d}"
    )


def _resolve_output_dir(output: str) -> Path:
    output_path = Path(output)
    if "{index" in output or output_path.suffix:
        return output_path.parent or Path(".")
    return output_path


def _template_from_output(output: str) -> str | None:
    output_name = Path(output).name

    def replace(match: re.Match[str]) -> str:
        width = match.group(1)
        return "?" * int(width) if width else "?"

    template = _INDEX_TOKEN_RE.sub(replace, output_name)
    return None if template == output_name else template


[docs] class CreateXDSInp( Task, input_names=[ "output", "detector", "nx", "ny", "qx", "qy", "data_range", "oscillation_range", ], optional_input_names=[ "filename", "job", "job_comment_lines", "library", "beam", "distance", "wavelength", "overload", "space_group_number", "unit_cell_constants", "friedels_law", "fraction_of_polarization", "minimum_valid_pixel_value", "trusted_region", "polarization_plane_normal", "direction_of_detector_x_axis", "direction_of_detector_y_axis", "name_template_of_data_frames", "spot_range", "background_range", "untrusted_rectangles", "rotation_axis", "incident_beam_direction", "sensor_thickness", "silicon", "starting_angle", "starting_frame", "orgx", "orgy", "detector_distance", "minimum_number_of_pixels_in_a_spot", "maximum_number_of_strong_pixels", "background_pixel", "signal_pixel", "refine_idxref", "refine_integrate", "refine_correct", "exclude_resolution_range", "include_resolution_range", "beam_divergence", "beam_divergence_esd", "reflecting_range", "reflecting_range_esd", "xds_extra", ], output_names=["saved_files_path"], ): """Create an XDS.INP file using explicit experiment parameters."""
[docs] def run(self): output_value = self.get_input_value("output") output_dir = _resolve_output_dir(output_value) if output_dir.exists() and not output_dir.is_dir(): raise NotADirectoryError(f"Output must be a directory: {output_dir}") output_dir.mkdir(parents=True, exist_ok=True) filename = self.get_input_value("filename", "XDS.INP") xds_path = output_dir / filename data_range = self.get_input_value("data_range") spot_range = self.get_input_value("spot_range", data_range) background_range = self.get_input_value("background_range", data_range) beam = self.get_input_value("beam", None) if beam is not None: orgx, orgy = beam else: orgx = self.get_input_value("orgx") orgy = self.get_input_value("orgy") distance = self.get_input_value( "distance", self.get_input_value("detector_distance", None) ) wavelength = self.get_input_value("wavelength", None) starting_frame = self.get_input_value("starting_frame", data_range[0]) name_template = self.get_input_value( "name_template_of_data_frames", _template_from_output(output_value), ) if name_template is None: raise ValueError( "name_template_of_data_frames is required when output has no {index} token" ) missing = [] if orgx is None or orgy is None: missing.append("beam") if distance is None: missing.append("distance") if wavelength is None: missing.append("wavelength") if missing: raise ValueError("Missing required XDS inputs: " + ", ".join(missing)) untrusted_rectangles = self.get_input_value( "untrusted_rectangles", list(_DEFAULT_UNTRUSTED_RECTANGLES) ) xds_extra = _normalize_lines(self.get_input_value("xds_extra", [])) starting_angle = self.get_input_value("starting_angle", None) unit_cell_constants = self.get_input_value("unit_cell_constants", None) trusted_region = self.get_input_value("trusted_region", None) minimum_valid_pixel_value = self.get_input_value( "minimum_valid_pixel_value", None ) lines = [ *_normalize_lines( self.get_input_value( "job_comment_lines", list(_DEFAULT_JOB_COMMENT_LINES) ) ), ( f"JOB= {self.get_input_value('job', 'XYCORR INIT COLSPOT IDXREF DEFPIX INTEGRATE CORRECT')}" ), "", f"NAME_TEMPLATE_OF_DATA_FRAMES= {name_template}", ] library = self.get_input_value("library", None) if library is not None: lines.append(f"!LIB= {library}") lines.extend( [ "", f"SPACE_GROUP_NUMBER={_format_value(self.get_input_value('space_group_number', 0))} ! 0 if unknown", ( f"UNIT_CELL_CONSTANTS={_format_value(unit_cell_constants)}" if unit_cell_constants is not None else _DEFAULT_UNIT_CELL_CONSTANTS ), "", ] ) lines.extend( [ ( f"FRIEDEL'S_LAW={self.get_input_value('friedels_law', 'FALSE')} " "! This acts only on the CORRECT step" ), "", f"FRACTION_OF_POLARIZATION={_format_value(self.get_input_value('fraction_of_polarization', 0.99))}", ( "ROTATION_AXIS=" f"{_format_value(self.get_input_value('rotation_axis', [0.0, -1.0, 0.0]))}" ), ( "POLARIZATION_PLANE_NORMAL=" f"{_format_value(self.get_input_value('polarization_plane_normal', [0.0, 1.0, 0.0]))}" ), ( "DIRECTION_OF_DETECTOR_X-AXIS=" f"{_format_value(self.get_input_value('direction_of_detector_x_axis', [1.0, 0.0, 0.0]))}" ), ( "DIRECTION_OF_DETECTOR_Y-AXIS=" f"{_format_value(self.get_input_value('direction_of_detector_y_axis', [0.0, 1.0, 0.0]))}" ), ( "INCIDENT_BEAM_DIRECTION=" f"{_format_value(self.get_input_value('incident_beam_direction', [0.0, 0.0, 1.0]))}" ), f"OSCILLATION_RANGE= {_format_value(self.get_input_value('oscillation_range'))}", ] ) if starting_angle is not None: lines.append(f"STARTING_ANGLE= {_format_value(starting_angle)}") lines.append(f"STARTING_FRAME= {_format_value(starting_frame)}") lines.extend( [ "", f"OVERLOAD={_format_value(self.get_input_value('overload', 100000000))}", ] ) if minimum_valid_pixel_value is not None: lines.append( f"MINIMUM_VALID_PIXEL_VALUE= {_format_value(minimum_valid_pixel_value)}" ) if trusted_region is not None: lines.append(f"TRUSTED_REGION= {_format_value(trusted_region)}") lines.extend( [ "", f"DATA_RANGE= {_format_value(data_range)}", f"SPOT_RANGE= {_format_value(spot_range)}", f"BACKGROUND_RANGE= {_format_value(background_range)}", "", ( f"NX= {_format_value(self.get_input_value('nx'))} " f"NY= {_format_value(self.get_input_value('ny'))} " f"QX= {_format_fixed(self.get_input_value('qx'), 6)} " f"QY= {_format_fixed(self.get_input_value('qy'), 6)}" ), f"DETECTOR= {self.get_input_value('detector')}", f"DETECTOR_DISTANCE= {_format_value(distance)}", f"ORGX= {_format_value(orgx)} ORGY= {_format_value(orgy)}", f"X-RAY_WAVELENGTH={_format_value(wavelength)}", "", ] ) for rectangle in untrusted_rectangles: lines.append(_format_untrusted_rectangle(rectangle)) lines.extend( [ "", f"SENSOR_THICKNESS={_format_value(self.get_input_value('sensor_thickness', 0.75))} !mm", f"SILICON= {_format_value(self.get_input_value('silicon', 18.023))} !1/mm", "", ( "MINIMUM_NUMBER_OF_PIXELS_IN_A_SPOT=" f"{_format_value(self.get_input_value('minimum_number_of_pixels_in_a_spot', 4))} " "! default of 6 is sometimes too high" ), ( "MAXIMUM_NUMBER_OF_STRONG_PIXELS=" f"{_format_value(self.get_input_value('maximum_number_of_strong_pixels', 18000))} " "! total number of strong pixels used for indexation" ), ( "BACKGROUND_PIXEL=" f"{_format_fixed(self.get_input_value('background_pixel', 2.0), 1)} " "! used by COLSPOT and INTEGRATE" ), ( "SIGNAL_PIXEL=" f"{_format_fixed(self.get_input_value('signal_pixel', 3.0), 1)} " "! needs to be lager than BACKGROUND_PIXEL, specifies standard deviation, " "used in COLSPOT and INTEGRATE" ), "", ( "REFINE(IDXREF)= " f"{self.get_input_value('refine_idxref', _DEFAULT_REFINE_IDXREF)}" ), ( "REFINE(INTEGRATE)= " f"{self.get_input_value('refine_integrate', _DEFAULT_REFINE_INTEGRATE)}" ), ( "REFINE(CORRECT)= " f"{self.get_input_value('refine_correct', _DEFAULT_REFINE_CORRECT)}" ), ] ) exclude_resolution_range = self.get_input_value( "exclude_resolution_range", None ) include_resolution_range = self.get_input_value( "include_resolution_range", None ) beam_divergence = self.get_input_value("beam_divergence", None) beam_divergence_esd = self.get_input_value("beam_divergence_esd", None) reflecting_range = self.get_input_value("reflecting_range", None) reflecting_range_esd = self.get_input_value("reflecting_range_esd", None) if ( exclude_resolution_range is not None or include_resolution_range is not None or beam_divergence is not None or beam_divergence_esd is not None or reflecting_range is not None or reflecting_range_esd is not None or xds_extra ): lines.extend(["", ""]) if exclude_resolution_range is not None: lines.append( f"EXCLUDE_RESOLUTION_RANGE= {_format_value(exclude_resolution_range)}" ) if include_resolution_range is not None: lines.append( f"INCLUDE_RESOLUTION_RANGE={_format_value(include_resolution_range)}" ) if beam_divergence is not None or beam_divergence_esd is not None: lines.append( " BEAM_DIVERGENCE= " f"{_format_value(beam_divergence)} " f"BEAM_DIVERGENCE_E.S.D.= {_format_value(beam_divergence_esd)}" ) if reflecting_range is not None or reflecting_range_esd is not None: lines.append( " REFLECTING_RANGE= " f"{_format_value(reflecting_range)} " f"REFLECTING_RANGE_E.S.D.= {_format_value(reflecting_range_esd)}" ) if xds_extra: if ( exclude_resolution_range is not None or include_resolution_range is not None or beam_divergence is not None or beam_divergence_esd is not None or reflecting_range is not None or reflecting_range_esd is not None ): lines.append("") lines.extend(xds_extra) xds_path.write_text("\n".join(lines) + "\n", encoding="ascii") self.outputs.saved_files_path = [str(xds_path)] logger.info("Created XDS input file at %s", xds_path)