Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
08c0a4b
added comments, no code changes
msh-yi Apr 24, 2021
537fc2b
updated referenced script for automatic solvation to solvate.py
msh-yi Apr 19, 2021
d46c2af
remove interactive bash when calling packmol, prevent lmod reload
msh-yi Apr 19, 2021
848ce70
added comments, no code changes
msh-yi Apr 24, 2021
4098b42
Merge remote-tracking branch 'origin/master'
msh-yi Apr 24, 2021
6b86109
catch oldchk=None, prevent ambiguous error message
msh-yi Apr 25, 2021
1d32730
make folder for parallel remd
msh-yi Apr 26, 2021
4fd9aa1
parallelized with mpi4py, pre-testing
msh-yi Apr 26, 2021
ae16582
update parallel run script
msh-yi May 1, 2021
2791af3
Merge remote-tracking branch 'upstream/master'
msh-yi May 1, 2021
e1f33f9
added partest
msh-yi May 31, 2021
873fd14
change undeclared variable
msh-yi May 31, 2021
11ca8ff
bugfix idxs required to be string, not list
msh-yi May 31, 2021
6bdaf7f
add mpi-parallelized remd
msh-yi Jun 1, 2021
24b3ea7
cleanup parallel remd testing files
msh-yi Jun 1, 2021
3609d03
add mpi run script
msh-yi Jun 1, 2021
b45fd2b
add feature: slurm-parallelized remd
msh-yi Jun 4, 2021
c7b8c64
add documentation for slurm-parallel remd
msh-yi Jun 4, 2021
623e348
update parallel remd documentation
msh-yi Jun 4, 2021
fe4851c
require additional Slurm submit script
msh-yi Jun 8, 2021
c7818d8
change final remd report location
msh-yi Jun 9, 2021
9193419
add feature: trajectory demux and cluster analysis
msh-yi Jun 11, 2021
697e719
update docs for demux, cluster analysis
msh-yi Jun 15, 2021
383f15c
load only last two frames during REMD restart
msh-yi Jun 17, 2021
cd8e615
update demuxing script to accept incomplete trajs
msh-yi Jun 25, 2021
68e7b28
allow demux to work on incomplete remd runs
msh-yi Jul 13, 2021
30071ae
Merge branch 'master' from upstream
msh-yi Jul 19, 2021
4a85040
add remd tutorial
msh-yi Jul 20, 2021
8bd83eb
remove yale cluster-specific info
msh-yi Jul 20, 2021
4a12f07
add cluster analysis dependencies
msh-yi Jul 20, 2021
015617a
fix demuxing script
msh-yi Jul 24, 2021
23fdf4c
update remd tutorial
msh-yi Jul 26, 2021
ef73182
make compatible with cctk
msh-yi Aug 12, 2021
b9c0931
Merge branch 'master' of https://github.com/corinwagen/presto
msh-yi Aug 12, 2021
0fce3ea
Merge remote-tracking branch 'upstream/master'
Jan 30, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion presto/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from presto.build import build
import presto.build
import presto.config
import presto.constants
import presto.calculators
Expand Down
2 changes: 1 addition & 1 deletion presto/build.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import configparser, os, re, pathlib, yaml, cctk, h5py, logging
import os, re, yaml, cctk, h5py, logging
import numpy as np
import presto

Expand Down
5 changes: 2 additions & 3 deletions presto/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import configparser, os, re, pathlib, yaml, cctk, h5py, logging, shutil
import numpy as np
import configparser, os, re, pathlib, logging
import presto

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -86,4 +85,4 @@ def check_exec(executable):

# backwards-compatibility with v0.2.4 and before
def build(*args, **kwargs):
return presto.build(*args, **kwargs)
return presto.build.build(*args, **kwargs)
30 changes: 30 additions & 0 deletions presto/config.py.rej
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
diff a/presto/config.py b/presto/config.py (rejected hunks)
@@ -237,23 +237,18 @@ def build_bath_scheduler(settings):

delta = settings["start_temp"] - settings["target_temp"]

- def sched(time):
- if time > settings["target_time"]:
- return settings["target_temp"]
- else:
- return settings["target_temp"] + delta * (1 - time / settings["target_time"])
+ if time > settings["target_time"]:
+ return settings["target_temp"]
+ else:
+ return settings["target_temp"] + delta * (1 - time / settings["target_time"])

- return sched

elif settings["type"].lower() == "constant":
assert "target_temp" in settings, "Need `target_temp` for this bath scheduler!"
assert isinstance(settings["target_temp"], (float, int)), "`target_temp` must be numeric!"
assert settings["target_temp"] > 0, "`target_temp` must be positive!"

- def sched(time):
- return settings["target_temp"]
-
- return sched
+ return settings["target_temp"]

else:
raise ValueError(f"Unknown bath scheduler type {settings['type']}! Allowed options are `linear` or `constant`.")
204 changes: 189 additions & 15 deletions presto/replica_exchange.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,25 @@
import numpy as np
import math, copy, cctk, os, re, logging, time, yaml, random
import multiprocessing as mp
# presto/presto/replica_exchange.py
# Jul 2021

import numpy as np
import logging
import random
import sys
import subprocess
import presto
import dill
import re
import os
import yaml
import multiprocessing as mp

logger = logging.getLogger(__name__)

MIN_CHECKPOINT_INTERVAL = 50

class MissingSbatchFlags(Exception):
pass

class ReplicaExchange():
"""
Runs several trajectories and manages interconversions between them.
Expand All @@ -23,15 +35,15 @@ class ReplicaExchange():
finished (bool):
current_idx (int):
"""
def __init__(self, trajectories, checkpoint_filename="remd.chk", swap_interval=10):
temps = np.zeros(shape=len(trajectories))
for idx, traj in enumerate(trajectories):
assert isinstance(traj, presto.trajectory.Trajectory), "all trajectories must be EquilibrationTrajectories"
def __init__(self, trajectories, checkpoint_filename="remd.chk", swap_interval=100):
for traj in trajectories:
assert isinstance(traj, presto.trajectory.EquilibrationTrajectory), "all trajectories must be EquilibrationTrajectories"
assert traj.timestep == trajectories[0].timestep, "all trajectories must have same ``timestep``"
assert traj.stop_time == trajectories[0].stop_time, "all trajectories must have same ``stop_time``"
assert np.array_equal(traj.high_atoms, trajectories[0].high_atoms), "all trajectories must have same ``high_atoms``"
assert np.array_equal(traj.active_atoms, trajectories[0].active_atoms), "all trajectories must have same ``active_atoms``"

# sort by temp
self.trajectories = sorted(trajectories, key=lambda x: x.bath_scheduler(0))
self.stop_time = trajectories[0].stop_time

Expand All @@ -45,13 +57,15 @@ def __init__(self, trajectories, checkpoint_filename="remd.chk", swap_interval=1
self.finished = False
self.current_idx = 0

self.load()
# prevent parallel class from loading
if not isinstance(self, ReplicaExchangeParallel):
self.load()

def __str__(self):
return f"ReplicaExchange({len(trajectories)} trajectories, swap_interval={swap_interval} fs, checkpoint_file={checkpoint_filename})"
return f"ReplicaExchange({len(self.trajectories)} trajectories, swap_interval={self.swap_interval} fs, checkpoint_file={self.checkpoint_filename})"

def __repr__(self):
return f"ReplicaExchange({len(trajectories)} trajectories, swap_interval={swap_interval} fs, checkpoint_file={checkpoint_filename})"
return f"ReplicaExchange({len(self.trajectories)} trajectories, swap_interval={self.swap_interval} fs, checkpoint_file={self.checkpoint_filename})"

def save(self):
file_dict = {
Expand Down Expand Up @@ -86,12 +100,16 @@ def run(self):
processes = [None]*len(self.trajectories)
start_idx = self.current_idx

# we break the runs up into small chunks
# we break the runs up into small chunks of size swap_interval
# each trajectory is basically run in parallel for time swap_interval in each iter
# we make as many mp.Process objects as there are trajectories
for current_idx in range(start_idx, int(self.stop_time/self.swap_interval)):
next_idx = current_idx + 1
target_time = next_idx * self.swap_interval

for idx, traj in enumerate(self.trajectories):
# how long does each traj need to run for?
# the last iter may have time_remaining \neq swap_interval
time_remaining = max(0, target_time - traj.last_time_run())

if time_remaining:
Expand All @@ -100,24 +118,30 @@ def run(self):
"checkpoint_interval": min(MIN_CHECKPOINT_INTERVAL, self.swap_interval),
})
processes[idx].start()

for process in processes:
if process is not None:
process.join()
# all the chunks in this iter are done

# run saves frames to chk file, so this ensures that run is done and saved properly
for traj in self.trajectories:
traj.load_from_checkpoint()
assert traj.last_time_run() == target_time

self.exchange(next_idx*self.swap_interval)
self.current_idx = next_idx
self.exchange() # TODO: make sure calling exchange without time is ok
self.save()
self.finished = True
return self

def exchange(self, time):
def exchange(self):
""" time is from 0
"""
time = self.current_idx * self.swap_interval
kB = presto.constants.BOLTZMANN_CONSTANT


# make one pass from low to high T
for i in range(len(self.trajectories)-1):
j = i+1

Expand All @@ -132,6 +156,7 @@ def exchange(self, time):
logger.info(f"E{i} & E{j}\tp={p}")

if p > random.random():
# or momenta scaling
v_i_scaling = np.sqrt(T_j/T_i)
v_j_scaling = np.sqrt(T_i/T_j)

Expand All @@ -148,6 +173,7 @@ def exchange(self, time):

def report(self):
counts = np.zeros(shape=len(self.trajectories))
# len of counts is no of temps
possible = self.current_idx * (len(self.trajectories) - 1)

for swap in self.swaps:
Expand All @@ -159,3 +185,151 @@ def report(self):
text += f"\tReplica {i} <=> Replica {i+1} \t{counts[i]/self.current_idx:.2%}\n"

return text

class ReplicaExchangeParallel(ReplicaExchange):
"""
Runs several trajectories and manages interconversions between them. Slurm-parallelized version.

Based on:
http://www.math.pitt.edu/~cbsg/Materials/Earl_ParallelTempering.pdf

Attributes:
stop_time (int):
swaps (list of dict):
swap_interval (int):
checkpoint_filename (int):
finished (bool):
current_idx (int):
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def update_trajs(self):
"""
Updates all trajectories from respective chk files on newly unpickled remd object
Not run in the first iteration
Does not modify any files
"""
self.current_idx += 1

for traj in self.trajectories:
traj.load_from_checkpoint(frames=slice(-2, None, None))
# need last two frames to do exchange
# ensure all the trajectories are run up to the same time
assert traj.last_time_run() == self.current_idx * self.swap_interval, "ensure that trajectories all ended at the same time"

if self.current_idx * self.swap_interval == self.stop_time:
self.finished = True

def run(self, slurm=True):
""" generates a (slurm) job array for trajectories in each segment and submits it
"""

# always self.update_trajs before self.run
if self.finished:
logger.info(f"Replica exchange already finished!")
return self

next_idx = self.current_idx + 1
target_time = next_idx * self.swap_interval

if isinstance(self.trajectories[0].calculator, presto.calculators.XTBCalculator):
n_threads = self.trajectories[0].calculator.parallel
elif isinstance(self.trajectories[0].calculator, presto.calculators.GaussianCalculator):
n_threads = int(
self.trajectories[0].calculator.link0["nprocshared"])

if slurm:
slurm_script = 'traj_array.sh'

assert os.path.exists(slurm_script), "The Slurm submit script traj_array.sh is required but not found"

# dict to ensure that (some) necessary sbatch options are specified
sbatch_flags = {'-J': False, '-c': False, '--array': False}
with open(slurm_script, 'r') as f:
source = f.read().splitlines() # no newline in source

with open(slurm_script, 'w') as f:
for line in source:
if "job-name" in line or '-J ' in line:
# modify job name appropriately
line = re.sub('array_(.*?)fs', f'array_{target_time}fs', line)
sbatch_flags['-J'] = True
elif "cpus-per-task" in line or '#SBATCH -c' in line:
# check if cpus-per-task is equal to n_threads
sbatch_flags['-c'] = True
thread_match = re.search('(\d+)$', line)
input_threads = int(thread_match.group())
if input_threads != n_threads:
line = re.sub('(\d+)$', str(n_threads), line)
logger.warning(
"--cpus-per-task in traj_array.sh is not equal to number of processors specified in config file, automatically changing")
elif "--array" in line:
# check if array size is equal to num of trajs
input_trajs = int(re.search('(\d+)$', line).group())
sbatch_flags['--array'] = True
if input_trajs != len(self.trajectories) - 1:
line = re.sub('(\d+)$', str(len(self.trajectories) - 1), line)
logger.warning(
"--array size in traj_array.sh is not equal to number of trajectories specified when calling remd_par_manager.py, automatically changing")

f.write(line+'\n')

for k, v in sbatch_flags.items():
if not v:
logger.error(
f"did not specify sbatch option {k} in traj_array.sh, exiting now")
sys.exit(1)

try:
sbatch_msg = subprocess.check_output(
['sbatch', slurm_script])
if 'Submitted' not in sbatch_msg.decode('utf-8'):
raise subprocess.CalledProcessError

except subprocess.CalledProcessError as e:
logger.error("Could not submit slurm job array script")
sys.exit(1)
logger.info("Wrote and submitted traj_array.sh")
return int(sbatch_msg.split()[-1]) # slurm jobid of the

def exchange(self):
""" time is from 0
Saves updates trajectories to trajectory chkfiles
"""
super().exchange()

for traj in self.trajectories:
traj.save()

def save(self):
"""
Saves ReplicaExchange object to chkfile (pickle), keeping only the last frame of each trajectory

"""
for traj in self.trajectories:
traj.frames = [traj.frames[-1]]

with (open(self.checkpoint_filename, "wb")) as f:
dill.dump(self, f, protocol=-1) # HIGHEST_PROTOCOL

logger.info(f"Saved ReplicaExchange to {self.checkpoint_filename}.")

@classmethod
def load(cls, checkpoint):
""" Loads ReplicaExchange object from chkfile

Returns:
ReplicaExchange object
"""

try:
with (open(checkpoint, "rb")) as f:
remd = dill.load(f)
except FileNotFoundError:
print(
"checkpoint file not found, do not call this script with --spawn on the first instance")
logger.info(f"Loaded ReplicaExchange from {checkpoint}.")

return remd
24 changes: 24 additions & 0 deletions presto/test/300k.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
type: equilibration

timestep: 1
high_atoms: 1-8
stop_time: 250

anchor: 1

integrator:
type: langevin
viscosity: 0.0001

potential:
type: spherical_harmonic
radius: 8

calculator:
type: xtb
gfn : 2
parallel: 8

bath_scheduler:
type: constant
target_temp: 300.00
10 changes: 10 additions & 0 deletions presto/test/dce.xyz
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
8
C2H4Cl2
C -1.259345218 0.000000000 -1.676453798
H -0.376267218 -0.509845000 -2.075709798
H -2.142423218 -0.509845000 -2.075709798
C -1.259345218 0.000000000 -0.146465798
H -2.142423218 0.509845000 0.252790202
H -0.376267218 0.509845000 0.252790202
Cl -1.259345218 -1.648164277 0.498867074
Cl -1.259345218 1.648164277 -2.321786671
6 changes: 6 additions & 0 deletions presto/test/save_empty_traj.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import presto

traj = presto.config.build("300k.yaml", checkpoint="300k.chk", geometry="dce.xyz")
traj.save()

traj2 = presto.trajectory.Trajectory.new_from_checkpoint("300k.chk", -1)
Loading