From 184a05ea0680ce2e477e5102af1aaae6440737c9 Mon Sep 17 00:00:00 2001 From: root Date: Mon, 12 Jan 2026 11:30:22 +0800 Subject: [PATCH 1/5] Update thermoelectric and superconductor server --- .../superconductor_mcp_server.py | 282 ++++++++++----- servers/thermoelectric/server.py | 338 ++++++++---------- 2 files changed, 341 insertions(+), 279 deletions(-) diff --git a/servers/superconductor/superconductor_mcp_server.py b/servers/superconductor/superconductor_mcp_server.py index 5b32469..03bd6a9 100644 --- a/servers/superconductor/superconductor_mcp_server.py +++ b/servers/superconductor/superconductor_mcp_server.py @@ -19,36 +19,12 @@ from pymatgen.core import Composition -import argparse - # Setup logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) -def parse_args(): - """Parse command line arguments for MCP server.""" - parser = argparse.ArgumentParser(description="DPA Calculator MCP Server") - parser.add_argument('--port', type=int, default=50001, help='Server port (default: 50001)') - parser.add_argument('--host', default='0.0.0.0', help='Server host (default: 0.0.0.0)') - parser.add_argument('--log-level', default='INFO', - choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], - help='Logging level (default: INFO)') - try: - args = parser.parse_args() - except SystemExit: - class Args: - port = 50001 - host = '0.0.0.0' - log_level = 'INFO' - args = Args() - return args - -args = parse_args() - - - # Initialize MCP server mcp = CalculationMCPServer( "SuperconductorServer", @@ -56,16 +32,19 @@ class Args: port=50002 ) - -def run_optimization( - structures: List[Path], +class RunOptimizationResult(TypedDict): + optimized_poscar_paths: Path + message: str +@mcp.tool() +def run_superconductor_optimization( + structure_path: Path, ambient: bool -) -> List[Path]: +) -> RunOptimizationResult: """ Optimize structures with DP model at ambient or high pressure condition. Args: - - structure (Path): Path to access structures need to be optimized + - structure_path (Path): Path to access structures need to be optimized - ambient (bool): Wether consider ambient condition Return: - optimized_structure_path (Path): Path to access optimized structures @@ -79,6 +58,8 @@ def run_optimization( pressure = 200 nsteps = 2000 + + structures = list(structure_path.rglob("POSCAR*")) try: # Build command: use the actual path to opt_py, not the literal string "opt_py" @@ -123,11 +104,14 @@ def run_optimization( for frame in system: system.to_vasp_poscar(optimized_dir / f'POSCAR_{count}') count+=1 - optimized_structures = list(optimized_dir.rglob("POSCAR*")) + #optimized_structures = list(optimized_dir.rglob("POSCAR*")) except Exception as e: print("Collect POSCAR failed!") - return optimized_structures + return{ + "optimized_poscar_paths": optimized_dir, + "message": "Geometry Optimization successfully" + } ### Tool generate structures with Calypso class GenerateCalypsoStructureResult(TypedDict): @@ -184,7 +168,7 @@ class GenerateCalypsoStructureResult(TypedDict): @mcp.tool() -def generate_calypso_structure( +def generate_calypso_superconductor_structure( species: List[str], n_tot: int )->GenerateCalypsoStructureResult: @@ -364,10 +348,11 @@ class GenerateCryFormerStructureResult(TypedDict): message: str @mcp.tool() -def generate_crystalformer_structures( +def generate_crystalformer_superconductor_structures( space_group: int, ambient: bool, - target_values: List[float], + target_values: float, + comparison_ops:Optional[str], n_tot: int )->GenerateCryFormerStructureResult: """ @@ -375,58 +360,66 @@ def generate_crystalformer_structures( If ambient condition, please using /opt/agents/superconductor/models/ambient_pressure/model.ckpt-1000000.pt model predicts critical temperature. If high pressure condition, please using /opt/agents/superconductor/models/high_pressure/model.ckpt-100000.pt model predicts critical temperature. If user did not mention space group number requirement, pressure condition, please reminder user to give instruction. + If user did not mentioned the comparison operator comparison_ops, please remind the user to give a value Args: space_group (int): Target space group number for generated structures. ambient (bool): Wether consider ambient condition superconductor. - target_values (List[float]): Target critical temperature + target_values (float): Target critical temperature. + comparison_ops (Optional[str]): One per target_prop; each must be one of "greater", "less", "equal", "minimize". If none, please use greater for all target_props. n_tot (int): Total number of structures generated Returns: poscar_paths (Path): Path to generated POSCAR. message (str): Message about calculation results. """ try: - if ambient: - model = Path("/opt/agents/superconductor/models/ambient_pressure/model.ckpt-1000000.pt") - else: - model = Path("/opt/agents/superconductor/models/high_pressure/model.ckpt-100000.pt") - - try: - - #activate uv - workdir = Path("/opt/agents/crystalformer_gpu") - outputs = workdir/ "outputs" - - - alpha = [0.5] - mc_steps = 2000 - cmd = [ - "uv", "run", "python", - "crystalformer_mcp.py", - "--cond_model_path", str(model), - "--target", str(target_values), - "--alpha", str(alpha), - "--spacegroup", str(space_group), - "--mc_steps", str(mc_steps), - "--num_samples", str(n_tot), - "--output_path", str(outputs) - ] - subprocess.run(cmd, cwd=workdir, check=True) - - output_path = Path("outputs") - if output_path.exists(): - shutil.rmtree(output_path) - shutil.copytree(outputs, output_path) - return { - "poscar_paths": output_path, - "message": "CrystalFormer structure generation successfully!" - } - except Exception as e: + if ambient: + target_prop = "ambient_pressure" + else: + target_prop = "high_pressure" + + try: + + #activate uv + workdir = Path("/opt/agents/mcp_tool") + outputs = workdir/ "target" + + + mc_steps = 2000 + upper=min(space_group, n_tot) + random_spacegroup_num = random.randint(1,upper) + + cmd = [ + "uv", "run", "python", + "mcp_tool.py", + "--mode", 'single', + "--cond_model_type", target_prop, + "--target", str(target_values), + "--target_type", str(comparison_ops), + "--alpha", '10', + "--spacegroup", str(space_group), + "--random_spacegroup_num", "1", #str(random_spacegroup_num), + "--init_sample_num", str(n_tot), + "--mc_steps", str(mc_steps), + ] + + print(f"cmd = {cmd}") + + subprocess.run(cmd, cwd=workdir, check=True) + + output_path = Path("outputs") + if output_path.exists(): + shutil.rmtree(output_path) + shutil.copytree(outputs, output_path) return { - "poscar_paths": None, - "message": "CrystalFormer Execution failed!" + "poscar_paths": output_path, + "message": "CrystalFormer structure generation successfully!" } - + except Exception as e: + return { + "poscar_paths": None, + "message": "CrystalFormer Generation failed!" + } except Exception as e: return { "poscar_paths": None, @@ -448,7 +441,7 @@ class CalculateEntalpyResult(TypedDict): message: str #======================Tool to calculate structure enthalpy====================== @mcp.tool() -def calculate_enthalpy( +def calculate_superconductor_enthalpy( structure_path: Path, threshold: float, ambient: bool @@ -566,9 +559,11 @@ def calculate_enthalpy( enthalpy_dir.mkdir(parents=True, exist_ok=True) try: - poscar_files = list(structure_path.rglob("POSCAR*")) + #poscar_files = list(structure_path.rglob("POSCAR*")) try: - optimized_structures = run_optimization(list(poscar_files), ambient) + results = run_superconductor_optimization(structure_path,ambient) + optimized_structure_path = results["optimized_poscar_paths"] + optimized_structures = list(optimized_structure_path.rglob("POSCAR*")) except Exception as e: return{ "enthalpy_file": [], @@ -781,7 +776,7 @@ def calculate_enthalpy( "message": "Enthalpy prediction failed!" } -#### Tool to predict superconductor critical temperature #### + class SuperconductorCriticalTemperatures(TypedDict): Tc: float path: str @@ -794,12 +789,129 @@ class SuperconductorTcResult(TypedDict): @mcp.tool() def predict_superconductor_Tc( + structure_path: Path, + ambient: bool +) -> SuperconductorTcResult: + """ + Predict material critical temperature at different pressure conditions with pretrained dpa model. + If at ambient condition, using /opt/agents/superconductor/models/ambient_pressure/model.ckpt-1000000.pt model predicts critical temperature. + If at high pressure condition, using /opt/agents/superconductor/models/high_pressure/model.ckpt-100000.pt model predicts critical temperature. + + If user did not mention pressure condition, please remind user to choose ambient or high pressure condition. + + + Args: + structure_path (Path): Path to either structure file (POSCAR/CIF) + ambient (bool): Wether consider ambient condition + + Returns: + SuperconductorTcResult: Dictionary with keys: + - results_file (Path): Path to access result files critical_temperature.csv, which saved in outputs/superconductor_critical_temperature.csv + - message (str): Message about calculations results. + """ + try: + + + structure_path = Path(structure_path) + if not structure_path.exists(): + return { + "results_file": {}, + "message": f"Structure path not found: {structure_path}" + } + + #Determine used model for critical temperature prediction + if ambient: + used_model = Path("/opt/agents/superconductor/models/ambient_pressure/model.ckpt-1000000.pt") + else: + used_model = Path("/opt/agents/superconductor/models/high_pressure/model.ckpt-100000.pt") + + if not used_model.exists(): + return { + "results_file": {}, + "message": f"{used_model} not exists!" + } + #find all structures + results = run_superconductor_optimization(structure_path, ambient) + optimized_structure_path = results["optimized_poscar_paths"] + optimized_structures = list(optimized_structure_path.rglob("POSCAR*")) + + superconductor_data: SuperconductorData ={} + for structure in optimized_structures: + #Convert structure into ase format + try: + + atom = io.read(str(structure)) + formula = atom.get_chemical_formula() + + #information for critical temperature predictions + coords = atom.get_positions() + cells = atom.get_cell() + atom_numbers = atom.get_atomic_numbers() + atom_types = [x - 1 for x in atom_numbers] + + except Exception as e: + return{ + "results_file": {}, + "message": f"Structure {structure} read failed!" + } + + try: + if ambient: + dp_property = DeepProperty(model_file=str(used_model)) + else: + dp_property = DeepProperty(model_file=str(used_model), head="tc") + result = dp_property.eval(coords=coords, cells=cells, atom_types=atom_types)[0][0][0] + except Exception as e: + return{ + "results_file": {}, + "message": f"Structure {structure} critical temperature prediction failed!" + } + + superconductor_Tc: SuperconductorCriticalTemperatures = {} + + try: + superconductor_Tc["Tc"] = result + superconductor_Tc["path"] = str(structure) + except Exception as e: + return{ + "results_file": {}, + "message": f"Structure {structure} superconductor_Tc save failed!" + } + + superconductor_data[formula] = superconductor_Tc + + output_dir = Path("outputs") + output_dir.mkdir(parents=True, exist_ok=True) + results_file = output_dir / "critical_temperature.csv" + with open(results_file, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["formula", "Tc", "path"]) # header + for formula, props in superconductor_data.items(): + fname = Path(structure).name + writer.writerow([formula, props["Tc"], props["path"]]) + + return { + "results_file": results_file, + "message": f"Material critical temperature predictions are saved in {results_file}" + } + + + except Exception as e: + return { + "Tc_List": -1.0, + "message": f"Unexpected error: {str(e)}" + } + +#### Tool to predict superconductor critical temperature #### + +@mcp.tool() +def screen_superconductor( structure_path: Path, above_hull_file: Path, ambient: bool ) -> SuperconductorTcResult: """ - Predict superconductor critical temperature at different pressure conditions with pretrained dpa model. + Screen promising supercondutor from above hull structures at ambient or high pressure condition If at ambient condition, using /opt/agents/superconductor/models/ambient_pressure/model.ckpt-1000000.pt model predicts critical temperature. If at high pressure condition, using /opt/agents/superconductor/models/high_pressure/model.ckpt-100000.pt model predicts critical temperature. @@ -813,7 +925,7 @@ def predict_superconductor_Tc( Returns: SuperconductorTcResult: Dictionary with keys: - - results_file (Path): Path to access result files superconductor_critical_temperature.csv, which saved in outputs/superconductor_critical_temperature.csv + - results_file (Path): Path to access result files superconductor.csv, which saved in outputs/superconductor_critical_temperature.csv - message (str): Message about calculations results. """ try: @@ -848,10 +960,12 @@ def predict_superconductor_Tc( # normalize to basename so lookups by Path(...).name match key = Path(row["structure"]).name # ← normalize above_hull_map[key] = float(row["energy"]) + print(f"key = {key}") #find all structures - structures = sorted(structure_path.rglob("POSCAR*")) + sorted(structure_path.rglob("*.cif")) - optimized_structures = run_optimization(list(structures), ambient) + + structures = list(structure_path.rglob("POSCAR*")) + superconductor_data: SuperconductorData ={} for structure in structures: #Convert structure into ase format @@ -899,12 +1013,12 @@ def predict_superconductor_Tc( output_dir = Path("outputs") output_dir.mkdir(parents=True, exist_ok=True) - results_file = output_dir / "superconductor_critical_temperature.csv" + results_file = output_dir / "superconductor.csv" with open(results_file, "w", newline="") as f: writer = csv.writer(f) writer.writerow(["formula", "Tc", "path", "e_above_hull"]) # header for formula, props in superconductor_data.items(): - fname = Path(structure).name + fname = Path(props["path"]).name eh = above_hull_map.get(fname, "") writer.writerow([formula, props["Tc"], props["path"], eh]) diff --git a/servers/thermoelectric/server.py b/servers/thermoelectric/server.py index 34266f2..2c2c870 100644 --- a/servers/thermoelectric/server.py +++ b/servers/thermoelectric/server.py @@ -36,34 +36,12 @@ import json import csv -import argparse - # Setup logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) -def parse_args(): - """Parse command line arguments for MCP server.""" - parser = argparse.ArgumentParser(description="DPA Calculator MCP Server") - parser.add_argument('--port', type=int, default=50001, help='Server port (default: 50001)') - parser.add_argument('--host', default='0.0.0.0', help='Server host (default: 0.0.0.0)') - parser.add_argument('--log-level', default='INFO', - choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], - help='Logging level (default: INFO)') - try: - args = parser.parse_args() - except SystemExit: - class Args: - port = 50001 - host = '0.0.0.0' - log_level = 'INFO' - args = Args() - return args - -args = parse_args() - # Initialize MCP server mcp = CalculationMCPServer( "ThermoelectricMaterialsServer", @@ -329,12 +307,12 @@ class GenerateCalypsoStructureResult(TypedDict): #========== Tool generate calypso structures =========== @mcp.tool() -def generate_calypso_structures( +def generate_calypso_thermoele_structures( species: List[str], n_tot: int )->GenerateCalypsoStructureResult: """ - Generate n_tot CALYPSO structures using specified species. + Generate n_tot CALYPSO structures using specified species. It is element-guided structure generations, user need to provide chemical elements and total number If user did not mention species and total number structures to generate, please remind the user to provide these information. Args: @@ -372,7 +350,7 @@ def get_props(s_list): return z_list, r_list, v_list def generate_counts(n): - return [random.randint(4, 4) for _ in range(n)] + return [random.randint(1, 10) for _ in range(n)] def write_input(path, species, z_list, n_list, r_mat, volume): """ @@ -509,81 +487,161 @@ class GenerateCryFormerStructureResult(TypedDict): message: str @mcp.tool() -def generate_crystalformer_structures( +def generate_crystalformer_thermoele_structures( space_group: int, target_props: Optional[List[str]], target_values: Optional[List[float]], + comparison_ops: Optional[List[str]], n_tot: int )->GenerateCryFormerStructureResult: """ - Generate conditional structures with target properties and space group number. Different target properties used different property model. - If user did not mention target property, please use band gap as target_prop. If user did not mention the space group please remind the usr to - give a value. + Generate conditional structures with target properties and space group number. This is property-guided structure generation tool. Different target properties used different property model. + If user did not mention target property, please use band gap as target_prop. If user did not mention the space group please remind the user to + give a value. If user did not mentioned the comparison operator comparison_ops, please remind the user to give a value Args: space_group (int): Target space group number for generated structures. target_props (Optional[List[str]]): Target properties for generated structures. - - "band_gap": hse functional band gap as target property, - - "sound_vel": sound velocity as target property - If none, please use band_gap as target property directly. + - "bandgap": hse functional band gap as target property, + - "sound": sound velocity as target property + If none, please use bandgap as target property directly. target_values (Optional[List[float]]): Target property values for target properties + comparison_ops (Optional[List[str]]): One per target_prop; each must be one of "greater", "less", "equal", "minimize". If none, please use greater for all target_props. n_tot (int): Total number of structures generated Returns: poscar_paths (Path): Path to generated POSCAR. message (str): Message about calculation results. """ try: - supported_props = ["band_gap", "sound_vel"] - - model = Path("/opt/agents/thermal_properties/models") - model_dirs = { - "band_gap": model / "bandgap" / "model.ckpt.pt", - "sound_vel": model / "shear_modulus" / "model.ckpt.pt", - } - - try: - - #activate uv - workdir = Path("/opt/agents/crystalformer_gpu") - outputs = workdir/ "outputs" - - - alpha = [0.5] - mc_steps = 2000 - for prop in target_props: - cmd = [ - "uv", "run", "python", - "crystalformer_mcp.py", - "--cond_model_path", str(model_dirs[prop]), - "--target", str(target_values), - "--alpha", str(alpha), - "--spacegroup", str(space_group), - "--mc_steps", str(mc_steps), - "--num_samples", str(n_tot), - "--output_path", str(outputs) - ] - subprocess.run(cmd, cwd=workdir, check=True) - - output_path = Path("outputs") - if output_path.exists(): - shutil.rmtree(output_path) - shutil.copytree(outputs, output_path) - return { - "poscar_paths": output_path, - "message": "CrystalFormer structure generation successfully!" - } - except Exception as e: - return { - "poscar_paths": None, - "message": "CrystalFormer Execution failed!" - } - + if space_group is None or space_group <= 0: + raise ValueError("Please provide a positive integer for `space_group`.") + + if not target_values or len(target_values) != len(target_props): + raise ValueError("`target_values` must be provided and match the length of `target_props`.") + + valid_ops = ("greater", "less", "equal", "minimize") + if len(comparison_ops) != len(target_props): + raise ValueError("`comparison_ops` length must match `target_props` length.") + for op in comparison_ops: + if op not in valid_ops: + raise ValueError(f"Invalid comparison op '{op}'. Must be one of {valid_ops}.") + + workdir = Path("/opt/agents/mcp_tool") + outputs = workdir/ "target" + + mode = "multi" if len(target_props) > 1 else "single" + + #total space group number to generated + upper=min(space_group, n_tot) + random_spacegroup_num = random.randint(1,upper) + print(f"n_tot = {n_tot}") + print(f"random_spacegroup_num = {random_spacegroup_num}") + cmd = [ + "uv", "run", "python", + "mcp_tool.py", + "--mode", str(mode), + "--cond_model_type", *target_props, + "--target", *map(str, target_values), + "--target_type", *comparison_ops, + "--alpha", "10", + "--spacegroup", str(space_group), + "--random_spacegroup_num", "1", #str(random_spacegroup_num), + "--init_sample_num", str(n_tot), #str(int(n_tot/random_spacegroup_num)), + "--mc_steps", "2000" + ] + + print(f"cmd = {cmd}") + subprocess.run(cmd, cwd=workdir, check=True) + output_path = Path("outputs") + if output_path.exists(): + shutil.rmtree(output_path) + shutil.copytree(outputs, output_path) + return { + "poscar_paths": output_path, + "message": "CrystalFormer structure generation successfully!" + } except Exception as e: return { "poscar_paths": None, "message": "CrystalFormer Generation failed!" } +#====== Tool to do geometry optimization========= +class RunOptimizationResult(TypedDict): + optimized_poscar_paths: Path + message: str + +@mcp.tool() +def run_pressure_optimization( + structure_path: Path, + fmax:float, + nsteps:int, + pressure:float +) -> RunOptimizationResult: + """ + Optimize structures with DP model at ambient or high pressure condition. + + Args: + - structure_path (Path): Path to access structures need to be optimized + - ambient (bool): Wether consider ambient condition + Return: + - optimized_structure_path (Path): Path to access optimized structures + """ + opt_py = Path("/opt/agents/thermal_properties/geo_opt/opt_multi.py") + + try: + poscar_files = list(structure_path.rglob("POSCAR*")) + # Build command: use the actual path to opt_py, not the literal string "opt_py" + cmd = [ + "python", + str(opt_py), # <— use the variable here + str(fmax), + str(pressure), + str(pressure), + str(nsteps), + ] + [str(p) for p in poscar_files] + + # Run and check for errors + subprocess.run(cmd, check=True) + + except Exception as e: + print("Geometry Optimization failed!") + try: + parse_py = Path("/opt/agents/thermal_properties/geo_opt/parse_traj.py") + cmd = [ + "python", + str(parse_py) + ] + + # Run and check for errors + subprocess.run(cmd, check=True) + except Exception as e: + print("Collect optimized structures failed!") + try: + frames =glob.glob('deepmd_npy/*/') + multisys = dpdata.MultiSystems() + for frame in frames: + sys = dpdata.System(frame,'deepmd/npy') + multisys.append(sys) + + optimized_dir = Path("optimized_poscar") + optimized_dir.mkdir(parents=True, exist_ok=True) # Create the directory if it doesn't exist + + count=0 + for frame in multisys: + for system in frame: + system.to_vasp_poscar(optimized_dir / f'POSCAR_{count}') + count+=1 +# optimized_structures = list(optimized_dir.rglob("POSCAR*")) + except Exception as e: + print("Convert dpdata to POSCAR failed!") + + return{ + "optimized_poscar_paths": optimized_dir, + "message": "Geometry Optimization successfully" + } + + #======== Tool predict enthalp and select on hull structures========= class CalculateEntalpyResult(TypedDict): @@ -594,19 +652,19 @@ class CalculateEntalpyResult(TypedDict): message: str @mcp.tool() -def calculate_enthalpy( +def calculate_thermoele_enthalpy( structure_path: Path, threshold: float, pressure: float )->CalculateEntalpyResult: """ Optimize the crystal structure using DP at a given pressure, then evaluate the enthalpy of the optimized structure, - and finally screen for structures above convex hull with a value of threshold. + and finally screen for structures above convex hull with a value of threshold (the unit of this threshold is eV). When user call cal_enthalpy reminder user to give pressure condition and threshold value to screen structures Args: - structure_file (Path): Path to the structure files (e.g. POSCAR) - - threshold (float): Upper limit for energy above hull. Only structures with energy-above-hull values smaller than this threshold will be selected. + - threshold (float): Upper limit for energy above hull. Only structures with energy-above-hull values smaller than this threshold will be selected.The unit is eV. - pressure (float): Pressure used in geometry optimization process Return: @@ -626,73 +684,13 @@ def calculate_enthalpy( enthalpy_dir.mkdir(parents=True, exist_ok=True) try: - poscar_files = list(structure_path.rglob("POSCAR*")) - opt_py = Path("/opt/agents/thermal_properties/geo_opt/opt_multi.py") - - try: - # Build command: use the actual path to opt_py, not the literal string "opt_py" - cmd = [ - "python", - str(opt_py), # <— use the variable here - str(fmax), - str(pressure), - str(pressure), - str(nsteps), - ] + [str(p) for p in poscar_files] - - # Run and check for errors - subprocess.run(cmd, check=True) - - except Exception as e: - return{ - "enthalpy_file": [], - "e_above_hull_structures": [], - "e_above_hull_values": [], - "message": "Geometry Optimization failed!" - } - - try: - parse_py = Path("/opt/agents/thermal_properties/geo_opt/parse_traj.py") - cmd = [ - "python", - str(parse_py) - ] - - # Run and check for errors - subprocess.run(cmd, check=True) - except Exception as e: - return{ - "enthalpy_file": [], - "e_above_hull_structures": [], - "e_above_hull_values": [], - "message": "Screen traj failed!" - } - - try: - frames =glob.glob('deepmd_npy/*/') - multisys = dpdata.MultiSystems() - for frame in frames: - sys = dpdata.System(frame,'deepmd/npy') - multisys.append(sys) - - optimized_dir = Path("optimized_poscar") - optimized_dir.mkdir(parents=True, exist_ok=True) # Create the directory if it doesn't exist - - count=0 - for frame in multisys: - for system in frame: - system.to_vasp_poscar(optimized_dir / f'POSCAR_{count}') - count+=1 - except Exception as e: - return{ - "enthalpy_file": [], - "e_above_hull_structures": [], - "e_above_hull_values": [], - "message": "Convert optimized structure to POSCAR failed!" - } try: - poscar_files = list(optimized_dir.rglob("POSCAR*")) + #poscar_files = list(structure_path.rglob("POSCAR*")) + results = run_pressure_optimization(structure_path,fmax,nsteps,pressure) + optimized_structure_path = results["optimized_poscar_paths"] + poscar_files = list(optimized_structure_path.rglob("POSCAR*")) + #poscar_files = run_pressure_optimization(structure_path,fmax,nsteps,pressure) enthalpy_py = Path("/opt/agents/thermal_properties/geo_opt/predict_enthalpy.py") cmd = [ "python", @@ -851,36 +849,6 @@ def calculate_enthalpy( # write to new CSV, including the absolute path fout.write(f"{cleaned},{energy},{on_hull_optimized_poscar.resolve()}\n") -# with e_above_hull_file.open("r") as f: -# #If there is a header, skip -# first = True -# for line in f: -# line.strip() -# if not line: -# continue -# if first and any(c.isalpha() for c in line): -# first = False -# continue -# first = False -# -# parts = line.split(' ') -# if len(parts) < 4: -# continue -# -# try: -# energy = float(parts[1]) -# except ValueError: -# continue -# -# raw = parts[3] -# cleaned = raw.strip().strip('"').strip("'") -# on_hull_optimized_poscar = Path('.') / cleaned -# -# if energy <= 0.05 and on_hull_optimized_poscar.is_file(): -# try: -# shutil.copy(on_hull_optimized_poscar, on_hull_optimized_structures) -# except Exception as e: -# print(f"Copy on hull optimized poscar {on_hull_optimized_poscar} failed!") except Exception as e: return{ "enthalpy_file": [], @@ -1091,13 +1059,6 @@ def get_space_group_number(structure): try: -# # Sort results by sound_velocity -# sorted_candidates = dict( -# sorted( -# thermoelectric_candidates.items(), -# key=lambda item: item[1]["sound_velocity"] -# ) -# ) sorted_candidates = dict( sorted( thermoelectric_candidates.items(), @@ -1178,19 +1139,6 @@ def get_space_group_number(structure): "thermoelectric_file": results_file, "message": message } - # --- build a CSV preview of the first 10 candidates --- -# preview_lines = [] -# for structure, props in list(sorted_candidates.items())[:10]: -# # join each property into “key=value” pairs -# prop_str = ", ".join(f"{k}={v}" for k, v in props.items()) -# preview_lines.append(f"{formula}: {prop_str}") -# -# message = "Predicted properties:\n" + "\n".join(preview_lines) -# -# return{ -# "thermoelectric_file": Path(results_file), -# "message": message -# } except Exception as e: return{ "thermoelectric_file" : [], From 7ecd8b39013fb09fead6f54c96e4e26ff5fdfbec Mon Sep 17 00:00:00 2001 From: root Date: Mon, 12 Jan 2026 11:33:45 +0800 Subject: [PATCH 2/5] mv superconductor_mcp_server.py to server.py and update thermoelectric server.py --- ...superconductor_mcp_server.py => server.py} | 565 ++++++++-------- servers/thermoelectric/server.py | 628 ++++++++++-------- 2 files changed, 640 insertions(+), 553 deletions(-) rename servers/superconductor/{superconductor_mcp_server.py => server.py} (71%) diff --git a/servers/superconductor/superconductor_mcp_server.py b/servers/superconductor/server.py similarity index 71% rename from servers/superconductor/superconductor_mcp_server.py rename to servers/superconductor/server.py index 62eb17f..806bcf8 100644 --- a/servers/superconductor/superconductor_mcp_server.py +++ b/servers/superconductor/server.py @@ -59,7 +59,12 @@ def run_superconductor_optimization( nsteps = 2000 - structures = list(structure_path.rglob("POSCAR*")) + base = Path(structure_path) + structure_path = base.parent if base.is_file() else base + + structures = list(p for pat in ("POSCAR*", "*.cif", "*.CIF") + for p in structure_path.rglob(pat)) + print(f"The length of structures {len(structures)}") try: # Build command: use the actual path to opt_py, not the literal string "opt_py" @@ -167,264 +172,266 @@ class GenerateCalypsoStructureResult(TypedDict): } -@mcp.tool() -def generate_calypso_superconductor_structure( - species: List[str], - n_tot: int - )->GenerateCalypsoStructureResult: - """ - Generate n_tot CALYPSO structures using specified species. - If user did not mention species and total number structures to generate, please remind the user to provide these information. - - Args: - species (List[str]): A list of chemical element symbols (e.g., ["Mg", "O", "Si"]). These elements will be used as building blocks in the CALYPSO structure generation. - All element symbols must be from the supported element list internally defined in the tool. - - n_tot (int): The number of CALYPSO structure configurations to generate. Each structure will be generated in a separate subdirectory (e.g., generated_calypso/0/, generated_calypso/1/, etc.) - Return: - GenerateCalypsoStructureResult with keys: - - poscar_paths (Path): Path to access generated structures POSCAR. All structures are saved in outputs/poscars_for_optimization/ - - message (str): Message about calculation results information. - """ - - def get_props(s_list): - """ - Get atomic number, atomic radius, and atomic volume infomation for interested species - - Args: - s_list: species list needed to get atomic number, atomic radius, and atomic volume infomation - - Return: - z_list (List): atomic number list for given species list, - r_list (List): atomic radius list for given species list, - v_list (List): atomic volume list for given species list. - """ - - z_list, r_list, v_list = [], [], [] - for s in s_list: - if s not in ELEMENT_PROPS: - raise ValueError(f"Unsupported element: {s}") - props = ELEMENT_PROPS[s] - z_list.append(props["Z"]) - r_list.append(props["r"]) - v_list.append(props["v"]) - return z_list, r_list, v_list - - def generate_counts(n): - return [random.randint(1, 10) for _ in range(n)] - - def write_input(path, species, z_list, n_list, r_mat, volume): - """ - Write calypso input files for given species combination with atomic number, number of each species, radius matrix and total volume - - Args: - - path (Path): Path to save input file, - - species (List[str]): Species list - - z_list (List[int]): atomic number list - - n_list (List[int]): number of each species list - - r_mat: radius matrix - - volume (float): total volume - """ - - # Step 1: reorder all based on atomic number - sorted_indices = sorted(range(len(z_list)), key=lambda i: z_list[i]) - species = [species[i] for i in sorted_indices] - z_list = [z_list[i] for i in sorted_indices] - n_list = [n_list[i] for i in sorted_indices] - r_mat = r_mat[np.ix_(sorted_indices, sorted_indices)] # reorder matrix - - # Step 2: write input.dat - with open(path / "input.dat", "w") as f: - f.write(f"SystemName = {' '.join(species)}\n") - f.write(f"NumberOfSpecies = {len(species)}\n") - f.write(f"NameOfAtoms = {' '.join(species)}\n") - f.write("@DistanceOfIon\n") - for i in range(len(species)): - row = " ".join(f"{r_mat[i][j]:.3f}" for j in range(len(species))) - f.write(row + "\n") - f.write("@End\n") - f.write(f"AtomicNumber = {' '.join(str(z) for z in z_list)}\n") - f.write(f"NumberOfAtoms = {' '.join(str(n) for n in n_list)}\n") - f.write("""Ialgo = 2 -PsoRatio = 0.5 -PopSize = 1 -GenType = 1 -ICode = 15 -NumberOfLbest = 4 -NumberOfLocalOptim = 3 -Command = sh submit.sh -MaxTime = 9000 -MaxStep = 1 -PickUp = F -PickStep = 1 -Parallel = F -NumberOfParallel = 4 -Split = T -PSTRESS = 2000 -fmax = 0.01 -FixCell = F -""") - - - - #===== Step 1: Generate calypso input files ========== - outdir = Path("generated_calypso") - outdir.mkdir(parents=True, exist_ok=True) - - - z_list, r_list, v_list = get_props(species) - for i in range(n_tot): - try: - n_list = generate_counts(len(species)) - volume = sum(n * v for n, v in zip(n_list, v_list)) - r_mat = np.add.outer(r_list, r_list) * 0.529 # bohr → Å - - struct_dir = outdir / f"{i}" - if not struct_dir.exists(): - struct_dir.mkdir(parents=True, exist_ok=True) - - #Prepare calypso input.dat - write_input(struct_dir, species, z_list, n_list, r_mat, volume) - except Exception as e: - return{ - "poscar_paths" : None, - "message": "Input files generations for calypso failed!" - } - - #Execuate calypso calculation and screening - flim_ase_path = Path("/opt/agents/thermal_properties/flim_ase/flim_ase.py") - command = f"/opt/agents/thermal_properties/calypso/calypso.x >> tmp_log && python {flim_ase_path}" - if not flim_ase_path.exists(): - return{ - "poscar_paths": None, - "message": "flim_ase.py did not found!" - - } - try: - subprocess.run(command, cwd=struct_dir, shell=True) - except Exception as e: - return{ - "poscar_paths": None, - "message": "calypso.x execute failed!" - } - - #Clean struct_dir only save input.dat and POSCAR_1 - for file in struct_dir.iterdir(): - if file.name not in ["input.dat", "POSCAR_1"]: - if file.is_file(): - file.unlink() - elif file.is_dir(): - shutil.rmtree(file) - - # Step 3: Collect POSCAR_1 into POSCAR_n format - try: - output_dir = Path("outputs") - output_dir.mkdir(parents=True, exist_ok=True) - final_dir = output_dir / "poscars_for_optimization" - final_dir.mkdir(parents=True, exist_ok=True) - counter = 0 - for struct_dir in outdir.iterdir(): - poscar_path = struct_dir / "POSCAR_1" - if poscar_path.exists(): - new_name = final_dir / f"POSCAR_{counter}" - shutil.copy(poscar_path, new_name) - counter += 1 - - return{ - "poscar_paths": Path(final_dir), - "message": f"Calypso generated {n_tot} structures with {species} successfully!" - } - except Exception as e: - return{ - "poscar_paths": None, - "message": "Calypso generated POSCAR files collected failed!" - } - - - -#================ Tool to generate structures with conditional properties via CrystalFormer =================== -class GenerateCryFormerStructureResult(TypedDict): - poscar_paths: Path - message: str - -@mcp.tool() -def generate_crystalformer_superconductor_structures( - space_group: int, - ambient: bool, - target_values: float, - comparison_ops:Optional[str], - n_tot: int -)->GenerateCryFormerStructureResult: - """ - Generate n_tot conditional superconductor structures with target critical temperature and space group number. - If ambient condition, please using /opt/agents/superconductor/models/ambient_pressure/model.ckpt-1000000.pt model predicts critical temperature. - If high pressure condition, please using /opt/agents/superconductor/models/high_pressure/model.ckpt-100000.pt model predicts critical temperature. - If user did not mention space group number requirement, pressure condition, please reminder user to give instruction. - If user did not mentioned the comparison operator comparison_ops, please remind the user to give a value - - Args: - space_group (int): Target space group number for generated structures. - ambient (bool): Wether consider ambient condition superconductor. - target_values (float): Target critical temperature. - comparison_ops (Optional[str]): One per target_prop; each must be one of "greater", "less", "equal", "minimize". If none, please use greater for all target_props. - n_tot (int): Total number of structures generated - Returns: - poscar_paths (Path): Path to generated POSCAR. - message (str): Message about calculation results. - """ - try: - if ambient: - target_prop = "ambient_pressure" - else: - target_prop = "high_pressure" - - try: - - #activate uv - workdir = Path("/opt/agents/mcp_tool") - outputs = workdir/ "target" - - - mc_steps = 2000 - upper=min(space_group, n_tot) - random_spacegroup_num = random.randint(1,upper) - - cmd = [ - "uv", "run", "python", - "mcp_tool.py", - "--mode", 'single', - "--cond_model_type", target_prop, - "--target", str(target_values), - "--target_type", str(comparison_ops), - "--alpha", '10', - "--spacegroup", str(space_group), - "--random_spacegroup_num", "1", #str(random_spacegroup_num), - "--init_sample_num", str(n_tot), - "--mc_steps", str(mc_steps), - ] - - print(f"cmd = {cmd}") - - subprocess.run(cmd, cwd=workdir, check=True) - - output_path = Path("outputs") - if output_path.exists(): - shutil.rmtree(output_path) - shutil.copytree(outputs, output_path) - return { - "poscar_paths": output_path, - "message": "CrystalFormer structure generation successfully!" - } - except Exception as e: - return { - "poscar_paths": None, - "message": "CrystalFormer Generation failed!" - } - except Exception as e: - return { - "poscar_paths": None, - "message": "CrystalFormer Generation failed!" - } +#@mcp.tool() +#def generate_calypso_superconductor_structure( +# species: List[str], +# n_tot: int +# )->GenerateCalypsoStructureResult: +# """ +# Generate n_tot CALYPSO structures using specified species. This tool is element-guided structure generation tool, user only need +# provide chemical element information and total number is fine. +# +# If user did not mention species and total number structures to generate, please remind the user to provide these information. +# +# Args: +# species (List[str]): A list of chemical element symbols (e.g., ["Mg", "O", "Si"]). These elements will be used as building blocks in the CALYPSO structure generation. +# All element symbols must be from the supported element list internally defined in the tool. +# +# n_tot (int): The number of CALYPSO structure configurations to generate. Each structure will be generated in a separate subdirectory (e.g., generated_calypso/0/, generated_calypso/1/, etc.) +# Return: +# GenerateCalypsoStructureResult with keys: +# - poscar_paths (Path): Path to access generated structures POSCAR. All structures are saved in outputs/poscars_for_optimization/ +# - message (str): Message about calculation results information. +# """ +# +# def get_props(s_list): +# """ +# Get atomic number, atomic radius, and atomic volume infomation for interested species +# +# Args: +# s_list: species list needed to get atomic number, atomic radius, and atomic volume infomation +# +# Return: +# z_list (List): atomic number list for given species list, +# r_list (List): atomic radius list for given species list, +# v_list (List): atomic volume list for given species list. +# """ +# +# z_list, r_list, v_list = [], [], [] +# for s in s_list: +# if s not in ELEMENT_PROPS: +# raise ValueError(f"Unsupported element: {s}") +# props = ELEMENT_PROPS[s] +# z_list.append(props["Z"]) +# r_list.append(props["r"]) +# v_list.append(props["v"]) +# return z_list, r_list, v_list +# +# def generate_counts(n): +# return [random.randint(1, 10) for _ in range(n)] +# +# def write_input(path, species, z_list, n_list, r_mat, volume): +# """ +# Write calypso input files for given species combination with atomic number, number of each species, radius matrix and total volume +# +# Args: +# - path (Path): Path to save input file, +# - species (List[str]): Species list +# - z_list (List[int]): atomic number list +# - n_list (List[int]): number of each species list +# - r_mat: radius matrix +# - volume (float): total volume +# """ +# +# # Step 1: reorder all based on atomic number +# sorted_indices = sorted(range(len(z_list)), key=lambda i: z_list[i]) +# species = [species[i] for i in sorted_indices] +# z_list = [z_list[i] for i in sorted_indices] +# n_list = [n_list[i] for i in sorted_indices] +# r_mat = r_mat[np.ix_(sorted_indices, sorted_indices)] # reorder matrix +# +# # Step 2: write input.dat +# with open(path / "input.dat", "w") as f: +# f.write(f"SystemName = {' '.join(species)}\n") +# f.write(f"NumberOfSpecies = {len(species)}\n") +# f.write(f"NameOfAtoms = {' '.join(species)}\n") +# f.write("@DistanceOfIon\n") +# for i in range(len(species)): +# row = " ".join(f"{r_mat[i][j]:.3f}" for j in range(len(species))) +# f.write(row + "\n") +# f.write("@End\n") +# f.write(f"AtomicNumber = {' '.join(str(z) for z in z_list)}\n") +# f.write(f"NumberOfAtoms = {' '.join(str(n) for n in n_list)}\n") +# f.write("""Ialgo = 2 +#PsoRatio = 0.5 +#PopSize = 1 +#GenType = 1 +#ICode = 15 +#NumberOfLbest = 4 +#NumberOfLocalOptim = 3 +#Command = sh submit.sh +#MaxTime = 9000 +#MaxStep = 1 +#PickUp = F +#PickStep = 1 +#Parallel = F +#NumberOfParallel = 4 +#Split = T +#PSTRESS = 2000 +#fmax = 0.01 +#FixCell = F +#""") +# +# +# +# #===== Step 1: Generate calypso input files ========== +# outdir = Path("generated_calypso") +# outdir.mkdir(parents=True, exist_ok=True) +# +# +# z_list, r_list, v_list = get_props(species) +# for i in range(n_tot): +# try: +# n_list = generate_counts(len(species)) +# volume = sum(n * v for n, v in zip(n_list, v_list)) +# r_mat = np.add.outer(r_list, r_list) * 0.529 # bohr → Å +# +# struct_dir = outdir / f"{i}" +# if not struct_dir.exists(): +# struct_dir.mkdir(parents=True, exist_ok=True) +# +# #Prepare calypso input.dat +# write_input(struct_dir, species, z_list, n_list, r_mat, volume) +# except Exception as e: +# return{ +# "poscar_paths" : None, +# "message": "Input files generations for calypso failed!" +# } +# +# #Execuate calypso calculation and screening +# flim_ase_path = Path("/opt/agents/thermal_properties/flim_ase/flim_ase.py") +# command = f"/opt/agents/thermal_properties/calypso/calypso.x >> tmp_log && python {flim_ase_path}" +# if not flim_ase_path.exists(): +# return{ +# "poscar_paths": None, +# "message": "flim_ase.py did not found!" +# +# } +# try: +# subprocess.run(command, cwd=struct_dir, shell=True) +# except Exception as e: +# return{ +# "poscar_paths": None, +# "message": "calypso.x execute failed!" +# } +# +# #Clean struct_dir only save input.dat and POSCAR_1 +# for file in struct_dir.iterdir(): +# if file.name not in ["input.dat", "POSCAR_1"]: +# if file.is_file(): +# file.unlink() +# elif file.is_dir(): +# shutil.rmtree(file) +# +# # Step 3: Collect POSCAR_1 into POSCAR_n format +# try: +# output_dir = Path("outputs") +# output_dir.mkdir(parents=True, exist_ok=True) +# final_dir = output_dir / "poscars_for_optimization" +# final_dir.mkdir(parents=True, exist_ok=True) +# counter = 0 +# for struct_dir in outdir.iterdir(): +# poscar_path = struct_dir / "POSCAR_1" +# if poscar_path.exists(): +# new_name = final_dir / f"POSCAR_{counter}" +# shutil.copy(poscar_path, new_name) +# counter += 1 +# +# return{ +# "poscar_paths": Path(final_dir), +# "message": f"Calypso generated {n_tot} structures with {species} successfully!" +# } +# except Exception as e: +# return{ +# "poscar_paths": None, +# "message": "Calypso generated POSCAR files collected failed!" +# } + + + +##================ Tool to generate structures with conditional properties via CrystalFormer =================== +#class GenerateCryFormerStructureResult(TypedDict): +# poscar_paths: Path +# message: str +# +#@mcp.tool() +#def generate_crystalformer_superconductor_structures( +# space_group: int, +# ambient: bool, +# target_values: float, +# comparison_ops:Optional[str], +# n_tot: int +#)->GenerateCryFormerStructureResult: +# """ +# Generate n_tot conditional superconductor structures with target critical temperature and space group number. This tool is property-guided structure generation tool. User need to define the desired property and corresponding target values and comparison operator. +# If ambient condition, please using /opt/agents/superconductor/models/ambient_pressure/model.ckpt-1000000.pt model predicts critical temperature. +# If high pressure condition, please using /opt/agents/superconductor/models/high_pressure/model.ckpt-100000.pt model predicts critical temperature. +# If user did not mention space group number requirement, pressure condition, please reminder user to give instruction. +# If user did not mentioned the comparison operator comparison_ops, please remind the user to give a value +# +# Args: +# space_group (int): Target space group number for generated structures. +# ambient (bool): Wether consider ambient condition superconductor. +# target_values (float): Target critical temperature. +# comparison_ops (Optional[str]): One per target_prop; each must be one of "greater", "less", "equal", "minimize". If none, please use greater for all target_props. +# n_tot (int): Total number of structures generated +# Returns: +# poscar_paths (Path): Path to generated POSCAR. +# message (str): Message about calculation results. +# """ +# try: +# if ambient: +# target_prop = "ambient_pressure" +# else: +# target_prop = "high_pressure" +# +# try: +# +# #activate uv +# workdir = Path("/opt/agents/mcp_tool") +# outputs = workdir/ "target" +# +# +# mc_steps = 2000 +# upper=min(space_group, n_tot) +# random_spacegroup_num = random.randint(1,upper) +# +# cmd = [ +# "uv", "run", "python", +# "mcp_tool.py", +# "--mode", 'single', +# "--cond_model_type", target_prop, +# "--target", str(target_values), +# "--target_type", str(comparison_ops), +# "--alpha", '10', +# "--spacegroup", str(space_group), +# "--random_spacegroup_num", "1", #str(random_spacegroup_num), +# "--init_sample_num", str(n_tot), +# "--mc_steps", str(mc_steps), +# ] +# +# print(f"cmd = {cmd}") +# +# subprocess.run(cmd, cwd=workdir, check=True) +# +# output_path = Path("outputs") +# if output_path.exists(): +# shutil.rmtree(output_path) +# shutil.copytree(outputs, output_path) +# return { +# "poscar_paths": output_path, +# "message": "CrystalFormer structure generation successfully!" +# } +# except Exception as e: +# return { +# "poscar_paths": None, +# "message": "CrystalFormer Generation failed!" +# } +# except Exception as e: +# return { +# "poscar_paths": None, +# "message": "CrystalFormer Generation failed!" +# } @@ -560,6 +567,9 @@ def calculate_superconductor_enthalpy( try: #poscar_files = list(structure_path.rglob("POSCAR*")) + base = Path(structure_path) + structure_path = base.parent if base.is_file() else base + try: results = run_superconductor_optimization(structure_path,ambient) optimized_structure_path = results["optimized_poscar_paths"] @@ -700,7 +710,8 @@ def calculate_superconductor_enthalpy( shutil.copy(src, enthalpy_dir) src = Path("/opt/agents/superconductor/geo_opt/results/e_above_hull_50meV.csv") - shutil.copy(src, enthalpy_dir) + dest = enthalpy_dir / f"e_above_hull.csv" + shutil.copy(src, dest) except Exception as e: return{ @@ -714,8 +725,9 @@ def calculate_superconductor_enthalpy( on_hull_optimized_structures = enthalpy_dir / "e_above_hull_structures" on_hull_optimized_structures.mkdir(parents=True, exist_ok=True) - e_above_hull_file = enthalpy_dir / "e_above_hull_50meV.csv" - e_above_hull_output = enthalpy_dir / "e_above_hull.csv" + threshold_meV = int(threshold*1000) + e_above_hull_output = enthalpy_dir / f"e_above_hull_{threshold_meV}meV.csv" + e_above_hull_file = enthalpy_dir / "e_above_hull.csv" with e_above_hull_file.open("r") as f, e_above_hull_output.open("w") as fout: # write header for new CSV @@ -813,12 +825,15 @@ def predict_superconductor_Tc( structure_path = Path(structure_path) + base = Path(structure_path) + structure_path = base.parent if base.is_file() else base if not structure_path.exists(): return { "results_file": {}, "message": f"Structure path not found: {structure_path}" } + #Determine used model for critical temperature prediction if ambient: used_model = Path("/opt/agents/superconductor/models/ambient_pressure/model.ckpt-1000000.pt") @@ -907,7 +922,6 @@ def predict_superconductor_Tc( @mcp.tool() def screen_superconductor( structure_path: Path, - above_hull_file: Path, ambient: bool ) -> SuperconductorTcResult: """ @@ -932,6 +946,9 @@ def screen_superconductor( structure_path = Path(structure_path) + base = Path(structure_path) + structure_path = base.parent if base.is_file() else base + if not structure_path.exists(): return { "results_file": {}, @@ -949,8 +966,12 @@ def screen_superconductor( "results_file": {}, "message": f"{used_model} not exists!" } - + threshold = 0.05 + results = calculate_superconductor_enthalpy(structure_path, threshold, ambient) + optimized_structure_path = results["e_above_hull_structures"] + above_hull_file = results["e_above_hull_values"] + # --- 0) load above-hull energies --- above_hull_map: dict[str, float] = {} if above_hull_file.is_file(): @@ -964,7 +985,7 @@ def screen_superconductor( #find all structures - structures = list(structure_path.rglob("POSCAR*")) + structures = list(optimized_structure_path.rglob("POSCAR*")) superconductor_data: SuperconductorData ={} for structure in structures: @@ -1014,6 +1035,15 @@ def screen_superconductor( output_dir = Path("outputs") output_dir.mkdir(parents=True, exist_ok=True) results_file = output_dir / "superconductor.csv" + + if not superconductor_data: + with results_file.open("w") as f: + f.write("No promising candidates found.\n") + return { + "results_file": results_file, + "message": "No promising candidates found." + } + with open(results_file, "w", newline="") as f: writer = csv.writer(f) writer.writerow(["formula", "Tc", "path", "e_above_hull"]) # header @@ -1039,7 +1069,6 @@ def screen_superconductor( # ====== Run Server ====== if __name__ == "__main__": - # Get transport type from environment variable, default to SSE - transport_type = os.getenv('MCP_TRANSPORT', 'sse') - mcp.run(transport=transport_type) + logging.info("Starting SuperconductorServer on port 50002...") + mcp.run(transport="sse") diff --git a/servers/thermoelectric/server.py b/servers/thermoelectric/server.py index 7e7aafb..6b9ee7e 100644 --- a/servers/thermoelectric/server.py +++ b/servers/thermoelectric/server.py @@ -45,8 +45,8 @@ # Initialize MCP server mcp = CalculationMCPServer( "ThermoelectricMaterialsServer", - host=args.host, - port=args.port + host="0.0.0.0", + port=50001 ) # ====== Tool 1: Predict Material Thermoelectronic Properties ====== @@ -163,12 +163,18 @@ def eval_properties( #Define props for atom results: MaterialData = {} props_results: MaterialProperties = {} - + structure_file = Path(structure_file) if not structure_file.exists(): return {"results": {}, "properties": {}, "message": f"Structure file not found: {structure_file}"} - structures = sorted(structure_file.rglob("POSCAR*")) + sorted(structure_file.rglob("*.cif")) + fmax=0.0005 + nsteps = 2000 + pressure = 0.0 + result = run_pressure_optimization(structure_file,fmax,nsteps,pressure) + optimized_structure_path = result["optimized_poscar_paths"] + + structures = list(optimized_structure_path.rglob("POSCAR*")) for structure in structures: try: if structure.name.upper().startswith("POSCAR"): @@ -306,265 +312,265 @@ class GenerateCalypsoStructureResult(TypedDict): #========== Tool generate calypso structures =========== -@mcp.tool() -def generate_calypso_thermoele_structures( - species: List[str], - n_tot: int - )->GenerateCalypsoStructureResult: - """ - Generate n_tot CALYPSO structures using specified species. It is element-guided structure generations, user need to provide chemical elements and total number - If user did not mention species and total number structures to generate, please remind the user to provide these information. - - Args: - species (List[str]): A list of chemical element symbols (e.g., ["Mg", "O", "Si"]). These elements will be used as building blocks in the CALYPSO structure generation. - All element symbols must be from the supported element list internally defined in the tool. - - n_tot (int): The number of CALYPSO structure configurations to generate. Each structure will be generated in a separate subdirectory (e.g., generated_calypso/0/, generated_calypso/1/, etc.) - Return: - GenerateCalypsoStructureResult with keys: - - poscar_paths (Path): Path to access generated structures POSCAR. All structures are saved in outputs/poscars_for_optimization/ - - message (str): Message about calculation results information. - """ - - def get_props(s_list): - """ - Get atomic number, atomic radius, and atomic volume infomation for interested species - - Args: - s_list: species list needed to get atomic number, atomic radius, and atomic volume infomation - - Return: - z_list (List): atomic number list for given species list, - r_list (List): atomic radius list for given species list, - v_list (List): atomic volume list for given species list. - """ - - z_list, r_list, v_list = [], [], [] - for s in s_list: - if s not in ELEMENT_PROPS: - raise ValueError(f"Unsupported element: {s}") - props = ELEMENT_PROPS[s] - z_list.append(props["Z"]) - r_list.append(props["r"]) - v_list.append(props["v"]) - return z_list, r_list, v_list - - def generate_counts(n): - return [random.randint(1, 10) for _ in range(n)] - - def write_input(path, species, z_list, n_list, r_mat, volume): - """ - Write calypso input files for given species combination with atomic number, number of each species, radius matrix and total volume - - Args: - - path (Path): Path to save input file, - - species (List[str]): Species list - - z_list (List[int]): atomic number list - - n_list (List[int]): number of each species list - - r_mat: radius matrix - - volume (float): total volume - """ - - # Step 1: reorder all based on atomic number - sorted_indices = sorted(range(len(z_list)), key=lambda i: z_list[i]) - species = [species[i] for i in sorted_indices] - z_list = [z_list[i] for i in sorted_indices] - n_list = [n_list[i] for i in sorted_indices] - r_mat = r_mat[np.ix_(sorted_indices, sorted_indices)] # reorder matrix - - # Step 2: write input.dat - with open(path / "input.dat", "w") as f: - f.write(f"SystemName = {' '.join(species)}\n") - f.write(f"NumberOfSpecies = {len(species)}\n") - f.write(f"NameOfAtoms = {' '.join(species)}\n") - f.write("@DistanceOfIon\n") - for i in range(len(species)): - row = " ".join(f"{r_mat[i][j]:.3f}" for j in range(len(species))) - f.write(row + "\n") - f.write("@End\n") - f.write(f"Volume = {volume:.2f}\n") - f.write(f"AtomicNumber = {' '.join(str(z) for z in z_list)}\n") - f.write(f"NumberOfAtoms = {' '.join(str(n) for n in n_list)}\n") - f.write("""Ialgo = 2 -PsoRatio = 0.5 -PopSize = 1 -GenType = 1 -ICode = 15 -NumberOfLbest = 4 -NumberOfLocalOptim = 3 -Command = sh submit.sh -MaxTime = 9000 -MaxStep = 1 -PickUp = F -PickStep = 1 -Parallel = F -NumberOfParallel = 4 -Split = T -PSTRESS = 2000 -fmax = 0.01 -FixCell = F -""") - - - - #===== Step 1: Generate calypso input files ========== - outdir = Path("generated_calypso") - outdir.mkdir(parents=True, exist_ok=True) - - - z_list, r_list, v_list = get_props(species) - for i in range(n_tot): - try: - n_list = generate_counts(len(species)) - volume = sum(n * v for n, v in zip(n_list, v_list)) - r_mat = np.add.outer(r_list, r_list) * 0.529 # bohr → Å - - struct_dir = outdir / f"{i}" - if not struct_dir.exists(): - struct_dir.mkdir(parents=True, exist_ok=True) - - #Prepare calypso input.dat - write_input(struct_dir, species, z_list, n_list, r_mat, volume) - except Exception as e: - return{ - "poscar_paths" : None, - "message": "Input files generations for calypso failed!" - } - - #Execuate calypso calculation and screening - flim_ase_path = Path("/opt/agents/thermal_properties/flim_ase/flim_ase.py") - command = f"/opt/agents/thermal_properties/calypso/calypso.x >> tmp_log && python {flim_ase_path}" - if not flim_ase_path.exists(): - return{ - "poscar_paths": None, - "message": "flim_ase.py did not found!" - - } - try: - subprocess.run(command, cwd=struct_dir, shell=True) - except Exception as e: - return{ - "poscar_paths": None, - "message": "calypso.x execute failed!" - } - - #Clean struct_dir only save input.dat and POSCAR_1 - for file in struct_dir.iterdir(): - if file.name not in ["input.dat", "POSCAR_1"]: - if file.is_file(): - file.unlink() - elif file.is_dir(): - shutil.rmtree(file) - - # Step 3: Collect POSCAR_1 into POSCAR_n format - try: - output_dir = Path("outputs") - output_dir.mkdir(parents=True, exist_ok=True) - final_dir = output_dir / "poscars_for_optimization" - final_dir.mkdir(parents=True, exist_ok=True) - counter = 0 - for struct_dir in outdir.iterdir(): - poscar_path = struct_dir / "POSCAR_1" - if poscar_path.exists(): - new_name = final_dir / f"POSCAR_{counter}" - shutil.copy(poscar_path, new_name) - counter += 1 - - return{ - "poscar_paths": Path(final_dir), - "message": f"Calypso generated {n_tot} structures with {species} successfully!" - } - except Exception as e: - return{ - "poscar_paths": None, - "message": "Calypso generated POSCAR files collected failed!" - } - - -#======== Tool generate CrystalFormer Structures ========== -class GenerateCryFormerStructureResult(TypedDict): - poscar_paths: Path - message: str - -@mcp.tool() -def generate_crystalformer_thermoele_structures( - space_group: int, - target_props: Optional[List[str]], - target_values: Optional[List[float]], - comparison_ops: Optional[List[str]], - n_tot: int -)->GenerateCryFormerStructureResult: - """ - Generate conditional structures with target properties and space group number. This is property-guided structure generation tool. Different target properties used different property model. - If user did not mention target property, please use band gap as target_prop. If user did not mention the space group please remind the user to - give a value. If user did not mentioned the comparison operator comparison_ops, please remind the user to give a value - - Args: - space_group (int): Target space group number for generated structures. - target_props (Optional[List[str]]): Target properties for generated structures. - - "bandgap": hse functional band gap as target property, - - "sound": sound velocity as target property - If none, please use bandgap as target property directly. - target_values (Optional[List[float]]): Target property values for target properties - comparison_ops (Optional[List[str]]): One per target_prop; each must be one of "greater", "less", "equal", "minimize". If none, please use greater for all target_props. - n_tot (int): Total number of structures generated - Returns: - poscar_paths (Path): Path to generated POSCAR. - message (str): Message about calculation results. - """ - try: - if space_group is None or space_group <= 0: - raise ValueError("Please provide a positive integer for `space_group`.") - - if not target_values or len(target_values) != len(target_props): - raise ValueError("`target_values` must be provided and match the length of `target_props`.") - - valid_ops = ("greater", "less", "equal", "minimize") - if len(comparison_ops) != len(target_props): - raise ValueError("`comparison_ops` length must match `target_props` length.") - for op in comparison_ops: - if op not in valid_ops: - raise ValueError(f"Invalid comparison op '{op}'. Must be one of {valid_ops}.") - - workdir = Path("/opt/agents/mcp_tool") - outputs = workdir/ "target" - - mode = "multi" if len(target_props) > 1 else "single" - - #total space group number to generated - upper=min(space_group, n_tot) - random_spacegroup_num = random.randint(1,upper) - print(f"n_tot = {n_tot}") - print(f"random_spacegroup_num = {random_spacegroup_num}") - cmd = [ - "uv", "run", "python", - "mcp_tool.py", - "--mode", str(mode), - "--cond_model_type", *target_props, - "--target", *map(str, target_values), - "--target_type", *comparison_ops, - "--alpha", "10", - "--spacegroup", str(space_group), - "--random_spacegroup_num", "1", #str(random_spacegroup_num), - "--init_sample_num", str(n_tot), #str(int(n_tot/random_spacegroup_num)), - "--mc_steps", "2000" - ] - - print(f"cmd = {cmd}") - subprocess.run(cmd, cwd=workdir, check=True) - output_path = Path("outputs") - if output_path.exists(): - shutil.rmtree(output_path) - shutil.copytree(outputs, output_path) - return { - "poscar_paths": output_path, - "message": "CrystalFormer structure generation successfully!" - } - except Exception as e: - return { - "poscar_paths": None, - "message": "CrystalFormer Generation failed!" - } +#@mcp.tool() +#def generate_calypso_thermoele_structures( +# species: List[str], +# n_tot: int +# )->GenerateCalypsoStructureResult: +# """ +# Generate n_tot CALYPSO structures using specified species. +# If user did not mention species and total number structures to generate, please remind the user to provide these information. +# +# Args: +# species (List[str]): A list of chemical element symbols (e.g., ["Mg", "O", "Si"]). These elements will be used as building blocks in the CALYPSO structure generation. +# All element symbols must be from the supported element list internally defined in the tool. +# +# n_tot (int): The number of CALYPSO structure configurations to generate. Each structure will be generated in a separate subdirectory (e.g., generated_calypso/0/, generated_calypso/1/, etc.) +# Return: +# GenerateCalypsoStructureResult with keys: +# - poscar_paths (Path): Path to access generated structures POSCAR. All structures are saved in outputs/poscars_for_optimization/ +# - message (str): Message about calculation results information. +# """ +# +# def get_props(s_list): +# """ +# Get atomic number, atomic radius, and atomic volume infomation for interested species +# +# Args: +# s_list: species list needed to get atomic number, atomic radius, and atomic volume infomation +# +# Return: +# z_list (List): atomic number list for given species list, +# r_list (List): atomic radius list for given species list, +# v_list (List): atomic volume list for given species list. +# """ +# +# z_list, r_list, v_list = [], [], [] +# for s in s_list: +# if s not in ELEMENT_PROPS: +# raise ValueError(f"Unsupported element: {s}") +# props = ELEMENT_PROPS[s] +# z_list.append(props["Z"]) +# r_list.append(props["r"]) +# v_list.append(props["v"]) +# return z_list, r_list, v_list +# +# def generate_counts(n): +# return [random.randint(1, 10) for _ in range(n)] +# +# def write_input(path, species, z_list, n_list, r_mat, volume): +# """ +# Write calypso input files for given species combination with atomic number, number of each species, radius matrix and total volume +# +# Args: +# - path (Path): Path to save input file, +# - species (List[str]): Species list +# - z_list (List[int]): atomic number list +# - n_list (List[int]): number of each species list +# - r_mat: radius matrix +# - volume (float): total volume +# """ +# +# # Step 1: reorder all based on atomic number +# sorted_indices = sorted(range(len(z_list)), key=lambda i: z_list[i]) +# species = [species[i] for i in sorted_indices] +# z_list = [z_list[i] for i in sorted_indices] +# n_list = [n_list[i] for i in sorted_indices] +# r_mat = r_mat[np.ix_(sorted_indices, sorted_indices)] # reorder matrix +# +# # Step 2: write input.dat +# with open(path / "input.dat", "w") as f: +# f.write(f"SystemName = {' '.join(species)}\n") +# f.write(f"NumberOfSpecies = {len(species)}\n") +# f.write(f"NameOfAtoms = {' '.join(species)}\n") +# f.write("@DistanceOfIon\n") +# for i in range(len(species)): +# row = " ".join(f"{r_mat[i][j]:.3f}" for j in range(len(species))) +# f.write(row + "\n") +# f.write("@End\n") +# f.write(f"Volume = {volume:.2f}\n") +# f.write(f"AtomicNumber = {' '.join(str(z) for z in z_list)}\n") +# f.write(f"NumberOfAtoms = {' '.join(str(n) for n in n_list)}\n") +# f.write("""Ialgo = 2 +#PsoRatio = 0.5 +#PopSize = 1 +#GenType = 1 +#ICode = 15 +#NumberOfLbest = 4 +#NumberOfLocalOptim = 3 +#Command = sh submit.sh +#MaxTime = 9000 +#MaxStep = 1 +#PickUp = F +#PickStep = 1 +#Parallel = F +#NumberOfParallel = 4 +#Split = T +#PSTRESS = 2000 +#fmax = 0.01 +#FixCell = F +#""") +# +# +# +# #===== Step 1: Generate calypso input files ========== +# outdir = Path("generated_calypso") +# outdir.mkdir(parents=True, exist_ok=True) +# +# +# z_list, r_list, v_list = get_props(species) +# for i in range(n_tot): +# try: +# n_list = generate_counts(len(species)) +# volume = sum(n * v for n, v in zip(n_list, v_list)) +# r_mat = np.add.outer(r_list, r_list) * 0.529 # bohr → Å +# +# struct_dir = outdir / f"{i}" +# if not struct_dir.exists(): +# struct_dir.mkdir(parents=True, exist_ok=True) +# +# #Prepare calypso input.dat +# write_input(struct_dir, species, z_list, n_list, r_mat, volume) +# except Exception as e: +# return{ +# "poscar_paths" : None, +# "message": "Input files generations for calypso failed!" +# } +# +# #Execuate calypso calculation and screening +# flim_ase_path = Path("/opt/agents/thermal_properties/flim_ase/flim_ase.py") +# command = f"/opt/agents/thermal_properties/calypso/calypso.x >> tmp_log && python {flim_ase_path}" +# if not flim_ase_path.exists(): +# return{ +# "poscar_paths": None, +# "message": "flim_ase.py did not found!" +# +# } +# try: +# subprocess.run(command, cwd=struct_dir, shell=True) +# except Exception as e: +# return{ +# "poscar_paths": None, +# "message": "calypso.x execute failed!" +# } +# +# #Clean struct_dir only save input.dat and POSCAR_1 +# for file in struct_dir.iterdir(): +# if file.name not in ["input.dat", "POSCAR_1"]: +# if file.is_file(): +# file.unlink() +# elif file.is_dir(): +# shutil.rmtree(file) +# +# # Step 3: Collect POSCAR_1 into POSCAR_n format +# try: +# output_dir = Path("outputs") +# output_dir.mkdir(parents=True, exist_ok=True) +# final_dir = output_dir / "poscars_for_optimization" +# final_dir.mkdir(parents=True, exist_ok=True) +# counter = 0 +# for struct_dir in outdir.iterdir(): +# poscar_path = struct_dir / "POSCAR_1" +# if poscar_path.exists(): +# new_name = final_dir / f"POSCAR_{counter}" +# shutil.copy(poscar_path, new_name) +# counter += 1 +# +# return{ +# "poscar_paths": Path(final_dir), +# "message": f"Calypso generated {n_tot} structures with {species} successfully!" +# } +# except Exception as e: +# return{ +# "poscar_paths": None, +# "message": "Calypso generated POSCAR files collected failed!" +# } + + +##======== Tool generate CrystalFormer Structures ========== +#class GenerateCryFormerStructureResult(TypedDict): +# poscar_paths: Path +# message: str +# +#@mcp.tool() +#def generate_crystalformer_thermoele_structures( +# space_group: int, +# target_props: Optional[List[str]], +# target_values: Optional[List[float]], +# comparison_ops: Optional[List[str]], +# n_tot: int +#)->GenerateCryFormerStructureResult: +# """ +# Generate conditional structures with target properties and space group number. Different target properties used different property model. +# If user did not mention target property, please use band gap as target_prop. If user did not mention the space group please remind the user to +# give a value. If user did not mentioned the comparison operator comparison_ops, please remind the user to give a value +# +# Args: +# space_group (int): Target space group number for generated structures. +# target_props (Optional[List[str]]): Target properties for generated structures. +# - "bandgap": hse functional band gap as target property, +# - "sound": sound velocity as target property +# If none, please use bandgap as target property directly. +# target_values (Optional[List[float]]): Target property values for target properties +# comparison_ops (Optional[List[str]]): One per target_prop; each must be one of "greater", "less", "equal", "minimize". If none, please use greater for all target_props. +# n_tot (int): Total number of structures generated +# Returns: +# poscar_paths (Path): Path to generated POSCAR. +# message (str): Message about calculation results. +# """ +# try: +# if space_group is None or space_group <= 0: +# raise ValueError("Please provide a positive integer for `space_group`.") +# +# if not target_values or len(target_values) != len(target_props): +# raise ValueError("`target_values` must be provided and match the length of `target_props`.") +# +# valid_ops = ("greater", "less", "equal", "minimize") +# if len(comparison_ops) != len(target_props): +# raise ValueError("`comparison_ops` length must match `target_props` length.") +# for op in comparison_ops: +# if op not in valid_ops: +# raise ValueError(f"Invalid comparison op '{op}'. Must be one of {valid_ops}.") +# +# workdir = Path("/opt/agents/mcp_tool") +# outputs = workdir/ "target" +# +# mode = "multi" if len(target_props) > 1 else "single" +# +# #total space group number to generated +# upper=min(space_group, n_tot) +# random_spacegroup_num = random.randint(1,upper) +# print(f"n_tot = {n_tot}") +# print(f"random_spacegroup_num = {random_spacegroup_num}") +# cmd = [ +# "uv", "run", "python", +# "mcp_tool.py", +# "--mode", str(mode), +# "--cond_model_type", *target_props, +# "--target", *map(str, target_values), +# "--target_type", *comparison_ops, +# "--alpha", "10", +# "--spacegroup", str(space_group), +# "--random_spacegroup_num", "1", #str(random_spacegroup_num), +# "--init_sample_num", str(n_tot), #str(int(n_tot/random_spacegroup_num)), +# "--mc_steps", "2000" +# ] +# +# print(f"cmd = {cmd}") +# subprocess.run(cmd, cwd=workdir, check=True) +# output_path = Path("outputs") +# if output_path.exists(): +# shutil.rmtree(output_path) +# shutil.copytree(outputs, output_path) +# return { +# "poscar_paths": output_path, +# "message": "CrystalFormer structure generation successfully!" +# } +# except Exception as e: +# return { +# "poscar_paths": None, +# "message": "CrystalFormer Generation failed!" +# } #====== Tool to do geometry optimization========= class RunOptimizationResult(TypedDict): @@ -590,7 +596,38 @@ def run_pressure_optimization( opt_py = Path("/opt/agents/thermal_properties/geo_opt/opt_multi.py") try: - poscar_files = list(structure_path.rglob("POSCAR*")) + #delete previous calculations results first + optimized_dir = Path("optimized_poscar") + if optimized_dir.exists(): + if optimized_dir.is_dir() and not optimized_dir.is_symlink(): + shutil.rmtree(optimized_dir) + else: + optimized_dir.unlink() + deep_md = Path("deepmd_npy") + if deep_md.exists(): + if deep_md.is_dir() and not deep_md.is_symlink(): + shutil.rmtree(deep_md) # delete the whole folder + else: + deep_md.unlink() + traj_path = Path("traj") + if traj_path.exists(): + if traj_path.is_dir() and not traj_path.is_symlink(): + shutil.rmtree(traj_path) # delete the whole folder + else: + traj_path.unlink() + + + base = Path(structure_path) + structure_path = base.parent if base.is_file() else base + poscar_files = list(p for pat in ("POSCAR*", "*.cif", "*.CIF") + for p in structure_path.rglob(pat)) + #poscar_files = list(structure_path.rglob("POSCAR*")) + if not poscar_files: + print(f"No POSCAR files found under: {structure_path}. Exit.") + return{ + "optimized_poscar_paths": None, + "message": f"No POSCAR files found under: {structure_path}" + } # Build command: use the actual path to opt_py, not the literal string "opt_py" cmd = [ "python", @@ -624,7 +661,6 @@ def run_pressure_optimization( sys = dpdata.System(frame,'deepmd/npy') multisys.append(sys) - optimized_dir = Path("optimized_poscar") optimized_dir.mkdir(parents=True, exist_ok=True) # Create the directory if it doesn't exist count=0 @@ -795,7 +831,8 @@ def calculate_thermoele_enthalpy( shutil.copy(src, enthalpy_dir) src = Path("/opt/agents/thermal_properties/geo_opt/results/e_above_hull_50meV.csv") - shutil.copy(src, enthalpy_dir) + dest = enthalpy_dir / f"e_above_hull.csv" + shutil.copy(src, dest) except Exception as e: return{ @@ -809,8 +846,9 @@ def calculate_thermoele_enthalpy( on_hull_optimized_structures = enthalpy_dir / "e_above_hull_structures" on_hull_optimized_structures.mkdir(parents=True, exist_ok=True) - e_above_hull_file = enthalpy_dir / "e_above_hull_50meV.csv" - e_above_hull_output = enthalpy_dir / "e_above_hull.csv" + e_above_hull_file = enthalpy_dir / f"e_above_hull.csv" + threshold_meV = int(threshold*1000) + e_above_hull_output = enthalpy_dir / f"e_above_hull_{threshold_meV}meV.csv" with e_above_hull_file.open("r") as f, e_above_hull_output.open("w") as fout: # write header for new CSV @@ -890,15 +928,15 @@ class ScreenThermoelectricCandidateResults(TypedDict): @mcp.tool() def screen_thermoelectric_candidate( structure_path: Path, - above_hull_file: Path + pressure: float )->ScreenThermoelectricCandidateResults: """ - Screen promising thermoelectric materials based on band gap, sound speed and space group number requirements. + Screen promising thermoelectric materials based on band gap, sound speed and space group number requirements under given pressure. If user did not define pressure please remind user to give a value Args: structure_file (Path): Path to structure files - above_hull_file (Path): Path to above_hull.csv file which about about hull energy information + pressure (float): working pressure in GPa Return: ScreenThermoelectricCandidateResults with keys: @@ -984,25 +1022,37 @@ def get_space_group_number(structure): return space_group_number - # --- 0) load above-hull energies --- - above_hull_map: dict[str, float] = {} - if above_hull_file.is_file(): - with above_hull_file.open("r") as fh: - reader = csv.DictReader(fh) - for row in reader: - # normalize to basename so lookups by Path(...).name match - key = Path(row["structure"]).name # ← normalize - above_hull_map[key] = float(row["energy"]) - #Predict bandgap try: structure_path = Path(structure_path) + + ##predict enthalpy to pick up above hull structures within threshold + threshold = 0.05 + results = calculate_thermoele_enthalpy(structure_path, threshold, pressure) + + structure_path = None + structure_path = results["e_above_hull_structures"] + above_hull_file = results["e_above_hull_values"] + + #e_above_hull_structures = list(e_above_hull_structure_path.rglob("POSCAR*")) + #for e_above_hull_structure in e_above_hull_structures: + # print(f"check e_above_hull_structures = {e_above_hull_structures}") + if not structure_path.exists(): return {"thermoelectric_file": {}, "message": f"Structure path not found: {structure_path}"} - results = predict_thermoelectric_properties(structure_path, ["band_gap", "G", "K"]) - structures_properties = results["properties"] + thermoelectric_results = predict_thermoelectric_properties(structure_path, ["band_gap", "G", "K"]) + structures_properties = thermoelectric_results["properties"] + + above_hull_map: dict[str, float] = {} + if above_hull_file.is_file(): + with above_hull_file.open("r") as fh: + reader = csv.DictReader(fh) + for row in reader: + # normalize to basename so lookups by Path(...).name match + key = Path(row["structure"]).name + above_hull_map[key] = float(row["energy"]) thermoelectric_candidates: ThermoelectricCandidatesData = {} @@ -1077,6 +1127,14 @@ def get_space_group_number(structure): output_dir = Path("outputs") output_dir.mkdir(parents=True, exist_ok=True) results_file = output_dir / "thermoelectric_material_candidates.csv" + + if not sorted_candidates: + with results_file.open("w") as f: + f.write("No promising candidates found.\n") + return { + "thermoelectric_file": str(results_file), + "message": "No promising candidates found." + } # Collect all field names from the first thermo_props dict sample_props = next(iter(sorted_candidates.values())) @@ -1149,6 +1207,6 @@ def get_space_group_number(structure): # ====== Run Server ====== if __name__ == "__main__": - transport_type = os.getenv('MCP_TRANSPORT', 'sse') - mcp.run(transport=transport_type) + logging.info("Starting ThermoelectricMaterialsServer on port 50001...") + mcp.run(transport="sse") From 24022e190da7722dfa7fee667aa6dc6180af0b52 Mon Sep 17 00:00:00 2001 From: root Date: Mon, 12 Jan 2026 11:37:07 +0800 Subject: [PATCH 3/5] Add FinetuneDPA server --- servers/finetune_dpa/README.md | 17 + servers/finetune_dpa/metadata.json | 6 + servers/finetune_dpa/pyproject.toml | 16 + servers/finetune_dpa/server.py | 1104 +++++++++++++++++++++++++++ servers/finetune_dpa/uv.lock | 8 + 5 files changed, 1151 insertions(+) create mode 100644 servers/finetune_dpa/README.md create mode 100644 servers/finetune_dpa/metadata.json create mode 100644 servers/finetune_dpa/pyproject.toml create mode 100644 servers/finetune_dpa/server.py create mode 100644 servers/finetune_dpa/uv.lock diff --git a/servers/finetune_dpa/README.md b/servers/finetune_dpa/README.md new file mode 100644 index 0000000..e10cb07 --- /dev/null +++ b/servers/finetune_dpa/README.md @@ -0,0 +1,17 @@ +# FinetuneDPAMCP Server + +A tool to finetune dpa model + +##Overview + +This MCP server could fine tune dpa2 and dpa3 pretrained model + +## How to cite? + +```bibtex +@software{FinetuneDPAMCP, + title = {FinetuneDPAMCP}, + author = {AI4Scient}, + year = {2025} +} +``` diff --git a/servers/finetune_dpa/metadata.json b/servers/finetune_dpa/metadata.json new file mode 100644 index 0000000..4a215cb --- /dev/null +++ b/servers/finetune_dpa/metadata.json @@ -0,0 +1,6 @@ +{ +"name": "SuperconductorServer", +"description": "Superconductor critical temperature prediction", +"author": "@liuyuxiang92", +"category": "materials" +} diff --git a/servers/finetune_dpa/pyproject.toml b/servers/finetune_dpa/pyproject.toml new file mode 100644 index 0000000..a3ac3da --- /dev/null +++ b/servers/finetune_dpa/pyproject.toml @@ -0,0 +1,16 @@ +[project] +name = "superconductor-mcp-server" +version = "0.1.0" +description = "Superconductor critical temperature prediction" +readme = "README.md" +requires-python = ">=3.12" +dependencies = [ + "dpdata", + "numpy", + "ase", + "pymatgen", + "deepmd-kit", + "pandas", + "mcp", + "fastmcp" +] diff --git a/servers/finetune_dpa/server.py b/servers/finetune_dpa/server.py new file mode 100644 index 0000000..05fd59d --- /dev/null +++ b/servers/finetune_dpa/server.py @@ -0,0 +1,1104 @@ +from typing import Optional, List, Union, Dict +from pathlib import Path +import logging +import os +import glob +import subprocess +import numpy as np +import dpdata +from dp.agent.server import CalculationMCPServer +from typing_extensions import TypedDict +import csv + +import random +import shutil + +from pymatgen.core import Composition + +# Setup logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s" +) + +# Initialize MCP server +mcp = CalculationMCPServer( + "DPAFinetune", + host="0.0.0.0", + port=50003 +) + + +#=========Function to revise dpa3 input.json======= +import json +from typing import Any, Dict, Mapping, Optional, Tuple, Literal + +# ------------- helpers ------------- + +def _set(d: Dict[str, Any], path: Tuple[str, ...], value: Any) -> None: + cur = d + for k in path[:-1]: + if k not in cur or not isinstance(cur[k], dict): + cur[k] = {} + cur = cur[k] + cur[path[-1]] = value + +def _get_section(d: Dict[str, Any], path: Tuple[str, ...]) -> Dict[str, Any]: + cur = d + for k in path: + if k not in cur or not isinstance(cur[k], dict): + cur[k] = {} + cur = cur[k] + return cur + +def _merge_whitelisted( + data: Dict[str, Any], + base_path: Tuple[str, ...], + incoming: Optional[Mapping[str, Any]], + allowed_keys: Optional[set] = None, +) -> None: + if not incoming: + return + section = _get_section(data, base_path) + if allowed_keys is None: + # merge all keys + for k, v in incoming.items(): + section[k] = v + else: + for k, v in incoming.items(): + if k in allowed_keys: + section[k] = v + +# ------------- allowed key sets ------------- + +DPA3_REPFLOW_KEYS = { + "n_dim","e_dim","a_dim","nlayers", + "e_rcut","e_rcut_smth","e_sel", + "a_rcut","a_rcut_smth","a_sel", + "axis_neuron","fix_stat_std", + "a_compress_rate","a_compress_e_rate","a_compress_use_split", + "update_angle","smooth_edge_update","use_dynamic_sel", + "sel_reduce_factor","use_exp_switch", + "update_style","update_residual","update_residual_init", +} + +DPA3_TOP_KEYS = {"activation_function","use_tebd_bias","precision","concat_output_tebd"} + +DPA3_FITTING_NET_KEYS = {"neuron","dim_case_embd","resnet_dt","precision","activation_function","seed"} + +DPA2_REPINIT_KEYS = { + "tebd_dim","rcut","rcut_smth","nsel","neuron", + "axis_neuron","three_body_neuron","activation_function", + "three_body_sel","three_body_rcut","three_body_rcut_smth","use_three_body", +} + +DPA2_REPFORMER_KEYS = { + "rcut","rcut_smth","nsel","nlayers", + "g1_dim","g2_dim","attn2_hidden","attn2_nhead","attn1_hidden","attn1_nhead", + "axis_neuron", + "update_h2","update_g1_has_conv","update_g1_has_grrg","update_g1_has_drrd","update_g1_has_attn", + "update_g2_has_g1g1","update_g2_has_attn", + "update_style","update_residual","update_residual_init", + "attn2_has_gate","use_sqrt_nnei","g1_out_conv","g1_out_mlp", + "activation_function", +} + +DPA2_FITTING_NET_KEYS = {"neuron","activation_function","resnet_dt","precision","dim_case_embd","seed","_comment"} + +DPA2_TOP_KEYS = {"use_tebd_bias","precision","add_tebd_to_repinit_out"} + +# ------------- main ------------- + +def update_dpa_train_json( + template_path: str, + version: Literal["dpa2","dpa3"], + # global switch + default_model: bool = True, + # -------- common knobs (always allowed) -------- + lr_type: Optional[str] = None, + decay_steps: Optional[int] = None, + start_lr: Optional[float] = None, + stop_lr: Optional[float] = None, + loss_type: Optional[str] = None, + start_pref_e: Optional[float] = None, + limit_pref_e: Optional[float] = None, + start_pref_f: Optional[float] = None, + limit_pref_f: Optional[float] = None, + start_pref_v: Optional[float] = None, + limit_pref_v: Optional[float] = None, + numb_steps: Optional[int] = None, + warmup_steps: Optional[int] = None, + # -------- DPA-3 extras (used only when default_model=False) -------- + dpa3_repflow: Optional[Mapping[str, Any]] = None, + dpa3_top: Optional[Mapping[str, Any]] = None, # keys in DPA3_TOP_KEYS + dpa3_fitting_net: Optional[Mapping[str, Any]] = None, # keys in DPA3_FITTING_NET_KEYS + # -------- DPA-2 extras (used only when default_model=False) -------- + dpa2_repinit: Optional[Mapping[str, Any]] = None, + dpa2_repformer: Optional[Mapping[str, Any]] = None, + dpa2_fitting_net: Optional[Mapping[str, Any]] = None, + dpa2_top: Optional[Mapping[str, Any]] = None, # keys in DPA2_TOP_KEYS + # output + output_path: Optional[str] = None, +) -> Dict[str, Any]: + """ + Loads input JSON (DPA-2 or DPA-3), applies updates, writes output, returns dict. + + Behavior: + - default_model=True -> only LR/Loss/Schedule will be updated. + - default_model=False -> also allow the exact section/keys per your spec: + DPA-3: model.descriptor.repflow, model.{activation_function,use_tebd_bias,precision,concat_output_tebd}, + model.fitting_net (selected keys) + DPA-2: model.descriptor.repinit, model.descriptor.repformer, + model.fitting_net (selected keys), top {use_tebd_bias,precision,add_tebd_to_repinit_out} + """ + with open(template_path, "r", encoding="utf-8") as f: + data = json.load(f) + + # ---- always-allowed (both versions) ---- + if lr_type is not None: _set(data, ("learning_rate","type"), lr_type) + if decay_steps is not None: _set(data, ("learning_rate","decay_steps"), decay_steps) + if start_lr is not None: _set(data, ("learning_rate","start_lr"), start_lr) + if stop_lr is not None: _set(data, ("learning_rate","stop_lr"), stop_lr) + + if loss_type is not None: _set(data, ("loss","type"), loss_type) + if start_pref_e is not None: _set(data, ("loss","start_pref_e"), start_pref_e) + if limit_pref_e is not None: _set(data, ("loss","limit_pref_e"), limit_pref_e) + if start_pref_f is not None: _set(data, ("loss","start_pref_f"), start_pref_f) + if limit_pref_f is not None: _set(data, ("loss","limit_pref_f"), limit_pref_f) + if start_pref_v is not None: _set(data, ("loss","start_pref_v"), start_pref_v) + if limit_pref_v is not None: _set(data, ("loss","limit_pref_v"), limit_pref_v) + + if numb_steps is not None: _set(data, ("training","numb_steps"), numb_steps) + if warmup_steps is not None: _set(data, ("training","warmup_steps"), warmup_steps) + + # ---- extended edits only when default_model=False ---- + if not default_model: + if version == "dpa3": + # repflow + _merge_whitelisted( + data, ("model","descriptor","repflow"), dpa3_repflow, DPA3_REPFLOW_KEYS + ) + # top-level model extras + if dpa3_top: + for k, v in dpa3_top.items(): + if k in DPA3_TOP_KEYS: + _set(data, ("model", k), v) + # fitting_net + _merge_whitelisted( + data, ("model","fitting_net"), dpa3_fitting_net, DPA3_FITTING_NET_KEYS + ) + + elif version == "dpa2": + _merge_whitelisted( + data, ("model","descriptor","repinit"), dpa2_repinit, DPA2_REPINIT_KEYS + ) + _merge_whitelisted( + data, ("model","descriptor","repformer"), dpa2_repformer, DPA2_REPFORMER_KEYS + ) + _merge_whitelisted( + data, ("model","fitting_net"), dpa2_fitting_net, DPA2_FITTING_NET_KEYS + ) + if dpa2_top: + for k, v in dpa2_top.items(): + if k in DPA2_TOP_KEYS: + _set(data, (k,), v) + else: + raise ValueError("version must be 'dpa2' or 'dpa3'") + + # ---- write out ---- + if output_path is None: + output_path = "train_dpa3.json" if version == "dpa3" else "train_dpa2.json" + with open(output_path, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2, ensure_ascii=False) + return data + +def split_train_valid( + input_path: str, + valid_ratio: float = 0.10 + ): + """ + Split dpdata into ./data_train and ./data_valid with given valid_ratio for further model fine tune operations. + Args: + input_path (str): Path to the whol dpdata (can be a directory or a compressed archive) + valid_ratio (float): validation data set ratio, and the default setting is 0.10 + """ + import os, shutil, tempfile, random + import dpdata + + def _is_archive(p: str) -> bool: + pl = p.lower() + return pl.endswith((".zip", ".tar", ".tar.gz", ".tgz", ".tar.bz2", ".tbz2", ".tar.xz", ".txz")) + + print(f"input_path={input_path}") + + # load MultiSystems (auto-extract if needed) + if _is_archive(input_path): + print(f"_is_archive(input_path)= {_is_archive(input_path)}") + with tempfile.TemporaryDirectory(prefix="dpdata_extract_") as tmpd: + shutil.unpack_archive(input_path, tmpd) + # if a single top-level directory exists after extraction, use it + entries = [os.path.join(tmpd, e) for e in os.listdir(tmpd) if not e.startswith("__MACOSX")] + if len(entries) == 1 and os.path.isdir(entries[0]): + root = entries[0] + else: + root = tmpd + ms = dpdata.MultiSystems() + p = Path(root) + has_real = next(p.rglob("real_atom_types.npy"), None) is not None + fmt = "deepmd/npy/mixed" if has_real else "deepmd/npy" + if has_real: + ms = dpdata.MultiSystems().load_systems_from_file(root,fmt=fmt) + else: + parent_dirs = sorted({p.parent.resolve() for p in Path(root).rglob("type.raw")}) + for d in parent_dirs: + try: + sys = dpdata.LabeledSystem(str(d), fmt="deepmd/npy") + except Exception as e: + print(f"[WARN] Skipping {d}: {e}") + continue + ms.append(sys) + + + + # collect all systems + systems = [] + count = 0 + for key, sys_list in ms.systems.items(): + for s in sys_list: + systems.append((key, s)) + count += 1 + + print(count) + + # random select train and validation set based on given ratio + random.seed(123) + random.shuffle(systems) + split_idx = int(valid_ratio * len(systems)) + valid_set = systems[:split_idx] + train_set = systems[split_idx:] + + ms_train = dpdata.MultiSystems() + ms_valid = dpdata.MultiSystems() + + for k, s in train_set: + ms_train.append(s) # preserve label + + for k, s in valid_set: + ms_valid.append(s) # preserve label + + ms_valid.to_deepmd_npy_mixed("./data_valid") + ms_train.to_deepmd_npy_mixed("./data_train") + return # important: exit before tmp dir is cleaned up + else: + # non-archive path + ms = dpdata.MultiSystems() + p = Path(input_path) + has_real = next(p.rglob("real_atom_types.npy"), None) is not None + fmt = "deepmd/npy/mixed" if has_real else "deepmd/npy" + ms = dpdata.MultiSystems().load_systems_from_file(input_path,fmt=fmt) + + # collect all systems + systems = [] + count = 0 + for key, sys_list in ms.systems.items(): + for s in sys_list: + systems.append((key, s)) + count += 1 + print(count) + + # random select train and validation set based on given ratio + random.seed(123) + random.shuffle(systems) + split_idx = int(valid_ratio * len(systems)) + valid_set = systems[:split_idx] + train_set = systems[split_idx:] + + ms_train = dpdata.MultiSystems() + ms_valid = dpdata.MultiSystems() + + for k, s in train_set: + ms_train.append(s, k) # preserve label + + for k, s in valid_set: + ms_valid.append(s, k) # preserve label + + ms_valid.to_deepmd_npy_mixed("./data_valid") + ms_train.to_deepmd_npy_mixed("./data_train") + +import logging +import re +from enum import Enum +from typing import Any + + +class StrEnum(str, Enum): + """Base enum with case-insensitive matching.""" + + @classmethod + def _missing_(cls, value): + if isinstance(value, str): + v = value.strip().lower() + for item in cls: + if item.value.lower() == v: + return item + return None + + +class ModelType(StrEnum): + DPA2 = "dpa2" + DPA3 = "dpa3" + + +class LRType(StrEnum): + EXP = "exp" + + # Must be a dict[str, str] + _ALIASES = { + "exponential": "exp", + "expdecay": "exp", + "exponential_decay": "exp", + "cosine": "exp", # optional: treat cosine as exp fallback + } + + @classmethod + def _missing_(cls, value): + if isinstance(value, str): + v = value.strip().lower() + + aliases = getattr(cls, "_ALIASES", None) + # Robust guard: if aliases is not a dict, ignore it + if isinstance(aliases, dict) and v in aliases: + mapped = aliases[v] + try: + return cls(mapped) + except ValueError: + return None + + return super()._missing_(value) + + +class LossType(StrEnum): + ENER = "ener" + ENER_SPIN = "ener_spin" + DOS = "dos" + PROPERTY = "property" + TENSOR = "tensor" + + +class PrecisionType(StrEnum): + FLOAT32 = "float32" + + +class ActivationType(StrEnum): + TANH = "tanh" + SILUT_3 = "silut:3.0" + + +class UpdateStyleType(StrEnum): + RES_RESIDUAL = "res_residual" + + +class ResidualInitType(StrEnum): + CONST = "const" + NORM = "norm" + + +def coerce_enum( + enum_cls: type[StrEnum], + raw_value: Any, + default: StrEnum, + param_name: str, +) -> StrEnum: + if raw_value is None or raw_value == "": + return default + + if isinstance(raw_value, enum_cls): + return raw_value + + if isinstance(raw_value, str): + try: + v = enum_cls(raw_value) + if v is not None: + return v + except ValueError: + pass + + logging.warning( + "Invalid value for %s=%r, falling back to default %r", + param_name, + raw_value, + default.value, + ) + return default + + +def coerce_float(raw_value: Any, default: float, param_name: str) -> float: + if raw_value is None: + return default + + if isinstance(raw_value, (int, float)): + return float(raw_value) + + if isinstance(raw_value, str): + try: + return float(raw_value.strip()) + except ValueError: + pass + + logging.warning( + "Invalid float for %s=%r, falling back to default %r", + param_name, + raw_value, + default, + ) + return default + + +def coerce_int(raw_value: Any, default: int, param_name: str) -> int: + """ + Enhanced int parser for LLM/agent-produced values. + Examples that will all parse to 3000: + "3000", " 3000 ", "3,000", "3000 steps", + "train 3000 steps", "numb_steps = 3000" + """ + if raw_value is None: + return default + + if isinstance(raw_value, int): + return raw_value + + if isinstance(raw_value, float): + return int(round(raw_value)) + + if isinstance(raw_value, str): + cleaned = raw_value.replace(",", "") + m = re.search(r"[-+]?\d+", cleaned) + if m: + try: + return int(m.group(0)) + except ValueError: + pass + + logging.warning( + "Invalid int for %s=%r, falling back to default %r", + param_name, + raw_value, + default, + ) + return default +def coerce_positive_float(raw_value: Any, default: float, param_name: str, eps: float = 1e-12) -> float: + """ + Like coerce_float(), but additionally enforces value > 0 (strictly). + If value is <= eps, fall back to default. + """ + v = coerce_float(raw_value, default, param_name) + if v <= eps: + logging.warning( + "Invalid (non-positive) value for %s=%r, falling back to default %r", + param_name, + raw_value, + default, + ) + return default + return v + +def is_user_provided_model_path(p: Optional[Path]) -> bool: + """ + Treat None / "" / "none" / "null" as NOT provided. + Only return True when user explicitly provides a real path/uri. + """ + if p is None: + return False + s = str(p).strip() + if s == "" or s.lower() in {"none", "null"}: + return False + return True + +import inspect + +def reset_params_to_signature_defaults(func, local_vars: dict, prefixes: tuple[str, ...]) -> None: + """ + For any parameter whose name starts with one of `prefixes`, + reset it to the default value in function signature. + """ + sig = inspect.signature(func) + for name, p in sig.parameters.items(): + if any(name.startswith(pref) for pref in prefixes): + if p.default is not inspect._empty: + local_vars[name] = p.default + +class Finetuned_model(TypedDict): + results: Path + message: str + +@mcp.tool() +def finetune_dpa_model( + input_path: Path, + model_type: str = "dpa3", + model_path: Optional[Path] = None, + valid_ratio: float = 0.1, + # learning rate + lr_type: str = "exp", + decay_steps: int = 5000, + start_lr: float = 0.001, + stop_lr: float = 3e-5, + # loss + loss_type: str = "ener", + start_pref_e: float = 0.2, + limit_pref_e: float = 20.0, + start_pref_f: float = 100.0, + limit_pref_f: float = 60.0, + start_pref_v: float = 0.02, + limit_pref_v: float = 1.0, + # training schedule + numb_steps: int = 3000, + warmup_steps: int = 2000, + # ---------- dpa3: repflow ---------- + dpa3_repflow_n_dim: int = 128, + dpa3_repflow_e_dim: int = 64, + dpa3_repflow_a_dim: int = 32, + dpa3_repflow_nlayers: int = 16, + dpa3_repflow_e_rcut: float = 6.0, + dpa3_repflow_e_rcut_smth: float = 3.5, + dpa3_repflow_e_sel: int = 1200, + dpa3_repflow_a_rcut: float = 4.0, + dpa3_repflow_a_rcut_smth: float = 3.5, + dpa3_repflow_a_sel: int = 300, + dpa3_repflow_axis_neuron: int = 4, + dpa3_repflow_fix_stat_std: float = 0.3, + dpa3_repflow_a_compress_rate: int = 1, + dpa3_repflow_a_compress_e_rate: int = 2, + dpa3_repflow_a_compress_use_split: bool = True, + dpa3_repflow_update_angle: bool = True, + dpa3_repflow_smooth_edge_update: bool = True, + dpa3_repflow_use_dynamic_sel: bool = True, + dpa3_repflow_sel_reduce_factor: float = 10.0, + dpa3_repflow_use_exp_switch: bool = True, + dpa3_repflow_update_style: str = "res_residual", + dpa3_repflow_update_residual: float = 0.1, + dpa3_repflow_update_residual_init: str = "const", + # ---------- dpa3: top ---------- + dpa3_top_activation_function: str = "silut:3.0", + dpa3_top_use_tebd_bias: bool = False, + dpa3_top_precision: str = "float32", + dpa3_top_concat_output_tebd: bool = False, + # ---------- dpa3: fitting_net ---------- + dpa3_fitting_net_neuron: Optional[List[int]] = None, # e.g. [240, 240, 240] + dpa3_fitting_net_dim_case_embd: int = 31, + dpa3_fitting_net_resnet_dt: bool = True, + dpa3_fitting_net_precision: str = "float32", + dpa3_fitting_net_activation_function: str = "tanh", + dpa3_fitting_net_seed: int = 1, + # ---------- dpa2: repinit ---------- + dpa2_repinit_tebd_dim: int = 80, + dpa2_repinit_rcut: float = 4.0, + dpa2_repinit_rcut_smth: float = 3.5, + dpa2_repinit_nsel: int = 40, + dpa2_repinit_neuron: int = 32, + dpa2_repinit_axis_neuron: int = 4, + dpa2_repinit_three_body_neuron: int = 32, + dpa2_repinit_activation_function: str = "tanh", + dpa2_repinit_three_body_sel: int = 40, + dpa2_repinit_three_body_rcut: float = 4.0, + dpa2_repinit_three_body_rcut_smt: float = 3.5, + dpa2_repinit_use_three_body: bool = True, + # ---------- dpa2: repformer ---------- + dpa2_repformer_rcut: float = 4.0, + dpa2_repformer_rcut_smth: float = 3.5, + dpa2_repformer_nsel: int = 40, + dpa2_repformer_nlayers: int = 6, + dpa2_repformer_g1_dim: int = 384, + dpa2_repformer_g2_dim: int = 96, + dpa2_repformer_attn2_hidden: int = 24, + dpa2_repformer_attn2_nhead: int = 4, + dpa2_repformer_attn1_hidden: int = 128, + dpa2_repformer_attn1_nhead: int = 4, + dpa2_repformer_axis_neuron: int = 4, + dpa2_repformer_update_h2: bool = False, + dpa2_repformer_update_g1_has_conv: bool = True, + dpa2_repformer_update_g1_has_grrg: bool = True, + dpa2_repformer_update_g1_has_drrd: bool = True, + dpa2_repformer_update_g1_has_attn: bool = False, + dpa2_repformer_update_g2_has_g1g1: bool = False, + dpa2_repformer_update_g2_has_attn: bool = True, + dpa2_repformer_update_style: str = "res_residual", + dpa2_repformer_update_residual: float = 0.01, + dpa2_repformer_update_residual_init: str = "norm", + dpa2_repformer_attn2_has_gate: bool = True, + dpa2_repformer_use_sqrt_nnei: bool = True, + dpa2_repformer_g1_out_conv: bool = True, + dpa2_repformer_g1_out_mlp: bool = True, + dpa2_repformer_activation_function: str = "tanh", + # ---------- dpa2: fitting_net ---------- + dpa2_fitting_net_neuron: Optional[List[int]] = None, # e.g. [240, 240, 240] + dpa2_fitting_net_activation_function: str = "tanh", + dpa2_fitting_net_resnet_dt: bool = True, + dpa2_fitting_net_precision: str = "float32", + dpa2_fitting_net_dim_case_embd: int = 37, + dpa2_fitting_net_seed: int = 1, + # ---------- dpa2: top ---------- + dpa2_top_use_tebd_bias: bool = False, + dpa2_top_precision: str = "float32", + dpa2_top_add_tebd_to_repinit_out: bool = False, + # ---- NEW: add at the very end ---- + user_text: Optional[str] = None, + training_steps: Optional[str] = None, + total_steps: Optional[str] = None, + steps: Optional[str] = None, +) -> Finetuned_model: + + """ + Finetune the DPA3 model based on user requirment. If use do not provide model, please just use our dpa2 and dpa3 default model to be fined tuned. + Args: + --input_path (Path): the path to fine tune model data + --valid_ratio (float): the ratio to split data into train and validation sets + --model_type (str): the version of DPA model, could be dpa2 or dpa3. + --model_path (Path): the path to model which need to be finetuned further. If user did not provide model, just use dpa2 or dpa3 default model + --lr_type (str): The type of the learning rate + --decay_steps (int): The learning rate is decaying every this number of training steps. + --start_lr float: The learning rate at the start of the training + --stop_lr (float):The desired learning rate at the end of the training. + --loss_type (str): The type of the loss. possible choices: ener, ener_spin, dos, property, tensor + --start_pref_e (float):The prefactor of energy loss at the start of the training. + --limit_pref_e (float): The prefactor of energy loss at the limit of the training. Should be larger than or equal to 0. + --start_pref_f (float): The prefactor of force loss at the start of the training. Should be larger than or equal to 0 + --limit_pref_f (float): The prefactor of force loss at the limit of the training. Should be larger than or equal to 0. + --start_pref_v (float): The prefactor of virial loss at the start of the training. Should be larger than or equal to 0. + --limit_pref_v (float): The prefactor of virial loss at the limit of the training. Should be larger than or equal to 0. + --numb_steps (int): Number of training batch. + --warmup_steps (int): The number of steps for learning rate warmup. + --dpa3_repflow (Mapping[str,Any]): For dpa3 model type, if user provide with model, it could be defined by user about repflow part + --dpa3_top (Mapping[str,Any]): For dpa3 model type, if user provide with model, it could be defined by user about top part + --dpa3_fitting_net (Mapping[str,Any]): For dpa3 model type, if user provide with model, it could be defined by user about fitting_net part + --dpa2_repinit (Mapping[str,Any]): For dpa2 model type, if user provide with model, it could be defined by user about repinit part + --dpa2_repformer (Mapping[str,Any]): For dpa2 model type, if user provide with model, it could be defined by user about repformer part + --dpa2_top (Mapping[str,Any]): For dpa2 model type, if user provide with model, it could be defined by user about top part + --dpa2_fitting_net (Mapping[str,Any]): For dpa2 model type, if user provide with model, it could be defined by user about fitting_net part + Return: + Finetuned_model with keys: + --results (Path): Path to finetuned model + --message: Message about operation results + + """ + import re + from typing import Optional + + def extract_training_steps(user_text: str) -> Optional[int]: + if not user_text: + return None + text = user_text.replace(",", "") + + # High-confidence patterns (prefer these) + patterns = [ + r"(?:training\s*)?(?:total\s*)?steps\s*[:=]?\s*(\d+)", + r"(?:num(?:b)?_steps|max_steps|steps)\s*[:=]?\s*(\d+)", + r"(?:训练总步数|总步数|训练步数|训练步)\s*[:=:]?\s*(\d+)", + r"(?:跑|训练)\s*(\d+)\s*(?:步|steps?)", + ] + for pat in patterns: + m = re.search(pat, text, flags=re.IGNORECASE) + if m: + return int(m.group(1)) + + return None + + # ====== Normalize and debug-log all important hyper-parameters ====== + + # Remember raw values for debugging + raw_model_type = model_type + raw_lr_type = lr_type + raw_loss_type = loss_type + raw_numb_steps = numb_steps + raw_warmup_steps = warmup_steps + + model_type = coerce_enum( + ModelType, model_type, ModelType.DPA3, "model_type" + ).value + + lr_type = coerce_enum( + LRType, lr_type, LRType.EXP, "lr_type" + ).value + + loss_type = coerce_enum( + LossType, loss_type, LossType.ENER, "loss_type" + ).value + + # dpa3 repflow + dpa3_repflow_update_style = coerce_enum( + UpdateStyleType, + dpa3_repflow_update_style, + UpdateStyleType.RES_RESIDUAL, + "dpa3_repflow_update_style", + ).value + + dpa3_repflow_update_residual_init = coerce_enum( + ResidualInitType, + dpa3_repflow_update_residual_init, + ResidualInitType.CONST, + "dpa3_repflow_update_residual_init", + ).value + + # dpa3 top / fitting + dpa3_top_activation_function = coerce_enum( + ActivationType, + dpa3_top_activation_function, + ActivationType.SILUT_3, + "dpa3_top_activation_function", + ).value + + dpa3_top_precision = coerce_enum( + PrecisionType, + dpa3_top_precision, + PrecisionType.FLOAT32, + "dpa3_top_precision", + ).value + + dpa3_fitting_net_precision = coerce_enum( + PrecisionType, + dpa3_fitting_net_precision, + PrecisionType.FLOAT32, + "dpa3_fitting_net_precision", + ).value + + dpa3_fitting_net_activation_function = coerce_enum( + ActivationType, + dpa3_fitting_net_activation_function, + ActivationType.TANH, + "dpa3_fitting_net_activation_function", + ).value + + # dpa2 repinit / repformer / fitting / top + dpa2_repinit_activation_function = coerce_enum( + ActivationType, + dpa2_repinit_activation_function, + ActivationType.TANH, + "dpa2_repinit_activation_function", + ).value + + dpa2_repformer_update_style = coerce_enum( + UpdateStyleType, + dpa2_repformer_update_style, + UpdateStyleType.RES_RESIDUAL, + "dpa2_repformer_update_style", + ).value + + dpa2_repformer_update_residual_init = coerce_enum( + ResidualInitType, + dpa2_repformer_update_residual_init, + ResidualInitType.NORM, + "dpa2_repformer_update_residual_init", + ).value + + dpa2_repformer_activation_function = coerce_enum( + ActivationType, + dpa2_repformer_activation_function, + ActivationType.TANH, + "dpa2_repformer_activation_function", + ).value + + dpa2_fitting_net_activation_function = coerce_enum( + ActivationType, + dpa2_fitting_net_activation_function, + ActivationType.TANH, + "dpa2_fitting_net_activation_function", + ).value + + dpa2_fitting_net_precision = coerce_enum( + PrecisionType, + dpa2_fitting_net_precision, + PrecisionType.FLOAT32, + "dpa2_fitting_net_precision", + ).value + + dpa2_top_precision = coerce_enum( + PrecisionType, + dpa2_top_precision, + PrecisionType.FLOAT32, + "dpa2_top_precision", + ).value + + # numeric hyper-parameters + valid_ratio = coerce_float(valid_ratio, 0.1, "valid_ratio") + decay_steps = coerce_int(decay_steps, 5000, "decay_steps") + start_lr = coerce_float(start_lr, 0.001, "start_lr") + stop_lr = coerce_float(stop_lr, 3e-5, "stop_lr") + + # Prefactors MUST be > 0; agent sometimes injects 0 which kills gradients. + start_pref_e = coerce_positive_float(start_pref_e, 0.2, "start_pref_e") + start_pref_f = coerce_positive_float(start_pref_f, 100.0, "start_pref_f") + start_pref_v = coerce_positive_float(start_pref_v, 0.02, "start_pref_v") + + # Limits should also be > 0 (and usually >= start) + limit_pref_e = coerce_positive_float(limit_pref_e, 20.0, "limit_pref_e") + limit_pref_f = coerce_positive_float(limit_pref_f, 60.0, "limit_pref_f") + limit_pref_v = coerce_positive_float(limit_pref_v, 1.0, "limit_pref_v") + + # Ensure limit >= start (safety) + limit_pref_e = max(limit_pref_e, start_pref_e) + limit_pref_f = max(limit_pref_f, start_pref_f) + limit_pref_v = max(limit_pref_v, start_pref_v) + + if (start_pref_e <= 1e-12) and (start_pref_f <= 1e-12) and (start_pref_v <= 1e-12): + logging.warning("All start_pref_* are ~0. Resetting to safe defaults.") + start_pref_e, start_pref_f, start_pref_v = 0.2, 100.0, 0.02 + + + # ====== Solve "could not read steps" problem ====== + DEFAULT_NUMB_STEPS = 3000 + DEFAULT_WARMUP_STEPS = 2000 + + raw_numb_steps = numb_steps + raw_warmup_steps = warmup_steps + + # Normalize both (handles "2000 steps", "3,000", etc.) + numb_steps = coerce_int(numb_steps, DEFAULT_NUMB_STEPS, "numb_steps") + warmup_steps = coerce_int(warmup_steps, DEFAULT_WARMUP_STEPS, "warmup_steps") + + # If user_text contains an explicit total steps, let it override (highest priority) + parsed_total = extract_training_steps(user_text) if user_text else None + if parsed_total is not None: + if parsed_total != numb_steps: + logging.info( + "DEBUG steps override: user_text specifies total_steps=%d, overriding agent numb_steps=%d", + parsed_total, + numb_steps, + ) + numb_steps = parsed_total + + # --------- Critical safety clamp for warmup_steps ---------- + # DeepMD requires: warmup_steps < numb_steps OR warmup_steps == 0 + if numb_steps <= 0: + # Defensive: never allow non-positive total steps + logging.warning("Invalid numb_steps=%d; forcing to 1 and warmup_steps=0", numb_steps) + numb_steps = 1 + warmup_steps = 0 + else: + if warmup_steps < 0: + logging.warning("Invalid warmup_steps=%d; forcing to 0", warmup_steps) + warmup_steps = 0 + + if warmup_steps >= numb_steps: + # Strategy: keep warmup small and valid + # Option A (recommended): set warmup to 0 when total steps is small + new_warmup = 0 if numb_steps < 50 else max(1, int(0.1 * numb_steps)) + # Ensure strictly less than numb_steps + new_warmup = min(new_warmup, numb_steps - 1) + logging.warning( + "Warmup steps (%d) >= total steps (%d). Adjusting warmup_steps -> %d", + warmup_steps, + numb_steps, + new_warmup, + ) + warmup_steps = new_warmup + + logging.info( + "DEBUG final schedule: numb_steps raw=%r -> %d, warmup_steps raw=%r -> %d", + raw_numb_steps, numb_steps, raw_warmup_steps, warmup_steps + ) + + # debug log: see what actually happened + logging.info( + "DEBUG hyper: model_type raw=%r final=%r, " + "lr_type raw=%r final=%r, " + "loss_type raw=%r final=%r, " + "numb_steps raw=%r final=%r, " + "warmup_steps raw=%r final=%r", + raw_model_type, + model_type, + raw_lr_type, + lr_type, + raw_loss_type, + loss_type, + raw_numb_steps, + numb_steps, + raw_warmup_steps, + warmup_steps, + ) + + user_provided_model = is_user_provided_model_path(model_path) + + if not user_provided_model: + default_model = True + if model_type == "dpa3": + input_json = "/opt/agents/dpa_finetune/input_dpa3.json" + model_path = Path("/opt/agents/dpa_finetune/models/dpa3.pt") + else: + input_json = "/opt/agents/dpa_finetune/input_dpa2.json" + model_path = Path("/opt/agents/dpa_finetune/dpa2.pt") + else: + default_model = False + if model_type == "dpa3": + input_json = "/opt/agents/dpa_finetune/input_dpa3.json" + else: + input_json = "/opt/agents/dpa_finetune/input_dpa2.json" + + # Lock architecture-related params when using built-in default model + if default_model: + reset_params_to_signature_defaults( + finetune_dpa_model, + locals(), + prefixes=("dpa3_", "dpa2_"), + ) + logging.info("DEBUG: default_model=True -> locked all dpa3_* / dpa2_* params to signature defaults.") + + + dpa3_repflow = { + "n_dim": dpa3_repflow_n_dim, + "e_dim": dpa3_repflow_e_dim, + "a_dim": dpa3_repflow_a_dim, + "nlayers": dpa3_repflow_nlayers, + "e_rcut": dpa3_repflow_e_rcut, + "e_rcut_smth": dpa3_repflow_e_rcut_smth, + "e_sel": dpa3_repflow_e_sel, + "a_rcut": dpa3_repflow_a_rcut, + "a_rcut_smth": dpa3_repflow_a_rcut_smth, + "a_sel": dpa3_repflow_a_sel, + "axis_neuron": dpa3_repflow_axis_neuron, + "fix_stat_std": dpa3_repflow_fix_stat_std, + "a_compress_rate": dpa3_repflow_a_compress_rate, + "a_compress_e_rate": dpa3_repflow_a_compress_e_rate, + "a_compress_use_split": dpa3_repflow_a_compress_use_split, + "update_angle": dpa3_repflow_update_angle, + "smooth_edge_update": dpa3_repflow_smooth_edge_update, + "use_dynamic_sel": dpa3_repflow_use_dynamic_sel, + "sel_reduce_factor": dpa3_repflow_sel_reduce_factor, + "use_exp_switch": dpa3_repflow_use_exp_switch, + "update_style": dpa3_repflow_update_style, + "update_residual": dpa3_repflow_update_residual, + "update_residual_init": dpa3_repflow_update_residual_init, + } + dpa3_top = { + "activation_function": dpa3_top_activation_function, + "use_tebd_bias": dpa3_top_use_tebd_bias, + "precision": dpa3_top_precision, + "concat_output_tebd": dpa3_top_concat_output_tebd, + } + dpa3_fitting_net = { + "neuron":dpa3_fitting_net_neuron, + "dim_case_embd":dpa3_fitting_net_dim_case_embd, + "resnet_dt": dpa3_fitting_net_resnet_dt, + "precision":dpa3_fitting_net_precision, + "activation_function":dpa3_fitting_net_activation_function, + "seed":dpa3_fitting_net_seed, + } + dpa2_repinit = { + "tebd_dim":dpa2_repinit_tebd_dim, + "rcut":dpa2_repinit_rcut, + "rcut_smth":dpa2_repinit_rcut_smth, + "nsel":dpa2_repinit_nsel, + "neuron":dpa2_repinit_neuron, + "axis_neuron":dpa2_repinit_axis_neuron, + "three_body_neuron":dpa2_repinit_three_body_neuron, + "activation_function":dpa2_repinit_activation_function, + "three_body_sel":dpa2_repinit_three_body_sel, + "three_body_rcut":dpa2_repinit_three_body_rcut, + "three_body_rcut_smth":dpa2_repinit_three_body_rcut_smt, + "use_three_body": dpa2_repinit_use_three_body, + } + dpa2_repformer = { + "rcut":dpa2_repformer_rcut, + "rcut_smth":dpa2_repformer_rcut_smth, + "nsel":dpa2_repformer_nsel, + "nlayers": dpa2_repformer_nlayers, + "g1_dim": dpa2_repformer_g1_dim, + "g2_dim": dpa2_repformer_g2_dim, + "attn2_hidden": dpa2_repformer_attn2_hidden, + "attn2_nhead": dpa2_repformer_attn2_nhead, + "attn1_hidden": dpa2_repformer_attn1_hidden, + "attn1_nhead": dpa2_repformer_attn1_nhead, + "axis_neuron": dpa2_repformer_axis_neuron, + "update_h2": dpa2_repformer_update_h2, + "update_g1_has_conv": dpa2_repformer_update_g1_has_conv, + "update_g1_has_grrg": dpa2_repformer_update_g1_has_grrg, + "update_g1_has_drrd": dpa2_repformer_update_g1_has_drrd, + "update_g1_has_attn": dpa2_repformer_update_g1_has_attn, + "update_g2_has_g1g1": dpa2_repformer_update_g2_has_g1g1, + "update_g2_has_attn": dpa2_repformer_update_g2_has_attn, + "update_style":dpa2_repformer_update_style, + "update_residual":dpa2_repformer_update_residual, + "update_residual_init":dpa2_repformer_update_residual_init, + "attn2_has_gate": dpa2_repformer_attn2_has_gate, + "use_sqrt_nnei": dpa2_repformer_use_sqrt_nnei, + "g1_out_conv": dpa2_repformer_g1_out_conv, + "g1_out_mlp": dpa2_repformer_g1_out_mlp, + "activation_function":dpa2_repformer_activation_function, + } + dpa2_fitting_net = { + "neuron": dpa2_fitting_net_neuron, + "activation_function": dpa2_fitting_net_activation_function, + "resnet_dt": dpa2_fitting_net_resnet_dt, + "precision": dpa2_fitting_net_precision, + "dim_case_embd": dpa2_fitting_net_dim_case_embd, + "seed": dpa2_fitting_net_seed, + } + dpa2_top = { + "use_tebd_bias": dpa2_top_use_tebd_bias, + "precision": dpa2_top_precision, + "add_tebd_to_repinit_out": dpa2_top_add_tebd_to_repinit_out, + } + #input_json = "bohrium://13756/501205/store/upload/523748fd-7d2e-4e94-9304-8d49e710e8ba/input_dpa3.json" + update_dpa_train_json( + input_json, + model_type, + default_model, + lr_type, + decay_steps, + start_lr, + stop_lr, + loss_type, + start_pref_e, + limit_pref_e, + start_pref_f, + limit_pref_f, + start_pref_v, + limit_pref_v, + numb_steps, + warmup_steps, + dpa3_repflow, + dpa3_top, + dpa3_fitting_net, + dpa2_repinit, + dpa2_repformer, + dpa2_fitting_net, + dpa2_top, + "train.json" + ) + print(f"checkcheck lr_type={lr_type}") +#Split dpdata into train and valid two sets + split_train_valid(str(input_path), valid_ratio) + #model_path = "bohrium://13756/501205/store/upload/6ea62778-1c8a-4ff6-9b12-85288b1912aa/dpa3.pt" + if model_type == "dpa3": + cmd = [ "dp", + "--pt", + "train", + "train.json", + "--finetune", + str(model_path), + ] + else: + cmd = [ "dp", + "--pt", + "train", + "train.json", + "--finetune", + str(model_path), + "--model-branch", + "Omat24" + ] + + subprocess.run(cmd, check=True) + + + finetuned_model = './model.ckpt.pt' + finetuned_model_path = Path(finetuned_model) + return{ + "results": finetuned_model_path, + "message": "Fine tune model successfully!" + } + +# ====== Run Server ====== + +if __name__ == "__main__": + logging.info("Starting FinetuneDPAServer on port 50003...") + mcp.run(transport="sse") + diff --git a/servers/finetune_dpa/uv.lock b/servers/finetune_dpa/uv.lock new file mode 100644 index 0000000..a274c1e --- /dev/null +++ b/servers/finetune_dpa/uv.lock @@ -0,0 +1,8 @@ +version = 1 +revision = 2 +requires-python = ">=3.12" + +[[package]] +name = "superconductor-mcp-server" +version = "0.1.0" +source = { virtual = "." } From 4380bff862f9a8317f3ccc27a8e05cc3b373397d Mon Sep 17 00:00:00 2001 From: root Date: Mon, 12 Jan 2026 11:39:54 +0800 Subject: [PATCH 4/5] Add ConvexHull server --- servers/convexhull/README.md | 17 ++ servers/convexhull/metadata.json | 6 + servers/convexhull/pyproject.toml | 16 ++ servers/convexhull/server.py | 395 ++++++++++++++++++++++++++++++ servers/convexhull/uv.lock | 8 + 5 files changed, 442 insertions(+) create mode 100644 servers/convexhull/README.md create mode 100644 servers/convexhull/metadata.json create mode 100644 servers/convexhull/pyproject.toml create mode 100644 servers/convexhull/server.py create mode 100644 servers/convexhull/uv.lock diff --git a/servers/convexhull/README.md b/servers/convexhull/README.md new file mode 100644 index 0000000..d8f4a41 --- /dev/null +++ b/servers/convexhull/README.md @@ -0,0 +1,17 @@ +# ConvexHullMCP Server + +A tool to build convexhull and check materials thermal stability + +##Overview + +This MCP server could build convex hull for given structures and check it corresponding thermal stability + +## How to cite? + +```bibtex +@software{ConvexHullMCP, + title = {ConvexHullMCP}, + author = {AI4Scient}, + year = {2025} +} +``` diff --git a/servers/convexhull/metadata.json b/servers/convexhull/metadata.json new file mode 100644 index 0000000..4a215cb --- /dev/null +++ b/servers/convexhull/metadata.json @@ -0,0 +1,6 @@ +{ +"name": "SuperconductorServer", +"description": "Superconductor critical temperature prediction", +"author": "@liuyuxiang92", +"category": "materials" +} diff --git a/servers/convexhull/pyproject.toml b/servers/convexhull/pyproject.toml new file mode 100644 index 0000000..a3ac3da --- /dev/null +++ b/servers/convexhull/pyproject.toml @@ -0,0 +1,16 @@ +[project] +name = "superconductor-mcp-server" +version = "0.1.0" +description = "Superconductor critical temperature prediction" +readme = "README.md" +requires-python = ">=3.12" +dependencies = [ + "dpdata", + "numpy", + "ase", + "pymatgen", + "deepmd-kit", + "pandas", + "mcp", + "fastmcp" +] diff --git a/servers/convexhull/server.py b/servers/convexhull/server.py new file mode 100644 index 0000000..fc17fa8 --- /dev/null +++ b/servers/convexhull/server.py @@ -0,0 +1,395 @@ +from typing import Optional, List, Union, Dict +from pathlib import Path +import logging +import os +import glob +import shutil +import subprocess +import numpy as np +from ase import io +import dpdata +import pandas as pd +from deepmd.pt.infer.deep_eval import DeepProperty +from dp.agent.server import CalculationMCPServer +from typing_extensions import TypedDict +import csv + +import random +import shutil + +from pymatgen.core import Composition + +# Setup logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s" +) + +# Initialize MCP server +mcp = CalculationMCPServer( + "ConvexHullServer", + host="0.0.0.0", + port=50004 +) + +class RunOptimizationResult(TypedDict): + optimized_poscar_paths: Path + message: str +def run_optimization( + structure_path: Path, + ambient: bool +) -> RunOptimizationResult: + """ + Optimize structures with DP model at ambient or high pressure condition. + + Args: + - structure_path (Path): Path to access structures need to be optimized + - ambient (bool): Wether consider ambient condition + Return: + - optimized_structure_path (Path): Path to access optimized structures + """ + opt_py = Path("/opt/agents/convex_hull/geo_opt/opt_multi.py") + + fmax = 0.0005 + if ambient: + pressure = 0 + else: + pressure = 200 + + nsteps = 2000 + + base = Path(structure_path) + structure_path = base.parent if base.is_file() else base + + structures = list(p for pat in ("POSCAR*", "*.cif", "*.CIF") + for p in structure_path.rglob(pat)) + print(f"The length of structures {len(structures)}") + + try: + # Build command: use the actual path to opt_py, not the literal string "opt_py" + cmd = [ + "python", + str(opt_py), # <— use the variable here + str(fmax), + str(pressure), + str(ambient), + str(nsteps), + ] + [str(p) for p in structures] + + # Run and check for errors + subprocess.run(cmd, check=True) + + except Exception as e: + print("Geometry Optimization failed!") + + try: + parse_py = Path("/opt/agents/convex_hull/geo_opt/parse_traj.py") + cmd = [ + "python", + str(parse_py) + ] + + # Run and check for errors + subprocess.run(cmd, check=True) + except Exception as e: + print("Collect optimized failed!") + try: + frames =glob.glob('deepmd_npy/*/') + multisys = dpdata.MultiSystems() + for frame in frames: + sys = dpdata.System(frame,'deepmd/npy') + multisys.append(sys) + + optimized_dir = Path("optimized_poscar") + optimized_dir.mkdir(parents=True, exist_ok=True) # Create the directory if it doesn't exist + + count=0 + for system in multisys: + for frame in system: + system.to_vasp_poscar(optimized_dir / f'POSCAR_{count}') + count+=1 + #optimized_structures = list(optimized_dir.rglob("POSCAR*")) + except Exception as e: + print("Collect POSCAR failed!") + + return{ + "optimized_poscar_paths": optimized_dir, + "message": "Geometry Optimization successfully" + } + + +### Tool to calculated structures enthalpy###### + +class BuildConvexHullResult(TypedDict): + """Results about enthalpy prediction""" + enthalpy_file: Path + message: str +#======================Tool to calculate structure enthalpy====================== +@mcp.tool() +def build_convex_hull( + structure_path: Path, +)->BuildConvexHullResult: + """ + Build Convex Hull for given structure. + + Args: + - structure_file (Path): Path to the structure files (e.g. POSCAR) + + Return: + BuildConvexHullResult with keys: + - enthalpy_file (Path): Path to access entalpy prediction related files, including convexhull.csv, convexhull.html, enthalpy.csv, e_above_hull_50meV.csv. + All these files are saved in outputs. + - message (str): Message about calculation results. + """ + ENERGY_REF = { + "Ne": -0.0259, + "He": -0.0091, + "Ar": -0.0688, + "F": -1.9115, + "O": -4.9467, + "Cl": -1.8485, + "N": -8.3365, + "Kr": -0.0567, + "Br": -1.553, + "I": -1.4734, + "Xe": -0.0362, + "S": -4.1364, + "Se": -3.4959, + "C": -9.2287, + "Au": -3.2739, + "W": -12.9581, + "Pb": -3.7126, + "Rh": -7.3643, + "Pt": -6.0711, + "Ru": -9.2744, + "Pd": -5.1799, + "Os": -11.2274, + "Ir": -8.8384, + "H": -3.3927, + "P": -5.4133, + "As": -4.6591, + "Mo": -10.8457, + "Te": -3.1433, + "Sb": -4.129, + "B": -6.6794, + "Bi": -3.8405, + "Ge": -4.623, + "Hg": -0.3037, + "Sn": -4.0096, + "Ag": -2.8326, + "Ni": -5.7801, + "Tc": -10.3606, + "Si": -5.4253, + "Re": -12.4445, + "Cu": -4.0992, + "Co": -7.1083, + "Fe": -8.47, + "Ga": -3.0281, + "In": -2.7517, + "Cd": -0.9229, + "Cr": -9.653, + "Zn": -1.2597, + "V": -9.0839, + "Tl": -2.3626, + "Al": -3.7456, + "Nb": -10.1013, + "Be": -3.7394, + "Mn": -9.162, + "Ti": -7.8955, + "Ta": -11.8578, + "Pa": -9.5147, + "U": -11.2914, + "Sc": -6.3325, + "Np": -12.9478, + "Zr": -8.5477, + "Mg": -1.6003, + "Th": -7.4139, + "Hf": -9.9572, + "Pu": -14.2678, + "Lu": -4.521, + "Tm": -4.4758, + "Er": -4.5677, + "Ho": -4.5824, + "Y": -6.4665, + "Dy": -4.6068, + "Gd": -14.0761, + "Eu": -10.257, + "Sm": -4.7186, + "Nd": -4.7681, + "Pr": -4.7809, + "Pm": -4.7505, + "Ce": -5.9331, + "Yb": -1.5396, + "Tb": -4.6344, + "La": -4.936, + "Ac": -4.1212, + "Ca": -2.0056, + "Li": -1.9089, + "Sr": -1.6895, + "Na": -1.3225, + "Ba": -1.919, + "Rb": -0.9805, + "K": -1.1104, + "Cs": -0.8954, + } + enthalpy_dir = Path("outputs") + enthalpy_dir.mkdir(parents=True, exist_ok=True) + ambient = True + try: + #poscar_files = list(structure_path.rglob("POSCAR*")) + try: + results = run_optimization(structure_path,ambient) + optimized_structure_path = results["optimized_poscar_paths"] + optimized_structures = list(optimized_structure_path.rglob("POSCAR*")) + except Exception as e: + return{ + "enthalpy_file": [], + "message": "Geometry Optimization failed!" + } + + try: + enthalpy_py = Path("/opt/agents/convex_hull/geo_opt/predict_enthalpy.py") + cmd = [ + "python", + str(enthalpy_py), + str(ambient) + ] + [str(poscar) for poscar in optimized_structures] + + # Run and check for errors + subprocess.run(cmd, check=True) + except Exception as e: + return{ + "enthalpy_file": [], + "message": "Enthalpy Predictions failed!" + } + + try: + enthalpy_file = enthalpy_dir / "enthalpy.csv" + with open(enthalpy_file, 'w') as ef: + ef.write("Number,formula,enthalpy\n") + prediction_file = Path("prediction") / "prediction.all.out" + with prediction_file.open('r') as pf: + for line in pf: + if not line.strip(): + continue + + # Split the line into columns + parts = line.split() + file_name = parts[0] # Column 1: POSCAR or structure file name + enthalpy = parts[2] # Column 3: enthalpy H0 + formula = parts[5] # Column 6: element composition + + if ambient: + if abs(float(parts[3]))< 0.001: + comp = Composition(formula) + element_counts = dict(comp.get_el_amt_dict()) + enthalpy = float(enthalpy) + print(enthalpy) + total_atoms = sum(element_counts.values()) + enthalpy -= sum(comp[ele]* ENERGY_REF[str(ele)] for ele in comp)/total_atoms + + #enthalpy /= total_atoms + + # Write out: file_name, formula, enthalpy + ef.write(f"{file_name},{formula},{enthalpy}\n") + else: + if 1.2473 < float(parts[3]) < 1.2493: + # Write out: file_name, formula, enthalpy + ef.write(f"{file_name},{formula},{enthalpy}\n") + + except Exception as e: + return{ + "enthalpy_file": [], + "message": "Enthalpy file save failed!" + } + try: + if ambient: + convexhull_file = Path("/opt/agents/convex_hull/geo_opt/convexhull_ambient.csv") + else: + convexhull_file = Path("/opt/agents/convex_hull/geo_opt/convexhull_high_pressure.csv") + + #Append enthalpy_file to convexhull_file + lines = enthalpy_file.read_text().splitlines() + # drop the first line (the header) + data_lines = lines[1:] + # open convexhull.csv in append mode + with convexhull_file.open("a") as f: + for line in data_lines: + # ensure newline + f.write(line.rstrip("\n") + "\n") + des = Path("/opt/agents/convex_hull/geo_opt/convexhull.csv") + print(f"convexhull_file = {convexhull_file}") + print(f"des = {des}") + shutil.copy(convexhull_file, des) + except Exception as e: + return{ + "enthalpy_file": [], + "message": "Convexhull.csv file save failed!" + } + + try: + update_input_file = Path("/opt/agents/convex_hull/geo_opt/update_input.py") + + cmd = [ + "python", + str(update_input_file), + str(formula) + ] + subprocess.run(cmd, cwd=enthalpy_dir, check=True) + + #Check updated convexhull.csv + + #src = Path("/opt/agents/convex_hull/geo_opt/convexhull.csv") + #shutil.copy(src, enthalpy_dir) + + src = Path("/opt/agents/convex_hull/geo_opt/input.dat") + shutil.copy(src, enthalpy_dir) + + except Exception as e: + return{ + "enthalpy_file": [], + "message": "Update input.dat failed!" + } + + try: + work_dir = Path("/opt/agents/convex_hull/geo_opt/results/") + + cmd = [ + "python", + "cak3.py", + "--plotch" + ] + subprocess.run(cmd, cwd=work_dir, check=True) + + src = Path("/opt/agents/convex_hull/geo_opt/results/convexhull.png") + shutil.copy(src, enthalpy_dir) + + src = Path("/opt/agents/convex_hull/geo_opt/results/e_above_hull_50meV.csv") + dest = enthalpy_dir / f"e_above_hull.csv" + shutil.copy(src, dest) + + except Exception as e: + return{ + "enthalpy_file": [], + "message": "Convex hull build failed" + } + + return{ + "enthalpy_file": enthalpy_dir, + "message": f"Entalpy calculated successfully and saved in {enthalpy_file}" + } + + + except Exception as e: + return{ + "message": "Convex Hull build failed!" + } + + + +# ====== Run Server ====== + +if __name__ == "__main__": + logging.info("Starting ConvexHullServer on port 50004...") + mcp.run(transport="sse") + diff --git a/servers/convexhull/uv.lock b/servers/convexhull/uv.lock new file mode 100644 index 0000000..a274c1e --- /dev/null +++ b/servers/convexhull/uv.lock @@ -0,0 +1,8 @@ +version = 1 +revision = 2 +requires-python = ">=3.12" + +[[package]] +name = "superconductor-mcp-server" +version = "0.1.0" +source = { virtual = "." } From 1e390d006e56f91ad6a46338b937b69c90682767 Mon Sep 17 00:00:00 2001 From: root Date: Mon, 12 Jan 2026 11:40:48 +0800 Subject: [PATCH 5/5] Revise Superconductor readme file --- servers/superconductor/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/servers/superconductor/README.md b/servers/superconductor/README.md index db11e69..87f7855 100644 --- a/servers/superconductor/README.md +++ b/servers/superconductor/README.md @@ -1,4 +1,4 @@ -# ThermoelectricMCP Server +# SuperconductorMCP Server A tool to predict supercondutor related properties and screen promising supercondutors.