diff --git a/servers/convexhull/README.md b/servers/convexhull/README.md new file mode 100644 index 0000000..d8f4a41 --- /dev/null +++ b/servers/convexhull/README.md @@ -0,0 +1,17 @@ +# ConvexHullMCP Server + +A tool to build convexhull and check materials thermal stability + +##Overview + +This MCP server could build convex hull for given structures and check it corresponding thermal stability + +## How to cite? + +```bibtex +@software{ConvexHullMCP, + title = {ConvexHullMCP}, + author = {AI4Scient}, + year = {2025} +} +``` diff --git a/servers/convexhull/metadata.json b/servers/convexhull/metadata.json new file mode 100644 index 0000000..4a215cb --- /dev/null +++ b/servers/convexhull/metadata.json @@ -0,0 +1,6 @@ +{ +"name": "SuperconductorServer", +"description": "Superconductor critical temperature prediction", +"author": "@liuyuxiang92", +"category": "materials" +} diff --git a/servers/convexhull/pyproject.toml b/servers/convexhull/pyproject.toml new file mode 100644 index 0000000..a3ac3da --- /dev/null +++ b/servers/convexhull/pyproject.toml @@ -0,0 +1,16 @@ +[project] +name = "superconductor-mcp-server" +version = "0.1.0" +description = "Superconductor critical temperature prediction" +readme = "README.md" +requires-python = ">=3.12" +dependencies = [ + "dpdata", + "numpy", + "ase", + "pymatgen", + "deepmd-kit", + "pandas", + "mcp", + "fastmcp" +] diff --git a/servers/convexhull/server.py b/servers/convexhull/server.py new file mode 100644 index 0000000..fc17fa8 --- /dev/null +++ b/servers/convexhull/server.py @@ -0,0 +1,395 @@ +from typing import Optional, List, Union, Dict +from pathlib import Path +import logging +import os +import glob +import shutil +import subprocess +import numpy as np +from ase import io +import dpdata +import pandas as pd +from deepmd.pt.infer.deep_eval import DeepProperty +from dp.agent.server import CalculationMCPServer +from typing_extensions import TypedDict +import csv + +import random +import shutil + +from pymatgen.core import Composition + +# Setup logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s" +) + +# Initialize MCP server +mcp = CalculationMCPServer( + "ConvexHullServer", + host="0.0.0.0", + port=50004 +) + +class RunOptimizationResult(TypedDict): + optimized_poscar_paths: Path + message: str +def run_optimization( + structure_path: Path, + ambient: bool +) -> RunOptimizationResult: + """ + Optimize structures with DP model at ambient or high pressure condition. + + Args: + - structure_path (Path): Path to access structures need to be optimized + - ambient (bool): Wether consider ambient condition + Return: + - optimized_structure_path (Path): Path to access optimized structures + """ + opt_py = Path("/opt/agents/convex_hull/geo_opt/opt_multi.py") + + fmax = 0.0005 + if ambient: + pressure = 0 + else: + pressure = 200 + + nsteps = 2000 + + base = Path(structure_path) + structure_path = base.parent if base.is_file() else base + + structures = list(p for pat in ("POSCAR*", "*.cif", "*.CIF") + for p in structure_path.rglob(pat)) + print(f"The length of structures {len(structures)}") + + try: + # Build command: use the actual path to opt_py, not the literal string "opt_py" + cmd = [ + "python", + str(opt_py), # <— use the variable here + str(fmax), + str(pressure), + str(ambient), + str(nsteps), + ] + [str(p) for p in structures] + + # Run and check for errors + subprocess.run(cmd, check=True) + + except Exception as e: + print("Geometry Optimization failed!") + + try: + parse_py = Path("/opt/agents/convex_hull/geo_opt/parse_traj.py") + cmd = [ + "python", + str(parse_py) + ] + + # Run and check for errors + subprocess.run(cmd, check=True) + except Exception as e: + print("Collect optimized failed!") + try: + frames =glob.glob('deepmd_npy/*/') + multisys = dpdata.MultiSystems() + for frame in frames: + sys = dpdata.System(frame,'deepmd/npy') + multisys.append(sys) + + optimized_dir = Path("optimized_poscar") + optimized_dir.mkdir(parents=True, exist_ok=True) # Create the directory if it doesn't exist + + count=0 + for system in multisys: + for frame in system: + system.to_vasp_poscar(optimized_dir / f'POSCAR_{count}') + count+=1 + #optimized_structures = list(optimized_dir.rglob("POSCAR*")) + except Exception as e: + print("Collect POSCAR failed!") + + return{ + "optimized_poscar_paths": optimized_dir, + "message": "Geometry Optimization successfully" + } + + +### Tool to calculated structures enthalpy###### + +class BuildConvexHullResult(TypedDict): + """Results about enthalpy prediction""" + enthalpy_file: Path + message: str +#======================Tool to calculate structure enthalpy====================== +@mcp.tool() +def build_convex_hull( + structure_path: Path, +)->BuildConvexHullResult: + """ + Build Convex Hull for given structure. + + Args: + - structure_file (Path): Path to the structure files (e.g. POSCAR) + + Return: + BuildConvexHullResult with keys: + - enthalpy_file (Path): Path to access entalpy prediction related files, including convexhull.csv, convexhull.html, enthalpy.csv, e_above_hull_50meV.csv. + All these files are saved in outputs. + - message (str): Message about calculation results. + """ + ENERGY_REF = { + "Ne": -0.0259, + "He": -0.0091, + "Ar": -0.0688, + "F": -1.9115, + "O": -4.9467, + "Cl": -1.8485, + "N": -8.3365, + "Kr": -0.0567, + "Br": -1.553, + "I": -1.4734, + "Xe": -0.0362, + "S": -4.1364, + "Se": -3.4959, + "C": -9.2287, + "Au": -3.2739, + "W": -12.9581, + "Pb": -3.7126, + "Rh": -7.3643, + "Pt": -6.0711, + "Ru": -9.2744, + "Pd": -5.1799, + "Os": -11.2274, + "Ir": -8.8384, + "H": -3.3927, + "P": -5.4133, + "As": -4.6591, + "Mo": -10.8457, + "Te": -3.1433, + "Sb": -4.129, + "B": -6.6794, + "Bi": -3.8405, + "Ge": -4.623, + "Hg": -0.3037, + "Sn": -4.0096, + "Ag": -2.8326, + "Ni": -5.7801, + "Tc": -10.3606, + "Si": -5.4253, + "Re": -12.4445, + "Cu": -4.0992, + "Co": -7.1083, + "Fe": -8.47, + "Ga": -3.0281, + "In": -2.7517, + "Cd": -0.9229, + "Cr": -9.653, + "Zn": -1.2597, + "V": -9.0839, + "Tl": -2.3626, + "Al": -3.7456, + "Nb": -10.1013, + "Be": -3.7394, + "Mn": -9.162, + "Ti": -7.8955, + "Ta": -11.8578, + "Pa": -9.5147, + "U": -11.2914, + "Sc": -6.3325, + "Np": -12.9478, + "Zr": -8.5477, + "Mg": -1.6003, + "Th": -7.4139, + "Hf": -9.9572, + "Pu": -14.2678, + "Lu": -4.521, + "Tm": -4.4758, + "Er": -4.5677, + "Ho": -4.5824, + "Y": -6.4665, + "Dy": -4.6068, + "Gd": -14.0761, + "Eu": -10.257, + "Sm": -4.7186, + "Nd": -4.7681, + "Pr": -4.7809, + "Pm": -4.7505, + "Ce": -5.9331, + "Yb": -1.5396, + "Tb": -4.6344, + "La": -4.936, + "Ac": -4.1212, + "Ca": -2.0056, + "Li": -1.9089, + "Sr": -1.6895, + "Na": -1.3225, + "Ba": -1.919, + "Rb": -0.9805, + "K": -1.1104, + "Cs": -0.8954, + } + enthalpy_dir = Path("outputs") + enthalpy_dir.mkdir(parents=True, exist_ok=True) + ambient = True + try: + #poscar_files = list(structure_path.rglob("POSCAR*")) + try: + results = run_optimization(structure_path,ambient) + optimized_structure_path = results["optimized_poscar_paths"] + optimized_structures = list(optimized_structure_path.rglob("POSCAR*")) + except Exception as e: + return{ + "enthalpy_file": [], + "message": "Geometry Optimization failed!" + } + + try: + enthalpy_py = Path("/opt/agents/convex_hull/geo_opt/predict_enthalpy.py") + cmd = [ + "python", + str(enthalpy_py), + str(ambient) + ] + [str(poscar) for poscar in optimized_structures] + + # Run and check for errors + subprocess.run(cmd, check=True) + except Exception as e: + return{ + "enthalpy_file": [], + "message": "Enthalpy Predictions failed!" + } + + try: + enthalpy_file = enthalpy_dir / "enthalpy.csv" + with open(enthalpy_file, 'w') as ef: + ef.write("Number,formula,enthalpy\n") + prediction_file = Path("prediction") / "prediction.all.out" + with prediction_file.open('r') as pf: + for line in pf: + if not line.strip(): + continue + + # Split the line into columns + parts = line.split() + file_name = parts[0] # Column 1: POSCAR or structure file name + enthalpy = parts[2] # Column 3: enthalpy H0 + formula = parts[5] # Column 6: element composition + + if ambient: + if abs(float(parts[3]))< 0.001: + comp = Composition(formula) + element_counts = dict(comp.get_el_amt_dict()) + enthalpy = float(enthalpy) + print(enthalpy) + total_atoms = sum(element_counts.values()) + enthalpy -= sum(comp[ele]* ENERGY_REF[str(ele)] for ele in comp)/total_atoms + + #enthalpy /= total_atoms + + # Write out: file_name, formula, enthalpy + ef.write(f"{file_name},{formula},{enthalpy}\n") + else: + if 1.2473 < float(parts[3]) < 1.2493: + # Write out: file_name, formula, enthalpy + ef.write(f"{file_name},{formula},{enthalpy}\n") + + except Exception as e: + return{ + "enthalpy_file": [], + "message": "Enthalpy file save failed!" + } + try: + if ambient: + convexhull_file = Path("/opt/agents/convex_hull/geo_opt/convexhull_ambient.csv") + else: + convexhull_file = Path("/opt/agents/convex_hull/geo_opt/convexhull_high_pressure.csv") + + #Append enthalpy_file to convexhull_file + lines = enthalpy_file.read_text().splitlines() + # drop the first line (the header) + data_lines = lines[1:] + # open convexhull.csv in append mode + with convexhull_file.open("a") as f: + for line in data_lines: + # ensure newline + f.write(line.rstrip("\n") + "\n") + des = Path("/opt/agents/convex_hull/geo_opt/convexhull.csv") + print(f"convexhull_file = {convexhull_file}") + print(f"des = {des}") + shutil.copy(convexhull_file, des) + except Exception as e: + return{ + "enthalpy_file": [], + "message": "Convexhull.csv file save failed!" + } + + try: + update_input_file = Path("/opt/agents/convex_hull/geo_opt/update_input.py") + + cmd = [ + "python", + str(update_input_file), + str(formula) + ] + subprocess.run(cmd, cwd=enthalpy_dir, check=True) + + #Check updated convexhull.csv + + #src = Path("/opt/agents/convex_hull/geo_opt/convexhull.csv") + #shutil.copy(src, enthalpy_dir) + + src = Path("/opt/agents/convex_hull/geo_opt/input.dat") + shutil.copy(src, enthalpy_dir) + + except Exception as e: + return{ + "enthalpy_file": [], + "message": "Update input.dat failed!" + } + + try: + work_dir = Path("/opt/agents/convex_hull/geo_opt/results/") + + cmd = [ + "python", + "cak3.py", + "--plotch" + ] + subprocess.run(cmd, cwd=work_dir, check=True) + + src = Path("/opt/agents/convex_hull/geo_opt/results/convexhull.png") + shutil.copy(src, enthalpy_dir) + + src = Path("/opt/agents/convex_hull/geo_opt/results/e_above_hull_50meV.csv") + dest = enthalpy_dir / f"e_above_hull.csv" + shutil.copy(src, dest) + + except Exception as e: + return{ + "enthalpy_file": [], + "message": "Convex hull build failed" + } + + return{ + "enthalpy_file": enthalpy_dir, + "message": f"Entalpy calculated successfully and saved in {enthalpy_file}" + } + + + except Exception as e: + return{ + "message": "Convex Hull build failed!" + } + + + +# ====== Run Server ====== + +if __name__ == "__main__": + logging.info("Starting ConvexHullServer on port 50004...") + mcp.run(transport="sse") + diff --git a/servers/convexhull/uv.lock b/servers/convexhull/uv.lock new file mode 100644 index 0000000..a274c1e --- /dev/null +++ b/servers/convexhull/uv.lock @@ -0,0 +1,8 @@ +version = 1 +revision = 2 +requires-python = ">=3.12" + +[[package]] +name = "superconductor-mcp-server" +version = "0.1.0" +source = { virtual = "." } diff --git a/servers/finetune_dpa/README.md b/servers/finetune_dpa/README.md new file mode 100644 index 0000000..e10cb07 --- /dev/null +++ b/servers/finetune_dpa/README.md @@ -0,0 +1,17 @@ +# FinetuneDPAMCP Server + +A tool to finetune dpa model + +##Overview + +This MCP server could fine tune dpa2 and dpa3 pretrained model + +## How to cite? + +```bibtex +@software{FinetuneDPAMCP, + title = {FinetuneDPAMCP}, + author = {AI4Scient}, + year = {2025} +} +``` diff --git a/servers/finetune_dpa/metadata.json b/servers/finetune_dpa/metadata.json new file mode 100644 index 0000000..4a215cb --- /dev/null +++ b/servers/finetune_dpa/metadata.json @@ -0,0 +1,6 @@ +{ +"name": "SuperconductorServer", +"description": "Superconductor critical temperature prediction", +"author": "@liuyuxiang92", +"category": "materials" +} diff --git a/servers/finetune_dpa/pyproject.toml b/servers/finetune_dpa/pyproject.toml new file mode 100644 index 0000000..a3ac3da --- /dev/null +++ b/servers/finetune_dpa/pyproject.toml @@ -0,0 +1,16 @@ +[project] +name = "superconductor-mcp-server" +version = "0.1.0" +description = "Superconductor critical temperature prediction" +readme = "README.md" +requires-python = ">=3.12" +dependencies = [ + "dpdata", + "numpy", + "ase", + "pymatgen", + "deepmd-kit", + "pandas", + "mcp", + "fastmcp" +] diff --git a/servers/finetune_dpa/server.py b/servers/finetune_dpa/server.py new file mode 100644 index 0000000..05fd59d --- /dev/null +++ b/servers/finetune_dpa/server.py @@ -0,0 +1,1104 @@ +from typing import Optional, List, Union, Dict +from pathlib import Path +import logging +import os +import glob +import subprocess +import numpy as np +import dpdata +from dp.agent.server import CalculationMCPServer +from typing_extensions import TypedDict +import csv + +import random +import shutil + +from pymatgen.core import Composition + +# Setup logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s" +) + +# Initialize MCP server +mcp = CalculationMCPServer( + "DPAFinetune", + host="0.0.0.0", + port=50003 +) + + +#=========Function to revise dpa3 input.json======= +import json +from typing import Any, Dict, Mapping, Optional, Tuple, Literal + +# ------------- helpers ------------- + +def _set(d: Dict[str, Any], path: Tuple[str, ...], value: Any) -> None: + cur = d + for k in path[:-1]: + if k not in cur or not isinstance(cur[k], dict): + cur[k] = {} + cur = cur[k] + cur[path[-1]] = value + +def _get_section(d: Dict[str, Any], path: Tuple[str, ...]) -> Dict[str, Any]: + cur = d + for k in path: + if k not in cur or not isinstance(cur[k], dict): + cur[k] = {} + cur = cur[k] + return cur + +def _merge_whitelisted( + data: Dict[str, Any], + base_path: Tuple[str, ...], + incoming: Optional[Mapping[str, Any]], + allowed_keys: Optional[set] = None, +) -> None: + if not incoming: + return + section = _get_section(data, base_path) + if allowed_keys is None: + # merge all keys + for k, v in incoming.items(): + section[k] = v + else: + for k, v in incoming.items(): + if k in allowed_keys: + section[k] = v + +# ------------- allowed key sets ------------- + +DPA3_REPFLOW_KEYS = { + "n_dim","e_dim","a_dim","nlayers", + "e_rcut","e_rcut_smth","e_sel", + "a_rcut","a_rcut_smth","a_sel", + "axis_neuron","fix_stat_std", + "a_compress_rate","a_compress_e_rate","a_compress_use_split", + "update_angle","smooth_edge_update","use_dynamic_sel", + "sel_reduce_factor","use_exp_switch", + "update_style","update_residual","update_residual_init", +} + +DPA3_TOP_KEYS = {"activation_function","use_tebd_bias","precision","concat_output_tebd"} + +DPA3_FITTING_NET_KEYS = {"neuron","dim_case_embd","resnet_dt","precision","activation_function","seed"} + +DPA2_REPINIT_KEYS = { + "tebd_dim","rcut","rcut_smth","nsel","neuron", + "axis_neuron","three_body_neuron","activation_function", + "three_body_sel","three_body_rcut","three_body_rcut_smth","use_three_body", +} + +DPA2_REPFORMER_KEYS = { + "rcut","rcut_smth","nsel","nlayers", + "g1_dim","g2_dim","attn2_hidden","attn2_nhead","attn1_hidden","attn1_nhead", + "axis_neuron", + "update_h2","update_g1_has_conv","update_g1_has_grrg","update_g1_has_drrd","update_g1_has_attn", + "update_g2_has_g1g1","update_g2_has_attn", + "update_style","update_residual","update_residual_init", + "attn2_has_gate","use_sqrt_nnei","g1_out_conv","g1_out_mlp", + "activation_function", +} + +DPA2_FITTING_NET_KEYS = {"neuron","activation_function","resnet_dt","precision","dim_case_embd","seed","_comment"} + +DPA2_TOP_KEYS = {"use_tebd_bias","precision","add_tebd_to_repinit_out"} + +# ------------- main ------------- + +def update_dpa_train_json( + template_path: str, + version: Literal["dpa2","dpa3"], + # global switch + default_model: bool = True, + # -------- common knobs (always allowed) -------- + lr_type: Optional[str] = None, + decay_steps: Optional[int] = None, + start_lr: Optional[float] = None, + stop_lr: Optional[float] = None, + loss_type: Optional[str] = None, + start_pref_e: Optional[float] = None, + limit_pref_e: Optional[float] = None, + start_pref_f: Optional[float] = None, + limit_pref_f: Optional[float] = None, + start_pref_v: Optional[float] = None, + limit_pref_v: Optional[float] = None, + numb_steps: Optional[int] = None, + warmup_steps: Optional[int] = None, + # -------- DPA-3 extras (used only when default_model=False) -------- + dpa3_repflow: Optional[Mapping[str, Any]] = None, + dpa3_top: Optional[Mapping[str, Any]] = None, # keys in DPA3_TOP_KEYS + dpa3_fitting_net: Optional[Mapping[str, Any]] = None, # keys in DPA3_FITTING_NET_KEYS + # -------- DPA-2 extras (used only when default_model=False) -------- + dpa2_repinit: Optional[Mapping[str, Any]] = None, + dpa2_repformer: Optional[Mapping[str, Any]] = None, + dpa2_fitting_net: Optional[Mapping[str, Any]] = None, + dpa2_top: Optional[Mapping[str, Any]] = None, # keys in DPA2_TOP_KEYS + # output + output_path: Optional[str] = None, +) -> Dict[str, Any]: + """ + Loads input JSON (DPA-2 or DPA-3), applies updates, writes output, returns dict. + + Behavior: + - default_model=True -> only LR/Loss/Schedule will be updated. + - default_model=False -> also allow the exact section/keys per your spec: + DPA-3: model.descriptor.repflow, model.{activation_function,use_tebd_bias,precision,concat_output_tebd}, + model.fitting_net (selected keys) + DPA-2: model.descriptor.repinit, model.descriptor.repformer, + model.fitting_net (selected keys), top {use_tebd_bias,precision,add_tebd_to_repinit_out} + """ + with open(template_path, "r", encoding="utf-8") as f: + data = json.load(f) + + # ---- always-allowed (both versions) ---- + if lr_type is not None: _set(data, ("learning_rate","type"), lr_type) + if decay_steps is not None: _set(data, ("learning_rate","decay_steps"), decay_steps) + if start_lr is not None: _set(data, ("learning_rate","start_lr"), start_lr) + if stop_lr is not None: _set(data, ("learning_rate","stop_lr"), stop_lr) + + if loss_type is not None: _set(data, ("loss","type"), loss_type) + if start_pref_e is not None: _set(data, ("loss","start_pref_e"), start_pref_e) + if limit_pref_e is not None: _set(data, ("loss","limit_pref_e"), limit_pref_e) + if start_pref_f is not None: _set(data, ("loss","start_pref_f"), start_pref_f) + if limit_pref_f is not None: _set(data, ("loss","limit_pref_f"), limit_pref_f) + if start_pref_v is not None: _set(data, ("loss","start_pref_v"), start_pref_v) + if limit_pref_v is not None: _set(data, ("loss","limit_pref_v"), limit_pref_v) + + if numb_steps is not None: _set(data, ("training","numb_steps"), numb_steps) + if warmup_steps is not None: _set(data, ("training","warmup_steps"), warmup_steps) + + # ---- extended edits only when default_model=False ---- + if not default_model: + if version == "dpa3": + # repflow + _merge_whitelisted( + data, ("model","descriptor","repflow"), dpa3_repflow, DPA3_REPFLOW_KEYS + ) + # top-level model extras + if dpa3_top: + for k, v in dpa3_top.items(): + if k in DPA3_TOP_KEYS: + _set(data, ("model", k), v) + # fitting_net + _merge_whitelisted( + data, ("model","fitting_net"), dpa3_fitting_net, DPA3_FITTING_NET_KEYS + ) + + elif version == "dpa2": + _merge_whitelisted( + data, ("model","descriptor","repinit"), dpa2_repinit, DPA2_REPINIT_KEYS + ) + _merge_whitelisted( + data, ("model","descriptor","repformer"), dpa2_repformer, DPA2_REPFORMER_KEYS + ) + _merge_whitelisted( + data, ("model","fitting_net"), dpa2_fitting_net, DPA2_FITTING_NET_KEYS + ) + if dpa2_top: + for k, v in dpa2_top.items(): + if k in DPA2_TOP_KEYS: + _set(data, (k,), v) + else: + raise ValueError("version must be 'dpa2' or 'dpa3'") + + # ---- write out ---- + if output_path is None: + output_path = "train_dpa3.json" if version == "dpa3" else "train_dpa2.json" + with open(output_path, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2, ensure_ascii=False) + return data + +def split_train_valid( + input_path: str, + valid_ratio: float = 0.10 + ): + """ + Split dpdata into ./data_train and ./data_valid with given valid_ratio for further model fine tune operations. + Args: + input_path (str): Path to the whol dpdata (can be a directory or a compressed archive) + valid_ratio (float): validation data set ratio, and the default setting is 0.10 + """ + import os, shutil, tempfile, random + import dpdata + + def _is_archive(p: str) -> bool: + pl = p.lower() + return pl.endswith((".zip", ".tar", ".tar.gz", ".tgz", ".tar.bz2", ".tbz2", ".tar.xz", ".txz")) + + print(f"input_path={input_path}") + + # load MultiSystems (auto-extract if needed) + if _is_archive(input_path): + print(f"_is_archive(input_path)= {_is_archive(input_path)}") + with tempfile.TemporaryDirectory(prefix="dpdata_extract_") as tmpd: + shutil.unpack_archive(input_path, tmpd) + # if a single top-level directory exists after extraction, use it + entries = [os.path.join(tmpd, e) for e in os.listdir(tmpd) if not e.startswith("__MACOSX")] + if len(entries) == 1 and os.path.isdir(entries[0]): + root = entries[0] + else: + root = tmpd + ms = dpdata.MultiSystems() + p = Path(root) + has_real = next(p.rglob("real_atom_types.npy"), None) is not None + fmt = "deepmd/npy/mixed" if has_real else "deepmd/npy" + if has_real: + ms = dpdata.MultiSystems().load_systems_from_file(root,fmt=fmt) + else: + parent_dirs = sorted({p.parent.resolve() for p in Path(root).rglob("type.raw")}) + for d in parent_dirs: + try: + sys = dpdata.LabeledSystem(str(d), fmt="deepmd/npy") + except Exception as e: + print(f"[WARN] Skipping {d}: {e}") + continue + ms.append(sys) + + + + # collect all systems + systems = [] + count = 0 + for key, sys_list in ms.systems.items(): + for s in sys_list: + systems.append((key, s)) + count += 1 + + print(count) + + # random select train and validation set based on given ratio + random.seed(123) + random.shuffle(systems) + split_idx = int(valid_ratio * len(systems)) + valid_set = systems[:split_idx] + train_set = systems[split_idx:] + + ms_train = dpdata.MultiSystems() + ms_valid = dpdata.MultiSystems() + + for k, s in train_set: + ms_train.append(s) # preserve label + + for k, s in valid_set: + ms_valid.append(s) # preserve label + + ms_valid.to_deepmd_npy_mixed("./data_valid") + ms_train.to_deepmd_npy_mixed("./data_train") + return # important: exit before tmp dir is cleaned up + else: + # non-archive path + ms = dpdata.MultiSystems() + p = Path(input_path) + has_real = next(p.rglob("real_atom_types.npy"), None) is not None + fmt = "deepmd/npy/mixed" if has_real else "deepmd/npy" + ms = dpdata.MultiSystems().load_systems_from_file(input_path,fmt=fmt) + + # collect all systems + systems = [] + count = 0 + for key, sys_list in ms.systems.items(): + for s in sys_list: + systems.append((key, s)) + count += 1 + print(count) + + # random select train and validation set based on given ratio + random.seed(123) + random.shuffle(systems) + split_idx = int(valid_ratio * len(systems)) + valid_set = systems[:split_idx] + train_set = systems[split_idx:] + + ms_train = dpdata.MultiSystems() + ms_valid = dpdata.MultiSystems() + + for k, s in train_set: + ms_train.append(s, k) # preserve label + + for k, s in valid_set: + ms_valid.append(s, k) # preserve label + + ms_valid.to_deepmd_npy_mixed("./data_valid") + ms_train.to_deepmd_npy_mixed("./data_train") + +import logging +import re +from enum import Enum +from typing import Any + + +class StrEnum(str, Enum): + """Base enum with case-insensitive matching.""" + + @classmethod + def _missing_(cls, value): + if isinstance(value, str): + v = value.strip().lower() + for item in cls: + if item.value.lower() == v: + return item + return None + + +class ModelType(StrEnum): + DPA2 = "dpa2" + DPA3 = "dpa3" + + +class LRType(StrEnum): + EXP = "exp" + + # Must be a dict[str, str] + _ALIASES = { + "exponential": "exp", + "expdecay": "exp", + "exponential_decay": "exp", + "cosine": "exp", # optional: treat cosine as exp fallback + } + + @classmethod + def _missing_(cls, value): + if isinstance(value, str): + v = value.strip().lower() + + aliases = getattr(cls, "_ALIASES", None) + # Robust guard: if aliases is not a dict, ignore it + if isinstance(aliases, dict) and v in aliases: + mapped = aliases[v] + try: + return cls(mapped) + except ValueError: + return None + + return super()._missing_(value) + + +class LossType(StrEnum): + ENER = "ener" + ENER_SPIN = "ener_spin" + DOS = "dos" + PROPERTY = "property" + TENSOR = "tensor" + + +class PrecisionType(StrEnum): + FLOAT32 = "float32" + + +class ActivationType(StrEnum): + TANH = "tanh" + SILUT_3 = "silut:3.0" + + +class UpdateStyleType(StrEnum): + RES_RESIDUAL = "res_residual" + + +class ResidualInitType(StrEnum): + CONST = "const" + NORM = "norm" + + +def coerce_enum( + enum_cls: type[StrEnum], + raw_value: Any, + default: StrEnum, + param_name: str, +) -> StrEnum: + if raw_value is None or raw_value == "": + return default + + if isinstance(raw_value, enum_cls): + return raw_value + + if isinstance(raw_value, str): + try: + v = enum_cls(raw_value) + if v is not None: + return v + except ValueError: + pass + + logging.warning( + "Invalid value for %s=%r, falling back to default %r", + param_name, + raw_value, + default.value, + ) + return default + + +def coerce_float(raw_value: Any, default: float, param_name: str) -> float: + if raw_value is None: + return default + + if isinstance(raw_value, (int, float)): + return float(raw_value) + + if isinstance(raw_value, str): + try: + return float(raw_value.strip()) + except ValueError: + pass + + logging.warning( + "Invalid float for %s=%r, falling back to default %r", + param_name, + raw_value, + default, + ) + return default + + +def coerce_int(raw_value: Any, default: int, param_name: str) -> int: + """ + Enhanced int parser for LLM/agent-produced values. + Examples that will all parse to 3000: + "3000", " 3000 ", "3,000", "3000 steps", + "train 3000 steps", "numb_steps = 3000" + """ + if raw_value is None: + return default + + if isinstance(raw_value, int): + return raw_value + + if isinstance(raw_value, float): + return int(round(raw_value)) + + if isinstance(raw_value, str): + cleaned = raw_value.replace(",", "") + m = re.search(r"[-+]?\d+", cleaned) + if m: + try: + return int(m.group(0)) + except ValueError: + pass + + logging.warning( + "Invalid int for %s=%r, falling back to default %r", + param_name, + raw_value, + default, + ) + return default +def coerce_positive_float(raw_value: Any, default: float, param_name: str, eps: float = 1e-12) -> float: + """ + Like coerce_float(), but additionally enforces value > 0 (strictly). + If value is <= eps, fall back to default. + """ + v = coerce_float(raw_value, default, param_name) + if v <= eps: + logging.warning( + "Invalid (non-positive) value for %s=%r, falling back to default %r", + param_name, + raw_value, + default, + ) + return default + return v + +def is_user_provided_model_path(p: Optional[Path]) -> bool: + """ + Treat None / "" / "none" / "null" as NOT provided. + Only return True when user explicitly provides a real path/uri. + """ + if p is None: + return False + s = str(p).strip() + if s == "" or s.lower() in {"none", "null"}: + return False + return True + +import inspect + +def reset_params_to_signature_defaults(func, local_vars: dict, prefixes: tuple[str, ...]) -> None: + """ + For any parameter whose name starts with one of `prefixes`, + reset it to the default value in function signature. + """ + sig = inspect.signature(func) + for name, p in sig.parameters.items(): + if any(name.startswith(pref) for pref in prefixes): + if p.default is not inspect._empty: + local_vars[name] = p.default + +class Finetuned_model(TypedDict): + results: Path + message: str + +@mcp.tool() +def finetune_dpa_model( + input_path: Path, + model_type: str = "dpa3", + model_path: Optional[Path] = None, + valid_ratio: float = 0.1, + # learning rate + lr_type: str = "exp", + decay_steps: int = 5000, + start_lr: float = 0.001, + stop_lr: float = 3e-5, + # loss + loss_type: str = "ener", + start_pref_e: float = 0.2, + limit_pref_e: float = 20.0, + start_pref_f: float = 100.0, + limit_pref_f: float = 60.0, + start_pref_v: float = 0.02, + limit_pref_v: float = 1.0, + # training schedule + numb_steps: int = 3000, + warmup_steps: int = 2000, + # ---------- dpa3: repflow ---------- + dpa3_repflow_n_dim: int = 128, + dpa3_repflow_e_dim: int = 64, + dpa3_repflow_a_dim: int = 32, + dpa3_repflow_nlayers: int = 16, + dpa3_repflow_e_rcut: float = 6.0, + dpa3_repflow_e_rcut_smth: float = 3.5, + dpa3_repflow_e_sel: int = 1200, + dpa3_repflow_a_rcut: float = 4.0, + dpa3_repflow_a_rcut_smth: float = 3.5, + dpa3_repflow_a_sel: int = 300, + dpa3_repflow_axis_neuron: int = 4, + dpa3_repflow_fix_stat_std: float = 0.3, + dpa3_repflow_a_compress_rate: int = 1, + dpa3_repflow_a_compress_e_rate: int = 2, + dpa3_repflow_a_compress_use_split: bool = True, + dpa3_repflow_update_angle: bool = True, + dpa3_repflow_smooth_edge_update: bool = True, + dpa3_repflow_use_dynamic_sel: bool = True, + dpa3_repflow_sel_reduce_factor: float = 10.0, + dpa3_repflow_use_exp_switch: bool = True, + dpa3_repflow_update_style: str = "res_residual", + dpa3_repflow_update_residual: float = 0.1, + dpa3_repflow_update_residual_init: str = "const", + # ---------- dpa3: top ---------- + dpa3_top_activation_function: str = "silut:3.0", + dpa3_top_use_tebd_bias: bool = False, + dpa3_top_precision: str = "float32", + dpa3_top_concat_output_tebd: bool = False, + # ---------- dpa3: fitting_net ---------- + dpa3_fitting_net_neuron: Optional[List[int]] = None, # e.g. [240, 240, 240] + dpa3_fitting_net_dim_case_embd: int = 31, + dpa3_fitting_net_resnet_dt: bool = True, + dpa3_fitting_net_precision: str = "float32", + dpa3_fitting_net_activation_function: str = "tanh", + dpa3_fitting_net_seed: int = 1, + # ---------- dpa2: repinit ---------- + dpa2_repinit_tebd_dim: int = 80, + dpa2_repinit_rcut: float = 4.0, + dpa2_repinit_rcut_smth: float = 3.5, + dpa2_repinit_nsel: int = 40, + dpa2_repinit_neuron: int = 32, + dpa2_repinit_axis_neuron: int = 4, + dpa2_repinit_three_body_neuron: int = 32, + dpa2_repinit_activation_function: str = "tanh", + dpa2_repinit_three_body_sel: int = 40, + dpa2_repinit_three_body_rcut: float = 4.0, + dpa2_repinit_three_body_rcut_smt: float = 3.5, + dpa2_repinit_use_three_body: bool = True, + # ---------- dpa2: repformer ---------- + dpa2_repformer_rcut: float = 4.0, + dpa2_repformer_rcut_smth: float = 3.5, + dpa2_repformer_nsel: int = 40, + dpa2_repformer_nlayers: int = 6, + dpa2_repformer_g1_dim: int = 384, + dpa2_repformer_g2_dim: int = 96, + dpa2_repformer_attn2_hidden: int = 24, + dpa2_repformer_attn2_nhead: int = 4, + dpa2_repformer_attn1_hidden: int = 128, + dpa2_repformer_attn1_nhead: int = 4, + dpa2_repformer_axis_neuron: int = 4, + dpa2_repformer_update_h2: bool = False, + dpa2_repformer_update_g1_has_conv: bool = True, + dpa2_repformer_update_g1_has_grrg: bool = True, + dpa2_repformer_update_g1_has_drrd: bool = True, + dpa2_repformer_update_g1_has_attn: bool = False, + dpa2_repformer_update_g2_has_g1g1: bool = False, + dpa2_repformer_update_g2_has_attn: bool = True, + dpa2_repformer_update_style: str = "res_residual", + dpa2_repformer_update_residual: float = 0.01, + dpa2_repformer_update_residual_init: str = "norm", + dpa2_repformer_attn2_has_gate: bool = True, + dpa2_repformer_use_sqrt_nnei: bool = True, + dpa2_repformer_g1_out_conv: bool = True, + dpa2_repformer_g1_out_mlp: bool = True, + dpa2_repformer_activation_function: str = "tanh", + # ---------- dpa2: fitting_net ---------- + dpa2_fitting_net_neuron: Optional[List[int]] = None, # e.g. [240, 240, 240] + dpa2_fitting_net_activation_function: str = "tanh", + dpa2_fitting_net_resnet_dt: bool = True, + dpa2_fitting_net_precision: str = "float32", + dpa2_fitting_net_dim_case_embd: int = 37, + dpa2_fitting_net_seed: int = 1, + # ---------- dpa2: top ---------- + dpa2_top_use_tebd_bias: bool = False, + dpa2_top_precision: str = "float32", + dpa2_top_add_tebd_to_repinit_out: bool = False, + # ---- NEW: add at the very end ---- + user_text: Optional[str] = None, + training_steps: Optional[str] = None, + total_steps: Optional[str] = None, + steps: Optional[str] = None, +) -> Finetuned_model: + + """ + Finetune the DPA3 model based on user requirment. If use do not provide model, please just use our dpa2 and dpa3 default model to be fined tuned. + Args: + --input_path (Path): the path to fine tune model data + --valid_ratio (float): the ratio to split data into train and validation sets + --model_type (str): the version of DPA model, could be dpa2 or dpa3. + --model_path (Path): the path to model which need to be finetuned further. If user did not provide model, just use dpa2 or dpa3 default model + --lr_type (str): The type of the learning rate + --decay_steps (int): The learning rate is decaying every this number of training steps. + --start_lr float: The learning rate at the start of the training + --stop_lr (float):The desired learning rate at the end of the training. + --loss_type (str): The type of the loss. possible choices: ener, ener_spin, dos, property, tensor + --start_pref_e (float):The prefactor of energy loss at the start of the training. + --limit_pref_e (float): The prefactor of energy loss at the limit of the training. Should be larger than or equal to 0. + --start_pref_f (float): The prefactor of force loss at the start of the training. Should be larger than or equal to 0 + --limit_pref_f (float): The prefactor of force loss at the limit of the training. Should be larger than or equal to 0. + --start_pref_v (float): The prefactor of virial loss at the start of the training. Should be larger than or equal to 0. + --limit_pref_v (float): The prefactor of virial loss at the limit of the training. Should be larger than or equal to 0. + --numb_steps (int): Number of training batch. + --warmup_steps (int): The number of steps for learning rate warmup. + --dpa3_repflow (Mapping[str,Any]): For dpa3 model type, if user provide with model, it could be defined by user about repflow part + --dpa3_top (Mapping[str,Any]): For dpa3 model type, if user provide with model, it could be defined by user about top part + --dpa3_fitting_net (Mapping[str,Any]): For dpa3 model type, if user provide with model, it could be defined by user about fitting_net part + --dpa2_repinit (Mapping[str,Any]): For dpa2 model type, if user provide with model, it could be defined by user about repinit part + --dpa2_repformer (Mapping[str,Any]): For dpa2 model type, if user provide with model, it could be defined by user about repformer part + --dpa2_top (Mapping[str,Any]): For dpa2 model type, if user provide with model, it could be defined by user about top part + --dpa2_fitting_net (Mapping[str,Any]): For dpa2 model type, if user provide with model, it could be defined by user about fitting_net part + Return: + Finetuned_model with keys: + --results (Path): Path to finetuned model + --message: Message about operation results + + """ + import re + from typing import Optional + + def extract_training_steps(user_text: str) -> Optional[int]: + if not user_text: + return None + text = user_text.replace(",", "") + + # High-confidence patterns (prefer these) + patterns = [ + r"(?:training\s*)?(?:total\s*)?steps\s*[:=]?\s*(\d+)", + r"(?:num(?:b)?_steps|max_steps|steps)\s*[:=]?\s*(\d+)", + r"(?:训练总步数|总步数|训练步数|训练步)\s*[:=:]?\s*(\d+)", + r"(?:跑|训练)\s*(\d+)\s*(?:步|steps?)", + ] + for pat in patterns: + m = re.search(pat, text, flags=re.IGNORECASE) + if m: + return int(m.group(1)) + + return None + + # ====== Normalize and debug-log all important hyper-parameters ====== + + # Remember raw values for debugging + raw_model_type = model_type + raw_lr_type = lr_type + raw_loss_type = loss_type + raw_numb_steps = numb_steps + raw_warmup_steps = warmup_steps + + model_type = coerce_enum( + ModelType, model_type, ModelType.DPA3, "model_type" + ).value + + lr_type = coerce_enum( + LRType, lr_type, LRType.EXP, "lr_type" + ).value + + loss_type = coerce_enum( + LossType, loss_type, LossType.ENER, "loss_type" + ).value + + # dpa3 repflow + dpa3_repflow_update_style = coerce_enum( + UpdateStyleType, + dpa3_repflow_update_style, + UpdateStyleType.RES_RESIDUAL, + "dpa3_repflow_update_style", + ).value + + dpa3_repflow_update_residual_init = coerce_enum( + ResidualInitType, + dpa3_repflow_update_residual_init, + ResidualInitType.CONST, + "dpa3_repflow_update_residual_init", + ).value + + # dpa3 top / fitting + dpa3_top_activation_function = coerce_enum( + ActivationType, + dpa3_top_activation_function, + ActivationType.SILUT_3, + "dpa3_top_activation_function", + ).value + + dpa3_top_precision = coerce_enum( + PrecisionType, + dpa3_top_precision, + PrecisionType.FLOAT32, + "dpa3_top_precision", + ).value + + dpa3_fitting_net_precision = coerce_enum( + PrecisionType, + dpa3_fitting_net_precision, + PrecisionType.FLOAT32, + "dpa3_fitting_net_precision", + ).value + + dpa3_fitting_net_activation_function = coerce_enum( + ActivationType, + dpa3_fitting_net_activation_function, + ActivationType.TANH, + "dpa3_fitting_net_activation_function", + ).value + + # dpa2 repinit / repformer / fitting / top + dpa2_repinit_activation_function = coerce_enum( + ActivationType, + dpa2_repinit_activation_function, + ActivationType.TANH, + "dpa2_repinit_activation_function", + ).value + + dpa2_repformer_update_style = coerce_enum( + UpdateStyleType, + dpa2_repformer_update_style, + UpdateStyleType.RES_RESIDUAL, + "dpa2_repformer_update_style", + ).value + + dpa2_repformer_update_residual_init = coerce_enum( + ResidualInitType, + dpa2_repformer_update_residual_init, + ResidualInitType.NORM, + "dpa2_repformer_update_residual_init", + ).value + + dpa2_repformer_activation_function = coerce_enum( + ActivationType, + dpa2_repformer_activation_function, + ActivationType.TANH, + "dpa2_repformer_activation_function", + ).value + + dpa2_fitting_net_activation_function = coerce_enum( + ActivationType, + dpa2_fitting_net_activation_function, + ActivationType.TANH, + "dpa2_fitting_net_activation_function", + ).value + + dpa2_fitting_net_precision = coerce_enum( + PrecisionType, + dpa2_fitting_net_precision, + PrecisionType.FLOAT32, + "dpa2_fitting_net_precision", + ).value + + dpa2_top_precision = coerce_enum( + PrecisionType, + dpa2_top_precision, + PrecisionType.FLOAT32, + "dpa2_top_precision", + ).value + + # numeric hyper-parameters + valid_ratio = coerce_float(valid_ratio, 0.1, "valid_ratio") + decay_steps = coerce_int(decay_steps, 5000, "decay_steps") + start_lr = coerce_float(start_lr, 0.001, "start_lr") + stop_lr = coerce_float(stop_lr, 3e-5, "stop_lr") + + # Prefactors MUST be > 0; agent sometimes injects 0 which kills gradients. + start_pref_e = coerce_positive_float(start_pref_e, 0.2, "start_pref_e") + start_pref_f = coerce_positive_float(start_pref_f, 100.0, "start_pref_f") + start_pref_v = coerce_positive_float(start_pref_v, 0.02, "start_pref_v") + + # Limits should also be > 0 (and usually >= start) + limit_pref_e = coerce_positive_float(limit_pref_e, 20.0, "limit_pref_e") + limit_pref_f = coerce_positive_float(limit_pref_f, 60.0, "limit_pref_f") + limit_pref_v = coerce_positive_float(limit_pref_v, 1.0, "limit_pref_v") + + # Ensure limit >= start (safety) + limit_pref_e = max(limit_pref_e, start_pref_e) + limit_pref_f = max(limit_pref_f, start_pref_f) + limit_pref_v = max(limit_pref_v, start_pref_v) + + if (start_pref_e <= 1e-12) and (start_pref_f <= 1e-12) and (start_pref_v <= 1e-12): + logging.warning("All start_pref_* are ~0. Resetting to safe defaults.") + start_pref_e, start_pref_f, start_pref_v = 0.2, 100.0, 0.02 + + + # ====== Solve "could not read steps" problem ====== + DEFAULT_NUMB_STEPS = 3000 + DEFAULT_WARMUP_STEPS = 2000 + + raw_numb_steps = numb_steps + raw_warmup_steps = warmup_steps + + # Normalize both (handles "2000 steps", "3,000", etc.) + numb_steps = coerce_int(numb_steps, DEFAULT_NUMB_STEPS, "numb_steps") + warmup_steps = coerce_int(warmup_steps, DEFAULT_WARMUP_STEPS, "warmup_steps") + + # If user_text contains an explicit total steps, let it override (highest priority) + parsed_total = extract_training_steps(user_text) if user_text else None + if parsed_total is not None: + if parsed_total != numb_steps: + logging.info( + "DEBUG steps override: user_text specifies total_steps=%d, overriding agent numb_steps=%d", + parsed_total, + numb_steps, + ) + numb_steps = parsed_total + + # --------- Critical safety clamp for warmup_steps ---------- + # DeepMD requires: warmup_steps < numb_steps OR warmup_steps == 0 + if numb_steps <= 0: + # Defensive: never allow non-positive total steps + logging.warning("Invalid numb_steps=%d; forcing to 1 and warmup_steps=0", numb_steps) + numb_steps = 1 + warmup_steps = 0 + else: + if warmup_steps < 0: + logging.warning("Invalid warmup_steps=%d; forcing to 0", warmup_steps) + warmup_steps = 0 + + if warmup_steps >= numb_steps: + # Strategy: keep warmup small and valid + # Option A (recommended): set warmup to 0 when total steps is small + new_warmup = 0 if numb_steps < 50 else max(1, int(0.1 * numb_steps)) + # Ensure strictly less than numb_steps + new_warmup = min(new_warmup, numb_steps - 1) + logging.warning( + "Warmup steps (%d) >= total steps (%d). Adjusting warmup_steps -> %d", + warmup_steps, + numb_steps, + new_warmup, + ) + warmup_steps = new_warmup + + logging.info( + "DEBUG final schedule: numb_steps raw=%r -> %d, warmup_steps raw=%r -> %d", + raw_numb_steps, numb_steps, raw_warmup_steps, warmup_steps + ) + + # debug log: see what actually happened + logging.info( + "DEBUG hyper: model_type raw=%r final=%r, " + "lr_type raw=%r final=%r, " + "loss_type raw=%r final=%r, " + "numb_steps raw=%r final=%r, " + "warmup_steps raw=%r final=%r", + raw_model_type, + model_type, + raw_lr_type, + lr_type, + raw_loss_type, + loss_type, + raw_numb_steps, + numb_steps, + raw_warmup_steps, + warmup_steps, + ) + + user_provided_model = is_user_provided_model_path(model_path) + + if not user_provided_model: + default_model = True + if model_type == "dpa3": + input_json = "/opt/agents/dpa_finetune/input_dpa3.json" + model_path = Path("/opt/agents/dpa_finetune/models/dpa3.pt") + else: + input_json = "/opt/agents/dpa_finetune/input_dpa2.json" + model_path = Path("/opt/agents/dpa_finetune/dpa2.pt") + else: + default_model = False + if model_type == "dpa3": + input_json = "/opt/agents/dpa_finetune/input_dpa3.json" + else: + input_json = "/opt/agents/dpa_finetune/input_dpa2.json" + + # Lock architecture-related params when using built-in default model + if default_model: + reset_params_to_signature_defaults( + finetune_dpa_model, + locals(), + prefixes=("dpa3_", "dpa2_"), + ) + logging.info("DEBUG: default_model=True -> locked all dpa3_* / dpa2_* params to signature defaults.") + + + dpa3_repflow = { + "n_dim": dpa3_repflow_n_dim, + "e_dim": dpa3_repflow_e_dim, + "a_dim": dpa3_repflow_a_dim, + "nlayers": dpa3_repflow_nlayers, + "e_rcut": dpa3_repflow_e_rcut, + "e_rcut_smth": dpa3_repflow_e_rcut_smth, + "e_sel": dpa3_repflow_e_sel, + "a_rcut": dpa3_repflow_a_rcut, + "a_rcut_smth": dpa3_repflow_a_rcut_smth, + "a_sel": dpa3_repflow_a_sel, + "axis_neuron": dpa3_repflow_axis_neuron, + "fix_stat_std": dpa3_repflow_fix_stat_std, + "a_compress_rate": dpa3_repflow_a_compress_rate, + "a_compress_e_rate": dpa3_repflow_a_compress_e_rate, + "a_compress_use_split": dpa3_repflow_a_compress_use_split, + "update_angle": dpa3_repflow_update_angle, + "smooth_edge_update": dpa3_repflow_smooth_edge_update, + "use_dynamic_sel": dpa3_repflow_use_dynamic_sel, + "sel_reduce_factor": dpa3_repflow_sel_reduce_factor, + "use_exp_switch": dpa3_repflow_use_exp_switch, + "update_style": dpa3_repflow_update_style, + "update_residual": dpa3_repflow_update_residual, + "update_residual_init": dpa3_repflow_update_residual_init, + } + dpa3_top = { + "activation_function": dpa3_top_activation_function, + "use_tebd_bias": dpa3_top_use_tebd_bias, + "precision": dpa3_top_precision, + "concat_output_tebd": dpa3_top_concat_output_tebd, + } + dpa3_fitting_net = { + "neuron":dpa3_fitting_net_neuron, + "dim_case_embd":dpa3_fitting_net_dim_case_embd, + "resnet_dt": dpa3_fitting_net_resnet_dt, + "precision":dpa3_fitting_net_precision, + "activation_function":dpa3_fitting_net_activation_function, + "seed":dpa3_fitting_net_seed, + } + dpa2_repinit = { + "tebd_dim":dpa2_repinit_tebd_dim, + "rcut":dpa2_repinit_rcut, + "rcut_smth":dpa2_repinit_rcut_smth, + "nsel":dpa2_repinit_nsel, + "neuron":dpa2_repinit_neuron, + "axis_neuron":dpa2_repinit_axis_neuron, + "three_body_neuron":dpa2_repinit_three_body_neuron, + "activation_function":dpa2_repinit_activation_function, + "three_body_sel":dpa2_repinit_three_body_sel, + "three_body_rcut":dpa2_repinit_three_body_rcut, + "three_body_rcut_smth":dpa2_repinit_three_body_rcut_smt, + "use_three_body": dpa2_repinit_use_three_body, + } + dpa2_repformer = { + "rcut":dpa2_repformer_rcut, + "rcut_smth":dpa2_repformer_rcut_smth, + "nsel":dpa2_repformer_nsel, + "nlayers": dpa2_repformer_nlayers, + "g1_dim": dpa2_repformer_g1_dim, + "g2_dim": dpa2_repformer_g2_dim, + "attn2_hidden": dpa2_repformer_attn2_hidden, + "attn2_nhead": dpa2_repformer_attn2_nhead, + "attn1_hidden": dpa2_repformer_attn1_hidden, + "attn1_nhead": dpa2_repformer_attn1_nhead, + "axis_neuron": dpa2_repformer_axis_neuron, + "update_h2": dpa2_repformer_update_h2, + "update_g1_has_conv": dpa2_repformer_update_g1_has_conv, + "update_g1_has_grrg": dpa2_repformer_update_g1_has_grrg, + "update_g1_has_drrd": dpa2_repformer_update_g1_has_drrd, + "update_g1_has_attn": dpa2_repformer_update_g1_has_attn, + "update_g2_has_g1g1": dpa2_repformer_update_g2_has_g1g1, + "update_g2_has_attn": dpa2_repformer_update_g2_has_attn, + "update_style":dpa2_repformer_update_style, + "update_residual":dpa2_repformer_update_residual, + "update_residual_init":dpa2_repformer_update_residual_init, + "attn2_has_gate": dpa2_repformer_attn2_has_gate, + "use_sqrt_nnei": dpa2_repformer_use_sqrt_nnei, + "g1_out_conv": dpa2_repformer_g1_out_conv, + "g1_out_mlp": dpa2_repformer_g1_out_mlp, + "activation_function":dpa2_repformer_activation_function, + } + dpa2_fitting_net = { + "neuron": dpa2_fitting_net_neuron, + "activation_function": dpa2_fitting_net_activation_function, + "resnet_dt": dpa2_fitting_net_resnet_dt, + "precision": dpa2_fitting_net_precision, + "dim_case_embd": dpa2_fitting_net_dim_case_embd, + "seed": dpa2_fitting_net_seed, + } + dpa2_top = { + "use_tebd_bias": dpa2_top_use_tebd_bias, + "precision": dpa2_top_precision, + "add_tebd_to_repinit_out": dpa2_top_add_tebd_to_repinit_out, + } + #input_json = "bohrium://13756/501205/store/upload/523748fd-7d2e-4e94-9304-8d49e710e8ba/input_dpa3.json" + update_dpa_train_json( + input_json, + model_type, + default_model, + lr_type, + decay_steps, + start_lr, + stop_lr, + loss_type, + start_pref_e, + limit_pref_e, + start_pref_f, + limit_pref_f, + start_pref_v, + limit_pref_v, + numb_steps, + warmup_steps, + dpa3_repflow, + dpa3_top, + dpa3_fitting_net, + dpa2_repinit, + dpa2_repformer, + dpa2_fitting_net, + dpa2_top, + "train.json" + ) + print(f"checkcheck lr_type={lr_type}") +#Split dpdata into train and valid two sets + split_train_valid(str(input_path), valid_ratio) + #model_path = "bohrium://13756/501205/store/upload/6ea62778-1c8a-4ff6-9b12-85288b1912aa/dpa3.pt" + if model_type == "dpa3": + cmd = [ "dp", + "--pt", + "train", + "train.json", + "--finetune", + str(model_path), + ] + else: + cmd = [ "dp", + "--pt", + "train", + "train.json", + "--finetune", + str(model_path), + "--model-branch", + "Omat24" + ] + + subprocess.run(cmd, check=True) + + + finetuned_model = './model.ckpt.pt' + finetuned_model_path = Path(finetuned_model) + return{ + "results": finetuned_model_path, + "message": "Fine tune model successfully!" + } + +# ====== Run Server ====== + +if __name__ == "__main__": + logging.info("Starting FinetuneDPAServer on port 50003...") + mcp.run(transport="sse") + diff --git a/servers/finetune_dpa/uv.lock b/servers/finetune_dpa/uv.lock new file mode 100644 index 0000000..a274c1e --- /dev/null +++ b/servers/finetune_dpa/uv.lock @@ -0,0 +1,8 @@ +version = 1 +revision = 2 +requires-python = ">=3.12" + +[[package]] +name = "superconductor-mcp-server" +version = "0.1.0" +source = { virtual = "." } diff --git a/servers/superconductor/README.md b/servers/superconductor/README.md index db11e69..87f7855 100644 --- a/servers/superconductor/README.md +++ b/servers/superconductor/README.md @@ -1,4 +1,4 @@ -# ThermoelectricMCP Server +# SuperconductorMCP Server A tool to predict supercondutor related properties and screen promising supercondutors. diff --git a/servers/superconductor/superconductor_mcp_server.py b/servers/superconductor/server.py similarity index 57% rename from servers/superconductor/superconductor_mcp_server.py rename to servers/superconductor/server.py index 2a05341..806bcf8 100644 --- a/servers/superconductor/superconductor_mcp_server.py +++ b/servers/superconductor/server.py @@ -19,36 +19,12 @@ from pymatgen.core import Composition -import argparse - # Setup logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) -def parse_args(): - """Parse command line arguments for MCP server.""" - parser = argparse.ArgumentParser(description="DPA Calculator MCP Server") - parser.add_argument('--port', type=int, default=50001, help='Server port (default: 50001)') - parser.add_argument('--host', default='0.0.0.0', help='Server host (default: 0.0.0.0)') - parser.add_argument('--log-level', default='INFO', - choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], - help='Logging level (default: INFO)') - try: - args = parser.parse_args() - except SystemExit: - class Args: - port = 50001 - host = '0.0.0.0' - log_level = 'INFO' - args = Args() - return args - -args = parse_args() - - - # Initialize MCP server mcp = CalculationMCPServer( "SuperconductorServer", @@ -56,16 +32,19 @@ class Args: port=50002 ) - -def run_optimization( - structures: List[Path], +class RunOptimizationResult(TypedDict): + optimized_poscar_paths: Path + message: str +@mcp.tool() +def run_superconductor_optimization( + structure_path: Path, ambient: bool -) -> List[Path]: +) -> RunOptimizationResult: """ Optimize structures with DP model at ambient or high pressure condition. Args: - - structure (Path): Path to access structures need to be optimized + - structure_path (Path): Path to access structures need to be optimized - ambient (bool): Wether consider ambient condition Return: - optimized_structure_path (Path): Path to access optimized structures @@ -79,6 +58,13 @@ def run_optimization( pressure = 200 nsteps = 2000 + + base = Path(structure_path) + structure_path = base.parent if base.is_file() else base + + structures = list(p for pat in ("POSCAR*", "*.cif", "*.CIF") + for p in structure_path.rglob(pat)) + print(f"The length of structures {len(structures)}") try: # Build command: use the actual path to opt_py, not the literal string "opt_py" @@ -123,11 +109,14 @@ def run_optimization( for frame in system: system.to_vasp_poscar(optimized_dir / f'POSCAR_{count}') count+=1 - optimized_structures = list(optimized_dir.rglob("POSCAR*")) + #optimized_structures = list(optimized_dir.rglob("POSCAR*")) except Exception as e: print("Collect POSCAR failed!") - return optimized_structures + return{ + "optimized_poscar_paths": optimized_dir, + "message": "Geometry Optimization successfully" + } ### Tool generate structures with Calypso class GenerateCalypsoStructureResult(TypedDict): @@ -183,255 +172,266 @@ class GenerateCalypsoStructureResult(TypedDict): } -@mcp.tool() -def generate_calypso_structure( - species: List[str], - n_tot: int - )->GenerateCalypsoStructureResult: - """ - Generate n_tot CALYPSO structures using specified species. - If user did not mention species and total number structures to generate, please remind the user to provide these information. - - Args: - species (List[str]): A list of chemical element symbols (e.g., ["Mg", "O", "Si"]). These elements will be used as building blocks in the CALYPSO structure generation. - All element symbols must be from the supported element list internally defined in the tool. - - n_tot (int): The number of CALYPSO structure configurations to generate. Each structure will be generated in a separate subdirectory (e.g., generated_calypso/0/, generated_calypso/1/, etc.) - Return: - GenerateCalypsoStructureResult with keys: - - poscar_paths (Path): Path to access generated structures POSCAR. All structures are saved in outputs/poscars_for_optimization/ - - message (str): Message about calculation results information. - """ - - def get_props(s_list): - """ - Get atomic number, atomic radius, and atomic volume infomation for interested species - - Args: - s_list: species list needed to get atomic number, atomic radius, and atomic volume infomation - - Return: - z_list (List): atomic number list for given species list, - r_list (List): atomic radius list for given species list, - v_list (List): atomic volume list for given species list. - """ - - z_list, r_list, v_list = [], [], [] - for s in s_list: - if s not in ELEMENT_PROPS: - raise ValueError(f"Unsupported element: {s}") - props = ELEMENT_PROPS[s] - z_list.append(props["Z"]) - r_list.append(props["r"]) - v_list.append(props["v"]) - return z_list, r_list, v_list - - def generate_counts(n): - return [random.randint(1, 10) for _ in range(n)] - - def write_input(path, species, z_list, n_list, r_mat, volume): - """ - Write calypso input files for given species combination with atomic number, number of each species, radius matrix and total volume - - Args: - - path (Path): Path to save input file, - - species (List[str]): Species list - - z_list (List[int]): atomic number list - - n_list (List[int]): number of each species list - - r_mat: radius matrix - - volume (float): total volume - """ - - # Step 1: reorder all based on atomic number - sorted_indices = sorted(range(len(z_list)), key=lambda i: z_list[i]) - species = [species[i] for i in sorted_indices] - z_list = [z_list[i] for i in sorted_indices] - n_list = [n_list[i] for i in sorted_indices] - r_mat = r_mat[np.ix_(sorted_indices, sorted_indices)] # reorder matrix - - # Step 2: write input.dat - with open(path / "input.dat", "w") as f: - f.write(f"SystemName = {' '.join(species)}\n") - f.write(f"NumberOfSpecies = {len(species)}\n") - f.write(f"NameOfAtoms = {' '.join(species)}\n") - f.write("@DistanceOfIon\n") - for i in range(len(species)): - row = " ".join(f"{r_mat[i][j]:.3f}" for j in range(len(species))) - f.write(row + "\n") - f.write("@End\n") - f.write(f"AtomicNumber = {' '.join(str(z) for z in z_list)}\n") - f.write(f"NumberOfAtoms = {' '.join(str(n) for n in n_list)}\n") - f.write("""Ialgo = 2 -PsoRatio = 0.5 -PopSize = 1 -GenType = 1 -ICode = 15 -NumberOfLbest = 4 -NumberOfLocalOptim = 3 -Command = sh submit.sh -MaxTime = 9000 -MaxStep = 1 -PickUp = F -PickStep = 1 -Parallel = F -NumberOfParallel = 4 -Split = T -PSTRESS = 2000 -fmax = 0.01 -FixCell = F -""") - - - - #===== Step 1: Generate calypso input files ========== - outdir = Path("generated_calypso") - outdir.mkdir(parents=True, exist_ok=True) - - - z_list, r_list, v_list = get_props(species) - for i in range(n_tot): - try: - n_list = generate_counts(len(species)) - volume = sum(n * v for n, v in zip(n_list, v_list)) - r_mat = np.add.outer(r_list, r_list) * 0.529 # bohr → Å - - struct_dir = outdir / f"{i}" - if not struct_dir.exists(): - struct_dir.mkdir(parents=True, exist_ok=True) - - #Prepare calypso input.dat - write_input(struct_dir, species, z_list, n_list, r_mat, volume) - except Exception as e: - return{ - "poscar_paths" : None, - "message": "Input files generations for calypso failed!" - } - - #Execuate calypso calculation and screening - flim_ase_path = Path("/opt/agents/thermal_properties/flim_ase/flim_ase.py") - command = f"/opt/agents/thermal_properties/calypso/calypso.x >> tmp_log && python {flim_ase_path}" - if not flim_ase_path.exists(): - return{ - "poscar_paths": None, - "message": "flim_ase.py did not found!" - - } - try: - subprocess.run(command, cwd=struct_dir, shell=True) - except Exception as e: - return{ - "poscar_paths": None, - "message": "calypso.x execute failed!" - } - - #Clean struct_dir only save input.dat and POSCAR_1 - for file in struct_dir.iterdir(): - if file.name not in ["input.dat", "POSCAR_1"]: - if file.is_file(): - file.unlink() - elif file.is_dir(): - shutil.rmtree(file) - - # Step 3: Collect POSCAR_1 into POSCAR_n format - try: - output_dir = Path("outputs") - output_dir.mkdir(parents=True, exist_ok=True) - final_dir = output_dir / "poscars_for_optimization" - final_dir.mkdir(parents=True, exist_ok=True) - counter = 0 - for struct_dir in outdir.iterdir(): - poscar_path = struct_dir / "POSCAR_1" - if poscar_path.exists(): - new_name = final_dir / f"POSCAR_{counter}" - shutil.copy(poscar_path, new_name) - counter += 1 - - return{ - "poscar_paths": Path(final_dir), - "message": f"Calypso generated {n_tot} structures with {species} successfully!" - } - except Exception as e: - return{ - "poscar_paths": None, - "message": "Calypso generated POSCAR files collected failed!" - } - - - -#================ Tool to generate structures with conditional properties via CrystalFormer =================== -class GenerateCryFormerStructureResult(TypedDict): - poscar_paths: Path - message: str - -@mcp.tool() -def generate_crystalformer_structures( - space_group: int, - ambient: bool, - target_values: List[float], - n_tot: int -)->GenerateCryFormerStructureResult: - """ - Generate n_tot conditional superconductor structures with target critical temperature and space group number. - If ambient condition, please using /opt/agents/superconductor/models/ambient_pressure/model.ckpt-1000000.pt model predicts critical temperature. - If high pressure condition, please using /opt/agents/superconductor/models/high_pressure/model.ckpt-100000.pt model predicts critical temperature. - If user did not mention space group number requirement, pressure condition, please reminder user to give instruction. - - Args: - space_group (int): Target space group number for generated structures. - ambient (bool): Wether consider ambient condition superconductor. - target_values (List[float]): Target critical temperature - n_tot (int): Total number of structures generated - Returns: - poscar_paths (Path): Path to generated POSCAR. - message (str): Message about calculation results. - """ - try: - if ambient: - model = Path("/opt/agents/superconductor/models/ambient_pressure/model.ckpt-1000000.pt") - else: - model = Path("/opt/agents/superconductor/models/high_pressure/model.ckpt-100000.pt") - - try: - - #activate uv - workdir = Path("/opt/agents/crystalformer_gpu") - outputs = workdir/ "outputs" - - - alpha = [0.5] - mc_steps = 2000 - cmd = [ - "uv", "run", "python", - "crystalformer_mcp.py", - "--cond_model_path", str(model), - "--target", str(target_values), - "--alpha", str(alpha), - "--spacegroup", str(space_group), - "--mc_steps", str(mc_steps), - "--num_samples", str(n_tot), - "--output_path", str(outputs) - ] - subprocess.run(cmd, cwd=workdir, check=True) - - output_path = Path("outputs") - if output_path.exists(): - shutil.rmtree(output_path) - shutil.copytree(outputs, output_path) - return { - "poscar_paths": output_path, - "message": "CrystalFormer structure generation successfully!" - } - except Exception as e: - return { - "poscar_paths": None, - "message": "CrystalFormer Execution failed!" - } - - except Exception as e: - return { - "poscar_paths": None, - "message": "CrystalFormer Generation failed!" - } +#@mcp.tool() +#def generate_calypso_superconductor_structure( +# species: List[str], +# n_tot: int +# )->GenerateCalypsoStructureResult: +# """ +# Generate n_tot CALYPSO structures using specified species. This tool is element-guided structure generation tool, user only need +# provide chemical element information and total number is fine. +# +# If user did not mention species and total number structures to generate, please remind the user to provide these information. +# +# Args: +# species (List[str]): A list of chemical element symbols (e.g., ["Mg", "O", "Si"]). These elements will be used as building blocks in the CALYPSO structure generation. +# All element symbols must be from the supported element list internally defined in the tool. +# +# n_tot (int): The number of CALYPSO structure configurations to generate. Each structure will be generated in a separate subdirectory (e.g., generated_calypso/0/, generated_calypso/1/, etc.) +# Return: +# GenerateCalypsoStructureResult with keys: +# - poscar_paths (Path): Path to access generated structures POSCAR. All structures are saved in outputs/poscars_for_optimization/ +# - message (str): Message about calculation results information. +# """ +# +# def get_props(s_list): +# """ +# Get atomic number, atomic radius, and atomic volume infomation for interested species +# +# Args: +# s_list: species list needed to get atomic number, atomic radius, and atomic volume infomation +# +# Return: +# z_list (List): atomic number list for given species list, +# r_list (List): atomic radius list for given species list, +# v_list (List): atomic volume list for given species list. +# """ +# +# z_list, r_list, v_list = [], [], [] +# for s in s_list: +# if s not in ELEMENT_PROPS: +# raise ValueError(f"Unsupported element: {s}") +# props = ELEMENT_PROPS[s] +# z_list.append(props["Z"]) +# r_list.append(props["r"]) +# v_list.append(props["v"]) +# return z_list, r_list, v_list +# +# def generate_counts(n): +# return [random.randint(1, 10) for _ in range(n)] +# +# def write_input(path, species, z_list, n_list, r_mat, volume): +# """ +# Write calypso input files for given species combination with atomic number, number of each species, radius matrix and total volume +# +# Args: +# - path (Path): Path to save input file, +# - species (List[str]): Species list +# - z_list (List[int]): atomic number list +# - n_list (List[int]): number of each species list +# - r_mat: radius matrix +# - volume (float): total volume +# """ +# +# # Step 1: reorder all based on atomic number +# sorted_indices = sorted(range(len(z_list)), key=lambda i: z_list[i]) +# species = [species[i] for i in sorted_indices] +# z_list = [z_list[i] for i in sorted_indices] +# n_list = [n_list[i] for i in sorted_indices] +# r_mat = r_mat[np.ix_(sorted_indices, sorted_indices)] # reorder matrix +# +# # Step 2: write input.dat +# with open(path / "input.dat", "w") as f: +# f.write(f"SystemName = {' '.join(species)}\n") +# f.write(f"NumberOfSpecies = {len(species)}\n") +# f.write(f"NameOfAtoms = {' '.join(species)}\n") +# f.write("@DistanceOfIon\n") +# for i in range(len(species)): +# row = " ".join(f"{r_mat[i][j]:.3f}" for j in range(len(species))) +# f.write(row + "\n") +# f.write("@End\n") +# f.write(f"AtomicNumber = {' '.join(str(z) for z in z_list)}\n") +# f.write(f"NumberOfAtoms = {' '.join(str(n) for n in n_list)}\n") +# f.write("""Ialgo = 2 +#PsoRatio = 0.5 +#PopSize = 1 +#GenType = 1 +#ICode = 15 +#NumberOfLbest = 4 +#NumberOfLocalOptim = 3 +#Command = sh submit.sh +#MaxTime = 9000 +#MaxStep = 1 +#PickUp = F +#PickStep = 1 +#Parallel = F +#NumberOfParallel = 4 +#Split = T +#PSTRESS = 2000 +#fmax = 0.01 +#FixCell = F +#""") +# +# +# +# #===== Step 1: Generate calypso input files ========== +# outdir = Path("generated_calypso") +# outdir.mkdir(parents=True, exist_ok=True) +# +# +# z_list, r_list, v_list = get_props(species) +# for i in range(n_tot): +# try: +# n_list = generate_counts(len(species)) +# volume = sum(n * v for n, v in zip(n_list, v_list)) +# r_mat = np.add.outer(r_list, r_list) * 0.529 # bohr → Å +# +# struct_dir = outdir / f"{i}" +# if not struct_dir.exists(): +# struct_dir.mkdir(parents=True, exist_ok=True) +# +# #Prepare calypso input.dat +# write_input(struct_dir, species, z_list, n_list, r_mat, volume) +# except Exception as e: +# return{ +# "poscar_paths" : None, +# "message": "Input files generations for calypso failed!" +# } +# +# #Execuate calypso calculation and screening +# flim_ase_path = Path("/opt/agents/thermal_properties/flim_ase/flim_ase.py") +# command = f"/opt/agents/thermal_properties/calypso/calypso.x >> tmp_log && python {flim_ase_path}" +# if not flim_ase_path.exists(): +# return{ +# "poscar_paths": None, +# "message": "flim_ase.py did not found!" +# +# } +# try: +# subprocess.run(command, cwd=struct_dir, shell=True) +# except Exception as e: +# return{ +# "poscar_paths": None, +# "message": "calypso.x execute failed!" +# } +# +# #Clean struct_dir only save input.dat and POSCAR_1 +# for file in struct_dir.iterdir(): +# if file.name not in ["input.dat", "POSCAR_1"]: +# if file.is_file(): +# file.unlink() +# elif file.is_dir(): +# shutil.rmtree(file) +# +# # Step 3: Collect POSCAR_1 into POSCAR_n format +# try: +# output_dir = Path("outputs") +# output_dir.mkdir(parents=True, exist_ok=True) +# final_dir = output_dir / "poscars_for_optimization" +# final_dir.mkdir(parents=True, exist_ok=True) +# counter = 0 +# for struct_dir in outdir.iterdir(): +# poscar_path = struct_dir / "POSCAR_1" +# if poscar_path.exists(): +# new_name = final_dir / f"POSCAR_{counter}" +# shutil.copy(poscar_path, new_name) +# counter += 1 +# +# return{ +# "poscar_paths": Path(final_dir), +# "message": f"Calypso generated {n_tot} structures with {species} successfully!" +# } +# except Exception as e: +# return{ +# "poscar_paths": None, +# "message": "Calypso generated POSCAR files collected failed!" +# } + + + +##================ Tool to generate structures with conditional properties via CrystalFormer =================== +#class GenerateCryFormerStructureResult(TypedDict): +# poscar_paths: Path +# message: str +# +#@mcp.tool() +#def generate_crystalformer_superconductor_structures( +# space_group: int, +# ambient: bool, +# target_values: float, +# comparison_ops:Optional[str], +# n_tot: int +#)->GenerateCryFormerStructureResult: +# """ +# Generate n_tot conditional superconductor structures with target critical temperature and space group number. This tool is property-guided structure generation tool. User need to define the desired property and corresponding target values and comparison operator. +# If ambient condition, please using /opt/agents/superconductor/models/ambient_pressure/model.ckpt-1000000.pt model predicts critical temperature. +# If high pressure condition, please using /opt/agents/superconductor/models/high_pressure/model.ckpt-100000.pt model predicts critical temperature. +# If user did not mention space group number requirement, pressure condition, please reminder user to give instruction. +# If user did not mentioned the comparison operator comparison_ops, please remind the user to give a value +# +# Args: +# space_group (int): Target space group number for generated structures. +# ambient (bool): Wether consider ambient condition superconductor. +# target_values (float): Target critical temperature. +# comparison_ops (Optional[str]): One per target_prop; each must be one of "greater", "less", "equal", "minimize". If none, please use greater for all target_props. +# n_tot (int): Total number of structures generated +# Returns: +# poscar_paths (Path): Path to generated POSCAR. +# message (str): Message about calculation results. +# """ +# try: +# if ambient: +# target_prop = "ambient_pressure" +# else: +# target_prop = "high_pressure" +# +# try: +# +# #activate uv +# workdir = Path("/opt/agents/mcp_tool") +# outputs = workdir/ "target" +# +# +# mc_steps = 2000 +# upper=min(space_group, n_tot) +# random_spacegroup_num = random.randint(1,upper) +# +# cmd = [ +# "uv", "run", "python", +# "mcp_tool.py", +# "--mode", 'single', +# "--cond_model_type", target_prop, +# "--target", str(target_values), +# "--target_type", str(comparison_ops), +# "--alpha", '10', +# "--spacegroup", str(space_group), +# "--random_spacegroup_num", "1", #str(random_spacegroup_num), +# "--init_sample_num", str(n_tot), +# "--mc_steps", str(mc_steps), +# ] +# +# print(f"cmd = {cmd}") +# +# subprocess.run(cmd, cwd=workdir, check=True) +# +# output_path = Path("outputs") +# if output_path.exists(): +# shutil.rmtree(output_path) +# shutil.copytree(outputs, output_path) +# return { +# "poscar_paths": output_path, +# "message": "CrystalFormer structure generation successfully!" +# } +# except Exception as e: +# return { +# "poscar_paths": None, +# "message": "CrystalFormer Generation failed!" +# } +# except Exception as e: +# return { +# "poscar_paths": None, +# "message": "CrystalFormer Generation failed!" +# } @@ -448,7 +448,7 @@ class CalculateEntalpyResult(TypedDict): message: str #======================Tool to calculate structure enthalpy====================== @mcp.tool() -def calculate_enthalpy( +def calculate_superconductor_enthalpy( structure_path: Path, threshold: float, ambient: bool @@ -566,9 +566,14 @@ def calculate_enthalpy( enthalpy_dir.mkdir(parents=True, exist_ok=True) try: - poscar_files = list(structure_path.rglob("POSCAR*")) + #poscar_files = list(structure_path.rglob("POSCAR*")) + base = Path(structure_path) + structure_path = base.parent if base.is_file() else base + try: - optimized_structures = run_optimization(list(poscar_files), ambient) + results = run_superconductor_optimization(structure_path,ambient) + optimized_structure_path = results["optimized_poscar_paths"] + optimized_structures = list(optimized_structure_path.rglob("POSCAR*")) except Exception as e: return{ "enthalpy_file": [], @@ -705,7 +710,8 @@ def calculate_enthalpy( shutil.copy(src, enthalpy_dir) src = Path("/opt/agents/superconductor/geo_opt/results/e_above_hull_50meV.csv") - shutil.copy(src, enthalpy_dir) + dest = enthalpy_dir / f"e_above_hull.csv" + shutil.copy(src, dest) except Exception as e: return{ @@ -719,8 +725,9 @@ def calculate_enthalpy( on_hull_optimized_structures = enthalpy_dir / "e_above_hull_structures" on_hull_optimized_structures.mkdir(parents=True, exist_ok=True) - e_above_hull_file = enthalpy_dir / "e_above_hull_50meV.csv" - e_above_hull_output = enthalpy_dir / "e_above_hull.csv" + threshold_meV = int(threshold*1000) + e_above_hull_output = enthalpy_dir / f"e_above_hull_{threshold_meV}meV.csv" + e_above_hull_file = enthalpy_dir / "e_above_hull.csv" with e_above_hull_file.open("r") as f, e_above_hull_output.open("w") as fout: # write header for new CSV @@ -781,7 +788,7 @@ def calculate_enthalpy( "message": "Enthalpy prediction failed!" } -#### Tool to predict superconductor critical temperature #### + class SuperconductorCriticalTemperatures(TypedDict): Tc: float path: str @@ -795,11 +802,130 @@ class SuperconductorTcResult(TypedDict): @mcp.tool() def predict_superconductor_Tc( structure_path: Path, - above_hull_file: Path, ambient: bool ) -> SuperconductorTcResult: """ - Predict superconductor critical temperature at different pressure conditions with pretrained dpa model. + Predict material critical temperature at different pressure conditions with pretrained dpa model. + If at ambient condition, using /opt/agents/superconductor/models/ambient_pressure/model.ckpt-1000000.pt model predicts critical temperature. + If at high pressure condition, using /opt/agents/superconductor/models/high_pressure/model.ckpt-100000.pt model predicts critical temperature. + + If user did not mention pressure condition, please remind user to choose ambient or high pressure condition. + + + Args: + structure_path (Path): Path to either structure file (POSCAR/CIF) + ambient (bool): Wether consider ambient condition + + Returns: + SuperconductorTcResult: Dictionary with keys: + - results_file (Path): Path to access result files critical_temperature.csv, which saved in outputs/superconductor_critical_temperature.csv + - message (str): Message about calculations results. + """ + try: + + + structure_path = Path(structure_path) + base = Path(structure_path) + structure_path = base.parent if base.is_file() else base + if not structure_path.exists(): + return { + "results_file": {}, + "message": f"Structure path not found: {structure_path}" + } + + + #Determine used model for critical temperature prediction + if ambient: + used_model = Path("/opt/agents/superconductor/models/ambient_pressure/model.ckpt-1000000.pt") + else: + used_model = Path("/opt/agents/superconductor/models/high_pressure/model.ckpt-100000.pt") + + if not used_model.exists(): + return { + "results_file": {}, + "message": f"{used_model} not exists!" + } + #find all structures + results = run_superconductor_optimization(structure_path, ambient) + optimized_structure_path = results["optimized_poscar_paths"] + optimized_structures = list(optimized_structure_path.rglob("POSCAR*")) + + superconductor_data: SuperconductorData ={} + for structure in optimized_structures: + #Convert structure into ase format + try: + + atom = io.read(str(structure)) + formula = atom.get_chemical_formula() + + #information for critical temperature predictions + coords = atom.get_positions() + cells = atom.get_cell() + atom_numbers = atom.get_atomic_numbers() + atom_types = [x - 1 for x in atom_numbers] + + except Exception as e: + return{ + "results_file": {}, + "message": f"Structure {structure} read failed!" + } + + try: + if ambient: + dp_property = DeepProperty(model_file=str(used_model)) + else: + dp_property = DeepProperty(model_file=str(used_model), head="tc") + result = dp_property.eval(coords=coords, cells=cells, atom_types=atom_types)[0][0][0] + except Exception as e: + return{ + "results_file": {}, + "message": f"Structure {structure} critical temperature prediction failed!" + } + + superconductor_Tc: SuperconductorCriticalTemperatures = {} + + try: + superconductor_Tc["Tc"] = result + superconductor_Tc["path"] = str(structure) + except Exception as e: + return{ + "results_file": {}, + "message": f"Structure {structure} superconductor_Tc save failed!" + } + + superconductor_data[formula] = superconductor_Tc + + output_dir = Path("outputs") + output_dir.mkdir(parents=True, exist_ok=True) + results_file = output_dir / "critical_temperature.csv" + with open(results_file, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["formula", "Tc", "path"]) # header + for formula, props in superconductor_data.items(): + fname = Path(structure).name + writer.writerow([formula, props["Tc"], props["path"]]) + + return { + "results_file": results_file, + "message": f"Material critical temperature predictions are saved in {results_file}" + } + + + except Exception as e: + return { + "Tc_List": -1.0, + "message": f"Unexpected error: {str(e)}" + } + +#### Tool to predict superconductor critical temperature #### + +@mcp.tool() +def screen_superconductor( + structure_path: Path, + ambient: bool +) -> SuperconductorTcResult: + """ + Screen promising supercondutor from above hull structures at ambient or high pressure condition If at ambient condition, using /opt/agents/superconductor/models/ambient_pressure/model.ckpt-1000000.pt model predicts critical temperature. If at high pressure condition, using /opt/agents/superconductor/models/high_pressure/model.ckpt-100000.pt model predicts critical temperature. @@ -813,13 +939,16 @@ def predict_superconductor_Tc( Returns: SuperconductorTcResult: Dictionary with keys: - - results_file (Path): Path to access result files superconductor_critical_temperature.csv, which saved in outputs/superconductor_critical_temperature.csv + - results_file (Path): Path to access result files superconductor.csv, which saved in outputs/superconductor_critical_temperature.csv - message (str): Message about calculations results. """ try: structure_path = Path(structure_path) + base = Path(structure_path) + structure_path = base.parent if base.is_file() else base + if not structure_path.exists(): return { "results_file": {}, @@ -837,8 +966,12 @@ def predict_superconductor_Tc( "results_file": {}, "message": f"{used_model} not exists!" } - + threshold = 0.05 + results = calculate_superconductor_enthalpy(structure_path, threshold, ambient) + optimized_structure_path = results["e_above_hull_structures"] + above_hull_file = results["e_above_hull_values"] + # --- 0) load above-hull energies --- above_hull_map: dict[str, float] = {} if above_hull_file.is_file(): @@ -848,10 +981,12 @@ def predict_superconductor_Tc( # normalize to basename so lookups by Path(...).name match key = Path(row["structure"]).name # ← normalize above_hull_map[key] = float(row["energy"]) + print(f"key = {key}") #find all structures - structures = sorted(structure_path.rglob("POSCAR*")) + sorted(structure_path.rglob("*.cif")) - optimized_structures = run_optimization(list(structures), ambient) + + structures = list(optimized_structure_path.rglob("POSCAR*")) + superconductor_data: SuperconductorData ={} for structure in structures: #Convert structure into ase format @@ -899,12 +1034,21 @@ def predict_superconductor_Tc( output_dir = Path("outputs") output_dir.mkdir(parents=True, exist_ok=True) - results_file = output_dir / "superconductor_critical_temperature.csv" + results_file = output_dir / "superconductor.csv" + + if not superconductor_data: + with results_file.open("w") as f: + f.write("No promising candidates found.\n") + return { + "results_file": results_file, + "message": "No promising candidates found." + } + with open(results_file, "w", newline="") as f: writer = csv.writer(f) writer.writerow(["formula", "Tc", "path", "e_above_hull"]) # header for formula, props in superconductor_data.items(): - fname = Path(structure).name + fname = Path(props["path"]).name eh = above_hull_map.get(fname, "") writer.writerow([formula, props["Tc"], props["path"], eh]) @@ -925,7 +1069,6 @@ def predict_superconductor_Tc( # ====== Run Server ====== if __name__ == "__main__": - # Get transport type from environment variable, default to SSE - transport_type = os.getenv('MCP_TRANSPORT', 'sse') - mcp.run(transport=transport_type) + logging.info("Starting SuperconductorServer on port 50002...") + mcp.run(transport="sse") diff --git a/servers/thermoelectric/server.py b/servers/thermoelectric/server.py index 6309caf..6b9ee7e 100644 --- a/servers/thermoelectric/server.py +++ b/servers/thermoelectric/server.py @@ -36,39 +36,17 @@ import json import csv -import argparse - # Setup logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) -def parse_args(): - """Parse command line arguments for MCP server.""" - parser = argparse.ArgumentParser(description="DPA Calculator MCP Server") - parser.add_argument('--port', type=int, default=50001, help='Server port (default: 50001)') - parser.add_argument('--host', default='0.0.0.0', help='Server host (default: 0.0.0.0)') - parser.add_argument('--log-level', default='INFO', - choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], - help='Logging level (default: INFO)') - try: - args = parser.parse_args() - except SystemExit: - class Args: - port = 50001 - host = '0.0.0.0' - log_level = 'INFO' - args = Args() - return args - -args = parse_args() - # Initialize MCP server mcp = CalculationMCPServer( "ThermoelectricMaterialsServer", - host=args.host, - port=args.port + host="0.0.0.0", + port=50001 ) # ====== Tool 1: Predict Material Thermoelectronic Properties ====== @@ -185,12 +163,18 @@ def eval_properties( #Define props for atom results: MaterialData = {} props_results: MaterialProperties = {} - + structure_file = Path(structure_file) if not structure_file.exists(): return {"results": {}, "properties": {}, "message": f"Structure file not found: {structure_file}"} - structures = sorted(structure_file.rglob("POSCAR*")) + sorted(structure_file.rglob("*.cif")) + fmax=0.0005 + nsteps = 2000 + pressure = 0.0 + result = run_pressure_optimization(structure_file,fmax,nsteps,pressure) + optimized_structure_path = result["optimized_poscar_paths"] + + structures = list(optimized_structure_path.rglob("POSCAR*")) for structure in structures: try: if structure.name.upper().startswith("POSCAR"): @@ -328,261 +312,371 @@ class GenerateCalypsoStructureResult(TypedDict): #========== Tool generate calypso structures =========== +#@mcp.tool() +#def generate_calypso_thermoele_structures( +# species: List[str], +# n_tot: int +# )->GenerateCalypsoStructureResult: +# """ +# Generate n_tot CALYPSO structures using specified species. +# If user did not mention species and total number structures to generate, please remind the user to provide these information. +# +# Args: +# species (List[str]): A list of chemical element symbols (e.g., ["Mg", "O", "Si"]). These elements will be used as building blocks in the CALYPSO structure generation. +# All element symbols must be from the supported element list internally defined in the tool. +# +# n_tot (int): The number of CALYPSO structure configurations to generate. Each structure will be generated in a separate subdirectory (e.g., generated_calypso/0/, generated_calypso/1/, etc.) +# Return: +# GenerateCalypsoStructureResult with keys: +# - poscar_paths (Path): Path to access generated structures POSCAR. All structures are saved in outputs/poscars_for_optimization/ +# - message (str): Message about calculation results information. +# """ +# +# def get_props(s_list): +# """ +# Get atomic number, atomic radius, and atomic volume infomation for interested species +# +# Args: +# s_list: species list needed to get atomic number, atomic radius, and atomic volume infomation +# +# Return: +# z_list (List): atomic number list for given species list, +# r_list (List): atomic radius list for given species list, +# v_list (List): atomic volume list for given species list. +# """ +# +# z_list, r_list, v_list = [], [], [] +# for s in s_list: +# if s not in ELEMENT_PROPS: +# raise ValueError(f"Unsupported element: {s}") +# props = ELEMENT_PROPS[s] +# z_list.append(props["Z"]) +# r_list.append(props["r"]) +# v_list.append(props["v"]) +# return z_list, r_list, v_list +# +# def generate_counts(n): +# return [random.randint(1, 10) for _ in range(n)] +# +# def write_input(path, species, z_list, n_list, r_mat, volume): +# """ +# Write calypso input files for given species combination with atomic number, number of each species, radius matrix and total volume +# +# Args: +# - path (Path): Path to save input file, +# - species (List[str]): Species list +# - z_list (List[int]): atomic number list +# - n_list (List[int]): number of each species list +# - r_mat: radius matrix +# - volume (float): total volume +# """ +# +# # Step 1: reorder all based on atomic number +# sorted_indices = sorted(range(len(z_list)), key=lambda i: z_list[i]) +# species = [species[i] for i in sorted_indices] +# z_list = [z_list[i] for i in sorted_indices] +# n_list = [n_list[i] for i in sorted_indices] +# r_mat = r_mat[np.ix_(sorted_indices, sorted_indices)] # reorder matrix +# +# # Step 2: write input.dat +# with open(path / "input.dat", "w") as f: +# f.write(f"SystemName = {' '.join(species)}\n") +# f.write(f"NumberOfSpecies = {len(species)}\n") +# f.write(f"NameOfAtoms = {' '.join(species)}\n") +# f.write("@DistanceOfIon\n") +# for i in range(len(species)): +# row = " ".join(f"{r_mat[i][j]:.3f}" for j in range(len(species))) +# f.write(row + "\n") +# f.write("@End\n") +# f.write(f"Volume = {volume:.2f}\n") +# f.write(f"AtomicNumber = {' '.join(str(z) for z in z_list)}\n") +# f.write(f"NumberOfAtoms = {' '.join(str(n) for n in n_list)}\n") +# f.write("""Ialgo = 2 +#PsoRatio = 0.5 +#PopSize = 1 +#GenType = 1 +#ICode = 15 +#NumberOfLbest = 4 +#NumberOfLocalOptim = 3 +#Command = sh submit.sh +#MaxTime = 9000 +#MaxStep = 1 +#PickUp = F +#PickStep = 1 +#Parallel = F +#NumberOfParallel = 4 +#Split = T +#PSTRESS = 2000 +#fmax = 0.01 +#FixCell = F +#""") +# +# +# +# #===== Step 1: Generate calypso input files ========== +# outdir = Path("generated_calypso") +# outdir.mkdir(parents=True, exist_ok=True) +# +# +# z_list, r_list, v_list = get_props(species) +# for i in range(n_tot): +# try: +# n_list = generate_counts(len(species)) +# volume = sum(n * v for n, v in zip(n_list, v_list)) +# r_mat = np.add.outer(r_list, r_list) * 0.529 # bohr → Å +# +# struct_dir = outdir / f"{i}" +# if not struct_dir.exists(): +# struct_dir.mkdir(parents=True, exist_ok=True) +# +# #Prepare calypso input.dat +# write_input(struct_dir, species, z_list, n_list, r_mat, volume) +# except Exception as e: +# return{ +# "poscar_paths" : None, +# "message": "Input files generations for calypso failed!" +# } +# +# #Execuate calypso calculation and screening +# flim_ase_path = Path("/opt/agents/thermal_properties/flim_ase/flim_ase.py") +# command = f"/opt/agents/thermal_properties/calypso/calypso.x >> tmp_log && python {flim_ase_path}" +# if not flim_ase_path.exists(): +# return{ +# "poscar_paths": None, +# "message": "flim_ase.py did not found!" +# +# } +# try: +# subprocess.run(command, cwd=struct_dir, shell=True) +# except Exception as e: +# return{ +# "poscar_paths": None, +# "message": "calypso.x execute failed!" +# } +# +# #Clean struct_dir only save input.dat and POSCAR_1 +# for file in struct_dir.iterdir(): +# if file.name not in ["input.dat", "POSCAR_1"]: +# if file.is_file(): +# file.unlink() +# elif file.is_dir(): +# shutil.rmtree(file) +# +# # Step 3: Collect POSCAR_1 into POSCAR_n format +# try: +# output_dir = Path("outputs") +# output_dir.mkdir(parents=True, exist_ok=True) +# final_dir = output_dir / "poscars_for_optimization" +# final_dir.mkdir(parents=True, exist_ok=True) +# counter = 0 +# for struct_dir in outdir.iterdir(): +# poscar_path = struct_dir / "POSCAR_1" +# if poscar_path.exists(): +# new_name = final_dir / f"POSCAR_{counter}" +# shutil.copy(poscar_path, new_name) +# counter += 1 +# +# return{ +# "poscar_paths": Path(final_dir), +# "message": f"Calypso generated {n_tot} structures with {species} successfully!" +# } +# except Exception as e: +# return{ +# "poscar_paths": None, +# "message": "Calypso generated POSCAR files collected failed!" +# } + + +##======== Tool generate CrystalFormer Structures ========== +#class GenerateCryFormerStructureResult(TypedDict): +# poscar_paths: Path +# message: str +# +#@mcp.tool() +#def generate_crystalformer_thermoele_structures( +# space_group: int, +# target_props: Optional[List[str]], +# target_values: Optional[List[float]], +# comparison_ops: Optional[List[str]], +# n_tot: int +#)->GenerateCryFormerStructureResult: +# """ +# Generate conditional structures with target properties and space group number. Different target properties used different property model. +# If user did not mention target property, please use band gap as target_prop. If user did not mention the space group please remind the user to +# give a value. If user did not mentioned the comparison operator comparison_ops, please remind the user to give a value +# +# Args: +# space_group (int): Target space group number for generated structures. +# target_props (Optional[List[str]]): Target properties for generated structures. +# - "bandgap": hse functional band gap as target property, +# - "sound": sound velocity as target property +# If none, please use bandgap as target property directly. +# target_values (Optional[List[float]]): Target property values for target properties +# comparison_ops (Optional[List[str]]): One per target_prop; each must be one of "greater", "less", "equal", "minimize". If none, please use greater for all target_props. +# n_tot (int): Total number of structures generated +# Returns: +# poscar_paths (Path): Path to generated POSCAR. +# message (str): Message about calculation results. +# """ +# try: +# if space_group is None or space_group <= 0: +# raise ValueError("Please provide a positive integer for `space_group`.") +# +# if not target_values or len(target_values) != len(target_props): +# raise ValueError("`target_values` must be provided and match the length of `target_props`.") +# +# valid_ops = ("greater", "less", "equal", "minimize") +# if len(comparison_ops) != len(target_props): +# raise ValueError("`comparison_ops` length must match `target_props` length.") +# for op in comparison_ops: +# if op not in valid_ops: +# raise ValueError(f"Invalid comparison op '{op}'. Must be one of {valid_ops}.") +# +# workdir = Path("/opt/agents/mcp_tool") +# outputs = workdir/ "target" +# +# mode = "multi" if len(target_props) > 1 else "single" +# +# #total space group number to generated +# upper=min(space_group, n_tot) +# random_spacegroup_num = random.randint(1,upper) +# print(f"n_tot = {n_tot}") +# print(f"random_spacegroup_num = {random_spacegroup_num}") +# cmd = [ +# "uv", "run", "python", +# "mcp_tool.py", +# "--mode", str(mode), +# "--cond_model_type", *target_props, +# "--target", *map(str, target_values), +# "--target_type", *comparison_ops, +# "--alpha", "10", +# "--spacegroup", str(space_group), +# "--random_spacegroup_num", "1", #str(random_spacegroup_num), +# "--init_sample_num", str(n_tot), #str(int(n_tot/random_spacegroup_num)), +# "--mc_steps", "2000" +# ] +# +# print(f"cmd = {cmd}") +# subprocess.run(cmd, cwd=workdir, check=True) +# output_path = Path("outputs") +# if output_path.exists(): +# shutil.rmtree(output_path) +# shutil.copytree(outputs, output_path) +# return { +# "poscar_paths": output_path, +# "message": "CrystalFormer structure generation successfully!" +# } +# except Exception as e: +# return { +# "poscar_paths": None, +# "message": "CrystalFormer Generation failed!" +# } + +#====== Tool to do geometry optimization========= +class RunOptimizationResult(TypedDict): + optimized_poscar_paths: Path + message: str + @mcp.tool() -def generate_calypso_structures( - species: List[str], - n_tot: int - )->GenerateCalypsoStructureResult: +def run_pressure_optimization( + structure_path: Path, + fmax:float, + nsteps:int, + pressure:float +) -> RunOptimizationResult: """ - Generate n_tot CALYPSO structures using specified species. - If user did not mention species and total number structures to generate, please remind the user to provide these information. + Optimize structures with DP model at ambient or high pressure condition. - Args: - species (List[str]): A list of chemical element symbols (e.g., ["Mg", "O", "Si"]). These elements will be used as building blocks in the CALYPSO structure generation. - All element symbols must be from the supported element list internally defined in the tool. - - n_tot (int): The number of CALYPSO structure configurations to generate. Each structure will be generated in a separate subdirectory (e.g., generated_calypso/0/, generated_calypso/1/, etc.) - Return: - GenerateCalypsoStructureResult with keys: - - poscar_paths (Path): Path to access generated structures POSCAR. All structures are saved in outputs/poscars_for_optimization/ - - message (str): Message about calculation results information. + Args: + - structure_path (Path): Path to access structures need to be optimized + - ambient (bool): Wether consider ambient condition + Return: + - optimized_structure_path (Path): Path to access optimized structures """ + opt_py = Path("/opt/agents/thermal_properties/geo_opt/opt_multi.py") - def get_props(s_list): - """ - Get atomic number, atomic radius, and atomic volume infomation for interested species - - Args: - s_list: species list needed to get atomic number, atomic radius, and atomic volume infomation - - Return: - z_list (List): atomic number list for given species list, - r_list (List): atomic radius list for given species list, - v_list (List): atomic volume list for given species list. - """ - - z_list, r_list, v_list = [], [], [] - for s in s_list: - if s not in ELEMENT_PROPS: - raise ValueError(f"Unsupported element: {s}") - props = ELEMENT_PROPS[s] - z_list.append(props["Z"]) - r_list.append(props["r"]) - v_list.append(props["v"]) - return z_list, r_list, v_list - - def generate_counts(n): - return [random.randint(4, 4) for _ in range(n)] - - def write_input(path, species, z_list, n_list, r_mat, volume): - """ - Write calypso input files for given species combination with atomic number, number of each species, radius matrix and total volume - - Args: - - path (Path): Path to save input file, - - species (List[str]): Species list - - z_list (List[int]): atomic number list - - n_list (List[int]): number of each species list - - r_mat: radius matrix - - volume (float): total volume - """ - - # Step 1: reorder all based on atomic number - sorted_indices = sorted(range(len(z_list)), key=lambda i: z_list[i]) - species = [species[i] for i in sorted_indices] - z_list = [z_list[i] for i in sorted_indices] - n_list = [n_list[i] for i in sorted_indices] - r_mat = r_mat[np.ix_(sorted_indices, sorted_indices)] # reorder matrix - - # Step 2: write input.dat - with open(path / "input.dat", "w") as f: - f.write(f"SystemName = {' '.join(species)}\n") - f.write(f"NumberOfSpecies = {len(species)}\n") - f.write(f"NameOfAtoms = {' '.join(species)}\n") - f.write("@DistanceOfIon\n") - for i in range(len(species)): - row = " ".join(f"{r_mat[i][j]:.3f}" for j in range(len(species))) - f.write(row + "\n") - f.write("@End\n") - f.write(f"Volume = {volume:.2f}\n") - f.write(f"AtomicNumber = {' '.join(str(z) for z in z_list)}\n") - f.write(f"NumberOfAtoms = {' '.join(str(n) for n in n_list)}\n") - f.write("""Ialgo = 2 -PsoRatio = 0.5 -PopSize = 1 -GenType = 1 -ICode = 15 -NumberOfLbest = 4 -NumberOfLocalOptim = 3 -Command = sh submit.sh -MaxTime = 9000 -MaxStep = 1 -PickUp = F -PickStep = 1 -Parallel = F -NumberOfParallel = 4 -Split = T -PSTRESS = 2000 -fmax = 0.01 -FixCell = F -""") - - - - #===== Step 1: Generate calypso input files ========== - outdir = Path("generated_calypso") - outdir.mkdir(parents=True, exist_ok=True) - - - z_list, r_list, v_list = get_props(species) - for i in range(n_tot): - try: - n_list = generate_counts(len(species)) - volume = sum(n * v for n, v in zip(n_list, v_list)) - r_mat = np.add.outer(r_list, r_list) * 0.529 # bohr → Å - - struct_dir = outdir / f"{i}" - if not struct_dir.exists(): - struct_dir.mkdir(parents=True, exist_ok=True) - - #Prepare calypso input.dat - write_input(struct_dir, species, z_list, n_list, r_mat, volume) - except Exception as e: - return{ - "poscar_paths" : None, - "message": "Input files generations for calypso failed!" - } - - #Execuate calypso calculation and screening - flim_ase_path = Path("/opt/agents/thermal_properties/flim_ase/flim_ase.py") - command = f"/opt/agents/thermal_properties/calypso/calypso.x >> tmp_log && python {flim_ase_path}" - if not flim_ase_path.exists(): - return{ - "poscar_paths": None, - "message": "flim_ase.py did not found!" - - } - try: - subprocess.run(command, cwd=struct_dir, shell=True) - except Exception as e: + try: + #delete previous calculations results first + optimized_dir = Path("optimized_poscar") + if optimized_dir.exists(): + if optimized_dir.is_dir() and not optimized_dir.is_symlink(): + shutil.rmtree(optimized_dir) + else: + optimized_dir.unlink() + deep_md = Path("deepmd_npy") + if deep_md.exists(): + if deep_md.is_dir() and not deep_md.is_symlink(): + shutil.rmtree(deep_md) # delete the whole folder + else: + deep_md.unlink() + traj_path = Path("traj") + if traj_path.exists(): + if traj_path.is_dir() and not traj_path.is_symlink(): + shutil.rmtree(traj_path) # delete the whole folder + else: + traj_path.unlink() + + + base = Path(structure_path) + structure_path = base.parent if base.is_file() else base + poscar_files = list(p for pat in ("POSCAR*", "*.cif", "*.CIF") + for p in structure_path.rglob(pat)) + #poscar_files = list(structure_path.rglob("POSCAR*")) + if not poscar_files: + print(f"No POSCAR files found under: {structure_path}. Exit.") return{ - "poscar_paths": None, - "message": "calypso.x execute failed!" + "optimized_poscar_paths": None, + "message": f"No POSCAR files found under: {structure_path}" } + # Build command: use the actual path to opt_py, not the literal string "opt_py" + cmd = [ + "python", + str(opt_py), # <— use the variable here + str(fmax), + str(pressure), + str(pressure), + str(nsteps), + ] + [str(p) for p in poscar_files] + + # Run and check for errors + subprocess.run(cmd, check=True) - #Clean struct_dir only save input.dat and POSCAR_1 - for file in struct_dir.iterdir(): - if file.name not in ["input.dat", "POSCAR_1"]: - if file.is_file(): - file.unlink() - elif file.is_dir(): - shutil.rmtree(file) - - # Step 3: Collect POSCAR_1 into POSCAR_n format + except Exception as e: + print("Geometry Optimization failed!") try: - output_dir = Path("outputs") - output_dir.mkdir(parents=True, exist_ok=True) - final_dir = output_dir / "poscars_for_optimization" - final_dir.mkdir(parents=True, exist_ok=True) - counter = 0 - for struct_dir in outdir.iterdir(): - poscar_path = struct_dir / "POSCAR_1" - if poscar_path.exists(): - new_name = final_dir / f"POSCAR_{counter}" - shutil.copy(poscar_path, new_name) - counter += 1 + parse_py = Path("/opt/agents/thermal_properties/geo_opt/parse_traj.py") + cmd = [ + "python", + str(parse_py) + ] - return{ - "poscar_paths": Path(final_dir), - "message": f"Calypso generated {n_tot} structures with {species} successfully!" - } + # Run and check for errors + subprocess.run(cmd, check=True) except Exception as e: - return{ - "poscar_paths": None, - "message": "Calypso generated POSCAR files collected failed!" - } - + print("Collect optimized structures failed!") + try: + frames =glob.glob('deepmd_npy/*/') + multisys = dpdata.MultiSystems() + for frame in frames: + sys = dpdata.System(frame,'deepmd/npy') + multisys.append(sys) + + optimized_dir.mkdir(parents=True, exist_ok=True) # Create the directory if it doesn't exist + + count=0 + for frame in multisys: + for system in frame: + system.to_vasp_poscar(optimized_dir / f'POSCAR_{count}') + count+=1 +# optimized_structures = list(optimized_dir.rglob("POSCAR*")) + except Exception as e: + print("Convert dpdata to POSCAR failed!") -#======== Tool generate CrystalFormer Structures ========== -class GenerateCryFormerStructureResult(TypedDict): - poscar_paths: Path - message: str + return{ + "optimized_poscar_paths": optimized_dir, + "message": "Geometry Optimization successfully" + } -@mcp.tool() -def generate_crystalformer_structures( - space_group: int, - target_props: Optional[List[str]], - target_values: Optional[List[float]], - n_tot: int -)->GenerateCryFormerStructureResult: - """ - Generate conditional structures with target properties and space group number. Different target properties used different property model. - If user did not mention target property, please use band gap as target_prop. If user did not mention the space group please remind the usr to - give a value. - - Args: - space_group (int): Target space group number for generated structures. - target_props (Optional[List[str]]): Target properties for generated structures. - - "band_gap": hse functional band gap as target property, - - "sound_vel": sound velocity as target property - If none, please use band_gap as target property directly. - target_values (Optional[List[float]]): Target property values for target properties - n_tot (int): Total number of structures generated - Returns: - poscar_paths (Path): Path to generated POSCAR. - message (str): Message about calculation results. - """ - try: - supported_props = ["band_gap", "sound_vel"] - - model = Path("/opt/agents/thermal_properties/models") - model_dirs = { - "band_gap": model / "bandgap" / "model.ckpt.pt", - "sound_vel": model / "shear_modulus" / "model.ckpt.pt", - } - - try: - - #activate uv - workdir = Path("/opt/agents/crystalformer_gpu") - outputs = workdir/ "outputs" - - - alpha = [0.5] - mc_steps = 2000 - for prop in target_props: - cmd = [ - "uv", "run", "python", - "crystalformer_mcp.py", - "--cond_model_path", str(model_dirs[prop]), - "--target", str(target_values), - "--alpha", str(alpha), - "--spacegroup", str(space_group), - "--mc_steps", str(mc_steps), - "--num_samples", str(n_tot), - "--output_path", str(outputs) - ] - subprocess.run(cmd, cwd=workdir, check=True) - - output_path = Path("outputs") - if output_path.exists(): - shutil.rmtree(output_path) - shutil.copytree(outputs, output_path) - return { - "poscar_paths": output_path, - "message": "CrystalFormer structure generation successfully!" - } - except Exception as e: - return { - "poscar_paths": None, - "message": "CrystalFormer Execution failed!" - } - - except Exception as e: - return { - "poscar_paths": None, - "message": "CrystalFormer Generation failed!" - } #======== Tool predict enthalp and select on hull structures========= @@ -594,19 +688,19 @@ class CalculateEntalpyResult(TypedDict): message: str @mcp.tool() -def calculate_enthalpy( +def calculate_thermoele_enthalpy( structure_path: Path, threshold: float, pressure: float )->CalculateEntalpyResult: """ Optimize the crystal structure using DP at a given pressure, then evaluate the enthalpy of the optimized structure, - and finally screen for structures above convex hull with a value of threshold. + and finally screen for structures above convex hull with a value of threshold (the unit of this threshold is eV). When user call cal_enthalpy reminder user to give pressure condition and threshold value to screen structures Args: - structure_file (Path): Path to the structure files (e.g. POSCAR) - - threshold (float): Upper limit for energy above hull. Only structures with energy-above-hull values smaller than this threshold will be selected. + - threshold (float): Upper limit for energy above hull. Only structures with energy-above-hull values smaller than this threshold will be selected.The unit is eV. - pressure (float): Pressure used in geometry optimization process Return: @@ -626,73 +720,13 @@ def calculate_enthalpy( enthalpy_dir.mkdir(parents=True, exist_ok=True) try: - poscar_files = list(structure_path.rglob("POSCAR*")) - opt_py = Path("/opt/agents/thermal_properties/geo_opt/opt_multi.py") - - try: - # Build command: use the actual path to opt_py, not the literal string "opt_py" - cmd = [ - "python", - str(opt_py), # <— use the variable here - str(fmax), - str(pressure), - str(pressure), - str(nsteps), - ] + [str(p) for p in poscar_files] - - # Run and check for errors - subprocess.run(cmd, check=True) - - except Exception as e: - return{ - "enthalpy_file": [], - "e_above_hull_structures": [], - "e_above_hull_values": [], - "message": "Geometry Optimization failed!" - } - - try: - parse_py = Path("/opt/agents/thermal_properties/geo_opt/parse_traj.py") - cmd = [ - "python", - str(parse_py) - ] - - # Run and check for errors - subprocess.run(cmd, check=True) - except Exception as e: - return{ - "enthalpy_file": [], - "e_above_hull_structures": [], - "e_above_hull_values": [], - "message": "Screen traj failed!" - } - - try: - frames =glob.glob('deepmd_npy/*/') - multisys = dpdata.MultiSystems() - for frame in frames: - sys = dpdata.System(frame,'deepmd/npy') - multisys.append(sys) - - optimized_dir = Path("optimized_poscar") - optimized_dir.mkdir(parents=True, exist_ok=True) # Create the directory if it doesn't exist - - count=0 - for frame in multisys: - for system in frame: - system.to_vasp_poscar(optimized_dir / f'POSCAR_{count}') - count+=1 - except Exception as e: - return{ - "enthalpy_file": [], - "e_above_hull_structures": [], - "e_above_hull_values": [], - "message": "Convert optimized structure to POSCAR failed!" - } try: - poscar_files = list(optimized_dir.rglob("POSCAR*")) + #poscar_files = list(structure_path.rglob("POSCAR*")) + results = run_pressure_optimization(structure_path,fmax,nsteps,pressure) + optimized_structure_path = results["optimized_poscar_paths"] + poscar_files = list(optimized_structure_path.rglob("POSCAR*")) + #poscar_files = run_pressure_optimization(structure_path,fmax,nsteps,pressure) enthalpy_py = Path("/opt/agents/thermal_properties/geo_opt/predict_enthalpy.py") cmd = [ "python", @@ -797,7 +831,8 @@ def calculate_enthalpy( shutil.copy(src, enthalpy_dir) src = Path("/opt/agents/thermal_properties/geo_opt/results/e_above_hull_50meV.csv") - shutil.copy(src, enthalpy_dir) + dest = enthalpy_dir / f"e_above_hull.csv" + shutil.copy(src, dest) except Exception as e: return{ @@ -811,8 +846,9 @@ def calculate_enthalpy( on_hull_optimized_structures = enthalpy_dir / "e_above_hull_structures" on_hull_optimized_structures.mkdir(parents=True, exist_ok=True) - e_above_hull_file = enthalpy_dir / "e_above_hull_50meV.csv" - e_above_hull_output = enthalpy_dir / "e_above_hull.csv" + e_above_hull_file = enthalpy_dir / f"e_above_hull.csv" + threshold_meV = int(threshold*1000) + e_above_hull_output = enthalpy_dir / f"e_above_hull_{threshold_meV}meV.csv" with e_above_hull_file.open("r") as f, e_above_hull_output.open("w") as fout: # write header for new CSV @@ -851,36 +887,6 @@ def calculate_enthalpy( # write to new CSV, including the absolute path fout.write(f"{cleaned},{energy},{on_hull_optimized_poscar.resolve()}\n") -# with e_above_hull_file.open("r") as f: -# #If there is a header, skip -# first = True -# for line in f: -# line.strip() -# if not line: -# continue -# if first and any(c.isalpha() for c in line): -# first = False -# continue -# first = False -# -# parts = line.split(' ') -# if len(parts) < 4: -# continue -# -# try: -# energy = float(parts[1]) -# except ValueError: -# continue -# -# raw = parts[3] -# cleaned = raw.strip().strip('"').strip("'") -# on_hull_optimized_poscar = Path('.') / cleaned -# -# if energy <= 0.05 and on_hull_optimized_poscar.is_file(): -# try: -# shutil.copy(on_hull_optimized_poscar, on_hull_optimized_structures) -# except Exception as e: -# print(f"Copy on hull optimized poscar {on_hull_optimized_poscar} failed!") except Exception as e: return{ "enthalpy_file": [], @@ -922,15 +928,15 @@ class ScreenThermoelectricCandidateResults(TypedDict): @mcp.tool() def screen_thermoelectric_candidate( structure_path: Path, - above_hull_file: Path + pressure: float )->ScreenThermoelectricCandidateResults: """ - Screen promising thermoelectric materials based on band gap, sound speed and space group number requirements. + Screen promising thermoelectric materials based on band gap, sound speed and space group number requirements under given pressure. If user did not define pressure please remind user to give a value Args: structure_file (Path): Path to structure files - above_hull_file (Path): Path to above_hull.csv file which about about hull energy information + pressure (float): working pressure in GPa Return: ScreenThermoelectricCandidateResults with keys: @@ -1016,25 +1022,37 @@ def get_space_group_number(structure): return space_group_number - # --- 0) load above-hull energies --- - above_hull_map: dict[str, float] = {} - if above_hull_file.is_file(): - with above_hull_file.open("r") as fh: - reader = csv.DictReader(fh) - for row in reader: - # normalize to basename so lookups by Path(...).name match - key = Path(row["structure"]).name # ← normalize - above_hull_map[key] = float(row["energy"]) - #Predict bandgap try: structure_path = Path(structure_path) + + ##predict enthalpy to pick up above hull structures within threshold + threshold = 0.05 + results = calculate_thermoele_enthalpy(structure_path, threshold, pressure) + + structure_path = None + structure_path = results["e_above_hull_structures"] + above_hull_file = results["e_above_hull_values"] + + #e_above_hull_structures = list(e_above_hull_structure_path.rglob("POSCAR*")) + #for e_above_hull_structure in e_above_hull_structures: + # print(f"check e_above_hull_structures = {e_above_hull_structures}") + if not structure_path.exists(): return {"thermoelectric_file": {}, "message": f"Structure path not found: {structure_path}"} - results = predict_thermoelectric_properties(structure_path, ["band_gap", "G", "K"]) - structures_properties = results["properties"] + thermoelectric_results = predict_thermoelectric_properties(structure_path, ["band_gap", "G", "K"]) + structures_properties = thermoelectric_results["properties"] + + above_hull_map: dict[str, float] = {} + if above_hull_file.is_file(): + with above_hull_file.open("r") as fh: + reader = csv.DictReader(fh) + for row in reader: + # normalize to basename so lookups by Path(...).name match + key = Path(row["structure"]).name + above_hull_map[key] = float(row["energy"]) thermoelectric_candidates: ThermoelectricCandidatesData = {} @@ -1091,13 +1109,6 @@ def get_space_group_number(structure): try: -# # Sort results by sound_velocity -# sorted_candidates = dict( -# sorted( -# thermoelectric_candidates.items(), -# key=lambda item: item[1]["sound_velocity"] -# ) -# ) sorted_candidates = dict( sorted( thermoelectric_candidates.items(), @@ -1116,6 +1127,14 @@ def get_space_group_number(structure): output_dir = Path("outputs") output_dir.mkdir(parents=True, exist_ok=True) results_file = output_dir / "thermoelectric_material_candidates.csv" + + if not sorted_candidates: + with results_file.open("w") as f: + f.write("No promising candidates found.\n") + return { + "thermoelectric_file": str(results_file), + "message": "No promising candidates found." + } # Collect all field names from the first thermo_props dict sample_props = next(iter(sorted_candidates.values())) @@ -1178,19 +1197,6 @@ def get_space_group_number(structure): "thermoelectric_file": results_file, "message": message } - # --- build a CSV preview of the first 10 candidates --- -# preview_lines = [] -# for structure, props in list(sorted_candidates.items())[:10]: -# # join each property into “key=value” pairs -# prop_str = ", ".join(f"{k}={v}" for k, v in props.items()) -# preview_lines.append(f"{formula}: {prop_str}") -# -# message = "Predicted properties:\n" + "\n".join(preview_lines) -# -# return{ -# "thermoelectric_file": Path(results_file), -# "message": message -# } except Exception as e: return{ "thermoelectric_file" : [], @@ -1201,6 +1207,6 @@ def get_space_group_number(structure): # ====== Run Server ====== if __name__ == "__main__": - transport_type = os.getenv('MCP_TRANSPORT', 'sse') - mcp.run(transport=transport_type) + logging.info("Starting ThermoelectricMaterialsServer on port 50001...") + mcp.run(transport="sse")