diff --git a/common/src/apis.rs b/common/src/apis.rs index 18fb91dd..af6bdedd 100644 --- a/common/src/apis.rs +++ b/common/src/apis.rs @@ -95,7 +95,7 @@ pub struct Application { pub command: Option, pub arguments: Vec, pub environments: HashMap, - pub working_directory: String, + pub working_directory: Option, pub max_instances: u32, pub delay_release: Duration, pub schema: Option, @@ -111,7 +111,7 @@ pub struct ApplicationAttributes { pub command: Option, pub arguments: Vec, pub environments: HashMap, - pub working_directory: String, + pub working_directory: Option, pub max_instances: u32, pub delay_release: Duration, pub schema: Option, @@ -128,7 +128,7 @@ impl Default for ApplicationAttributes { command: None, arguments: vec![], environments: HashMap::new(), - working_directory: "/tmp".to_string(), + working_directory: None, max_instances: DEFAULT_MAX_INSTANCES, delay_release: DEFAULT_DELAY_RELEASE, schema: Some(ApplicationSchema::default()), @@ -868,7 +868,7 @@ impl TryFrom<&rpc::Application> for Application { .into_iter() .map(|e| (e.name, e.value)) .collect(), - working_directory: spec.working_directory.unwrap_or(String::default()), + working_directory: spec.working_directory, max_instances: spec.max_instances.unwrap_or(DEFAULT_MAX_INSTANCES), delay_release: spec .delay_release @@ -901,7 +901,7 @@ impl From<&Application> for rpc::Application { .into_iter() .map(|(k, v)| rpc::Environment { name: k, value: v }) .collect(), - working_directory: Some(app.working_directory.clone()), + working_directory: app.working_directory.clone(), max_instances: Some(app.max_instances), delay_release: Some(app.delay_release.num_seconds()), schema: app.schema.clone().map(rpc::ApplicationSchema::from), @@ -939,7 +939,8 @@ impl From for ApplicationAttributes { .into_iter() .map(|e| (e.name, e.value)) .collect(), - working_directory: spec.working_directory.clone().unwrap_or_default(), + // Treat empty string as None due to protobuf limitation + working_directory: spec.working_directory.clone().filter(|wd| !wd.is_empty()), max_instances: spec.max_instances.unwrap_or(DEFAULT_MAX_INSTANCES), delay_release: spec .delay_release diff --git a/common/src/lib.rs b/common/src/lib.rs index a36a4cf7..80931aab 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -215,8 +215,7 @@ pub fn default_applications() -> HashMap { "The Flame Runner application for executing customized Python applications." .to_string(), ), - working_directory: "/tmp".to_string(), - command: Some("/usr/bin/uv".to_string()), + command: Some("/bin/uv".to_string()), arguments: vec![ "run".to_string(), "--with".to_string(), diff --git a/docker/Dockerfile.console b/docker/Dockerfile.console index 6b0b809c..180151ee 100644 --- a/docker/Dockerfile.console +++ b/docker/Dockerfile.console @@ -14,7 +14,7 @@ COPY --from=builder /usr/local/cargo/bin/flmping /usr/local/bin/flmping COPY --from=builder /usr/local/cargo/bin/flmctl /usr/local/bin/flmctl COPY --from=builder /usr/local/cargo/bin/flmexec /usr/local/bin/flmexec -COPY --from=ghcr.io/astral-sh/uv:0.9.18 /uv /uvx /bin/ +COPY --from=ghcr.io/astral-sh/uv:0.9.26 /uv /uvx /bin/ RUN chmod +x /usr/local/bin/* diff --git a/docker/Dockerfile.fem b/docker/Dockerfile.fem index 5346bf35..401c9834 100644 --- a/docker/Dockerfile.fem +++ b/docker/Dockerfile.fem @@ -12,11 +12,11 @@ FROM ubuntu:24.04 RUN apt-get update && apt-get install -y python3-pip -RUN mkdir -p /usr/local/flame/bin /usr/local/flame/work /usr/local/flame/sdk +RUN mkdir -p /usr/local/flame/bin /usr/local/flame/work/tmp /usr/local/flame/sdk WORKDIR /usr/local/flame/work -COPY --from=ghcr.io/astral-sh/uv:0.9.18 /uv /uvx /bin/ +COPY --from=ghcr.io/astral-sh/uv:0.9.26 /uv /uvx /bin/ COPY --from=builder /usr/local/cargo/bin/flame-executor-manager /usr/local/flame/bin/flame-executor-manager COPY --from=builder /usr/local/cargo/bin/flmping-service /usr/local/flame/bin/flmping-service diff --git a/e2e/tests/test_flmrun.py b/e2e/tests/test_flmrun.py index 1dc8ae5e..dc9e2dd3 100644 --- a/e2e/tests/test_flmrun.py +++ b/e2e/tests/test_flmrun.py @@ -79,8 +79,7 @@ def test_flmrun_application_registered(): assert flmrun.name == FLMRUN_E2E_APP assert flmrun.shim == flamepy.Shim.Host assert flmrun.state == flamepy.ApplicationState.ENABLED - assert flmrun.command == "/usr/bin/uv" - assert flmrun.working_directory == "/tmp" + assert flmrun.command == "/bin/uv" def test_flmrun_sum_function(): diff --git a/examples/ps/.python-version b/examples/ps/.python-version new file mode 100644 index 00000000..e4fba218 --- /dev/null +++ b/examples/ps/.python-version @@ -0,0 +1 @@ +3.12 diff --git a/examples/ps/README.md b/examples/ps/README.md new file mode 100644 index 00000000..e69de29b diff --git a/examples/ps/dist/ps-example.tar.gz b/examples/ps/dist/ps-example.tar.gz new file mode 100644 index 00000000..23ad3ea4 Binary files /dev/null and b/examples/ps/dist/ps-example.tar.gz differ diff --git a/examples/ps/main.py b/examples/ps/main.py new file mode 100644 index 00000000..ef5bb7b6 --- /dev/null +++ b/examples/ps/main.py @@ -0,0 +1,26 @@ +from ps import ConvNet, get_data_loader, ParameterServer, DataWorker, evaluate +from flamepy.rl import Runner + + +if __name__ == "__main__": + model = ConvNet() + test_loader = get_data_loader()[1] + print("Running synchronous parameter server training.") + + with Runner("ps-example") as rr: + ps_svc = rr.service(ParameterServer(1e-2)) + workers_svc = [rr.service(DataWorker) for _ in range(4)] + + current_weights = ps_svc.get_weights().get() + for i in range(20): + gradients = [worker.compute_gradients(current_weights) for worker in workers_svc] + # Calculate update after all gradients are available. + current_weights = ps_svc.apply_gradients(*gradients).get() + + if i % 10 == 0: + # Evaluate the current model. + model.set_weights(current_weights) + accuracy = evaluate(model, test_loader) + print("Iter {}: \taccuracy is {:.1f}".format(i, accuracy)) + + print("Final accuracy is {:.1f}.".format(accuracy)) diff --git a/examples/ps/ps.py b/examples/ps/ps.py new file mode 100644 index 00000000..d45a3a8f --- /dev/null +++ b/examples/ps/ps.py @@ -0,0 +1,119 @@ +import os +import torch +import torch.nn as nn +import torch.nn.functional as F +from torchvision import datasets, transforms +from filelock import FileLock +import numpy as np + + +def get_data_loader(): + """Safely downloads data. Returns training/validation set dataloader.""" + mnist_transforms = transforms.Compose( + [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))] + ) + + # We add FileLock here because multiple workers will want to + # download data, and this may cause overwrites since + # DataLoader is not threadsafe. + with FileLock(os.path.expanduser("~/data.lock")): + train_loader = torch.utils.data.DataLoader( + datasets.MNIST( + "~/data", train=True, download=True, transform=mnist_transforms + ), + batch_size=128, + shuffle=True, + ) + test_loader = torch.utils.data.DataLoader( + datasets.MNIST("~/data", train=False, transform=mnist_transforms), + batch_size=128, + shuffle=True, + ) + return train_loader, test_loader + + +def evaluate(model, test_loader): + """Evaluates the accuracy of the model on a validation dataset.""" + model.eval() + correct = 0 + total = 0 + with torch.no_grad(): + for batch_idx, (data, target) in enumerate(test_loader): + # This is only set to finish evaluation faster. + if batch_idx * len(data) > 1024: + break + outputs = model(data) + _, predicted = torch.max(outputs.data, 1) + total += target.size(0) + correct += (predicted == target).sum().item() + return 100.0 * correct / total + + +class ConvNet(nn.Module): + """Small ConvNet for MNIST.""" + + def __init__(self): + super(ConvNet, self).__init__() + self.conv1 = nn.Conv2d(1, 3, kernel_size=3) + self.fc = nn.Linear(192, 10) + + def forward(self, x): + x = F.relu(F.max_pool2d(self.conv1(x), 3)) + x = x.view(-1, 192) + x = self.fc(x) + return F.log_softmax(x, dim=1) + + def get_weights(self): + return {k: v.cpu() for k, v in self.state_dict().items()} + + def set_weights(self, weights): + self.load_state_dict(weights) + + def get_gradients(self): + grads = [] + for p in self.parameters(): + grad = None if p.grad is None else p.grad.data.cpu().numpy() + grads.append(grad) + return grads + + def set_gradients(self, gradients): + for g, p in zip(gradients, self.parameters()): + if g is not None: + p.grad = torch.from_numpy(g) + + +class ParameterServer(object): + def __init__(self, lr): + self.model = ConvNet() + self.optimizer = torch.optim.SGD(self.model.parameters(), lr=lr) + + def apply_gradients(self, *gradients): + summed_gradients = [ + np.stack(gradient_zip).sum(axis=0) for gradient_zip in zip(*gradients) + ] + self.optimizer.zero_grad() + self.model.set_gradients(summed_gradients) + self.optimizer.step() + return self.model.get_weights() + + def get_weights(self): + return self.model.get_weights() + + +class DataWorker(object): + def __init__(self): + self.model = ConvNet() + self.data_iterator = iter(get_data_loader()[0]) + + def compute_gradients(self, weights): + self.model.set_weights(weights) + try: + data, target = next(self.data_iterator) + except StopIteration: # When the epoch ends, start a new epoch. + self.data_iterator = iter(get_data_loader()[0]) + data, target = next(self.data_iterator) + self.model.zero_grad() + output = self.model(data) + loss = F.nll_loss(output, target) + loss.backward() + return self.model.get_gradients() \ No newline at end of file diff --git a/examples/ps/pyproject.toml b/examples/ps/pyproject.toml new file mode 100644 index 00000000..48e417a3 --- /dev/null +++ b/examples/ps/pyproject.toml @@ -0,0 +1,22 @@ +[project] +name = "ps-example" +version = "0.1.0" +description = "Parameter Server by flamepy.Runner" +readme = "README.md" +requires-python = ">=3.12" +dependencies = [ + "torch", + "torchvision", + "numpy", + "filelock" +] + +[build-system] +requires = ["setuptools>=61.0", "wheel"] +build-backend = "setuptools.build_meta" + +[tool.setuptools] +py-modules = ["ps", "main"] + +[tool.uv.sources] +flamepy = { path = "/usr/local/flame/sdk/python" } \ No newline at end of file diff --git a/executor_manager/src/shims/host_shim.rs b/executor_manager/src/shims/host_shim.rs index a32c1d1b..f97ddaf7 100644 --- a/executor_manager/src/shims/host_shim.rs +++ b/executor_manager/src/shims/host_shim.rs @@ -15,6 +15,7 @@ use std::env; use std::fs::{self, create_dir_all, File, OpenOptions}; use std::future::Future; use std::os::unix::process::CommandExt; +use std::path::Path; use std::pin::Pin; use std::process::{self, Command, Stdio}; use std::sync::Arc; @@ -99,34 +100,65 @@ impl HostShim { // Spawn child process let mut cmd = tokio::process::Command::new(&command); - let cur_dir = app - .working_directory - .clone() - .unwrap_or(FLAME_WORKING_DIRECTORY.to_string()); + // If application doesn't specify working_directory, use executor manager's working directory with executor ID + let cur_dir = match app.working_directory.clone() { + Some(wd) => Path::new(&wd).to_path_buf(), + None => env::current_dir() + .unwrap_or(Path::new(FLAME_WORKING_DIRECTORY).to_path_buf()) + .join(executor.id.as_str()), + }; - tracing::debug!("Current directory of application instance: {cur_dir}"); + let work_dir = cur_dir.clone(); + let tmp_dir = cur_dir.join("tmp"); - // Create the working directory if it doesn't exist - create_dir_all(&cur_dir).map_err(|e| { - FlameError::Internal(format!("failed to create working directory {cur_dir}: {e}")) + tracing::debug!( + "Working directory of application instance: {}", + work_dir.display() + ); + tracing::debug!( + "Temporary directory of application instance: {}", + tmp_dir.display() + ); + + // Create the working & temporary directories if they don't exist + create_dir_all(&work_dir).map_err(|e| { + FlameError::Internal(format!( + "failed to create working directory {}: {e}", + work_dir.display() + )) + })?; + create_dir_all(&tmp_dir).map_err(|e| { + FlameError::Internal(format!( + "failed to create temporary directory {}: {e}", + tmp_dir.display() + )) })?; - let log_file = OpenOptions::new() + // Set temporary directory for the application instance + envs.insert("TMPDIR".to_string(), tmp_dir.to_string_lossy().to_string()); + + let log_out = OpenOptions::new() + .create(true) + .read(true) + .write(true) + .truncate(true) + .open(work_dir.join(format!("{}.out", executor.id))) + .map_err(|e| FlameError::Internal(format!("failed to open stdout log file: {e}")))?; + + let log_err = OpenOptions::new() .create(true) .read(true) .write(true) .truncate(true) - .open(format!("{cur_dir}/{}.log", executor.id)) - .map_err(|e| FlameError::Internal(format!("failed to open log file: {e}")))?; + .open(work_dir.join(format!("{}.err", executor.id))) + .map_err(|e| FlameError::Internal(format!("failed to open stderr log file: {e}")))?; let mut child = cmd .envs(envs) .args(args) - .current_dir(cur_dir) - .stdout(Stdio::from(log_file.try_clone().map_err(|e| { - FlameError::Internal(format!("failed to clone log file: {e}")) - })?)) - .stderr(Stdio::from(log_file)) + .current_dir(&work_dir) + .stdout(Stdio::from(log_out)) + .stderr(Stdio::from(log_err)) .process_group(0) .spawn() .map_err(|e| { diff --git a/sdk/python/src/flamepy/rl/runpy.py b/sdk/python/src/flamepy/rl/runpy.py index bd512d58..e5e0721d 100644 --- a/sdk/python/src/flamepy/rl/runpy.py +++ b/sdk/python/src/flamepy/rl/runpy.py @@ -15,6 +15,7 @@ import inspect import logging import os +import shutil import site import subprocess import sys @@ -120,7 +121,12 @@ def _extract_archive(self, archive_path: str, extract_to: str) -> str: logger.info(f"Extracting archive: {archive_path} to {extract_to}") try: - # Create extraction directory if it doesn't exist + # Remove old extracted directory if it exists to ensure clean extraction + if os.path.exists(extract_to): + logger.info(f"Removing existing extracted directory: {extract_to}") + shutil.rmtree(extract_to) + + # Create extraction directory os.makedirs(extract_to, exist_ok=True) # Determine archive type and extract @@ -193,27 +199,50 @@ def _install_package_from_url(self, url: str) -> None: install_path = extracted_dir logger.info(f"Will install from extracted directory: {install_path}") + # Debug: List contents of extracted directory + try: + contents = os.listdir(install_path) + logger.debug(f"Extracted directory contents: {contents}") + + # Check for pyproject.toml or setup.py + if "pyproject.toml" in contents: + pyproject_path = os.path.join(install_path, "pyproject.toml") + with open(pyproject_path, "r") as f: + pyproject_content = f.read() + logger.debug(f"pyproject.toml content:\n{pyproject_content}") + if "setup.py" in contents: + logger.debug("Found setup.py in extracted directory") + except Exception as e: + logger.warning(f"Failed to list extracted directory contents: {e}") + # Use sys.executable -m pip to install into the current virtual environment + # pip install will upgrade the package if it's already installed logger.info(f"Installing package: {install_path}") - install_args = [sys.executable, "-m", "pip", "install", install_path] + logger.debug(f"Python executable: {sys.executable}") + logger.debug(f"Current working directory: {os.getcwd()}") + install_args = [sys.executable, "-m", "pip", "install", "--upgrade", install_path] + logger.debug(f"Install command: {' '.join(install_args)}") try: result = subprocess.run(install_args, capture_output=True, text=True, check=True) - logger.info(f"Package installation output: {result.stdout}") + logger.info("Package installation succeeded") + logger.debug(f"Package installation stdout:\n{result.stdout}") if result.stderr: - logger.warning(f"Package installation stderr: {result.stderr}") + logger.debug(f"Package installation stderr:\n{result.stderr}") logger.info(f"Successfully installed package from: {install_path}") # Reload site packages to make the newly installed package available # This is necessary because the Python interpreter has already started logger.info("Reloading site packages to pick up newly installed package") importlib.reload(site) - logger.info(f"Updated sys.path: {sys.path}") + logger.debug(f"Updated sys.path: {sys.path}") except subprocess.CalledProcessError as e: logger.error(f"Failed to install package: {e}") - logger.error(f"stdout: {e.stdout}") - logger.error(f"stderr: {e.stderr}") + logger.error(f"Return code: {e.returncode}") + logger.error(f"Install command was: {' '.join(install_args)}") + logger.error(f"Package installation stdout:\n{e.stdout}") + logger.error(f"Package installation stderr:\n{e.stderr}") raise RuntimeError(f"Package installation failed: {e}") finally: # Clean up extracted directory if it was created diff --git a/sdk/rust/src/client/mod.rs b/sdk/rust/src/client/mod.rs index bb4f18de..614ed1b2 100644 --- a/sdk/rust/src/client/mod.rs +++ b/sdk/rust/src/client/mod.rs @@ -574,7 +574,8 @@ impl From for ApplicationAttributes { .into_iter() .map(|env| (env.name, env.value)) .collect(), - working_directory: app.working_directory.clone(), + // Treat empty string as None due to protobuf limitation + working_directory: app.working_directory.clone().filter(|wd| !wd.is_empty()), max_instances: app.max_instances, delay_release: app.delay_release.map(Duration::seconds), schema: app.schema.clone().map(ApplicationSchema::from), diff --git a/session_manager/src/scheduler/mod.rs b/session_manager/src/scheduler/mod.rs index 99842b8b..2d46ddce 100644 --- a/session_manager/src/scheduler/mod.rs +++ b/session_manager/src/scheduler/mod.rs @@ -84,7 +84,7 @@ mod tests { description: None, labels: Vec::new(), arguments: Vec::new(), - working_directory: "/tmp".to_string(), + working_directory: Some("/tmp".to_string()), environments: HashMap::new(), shim: Shim::Host, max_instances: 10, diff --git a/session_manager/src/storage/engine/sqlite.rs b/session_manager/src/storage/engine/sqlite.rs index 29adb8b0..7510e1cb 100644 --- a/session_manager/src/storage/engine/sqlite.rs +++ b/session_manager/src/storage/engine/sqlite.rs @@ -803,7 +803,7 @@ mod tests { command: Some("run-agent".to_string()), arguments: vec!["--test".to_string(), "--agent".to_string()], environments: HashMap::from([("TEST".to_string(), "true".to_string())]), - working_directory: "/tmp".to_string(), + working_directory: Some("/tmp".to_string()), max_instances: 10, delay_release: Duration::seconds(0), schema: None, @@ -825,7 +825,7 @@ mod tests { app_2.environments, HashMap::from([("TEST".to_string(), "true".to_string())]) ); - assert_eq!(app_2.working_directory, "/tmp".to_string()); + assert_eq!(app_2.working_directory, Some("/tmp".to_string())); assert_eq!(app_2.max_instances, 10); assert_eq!(app_2.delay_release, Duration::seconds(0)); assert!(app_2.schema.is_none()); @@ -916,7 +916,7 @@ mod tests { command: Some("my-agent".to_string()), arguments: vec!["--test".to_string(), "--agent".to_string()], environments: HashMap::from([("TEST".to_string(), "true".to_string())]), - working_directory: "/tmp".to_string(), + working_directory: Some("/tmp".to_string()), max_instances: 10, delay_release: Duration::seconds(0), schema: Some(ApplicationSchema { @@ -937,7 +937,7 @@ mod tests { command: None, arguments: vec![], environments: HashMap::new(), - working_directory: "/tmp".to_string(), + working_directory: Some("/tmp".to_string()), max_instances: 10, delay_release: Duration::seconds(0), schema: None, @@ -1003,7 +1003,7 @@ mod tests { "flamepy.rl.runpy".to_string(), ], environments: HashMap::new(), - working_directory: "/tmp".to_string(), + working_directory: Some("/tmp".to_string()), max_instances: 5, delay_release: Duration::seconds(10), schema: None, @@ -1053,7 +1053,7 @@ mod tests { command: Some("/usr/bin/test".to_string()), arguments: vec![], environments: HashMap::new(), - working_directory: "/tmp".to_string(), + working_directory: Some("/tmp".to_string()), max_instances: 5, delay_release: Duration::seconds(10), schema: None, @@ -1099,7 +1099,7 @@ mod tests { command: Some("/usr/bin/test".to_string()), arguments: vec![], environments: HashMap::new(), - working_directory: "/tmp".to_string(), + working_directory: Some("/tmp".to_string()), max_instances: 5, delay_release: Duration::seconds(10), schema: None, @@ -1123,7 +1123,7 @@ mod tests { command: Some("/usr/bin/uv".to_string()), arguments: vec!["run".to_string()], environments: HashMap::from([("ENV".to_string(), "test".to_string())]), - working_directory: "/opt".to_string(), + working_directory: Some("/opt".to_string()), max_instances: 10, delay_release: Duration::seconds(20), schema: None, @@ -1139,7 +1139,7 @@ mod tests { Some("Updated description".to_string()) ); // Note: image field is not updated by update_application method - assert_eq!(updated_app.working_directory, "/opt".to_string()); + assert_eq!(updated_app.working_directory, Some("/opt".to_string())); assert_eq!(updated_app.max_instances, 10); // Retrieve and verify URL persisted after update diff --git a/session_manager/src/storage/engine/types.rs b/session_manager/src/storage/engine/types.rs index e3450257..a1bfcc74 100644 --- a/session_manager/src/storage/engine/types.rs +++ b/session_manager/src/storage/engine/types.rs @@ -192,7 +192,7 @@ impl TryFrom<&ApplicationDao> for Application { .clone() .map(|envs| envs.0) .unwrap_or_default(), - working_directory: app.working_directory.clone().unwrap_or("/tmp".to_string()), + working_directory: app.working_directory.clone(), max_instances: app.max_instances as u32, delay_release: Duration::seconds(app.delay_release), schema: app.schema.clone().map(|arg| arg.0.into()),