Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions qcrboxtools/analyse/quality/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def ascending_levels2func(levels: Tuple[float, ...]) -> Callable[[float], int]:
"""
return lambda x: next((i for i, v in enumerate(levels) if x < v))


def descending_levels2func(levels: Tuple[float, ...]) -> Callable[[float], int]:
"""
Create a function that maps a value to its corresponding level index in descending order.
Expand Down
32 changes: 9 additions & 23 deletions qcrboxtools/analyse/quality/precision.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def precision_all_data(cif_block: block, indicators: Optional[List[str]] = None)
results_overall["Completeness"] = int_nonsym.completeness()

if not already_merged:
int_merged = int_nonsym.merge_equivalents()
int_merged = int_nonsym.merge_equivalents()
if "Mean Redundancy" in indicators:
results_overall["Mean Redundancy"] = int_merged.redundancies().as_double().mean()
if "R_meas" in indicators:
Expand All @@ -118,19 +118,12 @@ def precision_all_data(cif_block: block, indicators: Optional[List[str]] = None)
results_overall["R_sigma"] = int_merged.r_sigma()
if "CC1/2" in indicators:
results_overall["CC1/2"] = int_nonsym.cc_one_half()
else:
non_sensical_entries = [
"Mean Redundancy",
"R_meas",
"R_pim",
"R_int",
"R_sigma",
"CC1/2"
]
else:
non_sensical_entries = ["Mean Redundancy", "R_meas", "R_pim", "R_int", "R_sigma", "CC1/2"]
for indicator in non_sensical_entries:
if indicator in indicators:
results_overall[indicator] = 'N/A'
results_overall[indicator] = "N/A"

return results_overall


Expand Down Expand Up @@ -164,7 +157,7 @@ def precision_all_data_quality(results_overall: Dict[str, float]) -> Dict[str, D
}
quality_values = {}
for indicator, value in results_overall.items():
if indicator == "d_min lower" or value == 'N/A':
if indicator == "d_min lower" or value == "N/A":
quality_values[indicator] = DataQuality.INFORMATION
else:
operation = value2level_dict[indicator]
Expand Down Expand Up @@ -222,15 +215,8 @@ def precision_vs_resolution(
intensity_array = cif_block2intensity_array(cif_block)

if intensity_array.is_unique_set_under_symmetry():
non_sensical_entries = [
"Mean Redundancy",
"R_meas",
"R_pim",
"R_int",
"R_sigma",
"CC1/2"
]
indicators = [ind for ind in indicators if ind not in non_sensical_entries]
non_sensical_entries = ["Mean Redundancy", "R_meas", "R_pim", "R_int", "R_sigma", "CC1/2"]
indicators = [ind for ind in indicators if ind not in non_sensical_entries]

intensity_array.setup_binner(n_bins=n_bins)

Expand All @@ -242,7 +228,7 @@ def precision_vs_resolution(
bin_array = intensity_array.select(sel)
bin_merged = bin_array.merge_equivalents()
lowlim, highlim = intensity_array.binner().bin_d_range(i_bin)

if "d_min lower" in indicators:
results_binned["d_min lower"][array_index] = lowlim
if "d_min upper" in indicators:
Expand Down
10 changes: 5 additions & 5 deletions qcrboxtools/cif/cif2cif/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ def cif_file_to_unified(


def cif_model_to_specific(
cif_model: model,
cif_model: model.cif,
required_entries: Optional[List[str]] = None,
optional_entries: Optional[List[str]] = None,
custom_categories: Optional[List[str]] = None,
merge_su: bool = False,
) -> model:
) -> model.cif:
"""
Filters and processes an iotbx CIF model to include only specific entries.

Expand Down Expand Up @@ -116,9 +116,9 @@ def cif_model_to_specific(
def cif_file_to_specific(
input_cif_path: Union[str, Path],
output_cif_path: Union[str, Path],
required_entries: List[str] = None,
optional_entries: List[str] = None,
custom_categories: List[str] = None,
required_entries: Optional[List[str]] = None,
optional_entries: Optional[List[str]] = None,
custom_categories: Optional[List[str]] = None,
merge_su: bool = False,
):
"""
Expand Down
100 changes: 80 additions & 20 deletions qcrboxtools/cif/cif2cif/yaml.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from ..read import cifdata_str_or_index, read_cif_safe
from ..trim import trim_cif_block
from ..uncertainties import split_su_block
from .base import cif_file_to_specific
from .base import cif_model_to_specific


class NoKeywordsError(BaseException):
Expand Down Expand Up @@ -516,6 +516,51 @@ def yml_entries_resolve_special(
raise ValueError("yml_entry must be of type YmlCifInputSettings or YmlCifOutputSettings.")


def cif_text_to_specific_by_yml(input_cif_text: str, yml_path: Union[str, Path], command: str, parameter: str) -> str:
"""
Processes a CIF text based on instructions defined in a YAML configuration, applying
specified keyword transformations defined in a commands parameter as well as its standard
uncertainty merge settings.

Parameters
----------
input_cif_text : str
The CIF text to be processed.
yml_path : Union[str, Path]
The file path to the YAML file containing processing instructions.
command : str
The specific command within the YAML file to follow for processing the CIF text.
parameter : str
The specific parameter within the command to follow for processing the CIF text.

Returns
-------
str
The processed CIF text.
"""
with open(yml_path, "r", encoding="UTF-8") as fobj:
yml_dict = yaml.safe_load(fobj)

yml_input_settings = cif_input_entries_from_yml(yml_dict, command, parameter)

model = cif.reader(input_string=input_cif_text).model()
block, _ = cifdata_str_or_index(model, 0)

yml_input_settings = yml_entries_resolve_special(yml_input_settings, block)

cif_model = cif.reader(input_string=input_cif_text).model()

specific_cif_model = cif_model_to_specific(
cif_model,
yml_input_settings.required_entries,
yml_input_settings.optional_entries,
yml_input_settings.custom_categories,
yml_input_settings.merge_su,
)

return str(specific_cif_model)


def cif_file_to_specific_by_yml(
input_cif_path: Union[str, Path],
output_cif_path: Union[str, Path],
Expand Down Expand Up @@ -546,23 +591,11 @@ def cif_file_to_specific_by_yml(
This file was developed for exposing commands within QCrBox. See this project or the
test of this function for an example of how such a yml file might look like.
"""
with open(yml_path, "r", encoding="UTF-8") as fobj:
yml_dict = yaml.safe_load(fobj)

yml_input_settings = cif_input_entries_from_yml(yml_dict, command, parameter)

block, _ = cifdata_str_or_index(read_cif_safe(input_cif_path), "0")
input_cif_text = Path(input_cif_path).read_text(encoding="UTF-8")

yml_input_settings = yml_entries_resolve_special(yml_input_settings, block)
output_cif_text = cif_text_to_specific_by_yml(input_cif_text, yml_path, command, parameter)

cif_file_to_specific(
input_cif_path,
output_cif_path,
yml_input_settings.required_entries,
yml_input_settings.optional_entries,
yml_input_settings.custom_categories,
yml_input_settings.merge_su,
)
Path(output_cif_path).write_text(output_cif_text, encoding="UTF-8")


def cif_file_merge_to_unified_by_yml(
Expand Down Expand Up @@ -598,18 +631,45 @@ def cif_file_merge_to_unified_by_yml(
This file was developed for exposeing commands within QCrBox. See this project or the
test of this function for an example of how such a yml file might look like.
"""
input_cif_text = Path(input_cif_path).read_text(encoding="UTF-8")
merge_cif_text = Path(merge_cif_path).read_text(encoding="UTF-8") if merge_cif_path else None

output_cif_text = cif_text_merge_to_unified_by_yml(input_cif_text, merge_cif_text, yml_path, command, parameter)

Path(output_cif_path).write_text(output_cif_text, encoding="UTF-8")


def cif_text_merge_to_unified_by_yml(input_cif_text, merge_cif_text, yml_path, command, parameter):
"""
Merges two CIF texts into a unified format based on YAML configuration.

Parameters
----------
input_cif_text : str
The CIF text to be processed.
merge_cif_text : str
The CIF text to be merged with the input CIF text.
yml_path : str
The file path to the YAML file containing processing instructions.
command : str
The specific command within the YAML file to follow for processing the CIF texts.
parameter : str
The specific parameter within the command to follow for processing the CIF texts.

"""

with open(yml_path, "r", encoding="UTF-8") as fobj:
yml_dict = yaml.safe_load(fobj)
yml_output_settings = cif_output_entries_from_yml(yml_dict, command, parameter)

input_cif = read_cif_safe(input_cif_path)
input_cif = cif.reader(input_string=input_cif_text).model()
# dataset name will be overwritten if merge_cif is not None
input_block, dataset_name = cifdata_str_or_index(input_cif, yml_output_settings.select_block)
if merge_cif_path is None:
if merge_cif_text is None:
merge_block = cif.model.block()
else:
merge_block, dataset_name = cifdata_str_or_index(
read_cif_safe(merge_cif_path), "0"
cif.reader(input_string=merge_cif_text).model(), 0
) # QCrBox cif files have only one block

yml_output_settings = yml_entries_resolve_special(yml_output_settings, input_block)
Expand Down Expand Up @@ -644,7 +704,7 @@ def cif_file_merge_to_unified_by_yml(
output_cif = cif.model.cif()
output_cif[dataset_name] = output_cif_block

Path(output_cif_path).write_text(str(output_cif), encoding="UTF-8")
return str(output_cif)


def can_run_command(yml_path: Path, command: str, input_cif_path: Path):
Expand Down
2 changes: 1 addition & 1 deletion qcrboxtools/cif/entries/entry_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def entry_to_unified_keyword(old_name: str, custom_categories: List[str]) -> str
cut_name = old_name[1:]
for category in custom_categories:
if cut_name.startswith(category):
return f"_{category}.{cut_name[len(category)+1:]}"
return f"_{category}.{cut_name[len(category) + 1 :]}"
return "_" + aliases.get(cut_name, cut_name)


Expand Down
6 changes: 3 additions & 3 deletions qcrboxtools/cif/entries/entry_lookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@ def generate_aliases(
aliases = {key: val for key, val in aliases.items() if key != val}

# ensure there are no circular translations
assert not any(
val in aliases for val in values
), "One of the common references has gotten an alias, creating a circle"
assert not any(val in aliases for val in values), (
"One of the common references has gotten an alias, creating a circle"
)

return aliases

Expand Down
12 changes: 6 additions & 6 deletions qcrboxtools/cif/file_converter/shelxl/cif2shelx_ins.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def block2header(cif_block: block) -> str:

ins_lines.append("CONF\nBOND $H\nL.S. 10\nLIST 4\nACTA\nBOND\nFMAP 2\nMORE -1")
ins_lines.append(block2wght(cif_block))
ins_lines.append(f'FVAR {cif_block["_qcrbox.shelx.scale_factor"]}')
ins_lines.append(f"FVAR {cif_block['_qcrbox.shelx.scale_factor']}")

return "\n".join(ins_lines)

Expand Down Expand Up @@ -227,10 +227,10 @@ def create_atom_string(
atom_string = f"{start} {float(-uiso_mult): 4.2f}"
elif (atom_site_aniso_loop is not None) and (label in atom_site_aniso_loop["_atom_site_aniso.label"]):
index_aniso = list(atom_site_aniso_loop["_atom_site_aniso.label"]).index(label)
uijs = [f'{float(atom_site_aniso_loop[f"_atom_site_aniso.u_{ij}"][index_aniso]): 9.5f}' for ij in uij_indexes]
uijs = [f"{float(atom_site_aniso_loop[f'_atom_site_aniso.u_{ij}'][index_aniso]): 9.5f}" for ij in uij_indexes]
atom_string = start + " " + " ".join(uijs)
else:
atom_string = f'{start} {float(atom_site_loop["_atom_site.u_iso_or_equiv"][index]): 9.5f}'
atom_string = f"{start} {float(atom_site_loop['_atom_site.u_iso_or_equiv'][index]): 9.5f}"
return " =\n ".join(wrap(atom_string))


Expand Down Expand Up @@ -322,9 +322,9 @@ def create_atom_list(cif_block: block) -> str:
if attached_atom == ".":
continue
indexes = np.nonzero(atom_site_loop["_atom_site.calc_attached_atom"] == attached_atom)[0]
assert all(
(psn_id[int(i)] == psn_id[int(indexes[0])] for i in indexes[1:])
), f"not all constrain posn ids are equal for {attached_atom}"
assert all((psn_id[int(i)] == psn_id[int(indexes[0])] for i in indexes[1:])), (
f"not all constrain posn ids are equal for {attached_atom}"
)
attached_collect[attached_atom] = list(
sorted(indexes, key=lambda x: atom_site_loop["_atom_site.qcrbox_constraint_posn_index"][int(x)])
)
Expand Down
8 changes: 4 additions & 4 deletions qcrboxtools/robots/eval/eval_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,7 @@ def extract_data(self, text: str):
for key, pattern in patterns.items():
matches = re.findall(pattern, text, re.DOTALL)
for i, match in enumerate(matches):
formatted_key = f"{key}_{i+1}" if key == "QVEC" and len(matches) > 1 else key
formatted_key = f"{key}_{i + 1}" if key == "QVEC" and len(matches) > 1 else key
if key in second_entry:
extracted_data[second_entry[key]] = match[0].strip()
extracted_data[formatted_key] = self.convert_to_numpy(match[1])
Expand All @@ -854,15 +854,15 @@ def value_as_string(self, key: str) -> str:
value = self[key]
if isinstance(value, np.ndarray) and value.ndim == 2:
if key == "RMAT":
key_string = f'RMAT {self["CENTRING"]}\n'
key_string = f"RMAT {self['CENTRING']}\n"
elif key == "TMAT":
key_string = f'TMAT {self["CENTRING"]} {self["POINTGROUP"]}\n'
key_string = f"TMAT {self['CENTRING']} {self['POINTGROUP']}\n"
else:
key_string = key + " "
return key_string + "\n".join("".join(f"{num: 12.7f}" for num in row) for row in value)
elif isinstance(value, np.ndarray) and value.ndim == 1:
key = "QVEC" if key.startswith("QVEC") else key
return f'{key} {" ".join(f"{num:.5f}" for num in value)}'
return f"{key} {' '.join(f'{num:.5f}' for num in value)}"
else:
return key + " " + " ".join(value)

Expand Down
4 changes: 2 additions & 2 deletions qcrboxtools/robots/eval/eval_robots.py
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,7 @@ def run(
if focus_type is None:
focus_type = "synchrotron"
if focus_type not in possible_focusses:
raise ValueError(f'Invalid focus type, choose one of: {", ".join(possible_focusses)}')
raise ValueError(f"Invalid focus type, choose one of: {', '.join(possible_focusses)}")

if polarisation_type is None:
polarisation_type = "none"
Expand All @@ -820,7 +820,7 @@ def run(
"o",
)
if polarisation_type not in possible_polarisations:
raise ValueError(f'Invalid polarisation, choose one of: {", ".join(possible_polarisations)}')
raise ValueError(f"Invalid polarisation, choose one of: {', '.join(possible_polarisations)}")

if self.p4p_file is None:
command_base = (
Expand Down
6 changes: 3 additions & 3 deletions qcrboxtools/robots/olex2.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ def structure_path(self, path: str):
self.wait_for_completion(2000, "startup", cmd)

load_cmds = [
f'file {path.with_suffix(".ins").name}',
f'export {path.with_suffix(".hkl").name}',
f'reap {path.with_suffix(".ins").name}',
f"file {path.with_suffix('.ins').name}",
f"export {path.with_suffix('.hkl').name}",
f"reap {path.with_suffix('.ins').name}",
]
try:
self.send_command("\n".join(load_cmds))
Expand Down
8 changes: 5 additions & 3 deletions tests/analyse/quality/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,20 @@
def test_data_quality_from_level(input_level, result):
assert data_quality_from_level(input_level) is result


@pytest.mark.parametrize(
"input_value, levels, expected_index",
[
(1.5, (1.0, 2.0, 3.0, 4.0, np.inf), 1),
(3.5, (1.0, 2.0, 3.0, 4.0, np.inf), 3),
(5.0, (1.0, 2.0, 3.0, 4.0, np.inf), 4), # Should return index of last level
(0.5, (1.0, 2.0, 3.0, 4.0, np.inf), 0), # Should return index of first level
]
],
)
def test_ascending_levels2func(input_value, levels, expected_index):
func = ascending_levels2func(levels)
assert func(input_value) == expected_index
assert func(input_value) == expected_index


@pytest.mark.parametrize(
"input_value, levels, expected_index",
Expand All @@ -35,7 +37,7 @@ def test_ascending_levels2func(input_value, levels, expected_index):
(3.5, (4.0, 3.0, 2.0, 1.0, -1.0), 1),
(5.0, (4.0, 3.0, 2.0, 1.0, -1.0), 0),
(0.5, (4.0, 3.0, 2.0, 1.0, -1.0), 4),
]
],
)
def test_descending_levels2func(input_value, levels, expected_index):
func = descending_levels2func(levels)
Expand Down
Loading