From 10b3980fdfed5973bb1c77a09b23008582cc69d3 Mon Sep 17 00:00:00 2001 From: Alex Kotliarskyi Date: Tue, 10 Mar 2026 19:13:12 -0700 Subject: [PATCH 1/4] feat(elixir): add ssh workers and live worker backends Summary: - add SSH worker execution support, host selection, and remote workspace/app-server wiring for Symphony Elixir - add Docker-backed SSH workers plus explicit local and SSH live E2E coverage in the Elixir test suite - simplify workflow worker config and document the new E2E behavior Rationale: - let Symphony run Codex work on remote SSH hosts without hardcoding host-specific config into the repo - make the live test exercise both the baseline local path and the SSH worker path with disposable infrastructure by default - keep the test and config surface smaller by removing unnecessary normalization and Docker-specific overrides Tests: - make all - env -u SYMPHONY_LIVE_SSH_WORKER_HOSTS LINEAR_API_KEY="$(tr -d '\r\n' < ~/.linear_api_key)" SYMPHONY_RUN_LIVE_E2E=1 mix test test/symphony_elixir/live_e2e_test.exs Co-authored-by: Codex --- elixir/Makefile | 10 - elixir/README.md | 21 +- elixir/lib/symphony_elixir/agent_runner.ex | 102 +++- .../lib/symphony_elixir/codex/app_server.ex | 132 ++++- .../lib/symphony_elixir/codex/dynamic_tool.ex | 19 +- elixir/lib/symphony_elixir/config.ex | 7 +- elixir/lib/symphony_elixir/config/schema.ex | 37 +- elixir/lib/symphony_elixir/orchestrator.ex | 123 ++++- elixir/lib/symphony_elixir/ssh.ex | 100 ++++ elixir/lib/symphony_elixir/workspace.ex | 269 +++++++-- elixir/lib/symphony_elixir_web/presenter.ex | 25 +- .../test/support/live_e2e_docker/Dockerfile | 22 + .../live_e2e_docker/docker-compose.yml | 20 + .../live_e2e_docker/live_worker_entrypoint.sh | 13 + .../live_e2e_docker/symphony-live-worker.conf | 7 + elixir/test/support/test_support.exs | 14 + .../test/symphony_elixir/app_server_test.exs | 137 ++++- .../symphony_elixir/dynamic_tool_test.exs | 108 +--- .../test/symphony_elixir/extensions_test.exs | 13 +- elixir/test/symphony_elixir/live_e2e_test.exs | 518 +++++++++++++++--- elixir/test/symphony_elixir/ssh_test.exs | 199 +++++++ .../workspace_and_config_test.exs | 70 ++- 22 files changed, 1649 insertions(+), 317 deletions(-) create mode 100644 elixir/lib/symphony_elixir/ssh.ex create mode 100644 elixir/test/support/live_e2e_docker/Dockerfile create mode 100644 elixir/test/support/live_e2e_docker/docker-compose.yml create mode 100644 elixir/test/support/live_e2e_docker/live_worker_entrypoint.sh create mode 100644 elixir/test/support/live_e2e_docker/symphony-live-worker.conf create mode 100644 elixir/test/symphony_elixir/ssh_test.exs diff --git a/elixir/Makefile b/elixir/Makefile index 331eaef5..61c40270 100644 --- a/elixir/Makefile +++ b/elixir/Makefile @@ -34,16 +34,6 @@ dialyzer: $(MIX) dialyzer --format short e2e: - @if [ -z "$$LINEAR_API_KEY" ]; then \ - echo "LINEAR_API_KEY is required for \`make e2e\`."; \ - echo "Export it first, for example:"; \ - echo " export LINEAR_API_KEY=\$$(tr -d '\\r\\n' < ~/.linear_api_key)"; \ - exit 1; \ - fi - @if ! command -v codex >/dev/null 2>&1; then \ - echo "\`codex\` must be on PATH for \`make e2e\`."; \ - exit 1; \ - fi SYMPHONY_RUN_LIVE_E2E=1 $(MIX) test test/symphony_elixir/live_e2e_test.exs ci: diff --git a/elixir/README.md b/elixir/README.md index 7c93acf0..603b4bb0 100644 --- a/elixir/README.md +++ b/elixir/README.md @@ -185,12 +185,23 @@ make e2e Optional environment variables: - `SYMPHONY_LIVE_LINEAR_TEAM_KEY` defaults to `SYME2E` -- `SYMPHONY_LIVE_CODEX_COMMAND` defaults to `codex app-server` +- `SYMPHONY_LIVE_SSH_WORKER_HOSTS` uses those SSH hosts when set, as a comma-separated list -The live test creates a temporary Linear project and issue, writes a temporary `WORKFLOW.md`, -runs a real agent turn, verifies the workspace side effect, requires Codex to comment on and close -the Linear issue, then marks the project completed so the run remains visible in Linear. -`make e2e` fails fast with a clear error if `LINEAR_API_KEY` is unset. +`make e2e` runs two live scenarios: +- one with a local worker +- one with SSH workers + +If `SYMPHONY_LIVE_SSH_WORKER_HOSTS` is unset, the SSH scenario uses `docker compose` to start two +disposable SSH workers on `localhost:`. The live test generates a temporary SSH keypair, +mounts the host `~/.codex/auth.json` into each worker, verifies that Symphony can talk to them +over real SSH, then runs the same orchestration flow against those worker addresses. This keeps +the transport representative without depending on long-lived external machines. + +Set `SYMPHONY_LIVE_SSH_WORKER_HOSTS` if you want `make e2e` to target real SSH hosts instead. + +The live test creates a temporary Linear project and issue, writes a temporary `WORKFLOW.md`, runs +a real agent turn, verifies the workspace side effect, requires Codex to comment on and close the +Linear issue, then marks the project completed so the run remains visible in Linear. ## FAQ diff --git a/elixir/lib/symphony_elixir/agent_runner.ex b/elixir/lib/symphony_elixir/agent_runner.ex index 482d5476..35ea8a03 100644 --- a/elixir/lib/symphony_elixir/agent_runner.ex +++ b/elixir/lib/symphony_elixir/agent_runner.ex @@ -7,28 +7,58 @@ defmodule SymphonyElixir.AgentRunner do alias SymphonyElixir.Codex.AppServer alias SymphonyElixir.{Config, Linear.Issue, PromptBuilder, Tracker, Workspace} + @type worker_host :: String.t() | nil + @spec run(map(), pid() | nil, keyword()) :: :ok | no_return() def run(issue, codex_update_recipient \\ nil, opts \\ []) do - Logger.info("Starting agent run for #{issue_context(issue)}") + worker_hosts = + candidate_worker_hosts(Keyword.get(opts, :worker_host), Config.settings!().worker.ssh_hosts) + + Logger.info("Starting agent run for #{issue_context(issue)} worker_hosts=#{inspect(worker_hosts_for_log(worker_hosts))}") + + case run_on_worker_hosts(issue, codex_update_recipient, opts, worker_hosts) do + :ok -> + :ok + + {:error, reason} -> + Logger.error("Agent run failed for #{issue_context(issue)}: #{inspect(reason)}") + raise RuntimeError, "Agent run failed for #{issue_context(issue)}: #{inspect(reason)}" + end + end - case Workspace.create_for_issue(issue) do + defp run_on_worker_hosts(issue, codex_update_recipient, opts, [worker_host | rest]) do + case run_on_worker_host(issue, codex_update_recipient, opts, worker_host) do + :ok -> + :ok + + {:error, reason} when rest != [] -> + Logger.warning("Agent run failed for #{issue_context(issue)} worker_host=#{worker_host_for_log(worker_host)} reason=#{inspect(reason)}; trying next worker host") + run_on_worker_hosts(issue, codex_update_recipient, opts, rest) + + {:error, reason} -> + {:error, reason} + end + end + + defp run_on_worker_hosts(_issue, _codex_update_recipient, _opts, []), do: {:error, :no_worker_hosts_available} + + defp run_on_worker_host(issue, codex_update_recipient, opts, worker_host) do + Logger.info("Starting worker attempt for #{issue_context(issue)} worker_host=#{worker_host_for_log(worker_host)}") + + case Workspace.create_for_issue(issue, worker_host) do {:ok, workspace} -> + send_worker_runtime_info(codex_update_recipient, issue, worker_host, workspace) + try do - with :ok <- Workspace.run_before_run_hook(workspace, issue), - :ok <- run_codex_turns(workspace, issue, codex_update_recipient, opts) do - :ok - else - {:error, reason} -> - Logger.error("Agent run failed for #{issue_context(issue)}: #{inspect(reason)}") - raise RuntimeError, "Agent run failed for #{issue_context(issue)}: #{inspect(reason)}" + with :ok <- Workspace.run_before_run_hook(workspace, issue, worker_host) do + run_codex_turns(workspace, issue, codex_update_recipient, opts, worker_host) end after - Workspace.run_after_run_hook(workspace, issue) + Workspace.run_after_run_hook(workspace, issue, worker_host) end {:error, reason} -> - Logger.error("Agent run failed for #{issue_context(issue)}: #{inspect(reason)}") - raise RuntimeError, "Agent run failed for #{issue_context(issue)}: #{inspect(reason)}" + {:error, reason} end end @@ -46,11 +76,27 @@ defmodule SymphonyElixir.AgentRunner do defp send_codex_update(_recipient, _issue, _message), do: :ok - defp run_codex_turns(workspace, issue, codex_update_recipient, opts) do + defp send_worker_runtime_info(recipient, %Issue{id: issue_id}, worker_host, workspace) + when is_binary(issue_id) and is_pid(recipient) and is_binary(workspace) do + send( + recipient, + {:worker_runtime_info, issue_id, + %{ + worker_host: worker_host, + workspace_path: workspace + }} + ) + + :ok + end + + defp send_worker_runtime_info(_recipient, _issue, _worker_host, _workspace), do: :ok + + defp run_codex_turns(workspace, issue, codex_update_recipient, opts, worker_host) do max_turns = Keyword.get(opts, :max_turns, Config.settings!().agent.max_turns) issue_state_fetcher = Keyword.get(opts, :issue_state_fetcher, &Tracker.fetch_issue_states_by_ids/1) - with {:ok, session} <- AppServer.start_session(workspace) do + with {:ok, session} <- AppServer.start_session(workspace, worker_host: worker_host) do try do do_run_codex_turns(session, workspace, issue, codex_update_recipient, opts, issue_state_fetcher, 1, max_turns) after @@ -142,6 +188,34 @@ defmodule SymphonyElixir.AgentRunner do defp active_issue_state?(_state_name), do: false + defp candidate_worker_hosts(nil, []), do: [nil] + + defp candidate_worker_hosts(preferred_host, configured_hosts) when is_list(configured_hosts) do + hosts = + configured_hosts + |> Enum.map(&String.trim/1) + |> Enum.reject(&(&1 == "")) + |> Enum.uniq() + + case preferred_host do + host when is_binary(host) and host != "" -> + [host | Enum.reject(hosts, &(&1 == host))] + + _ when hosts == [] -> + [nil] + + _ -> + hosts + end + end + + defp worker_hosts_for_log(worker_hosts) do + Enum.map(worker_hosts, &worker_host_for_log/1) + end + + defp worker_host_for_log(nil), do: "local" + defp worker_host_for_log(worker_host), do: worker_host + defp normalize_issue_state(state_name) when is_binary(state_name) do state_name |> String.trim() diff --git a/elixir/lib/symphony_elixir/codex/app_server.ex b/elixir/lib/symphony_elixir/codex/app_server.ex index ef790224..7da87ce9 100644 --- a/elixir/lib/symphony_elixir/codex/app_server.ex +++ b/elixir/lib/symphony_elixir/codex/app_server.ex @@ -4,7 +4,7 @@ defmodule SymphonyElixir.Codex.AppServer do """ require Logger - alias SymphonyElixir.{Codex.DynamicTool, Config, PathSafety} + alias SymphonyElixir.{Codex.DynamicTool, Config, PathSafety, SSH} @initialize_id 1 @thread_start_id 2 @@ -21,12 +21,13 @@ defmodule SymphonyElixir.Codex.AppServer do thread_sandbox: String.t(), turn_sandbox_policy: map(), thread_id: String.t(), - workspace: Path.t() + workspace: Path.t(), + worker_host: String.t() | nil } @spec run(Path.t(), String.t(), map(), keyword()) :: {:ok, map()} | {:error, term()} def run(workspace, prompt, issue, opts \\ []) do - with {:ok, session} <- start_session(workspace) do + with {:ok, session} <- start_session(workspace, opts) do try do run_turn(session, prompt, issue, opts) after @@ -35,13 +36,15 @@ defmodule SymphonyElixir.Codex.AppServer do end end - @spec start_session(Path.t()) :: {:ok, session()} | {:error, term()} - def start_session(workspace) do - with {:ok, expanded_workspace} <- validate_workspace_cwd(workspace), - {:ok, port} <- start_port(expanded_workspace) do - metadata = port_metadata(port) + @spec start_session(Path.t(), keyword()) :: {:ok, session()} | {:error, term()} + def start_session(workspace, opts \\ []) do + worker_host = Keyword.get(opts, :worker_host) - with {:ok, session_policies} <- session_policies(expanded_workspace), + with {:ok, expanded_workspace} <- validate_workspace_cwd(workspace, worker_host), + {:ok, port} <- start_port(expanded_workspace, worker_host) do + metadata = port_metadata(port, worker_host) + + with {:ok, session_policies} <- session_policies(expanded_workspace, worker_host), {:ok, thread_id} <- do_start_session(port, expanded_workspace, session_policies) do {:ok, %{ @@ -52,7 +55,8 @@ defmodule SymphonyElixir.Codex.AppServer do thread_sandbox: session_policies.thread_sandbox, turn_sandbox_policy: session_policies.turn_sandbox_policy, thread_id: thread_id, - workspace: expanded_workspace + workspace: expanded_workspace, + worker_host: worker_host }} else {:error, reason} -> @@ -140,7 +144,7 @@ defmodule SymphonyElixir.Codex.AppServer do stop_port(port) end - defp validate_workspace_cwd(workspace) when is_binary(workspace) do + defp validate_workspace_cwd(workspace, nil) when is_binary(workspace) do expanded_workspace = Path.expand(workspace) expanded_root = Path.expand(Config.settings!().workspace.root) expanded_root_prefix = expanded_root <> "/" @@ -168,7 +172,21 @@ defmodule SymphonyElixir.Codex.AppServer do end end - defp start_port(workspace) do + defp validate_workspace_cwd(workspace, worker_host) + when is_binary(workspace) and is_binary(worker_host) do + cond do + String.trim(workspace) == "" -> + {:error, {:invalid_workspace_cwd, :empty_remote_workspace, worker_host}} + + String.contains?(workspace, ["\n", "\r", <<0>>]) -> + {:error, {:invalid_workspace_cwd, :invalid_remote_workspace, worker_host, workspace}} + + true -> + {:ok, workspace} + end + end + + defp start_port(workspace, nil) do executable = System.find_executable("bash") if is_nil(executable) do @@ -191,13 +209,32 @@ defmodule SymphonyElixir.Codex.AppServer do end end - defp port_metadata(port) when is_port(port) do - case :erlang.port_info(port, :os_pid) do - {:os_pid, os_pid} -> - %{codex_app_server_pid: to_string(os_pid)} + defp start_port(workspace, worker_host) when is_binary(worker_host) do + remote_command = remote_launch_command(workspace) + SSH.start_port(worker_host, remote_command, line: @port_line_bytes) + end - _ -> - %{} + defp remote_launch_command(workspace) when is_binary(workspace) do + [ + "cd #{shell_escape(workspace)}", + "exec #{Config.settings!().codex.command}" + ] + |> Enum.join(" && ") + end + + defp port_metadata(port, worker_host) when is_port(port) do + base_metadata = + case :erlang.port_info(port, :os_pid) do + {:os_pid, os_pid} -> + %{codex_app_server_pid: to_string(os_pid)} + + _ -> + %{} + end + + case worker_host do + host when is_binary(host) -> Map.put(base_metadata, :worker_host, host) + _ -> base_metadata end end @@ -225,10 +262,14 @@ defmodule SymphonyElixir.Codex.AppServer do end end - defp session_policies(workspace) do + defp session_policies(workspace, nil) do Config.codex_runtime_settings(workspace) end + defp session_policies(workspace, worker_host) when is_binary(worker_host) do + Config.codex_runtime_settings(workspace, remote: true) + end + defp do_start_session(port, workspace, session_policies) do case send_initialize(port) do :ok -> start_thread(port, workspace, session_policies) @@ -243,7 +284,7 @@ defmodule SymphonyElixir.Codex.AppServer do "params" => %{ "approvalPolicy" => approval_policy, "sandbox" => thread_sandbox, - "cwd" => Path.expand(workspace), + "cwd" => workspace, "dynamicTools" => DynamicTool.tool_specs() } }) @@ -272,7 +313,7 @@ defmodule SymphonyElixir.Codex.AppServer do "text" => prompt } ], - "cwd" => Path.expand(workspace), + "cwd" => workspace, "title" => "#{issue.identifier}: #{issue.title}", "approvalPolicy" => approval_policy, "sandboxPolicy" => turn_sandbox_policy @@ -515,7 +556,10 @@ defmodule SymphonyElixir.Codex.AppServer do tool_name = tool_call_name(params) arguments = tool_call_arguments(params) - result = tool_executor.(tool_name, arguments) + result = + tool_name + |> tool_executor.(arguments) + |> normalize_dynamic_tool_result() send_message(port, %{ "id" => id, @@ -635,6 +679,44 @@ defmodule SymphonyElixir.Codex.AppServer do :unhandled end + defp normalize_dynamic_tool_result(%{"success" => success} = result) when is_boolean(success) do + output = + case Map.get(result, "output") do + existing_output when is_binary(existing_output) -> existing_output + _ -> dynamic_tool_output(result) + end + + content_items = + case Map.get(result, "contentItems") do + existing_items when is_list(existing_items) -> existing_items + _ -> dynamic_tool_content_items(output) + end + + result + |> Map.put("output", output) + |> Map.put("contentItems", content_items) + end + + defp normalize_dynamic_tool_result(result) do + %{ + "success" => false, + "output" => inspect(result), + "contentItems" => dynamic_tool_content_items(inspect(result)) + } + end + + defp dynamic_tool_output(%{"contentItems" => [%{"text" => text} | _]}) when is_binary(text), do: text + defp dynamic_tool_output(result), do: Jason.encode!(result, pretty: true) + + defp dynamic_tool_content_items(output) when is_binary(output) do + [ + %{ + "type" => "inputText", + "text" => output + } + ] + end + defp approve_or_require( port, id, @@ -921,7 +1003,7 @@ defmodule SymphonyElixir.Codex.AppServer do end defp metadata_from_message(port, payload) do - port |> port_metadata() |> maybe_set_usage(payload) + port |> port_metadata(nil) |> maybe_set_usage(payload) end defp maybe_set_usage(metadata, payload) when is_map(payload) do @@ -936,6 +1018,10 @@ defmodule SymphonyElixir.Codex.AppServer do defp maybe_set_usage(metadata, _payload), do: metadata + defp shell_escape(value) when is_binary(value) do + "'" <> String.replace(value, "'", "'\"'\"'") <> "'" + end + defp default_on_message(_message), do: :ok defp tool_call_name(params) when is_map(params) do diff --git a/elixir/lib/symphony_elixir/codex/dynamic_tool.ex b/elixir/lib/symphony_elixir/codex/dynamic_tool.ex index 716d3607..446c7fd2 100644 --- a/elixir/lib/symphony_elixir/codex/dynamic_tool.ex +++ b/elixir/lib/symphony_elixir/codex/dynamic_tool.ex @@ -118,24 +118,21 @@ defmodule SymphonyElixir.Codex.DynamicTool do _ -> true end - %{ - "success" => success, - "contentItems" => [ - %{ - "type" => "inputText", - "text" => encode_payload(response) - } - ] - } + dynamic_tool_response(success, encode_payload(response)) end defp failure_response(payload) do + dynamic_tool_response(false, encode_payload(payload)) + end + + defp dynamic_tool_response(success, output) when is_boolean(success) and is_binary(output) do %{ - "success" => false, + "success" => success, + "output" => output, "contentItems" => [ %{ "type" => "inputText", - "text" => encode_payload(payload) + "text" => output } ] } diff --git a/elixir/lib/symphony_elixir/config.ex b/elixir/lib/symphony_elixir/config.ex index 87011ac1..00e7f9b7 100644 --- a/elixir/lib/symphony_elixir/config.ex +++ b/elixir/lib/symphony_elixir/config.ex @@ -98,11 +98,12 @@ defmodule SymphonyElixir.Config do end end - @spec codex_runtime_settings(Path.t() | nil) :: {:ok, codex_runtime_settings()} | {:error, term()} - def codex_runtime_settings(workspace \\ nil) do + @spec codex_runtime_settings(Path.t() | nil, keyword()) :: + {:ok, codex_runtime_settings()} | {:error, term()} + def codex_runtime_settings(workspace \\ nil, opts \\ []) do with {:ok, settings} <- settings() do with {:ok, turn_sandbox_policy} <- - Schema.resolve_runtime_turn_sandbox_policy(settings, workspace) do + Schema.resolve_runtime_turn_sandbox_policy(settings, workspace, opts) do {:ok, %{ approval_policy: settings.codex.approval_policy, diff --git a/elixir/lib/symphony_elixir/config/schema.ex b/elixir/lib/symphony_elixir/config/schema.ex index c9ad948f..b9809926 100644 --- a/elixir/lib/symphony_elixir/config/schema.ex +++ b/elixir/lib/symphony_elixir/config/schema.ex @@ -100,6 +100,23 @@ defmodule SymphonyElixir.Config.Schema do end end + defmodule Worker do + @moduledoc false + use Ecto.Schema + import Ecto.Changeset + + @primary_key false + embedded_schema do + field(:ssh_hosts, {:array, :string}, default: []) + end + + @spec changeset(%__MODULE__{}, map()) :: Ecto.Changeset.t() + def changeset(schema, attrs) do + schema + |> cast(attrs, [:ssh_hosts], empty_values: []) + end + end + defmodule Agent do @moduledoc false use Ecto.Schema @@ -246,6 +263,7 @@ defmodule SymphonyElixir.Config.Schema do embeds_one(:tracker, Tracker, on_replace: :update, defaults_to_struct: true) embeds_one(:polling, Polling, on_replace: :update, defaults_to_struct: true) embeds_one(:workspace, Workspace, on_replace: :update, defaults_to_struct: true) + embeds_one(:worker, Worker, on_replace: :update, defaults_to_struct: true) embeds_one(:agent, Agent, on_replace: :update, defaults_to_struct: true) embeds_one(:codex, Codex, on_replace: :update, defaults_to_struct: true) embeds_one(:hooks, Hooks, on_replace: :update, defaults_to_struct: true) @@ -280,15 +298,15 @@ defmodule SymphonyElixir.Config.Schema do end end - @spec resolve_runtime_turn_sandbox_policy(%__MODULE__{}, Path.t() | nil) :: + @spec resolve_runtime_turn_sandbox_policy(%__MODULE__{}, Path.t() | nil, keyword()) :: {:ok, map()} | {:error, term()} - def resolve_runtime_turn_sandbox_policy(settings, workspace \\ nil) do + def resolve_runtime_turn_sandbox_policy(settings, workspace \\ nil, opts \\ []) do case settings.codex.turn_sandbox_policy do %{} = policy -> {:ok, policy} _ -> - default_runtime_turn_sandbox_policy(workspace || settings.workspace.root) + default_runtime_turn_sandbox_policy(workspace || settings.workspace.root, opts) end end @@ -332,6 +350,7 @@ defmodule SymphonyElixir.Config.Schema do |> cast_embed(:tracker, with: &Tracker.changeset/2) |> cast_embed(:polling, with: &Polling.changeset/2) |> cast_embed(:workspace, with: &Workspace.changeset/2) + |> cast_embed(:worker, with: &Worker.changeset/2) |> cast_embed(:agent, with: &Agent.changeset/2) |> cast_embed(:codex, with: &Codex.changeset/2) |> cast_embed(:hooks, with: &Hooks.changeset/2) @@ -471,13 +490,17 @@ defmodule SymphonyElixir.Config.Schema do } end - defp default_runtime_turn_sandbox_policy(workspace_root) when is_binary(workspace_root) do - with {:ok, canonical_workspace_root} <- PathSafety.canonicalize(workspace_root) do - {:ok, default_turn_sandbox_policy(canonical_workspace_root)} + defp default_runtime_turn_sandbox_policy(workspace_root, opts) when is_binary(workspace_root) do + if Keyword.get(opts, :remote, false) do + {:ok, default_turn_sandbox_policy(workspace_root)} + else + with {:ok, canonical_workspace_root} <- PathSafety.canonicalize(workspace_root) do + {:ok, default_turn_sandbox_policy(canonical_workspace_root)} + end end end - defp default_runtime_turn_sandbox_policy(workspace_root) do + defp default_runtime_turn_sandbox_policy(workspace_root, _opts) do {:error, {:unsafe_turn_sandbox_policy, {:invalid_workspace_root, workspace_root}}} end diff --git a/elixir/lib/symphony_elixir/orchestrator.ex b/elixir/lib/symphony_elixir/orchestrator.ex index 36ada1dc..0dedc38c 100644 --- a/elixir/lib/symphony_elixir/orchestrator.ex +++ b/elixir/lib/symphony_elixir/orchestrator.ex @@ -138,7 +138,9 @@ defmodule SymphonyElixir.Orchestrator do |> complete_issue(issue_id) |> schedule_issue_retry(issue_id, 1, %{ identifier: running_entry.identifier, - delay_type: :continuation + delay_type: :continuation, + worker_host: Map.get(running_entry, :worker_host), + workspace_path: Map.get(running_entry, :workspace_path) }) _ -> @@ -148,7 +150,9 @@ defmodule SymphonyElixir.Orchestrator do schedule_issue_retry(state, issue_id, next_attempt, %{ identifier: running_entry.identifier, - error: "agent exited: #{inspect(reason)}" + error: "agent exited: #{inspect(reason)}", + worker_host: Map.get(running_entry, :worker_host), + workspace_path: Map.get(running_entry, :workspace_path) }) end @@ -159,6 +163,23 @@ defmodule SymphonyElixir.Orchestrator do end end + def handle_info({:worker_runtime_info, issue_id, runtime_info}, %{running: running} = state) + when is_binary(issue_id) and is_map(runtime_info) do + case Map.get(running, issue_id) do + nil -> + {:noreply, state} + + running_entry -> + updated_running_entry = + running_entry + |> maybe_put_runtime_value(:worker_host, runtime_info[:worker_host]) + |> maybe_put_runtime_value(:workspace_path, runtime_info[:workspace_path]) + + notify_dashboard() + {:noreply, %{state | running: Map.put(running, issue_id, updated_running_entry)}} + end + end + def handle_info( {:codex_worker_update, issue_id, %{event: _, timestamp: _} = update}, %{running: running} = state @@ -392,9 +413,10 @@ defmodule SymphonyElixir.Orchestrator do %{pid: pid, ref: ref, identifier: identifier} = running_entry -> state = record_session_completion_totals(state, running_entry) + worker_host = Map.get(running_entry, :worker_host) if cleanup_workspace do - cleanup_issue_workspace(identifier) + cleanup_issue_workspace(identifier, worker_host) end if is_pid(pid) do @@ -628,10 +650,10 @@ defmodule SymphonyElixir.Orchestrator do |> MapSet.new() end - defp dispatch_issue(%State{} = state, issue, attempt \\ nil) do + defp dispatch_issue(%State{} = state, issue, attempt \\ nil, preferred_worker_host \\ nil) do case revalidate_issue_for_dispatch(issue, &Tracker.fetch_issue_states_by_ids/1, terminal_state_set()) do {:ok, %Issue{} = refreshed_issue} -> - do_dispatch_issue(state, refreshed_issue, attempt) + do_dispatch_issue(state, refreshed_issue, attempt, preferred_worker_host) {:skip, :missing} -> Logger.info("Skipping dispatch; issue no longer active or visible: #{issue_context(issue)}") @@ -648,16 +670,17 @@ defmodule SymphonyElixir.Orchestrator do end end - defp do_dispatch_issue(%State{} = state, issue, attempt) do + defp do_dispatch_issue(%State{} = state, issue, attempt, preferred_worker_host) do recipient = self() + worker_host = select_worker_host(state, preferred_worker_host) case Task.Supervisor.start_child(SymphonyElixir.TaskSupervisor, fn -> - AgentRunner.run(issue, recipient, attempt: attempt) + AgentRunner.run(issue, recipient, attempt: attempt, worker_host: worker_host) end) do {:ok, pid} -> ref = Process.monitor(pid) - Logger.info("Dispatching issue to agent: #{issue_context(issue)} pid=#{inspect(pid)} attempt=#{inspect(attempt)}") + Logger.info("Dispatching issue to agent: #{issue_context(issue)} pid=#{inspect(pid)} attempt=#{inspect(attempt)} worker_host=#{worker_host || "local"}") running = Map.put(state.running, issue.id, %{ @@ -665,6 +688,8 @@ defmodule SymphonyElixir.Orchestrator do ref: ref, identifier: issue.identifier, issue: issue, + worker_host: worker_host, + workspace_path: nil, session_id: nil, last_codex_message: nil, last_codex_timestamp: nil, @@ -694,7 +719,8 @@ defmodule SymphonyElixir.Orchestrator do schedule_issue_retry(state, issue.id, next_attempt, %{ identifier: issue.identifier, - error: "failed to spawn agent: #{inspect(reason)}" + error: "failed to spawn agent: #{inspect(reason)}", + worker_host: worker_host }) end end @@ -737,6 +763,8 @@ defmodule SymphonyElixir.Orchestrator do due_at_ms = System.monotonic_time(:millisecond) + delay_ms identifier = pick_retry_identifier(issue_id, previous_retry, metadata) error = pick_retry_error(previous_retry, metadata) + worker_host = pick_retry_worker_host(previous_retry, metadata) + workspace_path = pick_retry_workspace_path(previous_retry, metadata) if is_reference(old_timer) do Process.cancel_timer(old_timer) @@ -757,7 +785,9 @@ defmodule SymphonyElixir.Orchestrator do retry_token: retry_token, due_at_ms: due_at_ms, identifier: identifier, - error: error + error: error, + worker_host: worker_host, + workspace_path: workspace_path }) } end @@ -767,7 +797,9 @@ defmodule SymphonyElixir.Orchestrator do %{attempt: attempt, retry_token: ^retry_token} = retry_entry -> metadata = %{ identifier: Map.get(retry_entry, :identifier), - error: Map.get(retry_entry, :error) + error: Map.get(retry_entry, :error), + worker_host: Map.get(retry_entry, :worker_host), + workspace_path: Map.get(retry_entry, :workspace_path) } {:ok, attempt, metadata, %{state | retry_attempts: Map.delete(state.retry_attempts, issue_id)}} @@ -804,7 +836,7 @@ defmodule SymphonyElixir.Orchestrator do terminal_issue_state?(issue.state, terminal_states) -> Logger.info("Issue state is terminal: issue_id=#{issue_id} issue_identifier=#{issue.identifier} state=#{issue.state}; removing associated workspace") - cleanup_issue_workspace(issue.identifier) + cleanup_issue_workspace(issue.identifier, metadata[:worker_host]) {:noreply, release_issue_claim(state, issue_id)} retry_candidate_issue?(issue, terminal_states) -> @@ -822,11 +854,13 @@ defmodule SymphonyElixir.Orchestrator do {:noreply, release_issue_claim(state, issue_id)} end - defp cleanup_issue_workspace(identifier) when is_binary(identifier) do - Workspace.remove_issue_workspaces(identifier) + defp cleanup_issue_workspace(identifier, worker_host \\ nil) + + defp cleanup_issue_workspace(identifier, worker_host) when is_binary(identifier) do + Workspace.remove_issue_workspaces(identifier, worker_host) end - defp cleanup_issue_workspace(_identifier), do: :ok + defp cleanup_issue_workspace(_identifier, _worker_host), do: :ok defp run_terminal_workspace_cleanup do case Tracker.fetch_issues_by_states(Config.settings!().tracker.terminal_states) do @@ -852,7 +886,7 @@ defmodule SymphonyElixir.Orchestrator do defp handle_active_retry(state, issue, attempt, metadata) do if retry_candidate_issue?(issue, terminal_state_set()) and dispatch_slots_available?(issue, state) do - {:noreply, dispatch_issue(state, issue, attempt)} + {:noreply, dispatch_issue(state, issue, attempt, metadata[:worker_host])} else Logger.debug("No available slots for retrying #{issue_context(issue)}; retrying again") @@ -904,6 +938,57 @@ defmodule SymphonyElixir.Orchestrator do metadata[:error] || Map.get(previous_retry, :error) end + defp pick_retry_worker_host(previous_retry, metadata) do + metadata[:worker_host] || Map.get(previous_retry, :worker_host) + end + + defp pick_retry_workspace_path(previous_retry, metadata) do + metadata[:workspace_path] || Map.get(previous_retry, :workspace_path) + end + + defp maybe_put_runtime_value(running_entry, _key, nil), do: running_entry + + defp maybe_put_runtime_value(running_entry, key, value) when is_map(running_entry) do + Map.put(running_entry, key, value) + end + + defp select_worker_host(%State{} = state, preferred_worker_host) do + case Config.settings!().worker.ssh_hosts do + [] -> + nil + + hosts -> + if preferred_worker_host_available?(preferred_worker_host, hosts) do + preferred_worker_host + else + least_loaded_worker_host(state, hosts) + end + end + end + + defp preferred_worker_host_available?(preferred_worker_host, hosts) + when is_binary(preferred_worker_host) and is_list(hosts) do + preferred_worker_host != "" and preferred_worker_host in hosts + end + + defp preferred_worker_host_available?(_preferred_worker_host, _hosts), do: false + + defp least_loaded_worker_host(%State{} = state, hosts) when is_list(hosts) do + hosts + |> Enum.with_index() + |> Enum.min_by(fn {host, index} -> + {running_worker_host_count(state.running, host), index} + end) + |> elem(0) + end + + defp running_worker_host_count(running, worker_host) when is_map(running) and is_binary(worker_host) do + Enum.count(running, fn + {_issue_id, %{worker_host: ^worker_host}} -> true + _ -> false + end) + end + defp find_issue_by_id(issues, issue_id) when is_binary(issue_id) do Enum.find(issues, fn %Issue{id: ^issue_id} -> @@ -982,6 +1067,8 @@ defmodule SymphonyElixir.Orchestrator do issue_id: issue_id, identifier: metadata.identifier, state: metadata.issue.state, + worker_host: Map.get(metadata, :worker_host), + workspace_path: Map.get(metadata, :workspace_path), session_id: metadata.session_id, codex_app_server_pid: metadata.codex_app_server_pid, codex_input_tokens: metadata.codex_input_tokens, @@ -1004,7 +1091,9 @@ defmodule SymphonyElixir.Orchestrator do attempt: attempt, due_in_ms: max(0, due_at_ms - now_ms), identifier: Map.get(retry, :identifier), - error: Map.get(retry, :error) + error: Map.get(retry, :error), + worker_host: Map.get(retry, :worker_host), + workspace_path: Map.get(retry, :workspace_path) } end) diff --git a/elixir/lib/symphony_elixir/ssh.ex b/elixir/lib/symphony_elixir/ssh.ex new file mode 100644 index 00000000..0493adb0 --- /dev/null +++ b/elixir/lib/symphony_elixir/ssh.ex @@ -0,0 +1,100 @@ +defmodule SymphonyElixir.SSH do + @moduledoc false + + @spec run(String.t(), String.t(), keyword()) :: {:ok, {String.t(), non_neg_integer()}} | {:error, term()} + def run(host, command, opts \\ []) when is_binary(host) and is_binary(command) do + with {:ok, executable} <- ssh_executable() do + {:ok, System.cmd(executable, ssh_args(host, command), opts)} + end + end + + @spec start_port(String.t(), String.t(), keyword()) :: {:ok, port()} | {:error, term()} + def start_port(host, command, opts \\ []) when is_binary(host) and is_binary(command) do + with {:ok, executable} <- ssh_executable() do + line_bytes = Keyword.get(opts, :line) + + port_opts = + [ + :binary, + :exit_status, + :stderr_to_stdout, + args: Enum.map(ssh_args(host, command), &String.to_charlist/1) + ] + |> maybe_put_line_option(line_bytes) + + {:ok, Port.open({:spawn_executable, String.to_charlist(executable)}, port_opts)} + end + end + + @spec remote_shell_command(String.t()) :: String.t() + def remote_shell_command(command) when is_binary(command) do + "bash -lc " <> shell_escape(command) + end + + defp ssh_executable do + case System.find_executable("ssh") do + nil -> {:error, :ssh_not_found} + executable -> {:ok, executable} + end + end + + defp ssh_args(host, command) do + %{destination: destination, port: port} = parse_target(host) + + [] + |> maybe_put_config() + |> Kernel.++(["-T"]) + |> maybe_put_port(port) + |> Kernel.++([destination, remote_shell_command(command)]) + end + + defp maybe_put_line_option(port_opts, nil), do: port_opts + defp maybe_put_line_option(port_opts, line_bytes), do: Keyword.put(port_opts, :line, line_bytes) + + defp maybe_put_config(args) do + case System.get_env("SYMPHONY_SSH_CONFIG") do + config_path when is_binary(config_path) and config_path != "" -> + args ++ ["-F", config_path] + + _ -> + args + end + end + + defp maybe_put_port(args, nil), do: args + defp maybe_put_port(args, port), do: args ++ ["-p", port] + + defp parse_target(target) when is_binary(target) do + trimmed_target = String.trim(target) + + # OpenSSH does not interpret bare "host:port" as "host + port"; it treats the + # whole value as a hostname and leaves the port at 22. We split that shorthand + # here so worker config can use "localhost:2222" without requiring ssh:// URIs. + case Regex.run(~r/^(.*):(\d+)$/, trimmed_target, capture: :all_but_first) do + [destination, port] -> + if valid_port_destination?(destination) do + %{destination: destination, port: port} + else + %{destination: trimmed_target, port: nil} + end + + _ -> + %{destination: trimmed_target, port: nil} + end + end + + defp valid_port_destination?(destination) when is_binary(destination) do + destination != "" and + (not String.contains?(destination, ":") or bracketed_host?(destination)) + end + + defp bracketed_host?(destination) when is_binary(destination) do + # IPv6 literals contain ":" already, so we only accept additional ":port" + # parsing when the host is explicitly bracketed, e.g. "[::1]:2222". + String.contains?(destination, "[") and String.contains?(destination, "]") + end + + defp shell_escape(value) when is_binary(value) do + "'" <> String.replace(value, "'", "'\"'\"'") <> "'" + end +end diff --git a/elixir/lib/symphony_elixir/workspace.ex b/elixir/lib/symphony_elixir/workspace.ex index 539976e9..38e8ecac 100644 --- a/elixir/lib/symphony_elixir/workspace.ex +++ b/elixir/lib/symphony_elixir/workspace.ex @@ -4,35 +4,37 @@ defmodule SymphonyElixir.Workspace do """ require Logger - alias SymphonyElixir.{Config, PathSafety} + alias SymphonyElixir.{Config, PathSafety, SSH} - @excluded_entries MapSet.new([".elixir_ls", "tmp"]) + @remote_workspace_marker "__SYMPHONY_WORKSPACE__" - @spec create_for_issue(map() | String.t() | nil) :: {:ok, Path.t()} | {:error, term()} - def create_for_issue(issue_or_identifier) do + @type worker_host :: String.t() | nil + + @spec create_for_issue(map() | String.t() | nil, worker_host()) :: + {:ok, Path.t()} | {:error, term()} + def create_for_issue(issue_or_identifier, worker_host \\ nil) do issue_context = issue_context(issue_or_identifier) try do safe_id = safe_identifier(issue_context.issue_identifier) - with {:ok, workspace} <- workspace_path_for_issue(safe_id), - :ok <- validate_workspace_path(workspace), - {:ok, created?} <- ensure_workspace(workspace), - :ok <- maybe_run_after_create_hook(workspace, issue_context, created?) do + with {:ok, workspace} <- workspace_path_for_issue(safe_id, worker_host), + :ok <- validate_workspace_path(workspace, worker_host), + {:ok, workspace, created?} <- ensure_workspace(workspace, worker_host), + :ok <- maybe_run_after_create_hook(workspace, issue_context, created?, worker_host) do {:ok, workspace} end rescue error in [ArgumentError, ErlangError, File.Error] -> - Logger.error("Workspace creation failed #{issue_log_context(issue_context)} error=#{Exception.message(error)}") + Logger.error("Workspace creation failed #{issue_log_context(issue_context)} worker_host=#{worker_host_for_log(worker_host)} error=#{Exception.message(error)}") {:error, error} end end - defp ensure_workspace(workspace) do + defp ensure_workspace(workspace, nil) do cond do File.dir?(workspace) -> - clean_tmp_artifacts(workspace) - {:ok, false} + {:ok, workspace, false} File.exists?(workspace) -> File.rm_rf!(workspace) @@ -43,19 +45,54 @@ defmodule SymphonyElixir.Workspace do end end + defp ensure_workspace(workspace, worker_host) when is_binary(worker_host) do + script = + [ + "set -eu", + "if [ -d #{shell_escape(workspace)} ]; then", + " created=0", + "elif [ -e #{shell_escape(workspace)} ]; then", + " rm -rf #{shell_escape(workspace)}", + " mkdir -p #{shell_escape(workspace)}", + " created=1", + "else", + " mkdir -p #{shell_escape(workspace)}", + " created=1", + "fi", + "cd #{shell_escape(workspace)}", + "printf '%s\\t%s\\t%s\\n' '#{@remote_workspace_marker}' \"$created\" \"$(pwd -P)\"" + ] + |> Enum.reject(&(&1 == "")) + |> Enum.join("\n") + + case run_remote_command(worker_host, script, Config.settings!().hooks.timeout_ms) do + {:ok, {output, 0}} -> + parse_remote_workspace_output(output) + + {:ok, {output, status}} -> + {:error, {:workspace_prepare_failed, worker_host, status, output}} + + {:error, reason} -> + {:error, reason} + end + end + defp create_workspace(workspace) do File.rm_rf!(workspace) File.mkdir_p!(workspace) - {:ok, true} + {:ok, workspace, true} end @spec remove(Path.t()) :: {:ok, [String.t()]} | {:error, term(), String.t()} - def remove(workspace) do + def remove(workspace), do: remove(workspace, nil) + + @spec remove(Path.t(), worker_host()) :: {:ok, [String.t()]} | {:error, term(), String.t()} + def remove(workspace, nil) do case File.exists?(workspace) do true -> - case validate_workspace_path(workspace) do + case validate_workspace_path(workspace, nil) do :ok -> - maybe_run_before_remove_hook(workspace) + maybe_run_before_remove_hook(workspace, nil) File.rm_rf(workspace) {:error, reason} -> @@ -67,24 +104,60 @@ defmodule SymphonyElixir.Workspace do end end + def remove(workspace, worker_host) when is_binary(worker_host) do + maybe_run_before_remove_hook(workspace, worker_host) + + case run_remote_command(worker_host, "rm -rf #{shell_escape(workspace)}", Config.settings!().hooks.timeout_ms) do + {:ok, {_output, 0}} -> + {:ok, []} + + {:ok, {output, status}} -> + {:error, {:workspace_remove_failed, worker_host, status, output}, ""} + + {:error, reason} -> + {:error, reason, ""} + end + end + @spec remove_issue_workspaces(term()) :: :ok - def remove_issue_workspaces(identifier) when is_binary(identifier) do + def remove_issue_workspaces(identifier), do: remove_issue_workspaces(identifier, nil) + + @spec remove_issue_workspaces(term(), worker_host()) :: :ok + def remove_issue_workspaces(identifier, worker_host) when is_binary(identifier) and is_binary(worker_host) do safe_id = safe_identifier(identifier) - case workspace_path_for_issue(safe_id) do - {:ok, workspace} -> remove(workspace) + case workspace_path_for_issue(safe_id, worker_host) do + {:ok, workspace} -> remove(workspace, worker_host) {:error, _reason} -> :ok end :ok end - def remove_issue_workspaces(_identifier) do + def remove_issue_workspaces(identifier, nil) when is_binary(identifier) do + safe_id = safe_identifier(identifier) + + case Config.settings!().worker.ssh_hosts do + [] -> + case workspace_path_for_issue(safe_id, nil) do + {:ok, workspace} -> remove(workspace, nil) + {:error, _reason} -> :ok + end + + worker_hosts -> + Enum.each(worker_hosts, &remove_issue_workspaces(identifier, &1)) + end + :ok end - @spec run_before_run_hook(Path.t(), map() | String.t() | nil) :: :ok | {:error, term()} - def run_before_run_hook(workspace, issue_or_identifier) when is_binary(workspace) do + def remove_issue_workspaces(_identifier, _worker_host) do + :ok + end + + @spec run_before_run_hook(Path.t(), map() | String.t() | nil, worker_host()) :: + :ok | {:error, term()} + def run_before_run_hook(workspace, issue_or_identifier, worker_host \\ nil) when is_binary(workspace) do issue_context = issue_context(issue_or_identifier) hooks = Config.settings!().hooks @@ -93,12 +166,12 @@ defmodule SymphonyElixir.Workspace do :ok command -> - run_hook(command, workspace, issue_context, "before_run") + run_hook(command, workspace, issue_context, "before_run", worker_host) end end - @spec run_after_run_hook(Path.t(), map() | String.t() | nil) :: :ok - def run_after_run_hook(workspace, issue_or_identifier) when is_binary(workspace) do + @spec run_after_run_hook(Path.t(), map() | String.t() | nil, worker_host()) :: :ok + def run_after_run_hook(workspace, issue_or_identifier, worker_host \\ nil) when is_binary(workspace) do issue_context = issue_context(issue_or_identifier) hooks = Config.settings!().hooks @@ -107,28 +180,26 @@ defmodule SymphonyElixir.Workspace do :ok command -> - run_hook(command, workspace, issue_context, "after_run") + run_hook(command, workspace, issue_context, "after_run", worker_host) |> ignore_hook_failure() end end - defp workspace_path_for_issue(safe_id) when is_binary(safe_id) do + defp workspace_path_for_issue(safe_id, nil) when is_binary(safe_id) do Config.settings!().workspace.root |> Path.join(safe_id) |> PathSafety.canonicalize() end - defp safe_identifier(identifier) do - String.replace(identifier || "issue", ~r/[^a-zA-Z0-9._-]/, "_") + defp workspace_path_for_issue(safe_id, worker_host) when is_binary(safe_id) and is_binary(worker_host) do + {:ok, Path.join(Config.settings!().workspace.root, safe_id)} end - defp clean_tmp_artifacts(workspace) do - Enum.each(MapSet.to_list(@excluded_entries), fn entry -> - File.rm_rf(Path.join(workspace, entry)) - end) + defp safe_identifier(identifier) do + String.replace(identifier || "issue", ~r/[^a-zA-Z0-9._-]/, "_") end - defp maybe_run_after_create_hook(workspace, issue_context, created?) do + defp maybe_run_after_create_hook(workspace, issue_context, created?, worker_host) do hooks = Config.settings!().hooks case created? do @@ -138,7 +209,7 @@ defmodule SymphonyElixir.Workspace do :ok command -> - run_hook(command, workspace, issue_context, "after_create") + run_hook(command, workspace, issue_context, "after_create", worker_host) end false -> @@ -146,7 +217,7 @@ defmodule SymphonyElixir.Workspace do end end - defp maybe_run_before_remove_hook(workspace) do + defp maybe_run_before_remove_hook(workspace, nil) do hooks = Config.settings!().hooks case File.dir?(workspace) do @@ -160,7 +231,8 @@ defmodule SymphonyElixir.Workspace do command, workspace, %{issue_id: nil, issue_identifier: Path.basename(workspace)}, - "before_remove" + "before_remove", + nil ) |> ignore_hook_failure() end @@ -170,13 +242,50 @@ defmodule SymphonyElixir.Workspace do end end + defp maybe_run_before_remove_hook(workspace, worker_host) when is_binary(worker_host) do + hooks = Config.settings!().hooks + + case hooks.before_remove do + nil -> + :ok + + command -> + script = + [ + "if [ -d #{shell_escape(workspace)} ]; then", + " cd #{shell_escape(workspace)}", + " #{command}", + "fi" + ] + |> Enum.join("\n") + + run_remote_command(worker_host, script, Config.settings!().hooks.timeout_ms) + |> case do + {:ok, {output, status}} -> + handle_hook_command_result( + {output, status}, + workspace, + %{issue_id: nil, issue_identifier: Path.basename(workspace)}, + "before_remove" + ) + + {:error, {:workspace_hook_timeout, "before_remove", _timeout_ms} = reason} -> + {:error, reason} + + {:error, reason} -> + {:error, reason} + end + |> ignore_hook_failure() + end + end + defp ignore_hook_failure(:ok), do: :ok defp ignore_hook_failure({:error, _reason}), do: :ok - defp run_hook(command, workspace, issue_context, hook_name) do + defp run_hook(command, workspace, issue_context, hook_name, nil) do timeout_ms = Config.settings!().hooks.timeout_ms - Logger.info("Running workspace hook hook=#{hook_name} #{issue_log_context(issue_context)} workspace=#{workspace}") + Logger.info("Running workspace hook hook=#{hook_name} #{issue_log_context(issue_context)} workspace=#{workspace} worker_host=local") task = Task.async(fn -> @@ -190,12 +299,29 @@ defmodule SymphonyElixir.Workspace do nil -> Task.shutdown(task, :brutal_kill) - Logger.warning("Workspace hook timed out hook=#{hook_name} #{issue_log_context(issue_context)} workspace=#{workspace} timeout_ms=#{timeout_ms}") + Logger.warning("Workspace hook timed out hook=#{hook_name} #{issue_log_context(issue_context)} workspace=#{workspace} worker_host=local timeout_ms=#{timeout_ms}") {:error, {:workspace_hook_timeout, hook_name, timeout_ms}} end end + defp run_hook(command, workspace, issue_context, hook_name, worker_host) when is_binary(worker_host) do + timeout_ms = Config.settings!().hooks.timeout_ms + + Logger.info("Running workspace hook hook=#{hook_name} #{issue_log_context(issue_context)} workspace=#{workspace} worker_host=#{worker_host}") + + case run_remote_command(worker_host, "cd #{shell_escape(workspace)} && #{command}", timeout_ms) do + {:ok, cmd_result} -> + handle_hook_command_result(cmd_result, workspace, issue_context, hook_name) + + {:error, {:workspace_hook_timeout, ^hook_name, _timeout_ms} = reason} -> + {:error, reason} + + {:error, reason} -> + {:error, reason} + end + end + defp handle_hook_command_result({_output, 0}, _workspace, _issue_id, _hook_name) do :ok end @@ -220,7 +346,7 @@ defmodule SymphonyElixir.Workspace do end end - defp validate_workspace_path(workspace) when is_binary(workspace) do + defp validate_workspace_path(workspace, nil) when is_binary(workspace) do expanded_workspace = Path.expand(workspace) expanded_root = Path.expand(Config.settings!().workspace.root) expanded_root_prefix = expanded_root <> "/" @@ -248,6 +374,67 @@ defmodule SymphonyElixir.Workspace do end end + defp validate_workspace_path(workspace, worker_host) + when is_binary(workspace) and is_binary(worker_host) do + cond do + String.trim(workspace) == "" -> + {:error, {:workspace_path_unreadable, workspace, :empty}} + + String.contains?(workspace, ["\n", "\r", <<0>>]) -> + {:error, {:workspace_path_unreadable, workspace, :invalid_characters}} + + true -> + :ok + end + end + + defp parse_remote_workspace_output(output) do + lines = String.split(IO.iodata_to_binary(output), "\n", trim: true) + + payload = + Enum.find_value(lines, fn line -> + case String.split(line, "\t", parts: 3) do + [@remote_workspace_marker, created, path] when created in ["0", "1"] and path != "" -> + {created == "1", path} + + _ -> + nil + end + end) + + case payload do + {created?, workspace} when is_boolean(created?) and is_binary(workspace) -> + {:ok, workspace, created?} + + _ -> + {:error, {:workspace_prepare_failed, :invalid_output, output}} + end + end + + defp run_remote_command(worker_host, script, timeout_ms) + when is_binary(worker_host) and is_binary(script) and is_integer(timeout_ms) and timeout_ms > 0 do + task = + Task.async(fn -> + SSH.run(worker_host, script, stderr_to_stdout: true) + end) + + case Task.yield(task, timeout_ms) do + {:ok, result} -> + result + + nil -> + Task.shutdown(task, :brutal_kill) + {:error, {:workspace_hook_timeout, "remote_command", timeout_ms}} + end + end + + defp shell_escape(value) when is_binary(value) do + "'" <> String.replace(value, "'", "'\"'\"'") <> "'" + end + + defp worker_host_for_log(nil), do: "local" + defp worker_host_for_log(worker_host), do: worker_host + defp issue_context(%{id: issue_id, identifier: identifier}) do %{ issue_id: issue_id, diff --git a/elixir/lib/symphony_elixir_web/presenter.ex b/elixir/lib/symphony_elixir_web/presenter.ex index dc78ab32..1063cf7a 100644 --- a/elixir/lib/symphony_elixir_web/presenter.ex +++ b/elixir/lib/symphony_elixir_web/presenter.ex @@ -66,7 +66,8 @@ defmodule SymphonyElixirWeb.Presenter do issue_id: issue_id_from_entries(running, retry), status: issue_status(running, retry), workspace: %{ - path: Path.join(Config.settings!().workspace.root, issue_identifier) + path: workspace_path(issue_identifier, running, retry), + host: workspace_host(running, retry) }, attempts: %{ restart_count: restart_count(retry), @@ -99,6 +100,8 @@ defmodule SymphonyElixirWeb.Presenter do issue_id: entry.issue_id, issue_identifier: entry.identifier, state: entry.state, + worker_host: Map.get(entry, :worker_host), + workspace_path: Map.get(entry, :workspace_path), session_id: entry.session_id, turn_count: Map.get(entry, :turn_count, 0), last_event: entry.last_codex_event, @@ -119,12 +122,16 @@ defmodule SymphonyElixirWeb.Presenter do issue_identifier: entry.identifier, attempt: entry.attempt, due_at: due_at_iso8601(entry.due_in_ms), - error: entry.error + error: entry.error, + worker_host: Map.get(entry, :worker_host), + workspace_path: Map.get(entry, :workspace_path) } end defp running_issue_payload(running) do %{ + worker_host: Map.get(running, :worker_host), + workspace_path: Map.get(running, :workspace_path), session_id: running.session_id, turn_count: Map.get(running, :turn_count, 0), state: running.state, @@ -144,10 +151,22 @@ defmodule SymphonyElixirWeb.Presenter do %{ attempt: retry.attempt, due_at: due_at_iso8601(retry.due_in_ms), - error: retry.error + error: retry.error, + worker_host: Map.get(retry, :worker_host), + workspace_path: Map.get(retry, :workspace_path) } end + defp workspace_path(issue_identifier, running, retry) do + (running && Map.get(running, :workspace_path)) || + (retry && Map.get(retry, :workspace_path)) || + Path.join(Config.settings!().workspace.root, issue_identifier) + end + + defp workspace_host(running, retry) do + (running && Map.get(running, :worker_host)) || (retry && Map.get(retry, :worker_host)) + end + defp recent_events_payload(running) do [ %{ diff --git a/elixir/test/support/live_e2e_docker/Dockerfile b/elixir/test/support/live_e2e_docker/Dockerfile new file mode 100644 index 00000000..974625c1 --- /dev/null +++ b/elixir/test/support/live_e2e_docker/Dockerfile @@ -0,0 +1,22 @@ +FROM node:20-bookworm-slim + +RUN apt-get update && apt-get install -y --no-install-recommends \ + bash \ + ca-certificates \ + git \ + openssh-server \ + python3 \ + ripgrep \ + && rm -rf /var/lib/apt/lists/* + +RUN install -d -m 700 /root/.ssh /root/.codex /run/symphony/ssh /var/run/sshd + +RUN npm install --global @openai/codex + +COPY symphony-live-worker.conf /etc/ssh/sshd_config.d/symphony-live-worker.conf +COPY live_worker_entrypoint.sh /usr/local/bin/symphony-live-worker +RUN chmod 755 /usr/local/bin/symphony-live-worker + +EXPOSE 22 + +ENTRYPOINT ["/usr/local/bin/symphony-live-worker"] diff --git a/elixir/test/support/live_e2e_docker/docker-compose.yml b/elixir/test/support/live_e2e_docker/docker-compose.yml new file mode 100644 index 00000000..31584538 --- /dev/null +++ b/elixir/test/support/live_e2e_docker/docker-compose.yml @@ -0,0 +1,20 @@ +services: + worker1: + build: + context: . + dockerfile: Dockerfile + ports: + - "${SYMPHONY_LIVE_DOCKER_WORKER_1_PORT}:22" + volumes: + - ${SYMPHONY_LIVE_DOCKER_AUTHORIZED_KEY}:/run/symphony/ssh/authorized_key.pub:ro + - ${SYMPHONY_LIVE_DOCKER_AUTH_JSON}:/root/.codex/auth.json:ro + + worker2: + build: + context: . + dockerfile: Dockerfile + ports: + - "${SYMPHONY_LIVE_DOCKER_WORKER_2_PORT}:22" + volumes: + - ${SYMPHONY_LIVE_DOCKER_AUTHORIZED_KEY}:/run/symphony/ssh/authorized_key.pub:ro + - ${SYMPHONY_LIVE_DOCKER_AUTH_JSON}:/root/.codex/auth.json:ro diff --git a/elixir/test/support/live_e2e_docker/live_worker_entrypoint.sh b/elixir/test/support/live_e2e_docker/live_worker_entrypoint.sh new file mode 100644 index 00000000..3b70e6f4 --- /dev/null +++ b/elixir/test/support/live_e2e_docker/live_worker_entrypoint.sh @@ -0,0 +1,13 @@ +#!/bin/sh +set -eu + +install -d -m 700 /root/.ssh /root/.codex + +if [ ! -s /run/symphony/ssh/authorized_key.pub ]; then + echo "missing authorized key at /run/symphony/ssh/authorized_key.pub" >&2 + exit 1 +fi + +install -m 600 /run/symphony/ssh/authorized_key.pub /root/.ssh/authorized_keys + +exec /usr/sbin/sshd -D -e diff --git a/elixir/test/support/live_e2e_docker/symphony-live-worker.conf b/elixir/test/support/live_e2e_docker/symphony-live-worker.conf new file mode 100644 index 00000000..45cc12dc --- /dev/null +++ b/elixir/test/support/live_e2e_docker/symphony-live-worker.conf @@ -0,0 +1,7 @@ +PubkeyAuthentication yes +PasswordAuthentication no +KbdInteractiveAuthentication no +ChallengeResponseAuthentication no +UsePAM no +PermitRootLogin yes +AuthorizedKeysFile .ssh/authorized_keys diff --git a/elixir/test/support/test_support.exs b/elixir/test/support/test_support.exs index bea30f2c..81162f08 100644 --- a/elixir/test/support/test_support.exs +++ b/elixir/test/support/test_support.exs @@ -101,6 +101,7 @@ defmodule SymphonyElixir.TestSupport do tracker_terminal_states: ["Closed", "Cancelled", "Canceled", "Duplicate", "Done"], poll_interval_ms: 30_000, workspace_root: Path.join(System.tmp_dir!(), "symphony_workspaces"), + worker_ssh_hosts: [], max_concurrent_agents: 10, max_turns: 20, max_retry_backoff_ms: 300_000, @@ -136,6 +137,7 @@ defmodule SymphonyElixir.TestSupport do tracker_terminal_states = Keyword.get(config, :tracker_terminal_states) poll_interval_ms = Keyword.get(config, :poll_interval_ms) workspace_root = Keyword.get(config, :workspace_root) + worker_ssh_hosts = Keyword.get(config, :worker_ssh_hosts) max_concurrent_agents = Keyword.get(config, :max_concurrent_agents) max_turns = Keyword.get(config, :max_turns) max_retry_backoff_ms = Keyword.get(config, :max_retry_backoff_ms) @@ -174,6 +176,7 @@ defmodule SymphonyElixir.TestSupport do " interval_ms: #{yaml_value(poll_interval_ms)}", "workspace:", " root: #{yaml_value(workspace_root)}", + worker_yaml(worker_ssh_hosts), "agent:", " max_concurrent_agents: #{yaml_value(max_concurrent_agents)}", " max_turns: #{yaml_value(max_turns)}", @@ -235,6 +238,17 @@ defmodule SymphonyElixir.TestSupport do |> Enum.join("\n") end + defp worker_yaml([]), do: nil + defp worker_yaml(nil), do: nil + + defp worker_yaml(ssh_hosts) do + [ + "worker:", + " ssh_hosts: #{yaml_value(ssh_hosts)}" + ] + |> Enum.join("\n") + end + defp observability_yaml(enabled, refresh_ms, render_interval_ms) do [ "observability:", diff --git a/elixir/test/symphony_elixir/app_server_test.exs b/elixir/test/symphony_elixir/app_server_test.exs index 3b98c443..b7fab152 100644 --- a/elixir/test/symphony_elixir/app_server_test.exs +++ b/elixir/test/symphony_elixir/app_server_test.exs @@ -825,9 +825,8 @@ defmodule SymphonyElixir.AppServerTest do payload["id"] == 101 and get_in(payload, ["result", "success"]) == false and - get_in(payload, ["result", "contentItems", Access.at(0), "type"]) == "inputText" and String.contains?( - get_in(payload, ["result", "contentItems", Access.at(0), "text"]), + get_in(payload, ["result", "output"]), "Unsupported dynamic tool" ) else @@ -950,7 +949,7 @@ defmodule SymphonyElixir.AppServerTest do payload["id"] == 102 and get_in(payload, ["result", "success"]) == true and - get_in(payload, ["result", "contentItems", Access.at(0), "text"]) == + get_in(payload, ["result", "output"]) == ~s({"data":{"viewer":{"id":"usr_123"}}}) else false @@ -1199,4 +1198,136 @@ defmodule SymphonyElixir.AppServerTest do File.rm_rf(test_root) end end + + test "app server launches over ssh for remote workers" do + test_root = + Path.join( + System.tmp_dir!(), + "symphony-elixir-app-server-remote-ssh-#{System.unique_integer([:positive])}" + ) + + previous_path = System.get_env("PATH") + previous_trace = System.get_env("SYMP_TEST_SSH_TRACE") + + on_exit(fn -> + restore_env("PATH", previous_path) + restore_env("SYMP_TEST_SSH_TRACE", previous_trace) + end) + + try do + trace_file = Path.join(test_root, "ssh.trace") + fake_ssh = Path.join(test_root, "ssh") + remote_workspace = "/remote/workspaces/MT-REMOTE" + + File.mkdir_p!(test_root) + System.put_env("SYMP_TEST_SSH_TRACE", trace_file) + System.put_env("PATH", test_root <> ":" <> (previous_path || "")) + + File.write!(fake_ssh, """ + #!/bin/sh + trace_file="${SYMP_TEST_SSH_TRACE:-/tmp/symphony-fake-ssh.trace}" + count=0 + printf 'ARGV:%s\\n' "$*" >> "$trace_file" + + while IFS= read -r line; do + count=$((count + 1)) + printf 'JSON:%s\\n' "$line" >> "$trace_file" + + case "$count" in + 1) + printf '%s\\n' '{"id":1,"result":{}}' + ;; + 2) + printf '%s\\n' '{"id":2,"result":{"thread":{"id":"thread-remote"}}}' + ;; + 3) + printf '%s\\n' '{"id":3,"result":{"turn":{"id":"turn-remote"}}}' + ;; + 4) + printf '%s\\n' '{"method":"turn/completed"}' + exit 0 + ;; + *) + exit 0 + ;; + esac + done + """) + + File.chmod!(fake_ssh, 0o755) + + write_workflow_file!(Workflow.workflow_file_path(), + workspace_root: "/remote/workspaces", + codex_command: "fake-remote-codex app-server" + ) + + issue = %Issue{ + id: "issue-remote", + identifier: "MT-REMOTE", + title: "Run remote app server", + description: "Validate ssh-backed codex startup", + state: "In Progress", + url: "https://example.org/issues/MT-REMOTE", + labels: ["backend"] + } + + assert {:ok, _result} = + AppServer.run( + remote_workspace, + "Run remote worker", + issue, + worker_host: "worker-01:2200" + ) + + trace = File.read!(trace_file) + lines = String.split(trace, "\n", trim: true) + + assert argv_line = Enum.find(lines, &String.starts_with?(&1, "ARGV:")) + assert argv_line =~ "-T -p 2200 worker-01 bash -lc" + assert argv_line =~ "cd " + assert argv_line =~ remote_workspace + assert argv_line =~ "exec " + assert argv_line =~ "fake-remote-codex app-server" + + expected_turn_policy = %{ + "type" => "workspaceWrite", + "writableRoots" => [remote_workspace], + "readOnlyAccess" => %{"type" => "fullAccess"}, + "networkAccess" => false, + "excludeTmpdirEnvVar" => false, + "excludeSlashTmp" => false + } + + assert Enum.any?(lines, fn line -> + if String.starts_with?(line, "JSON:") do + line + |> String.trim_leading("JSON:") + |> Jason.decode!() + |> then(fn payload -> + payload["method"] == "thread/start" && + get_in(payload, ["params", "cwd"]) == remote_workspace + end) + else + false + end + end) + + assert Enum.any?(lines, fn line -> + if String.starts_with?(line, "JSON:") do + line + |> String.trim_leading("JSON:") + |> Jason.decode!() + |> then(fn payload -> + payload["method"] == "turn/start" && + get_in(payload, ["params", "cwd"]) == remote_workspace && + get_in(payload, ["params", "sandboxPolicy"]) == expected_turn_policy + end) + else + false + end + end) + after + File.rm_rf(test_root) + end + end end diff --git a/elixir/test/symphony_elixir/dynamic_tool_test.exs b/elixir/test/symphony_elixir/dynamic_tool_test.exs index a5536e03..294471ed 100644 --- a/elixir/test/symphony_elixir/dynamic_tool_test.exs +++ b/elixir/test/symphony_elixir/dynamic_tool_test.exs @@ -27,19 +27,19 @@ defmodule SymphonyElixir.Codex.DynamicToolTest do assert response["success"] == false - assert [ - %{ - "type" => "inputText", - "text" => text - } - ] = response["contentItems"] - - assert Jason.decode!(text) == %{ + assert Jason.decode!(response["output"]) == %{ "error" => %{ "message" => ~s(Unsupported dynamic tool: "not_a_real_tool".), "supportedTools" => ["linear_graphql"] } } + + assert response["contentItems"] == [ + %{ + "type" => "inputText", + "text" => response["output"] + } + ] end test "linear_graphql returns successful GraphQL responses as tool text" do @@ -61,15 +61,8 @@ defmodule SymphonyElixir.Codex.DynamicToolTest do assert_received {:linear_client_called, "query Viewer { viewer { id } }", %{"includeTeams" => false}, []} assert response["success"] == true - - assert [ - %{ - "type" => "inputText", - "text" => text - } - ] = response["contentItems"] - - assert Jason.decode!(text) == %{"data" => %{"viewer" => %{"id" => "usr_123"}}} + assert Jason.decode!(response["output"]) == %{"data" => %{"viewer" => %{"id" => "usr_123"}}} + assert response["contentItems"] == [%{"type" => "inputText", "text" => response["output"]}] end test "linear_graphql accepts a raw GraphQL query string" do @@ -134,13 +127,7 @@ defmodule SymphonyElixir.Codex.DynamicToolTest do assert response["success"] == false - assert [ - %{ - "text" => text - } - ] = response["contentItems"] - - assert Jason.decode!(text) == %{ + assert Jason.decode!(response["output"]) == %{ "error" => %{ "message" => "`linear_graphql` requires a non-empty `query` string." } @@ -159,14 +146,7 @@ defmodule SymphonyElixir.Codex.DynamicToolTest do assert response["success"] == false - assert [ - %{ - "type" => "inputText", - "text" => text - } - ] = response["contentItems"] - - assert Jason.decode!(text) == %{ + assert Jason.decode!(response["output"]) == %{ "data" => nil, "errors" => [%{"message" => "Unknown field `nope`"}] } @@ -197,14 +177,7 @@ defmodule SymphonyElixir.Codex.DynamicToolTest do assert response["success"] == false - assert [ - %{ - "type" => "inputText", - "text" => text - } - ] = response["contentItems"] - - assert Jason.decode!(text) == %{ + assert Jason.decode!(response["output"]) == %{ "error" => %{ "message" => "`linear_graphql` requires a non-empty `query` string." } @@ -234,13 +207,7 @@ defmodule SymphonyElixir.Codex.DynamicToolTest do assert response["success"] == false - assert [ - %{ - "text" => text - } - ] = response["contentItems"] - - assert Jason.decode!(text) == %{ + assert Jason.decode!(response["output"]) == %{ "error" => %{ "message" => "`linear_graphql` expects either a GraphQL query string or an object with `query` and optional `variables`." } @@ -259,13 +226,7 @@ defmodule SymphonyElixir.Codex.DynamicToolTest do assert response["success"] == false - assert [ - %{ - "text" => text - } - ] = response["contentItems"] - - assert Jason.decode!(text) == %{ + assert Jason.decode!(response["output"]) == %{ "error" => %{ "message" => "`linear_graphql.variables` must be a JSON object when provided." } @@ -282,13 +243,7 @@ defmodule SymphonyElixir.Codex.DynamicToolTest do assert missing_token["success"] == false - assert [ - %{ - "text" => missing_token_text - } - ] = missing_token["contentItems"] - - assert Jason.decode!(missing_token_text) == %{ + assert Jason.decode!(missing_token["output"]) == %{ "error" => %{ "message" => "Symphony is missing Linear auth. Set `linear.api_key` in `WORKFLOW.md` or export `LINEAR_API_KEY`." } @@ -301,13 +256,7 @@ defmodule SymphonyElixir.Codex.DynamicToolTest do linear_client: fn _query, _variables, _opts -> {:error, {:linear_api_status, 503}} end ) - assert [ - %{ - "text" => status_error_text - } - ] = status_error["contentItems"] - - assert Jason.decode!(status_error_text) == %{ + assert Jason.decode!(status_error["output"]) == %{ "error" => %{ "message" => "Linear GraphQL request failed with HTTP 503.", "status" => 503 @@ -321,13 +270,7 @@ defmodule SymphonyElixir.Codex.DynamicToolTest do linear_client: fn _query, _variables, _opts -> {:error, {:linear_api_request, :timeout}} end ) - assert [ - %{ - "text" => request_error_text - } - ] = request_error["contentItems"] - - assert Jason.decode!(request_error_text) == %{ + assert Jason.decode!(request_error["output"]) == %{ "error" => %{ "message" => "Linear GraphQL request failed before receiving a successful response.", "reason" => ":timeout" @@ -345,13 +288,7 @@ defmodule SymphonyElixir.Codex.DynamicToolTest do assert response["success"] == false - assert [ - %{ - "text" => text - } - ] = response["contentItems"] - - assert Jason.decode!(text) == %{ + assert Jason.decode!(response["output"]) == %{ "error" => %{ "message" => "Linear GraphQL tool execution failed.", "reason" => ":boom" @@ -368,11 +305,6 @@ defmodule SymphonyElixir.Codex.DynamicToolTest do ) assert response["success"] == true - - assert [ - %{ - "text" => ":ok" - } - ] = response["contentItems"] + assert response["output"] == ":ok" end end diff --git a/elixir/test/symphony_elixir/extensions_test.exs b/elixir/test/symphony_elixir/extensions_test.exs index 27b2416b..d6309c96 100644 --- a/elixir/test/symphony_elixir/extensions_test.exs +++ b/elixir/test/symphony_elixir/extensions_test.exs @@ -348,6 +348,8 @@ defmodule SymphonyElixir.ExtensionsTest do "issue_id" => "issue-http", "issue_identifier" => "MT-HTTP", "state" => "In Progress", + "worker_host" => nil, + "workspace_path" => nil, "session_id" => "thread-http", "turn_count" => 7, "last_event" => "notification", @@ -363,7 +365,9 @@ defmodule SymphonyElixir.ExtensionsTest do "issue_identifier" => "MT-RETRY", "attempt" => 2, "due_at" => state_payload["retrying"] |> List.first() |> Map.fetch!("due_at"), - "error" => "boom" + "error" => "boom", + "worker_host" => nil, + "workspace_path" => nil } ], "codex_totals" => %{ @@ -382,9 +386,14 @@ defmodule SymphonyElixir.ExtensionsTest do "issue_identifier" => "MT-HTTP", "issue_id" => "issue-http", "status" => "running", - "workspace" => %{"path" => Path.join(Config.settings!().workspace.root, "MT-HTTP")}, + "workspace" => %{ + "path" => Path.join(Config.settings!().workspace.root, "MT-HTTP"), + "host" => nil + }, "attempts" => %{"restart_count" => 0, "current_retry_attempt" => 0}, "running" => %{ + "worker_host" => nil, + "workspace_path" => nil, "session_id" => "thread-http", "turn_count" => 7, "state" => "In Progress", diff --git a/elixir/test/symphony_elixir/live_e2e_test.exs b/elixir/test/symphony_elixir/live_e2e_test.exs index b775e66f..163d03e4 100644 --- a/elixir/test/symphony_elixir/live_e2e_test.exs +++ b/elixir/test/symphony_elixir/live_e2e_test.exs @@ -2,25 +2,20 @@ defmodule SymphonyElixir.LiveE2ETest do use SymphonyElixir.TestSupport require Logger + alias SymphonyElixir.SSH @moduletag :live_e2e @moduletag timeout: 300_000 @default_team_key "SYME2E" + @default_docker_auth_json Path.join(System.user_home!(), ".codex/auth.json") + @docker_worker_count 2 + @docker_support_dir Path.expand("../support/live_e2e_docker", __DIR__) + @docker_compose_file Path.join(@docker_support_dir, "docker-compose.yml") @result_file "LIVE_E2E_RESULT.txt" - @live_e2e_skip_reason (cond do - System.get_env("SYMPHONY_RUN_LIVE_E2E") != "1" -> - "set SYMPHONY_RUN_LIVE_E2E=1 to enable the real Linear/Codex end-to-end test" - - is_nil(System.find_executable("codex")) -> - "real Codex live test requires `codex` on PATH" - - System.get_env("LINEAR_API_KEY") in [nil, ""] -> - "real Linear live test requires LINEAR_API_KEY" - - true -> - nil - end) + @live_e2e_skip_reason if(System.get_env("SYMPHONY_RUN_LIVE_E2E") != "1", + do: "set SYMPHONY_RUN_LIVE_E2E=1 to enable the real Linear/Codex end-to-end test" + ) @team_query """ query SymphonyLiveE2ETeam($key: String!) { @@ -126,82 +121,13 @@ defmodule SymphonyElixir.LiveE2ETest do """ @tag skip: @live_e2e_skip_reason - test "creates a real Linear project and issue, then runs a real Codex turn" do - test_root = - Path.join( - System.tmp_dir!(), - "symphony-live-e2e-#{System.unique_integer([:positive])}" - ) - - workflow_root = Path.join(test_root, "workflow") - workflow_file = Path.join(workflow_root, "WORKFLOW.md") - workspace_root = Path.join(test_root, "workspaces") - team_key = System.get_env("SYMPHONY_LIVE_LINEAR_TEAM_KEY") || @default_team_key - codex_command = System.get_env("SYMPHONY_LIVE_CODEX_COMMAND") || "codex app-server" - original_workflow_path = Workflow.workflow_file_path() - - File.mkdir_p!(workflow_root) - - try do - Workflow.set_workflow_file_path(workflow_file) - - write_workflow_file!(workflow_file, - tracker_api_token: "$LINEAR_API_KEY", - tracker_project_slug: "bootstrap", - workspace_root: workspace_root, - codex_command: codex_command, - codex_approval_policy: "never", - observability_enabled: false - ) - - team = fetch_team!(team_key) - active_state = active_state!(team) - completed_project_status = completed_project_status!() - terminal_states = terminal_state_names(team) - - project = - create_project!( - team["id"], - "Symphony Live E2E #{System.unique_integer([:positive])}" - ) - - issue = - create_issue!( - team["id"], - project["id"], - active_state["id"], - "Symphony live e2e issue for #{project["name"]}" - ) - - write_workflow_file!(workflow_file, - tracker_api_token: "$LINEAR_API_KEY", - tracker_project_slug: project["slugId"], - tracker_active_states: [active_state["name"]], - tracker_terminal_states: terminal_states, - workspace_root: workspace_root, - codex_command: codex_command, - codex_approval_policy: "never", - codex_turn_timeout_ms: 600_000, - codex_stall_timeout_ms: 600_000, - observability_enabled: false, - prompt: live_prompt(project["slugId"]) - ) - - assert :ok = AgentRunner.run(issue, nil, max_turns: 1) - - result_path = Path.join([workspace_root, issue.identifier, @result_file]) - assert File.exists?(result_path) - assert File.read!(result_path) == expected_result(issue.identifier, project["slugId"]) - - issue_snapshot = fetch_issue_details!(issue.id) - assert issue_completed?(issue_snapshot) - assert issue_has_comment?(issue_snapshot, expected_comment(issue.identifier, project["slugId"])) + test "creates a real Linear project and issue with a local worker" do + run_live_issue_flow!(:local) + end - assert :ok = complete_project(project["id"], completed_project_status["id"]) - after - Workflow.set_workflow_file_path(original_workflow_path) - File.rm_rf(test_root) - end + @tag skip: @live_e2e_skip_reason + test "creates a real Linear project and issue with an ssh worker" do + run_live_issue_flow!(:ssh) end defp fetch_team!(team_key) do @@ -234,6 +160,16 @@ defmodule SymphonyElixir.LiveE2ETest do end end + defp active_state_names(%{"states" => %{"nodes" => states}}) when is_list(states) do + states + |> Enum.reject(&(&1["type"] in ["completed", "canceled"])) + |> Enum.map(& &1["name"]) + |> case do + [] -> ["Todo", "In Progress", "In Review"] + names -> names + end + end + defp completed_project_status! do @project_statuses_query |> graphql_data!(%{}) @@ -387,10 +323,12 @@ defmodule SymphonyElixir.LiveE2ETest do project_slug=#{project_slug} Step 2: - Use the `linear_graphql` tool to query the current issue by `{{ issue.id }}` and read: + You must use the `linear_graphql` tool to query the current issue by `{{ issue.id }}` and read: - existing comments - team workflow states + A turn that only creates the file is incomplete. Do not stop after Step 1. + If the exact comment body below is not already present, post exactly one comment on the current issue with this exact body: #{expected_comment("{{ issue.identifier }}", project_slug)} @@ -457,4 +395,406 @@ defmodule SymphonyElixir.LiveE2ETest do defp expected_comment(issue_identifier, project_slug) do "Symphony live e2e comment\nidentifier=#{issue_identifier}\nproject_slug=#{project_slug}" end + + defp receive_runtime_info!(issue_id) do + receive do + {:worker_runtime_info, ^issue_id, %{workspace_path: workspace_path} = runtime_info} + when is_binary(workspace_path) -> + runtime_info + + {:codex_worker_update, ^issue_id, _message} -> + receive_runtime_info!(issue_id) + after + 5_000 -> + flunk("timed out waiting for worker runtime info for #{inspect(issue_id)}") + end + end + + defp read_worker_result!(%{worker_host: nil, workspace_path: workspace_path}, result_file) + when is_binary(workspace_path) and is_binary(result_file) do + File.read!(Path.join(workspace_path, result_file)) + end + + defp read_worker_result!(%{worker_host: worker_host, workspace_path: workspace_path}, result_file) + when is_binary(worker_host) and is_binary(workspace_path) and is_binary(result_file) do + remote_result_path = Path.join(workspace_path, result_file) + + case SSH.run(worker_host, "cat #{shell_escape(remote_result_path)}", stderr_to_stdout: true) do + {:ok, {output, 0}} -> + output + + {:ok, {output, status}} -> + flunk("failed to read remote result from #{worker_host}:#{remote_result_path} (status #{status}): #{inspect(output)}") + + {:error, reason} -> + flunk("failed to read remote result from #{worker_host}:#{remote_result_path}: #{inspect(reason)}") + end + end + + defp shell_escape(value) when is_binary(value) do + "'" <> String.replace(value, "'", "'\"'\"'") <> "'" + end + + defp run_live_issue_flow!(backend) when backend in [:local, :ssh] do + run_id = "symphony-live-e2e-#{backend}-#{System.unique_integer([:positive])}" + test_root = Path.join(System.tmp_dir!(), run_id) + workflow_root = Path.join(test_root, "workflow") + workflow_file = Path.join(workflow_root, "WORKFLOW.md") + worker_setup = live_worker_setup!(backend, run_id, test_root) + team_key = System.get_env("SYMPHONY_LIVE_LINEAR_TEAM_KEY") || @default_team_key + original_workflow_path = Workflow.workflow_file_path() + orchestrator_pid = Process.whereis(SymphonyElixir.Orchestrator) + + File.mkdir_p!(workflow_root) + + try do + if is_pid(orchestrator_pid) do + assert :ok = Supervisor.terminate_child(SymphonyElixir.Supervisor, SymphonyElixir.Orchestrator) + end + + Workflow.set_workflow_file_path(workflow_file) + + write_workflow_file!(workflow_file, + tracker_api_token: "$LINEAR_API_KEY", + tracker_project_slug: "bootstrap", + workspace_root: worker_setup.workspace_root, + worker_ssh_hosts: worker_setup.ssh_worker_hosts, + codex_command: worker_setup.codex_command, + codex_approval_policy: "never", + observability_enabled: false + ) + + team = fetch_team!(team_key) + active_state = active_state!(team) + completed_project_status = completed_project_status!() + terminal_states = terminal_state_names(team) + + project = + create_project!( + team["id"], + "Symphony Live E2E #{backend} #{System.unique_integer([:positive])}" + ) + + issue = + create_issue!( + team["id"], + project["id"], + active_state["id"], + "Symphony live e2e #{backend} issue for #{project["name"]}" + ) + + write_workflow_file!(workflow_file, + tracker_api_token: "$LINEAR_API_KEY", + tracker_project_slug: project["slugId"], + tracker_active_states: active_state_names(team), + tracker_terminal_states: terminal_states, + workspace_root: worker_setup.workspace_root, + worker_ssh_hosts: worker_setup.ssh_worker_hosts, + codex_command: worker_setup.codex_command, + codex_approval_policy: "never", + codex_turn_timeout_ms: 600_000, + codex_stall_timeout_ms: 600_000, + observability_enabled: false, + prompt: live_prompt(project["slugId"]) + ) + + assert :ok = AgentRunner.run(issue, self(), max_turns: 3) + + runtime_info = receive_runtime_info!(issue.id) + + assert read_worker_result!(runtime_info, @result_file) == + expected_result(issue.identifier, project["slugId"]) + + issue_snapshot = fetch_issue_details!(issue.id) + assert issue_completed?(issue_snapshot) + assert issue_has_comment?(issue_snapshot, expected_comment(issue.identifier, project["slugId"])) + + assert :ok = complete_project(project["id"], completed_project_status["id"]) + after + restart_orchestrator_if_needed() + cleanup_live_worker_setup(worker_setup) + Workflow.set_workflow_file_path(original_workflow_path) + File.rm_rf(test_root) + end + end + + defp live_worker_setup!(:local, _run_id, test_root) when is_binary(test_root) do + %{ + cleanup: fn -> :ok end, + codex_command: "codex app-server", + ssh_worker_hosts: [], + workspace_root: Path.join(test_root, "workspaces") + } + end + + defp live_worker_setup!(:ssh, run_id, test_root) when is_binary(run_id) and is_binary(test_root) do + case live_ssh_worker_hosts() do + [] -> + live_docker_worker_setup!(run_id, test_root) + + _hosts -> + live_ssh_worker_setup!(run_id) + end + end + + defp cleanup_live_worker_setup(%{cleanup: cleanup}) when is_function(cleanup, 0) do + cleanup.() + end + + defp cleanup_live_worker_setup(_worker_setup), do: :ok + + defp restart_orchestrator_if_needed do + if is_nil(Process.whereis(SymphonyElixir.Orchestrator)) do + case Supervisor.restart_child(SymphonyElixir.Supervisor, SymphonyElixir.Orchestrator) do + {:ok, _pid} -> :ok + {:error, {:already_started, _pid}} -> :ok + end + end + end + + defp live_ssh_worker_setup!(run_id) when is_binary(run_id) do + ssh_worker_hosts = live_ssh_worker_hosts() + remote_test_root = Path.join(shared_remote_home!(ssh_worker_hosts), ".#{run_id}") + + %{ + cleanup: fn -> cleanup_remote_test_root(remote_test_root, ssh_worker_hosts) end, + codex_command: "codex app-server", + ssh_worker_hosts: ssh_worker_hosts, + workspace_root: Path.join(remote_test_root, "workspaces") + } + end + + defp live_docker_worker_setup!(run_id, test_root) when is_binary(run_id) and is_binary(test_root) do + ssh_root = Path.join(test_root, "live-docker-ssh") + key_path = Path.join(ssh_root, "id_ed25519") + config_path = Path.join(ssh_root, "config") + auth_json_path = @default_docker_auth_json + worker_ports = reserve_tcp_ports(@docker_worker_count) + worker_hosts = Enum.map(worker_ports, &"localhost:#{&1}") + project_name = docker_project_name(run_id) + previous_ssh_config = System.get_env("SYMPHONY_SSH_CONFIG") + + base_cleanup = fn -> + restore_env("SYMPHONY_SSH_CONFIG", previous_ssh_config) + docker_compose_down(project_name, docker_compose_env(worker_ports, auth_json_path, key_path <> ".pub")) + end + + result = + try do + File.mkdir_p!(ssh_root) + generate_ssh_keypair!(key_path) + write_docker_ssh_config!(config_path, key_path) + System.put_env("SYMPHONY_SSH_CONFIG", config_path) + + docker_compose_up!(project_name, docker_compose_env(worker_ports, auth_json_path, key_path <> ".pub")) + wait_for_ssh_hosts!(worker_hosts) + remote_test_root = Path.join(shared_remote_home!(worker_hosts), ".#{run_id}") + + %{ + cleanup: fn -> + cleanup_remote_test_root(remote_test_root, worker_hosts) + base_cleanup.() + end, + codex_command: "codex app-server", + ssh_worker_hosts: worker_hosts, + workspace_root: Path.join(remote_test_root, "workspaces") + } + rescue + error -> + {:error, error, __STACKTRACE__} + catch + kind, reason -> + {:caught, kind, reason, __STACKTRACE__} + end + + case result do + %{ssh_worker_hosts: _hosts} = worker_setup -> + worker_setup + + {:error, error, stacktrace} -> + base_cleanup.() + reraise(error, stacktrace) + + {:caught, kind, reason, stacktrace} -> + base_cleanup.() + :erlang.raise(kind, reason, stacktrace) + end + end + + defp live_ssh_worker_hosts do + System.get_env("SYMPHONY_LIVE_SSH_WORKER_HOSTS", "") + |> String.split(",", trim: true) + |> Enum.map(&String.trim/1) + |> Enum.reject(&(&1 == "")) + end + + defp cleanup_remote_test_root(test_root, ssh_worker_hosts) + when is_binary(test_root) and is_list(ssh_worker_hosts) do + Enum.each(ssh_worker_hosts, fn worker_host -> + _ = SSH.run(worker_host, "rm -rf #{shell_escape(test_root)}", stderr_to_stdout: true) + end) + end + + defp shared_remote_home!([first_host | rest] = worker_hosts) when is_binary(first_host) and rest != [] do + homes = + worker_hosts + |> Enum.map(fn worker_host -> {worker_host, remote_home!(worker_host)} end) + + [{_host, home} | _remaining] = homes + + if Enum.all?(homes, fn {_host, other_home} -> other_home == home end) do + home + else + flunk("expected all live SSH workers to share one home directory, got: #{inspect(homes)}") + end + end + + defp shared_remote_home!([worker_host]) when is_binary(worker_host), do: remote_home!(worker_host) + defp shared_remote_home!(_worker_hosts), do: flunk("expected at least one live SSH worker host") + + defp remote_home!(worker_host) when is_binary(worker_host) do + case SSH.run(worker_host, "printf '%s\\n' \"$HOME\"", stderr_to_stdout: true) do + {:ok, {output, 0}} -> + output + |> String.trim() + |> case do + "" -> flunk("expected non-empty remote home for #{worker_host}") + home -> home + end + + {:ok, {output, status}} -> + flunk("failed to resolve remote home for #{worker_host} (status #{status}): #{inspect(output)}") + + {:error, reason} -> + flunk("failed to resolve remote home for #{worker_host}: #{inspect(reason)}") + end + end + + defp reserve_tcp_ports(count) when is_integer(count) and count > 0 do + reserve_tcp_ports(count, MapSet.new(), []) + end + + defp reserve_tcp_ports(0, _seen, ports), do: Enum.reverse(ports) + + defp reserve_tcp_ports(remaining, seen, ports) do + port = reserve_tcp_port!() + + if MapSet.member?(seen, port) do + reserve_tcp_ports(remaining, seen, ports) + else + reserve_tcp_ports(remaining - 1, MapSet.put(seen, port), [port | ports]) + end + end + + defp reserve_tcp_port! do + {:ok, socket} = :gen_tcp.listen(0, [:binary, {:active, false}, {:reuseaddr, true}]) + {:ok, port} = :inet.port(socket) + :ok = :gen_tcp.close(socket) + port + end + + defp generate_ssh_keypair!(key_path) when is_binary(key_path) do + case System.find_executable("ssh-keygen") do + nil -> + flunk("docker worker mode requires `ssh-keygen` on PATH") + + executable -> + key_dir = Path.dirname(key_path) + File.mkdir_p!(key_dir) + File.rm_rf(key_path) + File.rm_rf(key_path <> ".pub") + + case System.cmd(executable, ["-q", "-t", "ed25519", "-N", "", "-f", key_path], stderr_to_stdout: true) do + {_output, 0} -> :ok + {output, status} -> flunk("failed to generate live docker ssh key (status #{status}): #{inspect(output)}") + end + end + end + + defp write_docker_ssh_config!(config_path, key_path) + when is_binary(config_path) and is_binary(key_path) do + config_contents = """ + Host localhost 127.0.0.1 + User root + IdentityFile #{key_path} + IdentitiesOnly yes + StrictHostKeyChecking no + UserKnownHostsFile /dev/null + LogLevel ERROR + """ + + File.mkdir_p!(Path.dirname(config_path)) + File.write!(config_path, config_contents) + end + + defp docker_project_name(run_id) when is_binary(run_id) do + run_id + |> String.downcase() + |> String.replace(~r/[^a-z0-9_-]/, "-") + end + + defp docker_compose_env(worker_ports, auth_json_path, authorized_key_path) + when is_list(worker_ports) and is_binary(auth_json_path) and is_binary(authorized_key_path) do + [ + {"SYMPHONY_LIVE_DOCKER_AUTH_JSON", auth_json_path}, + {"SYMPHONY_LIVE_DOCKER_AUTHORIZED_KEY", authorized_key_path}, + {"SYMPHONY_LIVE_DOCKER_WORKER_1_PORT", Integer.to_string(Enum.at(worker_ports, 0))}, + {"SYMPHONY_LIVE_DOCKER_WORKER_2_PORT", Integer.to_string(Enum.at(worker_ports, 1))} + ] + end + + defp docker_compose_up!(project_name, env) when is_binary(project_name) and is_list(env) do + args = ["compose", "-f", @docker_compose_file, "-p", project_name, "up", "-d", "--build"] + + case System.cmd("docker", args, cd: @docker_support_dir, env: env, stderr_to_stdout: true) do + {_output, 0} -> + :ok + + {output, status} -> + flunk("failed to start live docker workers (status #{status}): #{inspect(output)}") + end + end + + defp docker_compose_down(project_name, env) when is_binary(project_name) and is_list(env) do + _ = + System.cmd( + "docker", + ["compose", "-f", @docker_compose_file, "-p", project_name, "down", "-v", "--remove-orphans"], + cd: @docker_support_dir, + env: env, + stderr_to_stdout: true + ) + + :ok + end + + defp wait_for_ssh_hosts!(worker_hosts) when is_list(worker_hosts) do + deadline = System.monotonic_time(:millisecond) + 60_000 + + Enum.each(worker_hosts, fn worker_host -> + wait_for_ssh_host!(worker_host, deadline) + end) + end + + defp wait_for_ssh_host!(worker_host, deadline_ms) when is_binary(worker_host) do + case SSH.run(worker_host, "printf ready", stderr_to_stdout: true) do + {:ok, {"ready", 0}} -> + :ok + + {:ok, {_output, _status}} -> + retry_or_flunk_ssh_host(worker_host, deadline_ms) + + {:error, _reason} -> + retry_or_flunk_ssh_host(worker_host, deadline_ms) + end + end + + defp retry_or_flunk_ssh_host(worker_host, deadline_ms) do + if System.monotonic_time(:millisecond) < deadline_ms do + Process.sleep(1_000) + wait_for_ssh_host!(worker_host, deadline_ms) + else + flunk("timed out waiting for SSH worker #{worker_host} to accept connections") + end + end end diff --git a/elixir/test/symphony_elixir/ssh_test.exs b/elixir/test/symphony_elixir/ssh_test.exs new file mode 100644 index 00000000..9edc94f3 --- /dev/null +++ b/elixir/test/symphony_elixir/ssh_test.exs @@ -0,0 +1,199 @@ +defmodule SymphonyElixir.SSHTest do + use ExUnit.Case, async: false + + alias SymphonyElixir.SSH + + test "run/3 keeps bracketed IPv6 host:port targets intact" do + test_root = Path.join(System.tmp_dir!(), "symphony-ssh-ipv6-test-#{System.unique_integer([:positive])}") + trace_file = Path.join(test_root, "ssh.trace") + previous_path = System.get_env("PATH") + + on_exit(fn -> + restore_env("PATH", previous_path) + File.rm_rf(test_root) + end) + + install_fake_ssh!(test_root, trace_file) + + assert {:ok, {"", 0}} = + SSH.run("root@[::1]:2200", "printf ok", stderr_to_stdout: true) + + trace = File.read!(trace_file) + assert trace =~ "-T -p 2200 root@[::1] bash -lc" + assert trace =~ "printf ok" + end + + test "run/3 leaves unbracketed IPv6-style targets unchanged" do + test_root = Path.join(System.tmp_dir!(), "symphony-ssh-ipv6-raw-test-#{System.unique_integer([:positive])}") + trace_file = Path.join(test_root, "ssh.trace") + previous_path = System.get_env("PATH") + + on_exit(fn -> + restore_env("PATH", previous_path) + File.rm_rf(test_root) + end) + + install_fake_ssh!(test_root, trace_file) + + assert {:ok, {"", 0}} = + SSH.run("::1:2200", "printf ok", stderr_to_stdout: true) + + trace = File.read!(trace_file) + assert trace =~ "-T ::1:2200 bash -lc" + refute trace =~ "-p 2200" + end + + test "run/3 passes host:port targets through ssh -p" do + test_root = Path.join(System.tmp_dir!(), "symphony-ssh-test-#{System.unique_integer([:positive])}") + trace_file = Path.join(test_root, "ssh.trace") + previous_path = System.get_env("PATH") + previous_ssh_config = System.get_env("SYMPHONY_SSH_CONFIG") + + on_exit(fn -> + restore_env("PATH", previous_path) + restore_env("SYMPHONY_SSH_CONFIG", previous_ssh_config) + File.rm_rf(test_root) + end) + + install_fake_ssh!(test_root, trace_file) + System.put_env("SYMPHONY_SSH_CONFIG", "/tmp/symphony-test-ssh-config") + + assert {:ok, {"", 0}} = + SSH.run("localhost:2222", "echo ready", stderr_to_stdout: true) + + trace = File.read!(trace_file) + assert trace =~ "-F /tmp/symphony-test-ssh-config" + assert trace =~ "-T -p 2222 localhost bash -lc" + assert trace =~ "echo ready" + end + + test "run/3 keeps the user prefix when parsing user@host:port targets" do + test_root = Path.join(System.tmp_dir!(), "symphony-ssh-user-test-#{System.unique_integer([:positive])}") + trace_file = Path.join(test_root, "ssh.trace") + previous_path = System.get_env("PATH") + + on_exit(fn -> + restore_env("PATH", previous_path) + File.rm_rf(test_root) + end) + + install_fake_ssh!(test_root, trace_file) + + assert {:ok, {"", 0}} = + SSH.run("root@127.0.0.1:2200", "printf ok", stderr_to_stdout: true) + + trace = File.read!(trace_file) + assert trace =~ "-T -p 2200 root@127.0.0.1 bash -lc" + assert trace =~ "printf ok" + end + + test "run/3 returns an error when ssh is unavailable" do + test_root = Path.join(System.tmp_dir!(), "symphony-ssh-missing-test-#{System.unique_integer([:positive])}") + previous_path = System.get_env("PATH") + + on_exit(fn -> + restore_env("PATH", previous_path) + File.rm_rf(test_root) + end) + + File.mkdir_p!(test_root) + System.put_env("PATH", test_root) + + assert {:error, :ssh_not_found} = SSH.run("localhost", "printf ok") + end + + test "start_port/3 supports binary output without line mode" do + test_root = Path.join(System.tmp_dir!(), "symphony-ssh-port-test-#{System.unique_integer([:positive])}") + trace_file = Path.join(test_root, "ssh.trace") + previous_path = System.get_env("PATH") + previous_ssh_config = System.get_env("SYMPHONY_SSH_CONFIG") + + on_exit(fn -> + restore_env("PATH", previous_path) + restore_env("SYMPHONY_SSH_CONFIG", previous_ssh_config) + File.rm_rf(test_root) + end) + + install_fake_ssh!(test_root, trace_file, """ + #!/bin/sh + printf 'ARGV:%s\\n' "$*" >> "#{trace_file}" + printf 'ready\\n' + exit 0 + """) + + System.delete_env("SYMPHONY_SSH_CONFIG") + + assert {:ok, port} = SSH.start_port("localhost", "printf ok") + assert is_port(port) + wait_for_trace!(trace_file) + + trace = File.read!(trace_file) + assert trace =~ "-T localhost bash -lc" + refute trace =~ " -F " + end + + test "start_port/3 supports line mode" do + test_root = Path.join(System.tmp_dir!(), "symphony-ssh-line-port-test-#{System.unique_integer([:positive])}") + trace_file = Path.join(test_root, "ssh.trace") + previous_path = System.get_env("PATH") + + on_exit(fn -> + restore_env("PATH", previous_path) + File.rm_rf(test_root) + end) + + install_fake_ssh!(test_root, trace_file, """ + #!/bin/sh + printf 'ARGV:%s\\n' "$*" >> "#{trace_file}" + printf 'ready\\n' + exit 0 + """) + + assert {:ok, port} = SSH.start_port("localhost:2222", "printf ok", line: 256) + assert is_port(port) + wait_for_trace!(trace_file) + + trace = File.read!(trace_file) + assert trace =~ "-T -p 2222 localhost bash -lc" + end + + test "remote_shell_command/1 escapes embedded single quotes" do + assert SSH.remote_shell_command("printf 'hello'") == + "bash -lc 'printf '\"'\"'hello'\"'\"''" + end + + defp install_fake_ssh!(test_root, trace_file, script \\ nil) do + fake_bin_dir = Path.join(test_root, "bin") + fake_ssh = Path.join(fake_bin_dir, "ssh") + + File.mkdir_p!(fake_bin_dir) + + File.write!( + fake_ssh, + script || + """ + #!/bin/sh + printf 'ARGV:%s\\n' "$*" >> "#{trace_file}" + exit 0 + """ + ) + + File.chmod!(fake_ssh, 0o755) + System.put_env("PATH", fake_bin_dir <> ":" <> (System.get_env("PATH") || "")) + end + + defp wait_for_trace!(trace_file, attempts \\ 20) + defp wait_for_trace!(trace_file, 0), do: flunk("timed out waiting for fake ssh trace at #{trace_file}") + + defp wait_for_trace!(trace_file, attempts) do + if File.exists?(trace_file) and File.read!(trace_file) != "" do + :ok + else + Process.sleep(25) + wait_for_trace!(trace_file, attempts - 1) + end + end + + defp restore_env(key, nil), do: System.delete_env(key) + defp restore_env(key, value), do: System.put_env(key, value) +end diff --git a/elixir/test/symphony_elixir/workspace_and_config_test.exs b/elixir/test/symphony_elixir/workspace_and_config_test.exs index 03a3c59e..e6b4e522 100644 --- a/elixir/test/symphony_elixir/workspace_and_config_test.exs +++ b/elixir/test/symphony_elixir/workspace_and_config_test.exs @@ -86,7 +86,7 @@ defmodule SymphonyElixir.WorkspaceAndConfigTest do assert File.read!(Path.join(second_workspace, "local-progress.txt")) == "in progress\n" assert File.read!(Path.join([second_workspace, "deps", "cache.txt"])) == "cached deps\n" assert File.read!(Path.join([second_workspace, "_build", "artifact.txt"])) == "compiled artifact\n" - refute File.exists?(Path.join([second_workspace, "tmp", "scratch.txt"])) + assert File.read!(Path.join([second_workspace, "tmp", "scratch.txt"])) == "remove me\n" after File.rm_rf(workspace_root) end @@ -1183,4 +1183,72 @@ defmodule SymphonyElixir.WorkspaceAndConfigTest do write_workflow_file!(Workflow.workflow_file_path(), prompt: workflow_prompt) assert Config.workflow_prompt() == workflow_prompt end + + test "remote workspace lifecycle uses ssh host aliases from worker config" do + test_root = + Path.join( + System.tmp_dir!(), + "symphony-elixir-remote-workspace-#{System.unique_integer([:positive])}" + ) + + previous_path = System.get_env("PATH") + previous_trace = System.get_env("SYMP_TEST_SSH_TRACE") + + on_exit(fn -> + restore_env("PATH", previous_path) + restore_env("SYMP_TEST_SSH_TRACE", previous_trace) + end) + + try do + trace_file = Path.join(test_root, "ssh.trace") + fake_ssh = Path.join(test_root, "ssh") + workspace_root = "/remote/workspaces" + workspace_path = Path.join(workspace_root, "MT-SSH-WS") + + File.mkdir_p!(test_root) + System.put_env("SYMP_TEST_SSH_TRACE", trace_file) + System.put_env("PATH", test_root <> ":" <> (previous_path || "")) + + File.write!(fake_ssh, """ + #!/bin/sh + trace_file="${SYMP_TEST_SSH_TRACE:-/tmp/symphony-fake-ssh.trace}" + printf 'ARGV:%s\\n' "$*" >> "$trace_file" + + case "$*" in + *"__SYMPHONY_WORKSPACE__"*) + printf '%s\\t%s\\t%s\\n' '__SYMPHONY_WORKSPACE__' '1' '#{workspace_path}' + ;; + esac + + exit 0 + """) + + File.chmod!(fake_ssh, 0o755) + + write_workflow_file!(Workflow.workflow_file_path(), + workspace_root: workspace_root, + worker_ssh_hosts: ["worker-01:2200"], + hook_before_run: "echo before-run", + hook_after_run: "echo after-run", + hook_before_remove: "echo before-remove" + ) + + assert Config.settings!().worker.ssh_hosts == ["worker-01:2200"] + assert {:ok, ^workspace_path} = Workspace.create_for_issue("MT-SSH-WS", "worker-01:2200") + assert :ok = Workspace.run_before_run_hook(workspace_path, "MT-SSH-WS", "worker-01:2200") + assert :ok = Workspace.run_after_run_hook(workspace_path, "MT-SSH-WS", "worker-01:2200") + assert :ok = Workspace.remove_issue_workspaces("MT-SSH-WS", "worker-01:2200") + + trace = File.read!(trace_file) + assert trace =~ "-p 2200 worker-01 bash -lc" + assert trace =~ "__SYMPHONY_WORKSPACE__" + assert trace =~ "echo before-run" + assert trace =~ "echo after-run" + assert trace =~ "echo before-remove" + assert trace =~ "rm -rf" + assert trace =~ workspace_path + after + File.rm_rf(test_root) + end + end end From 54dbf3a268602a0f5b840ae01713f7672d6a9c18 Mon Sep 17 00:00:00 2001 From: Alex Kotliarskyi Date: Tue, 10 Mar 2026 19:32:17 -0700 Subject: [PATCH 2/4] feat(elixir): add shared per-host ssh worker caps Summary: - add an optional worker.max_concurrent_agents_per_host setting to the Elixir workflow schema - update orchestrator host selection to skip SSH hosts that are at the shared per-host cap and wait when all SSH hosts are full - cover the new cap in config, scheduler, and spec documentation tests Rationale: - let operators bound concurrent ticket execution per SSH worker machine without introducing per-host bespoke config - keep scheduling behavior predictable by treating saturated SSH hosts as temporarily unavailable instead of falling back to local execution - document the feature lightly in the spec so the extension is visible without overspecifying implementation details Tests: - make all Co-authored-by: Codex --- SPEC.md | 10 ++++ elixir/lib/symphony_elixir/config/schema.ex | 4 +- elixir/lib/symphony_elixir/orchestrator.ex | 57 ++++++++++++++++--- elixir/test/support/test_support.exs | 16 ++++-- elixir/test/symphony_elixir/core_test.exs | 47 +++++++++++++++ .../workspace_and_config_test.exs | 9 +++ 6 files changed, 130 insertions(+), 13 deletions(-) diff --git a/SPEC.md b/SPEC.md index e861f3b9..ebdac170 100644 --- a/SPEC.md +++ b/SPEC.md @@ -559,6 +559,10 @@ This section is intentionally redundant so a coding agent can implement the conf - `tracker.terminal_states`: list of strings, default `["Closed", "Cancelled", "Canceled", "Duplicate", "Done"]` - `polling.interval_ms`: integer, default `30000` - `workspace.root`: path, default `/symphony_workspaces` +- `worker.ssh_hosts` (extension): list of SSH host strings, optional; when omitted, work runs + locally +- `worker.max_concurrent_agents_per_host` (extension): positive integer, optional; shared per-host + cap applied across configured SSH hosts - `hooks.after_create`: shell script or null - `hooks.before_run`: shell script or null - `hooks.after_run`: shell script or null @@ -729,6 +733,12 @@ Per-state limit: The runtime counts issues by their current tracked state in the `running` map. +Optional SSH host limit: + +- When `worker.max_concurrent_agents_per_host` is set, each configured SSH host may run at most + that many concurrent agents at once. +- Hosts at that cap are skipped for new dispatch until capacity frees up. + ### 8.4 Retry and Backoff Retry entry creation: diff --git a/elixir/lib/symphony_elixir/config/schema.ex b/elixir/lib/symphony_elixir/config/schema.ex index b9809926..4a4a0854 100644 --- a/elixir/lib/symphony_elixir/config/schema.ex +++ b/elixir/lib/symphony_elixir/config/schema.ex @@ -108,12 +108,14 @@ defmodule SymphonyElixir.Config.Schema do @primary_key false embedded_schema do field(:ssh_hosts, {:array, :string}, default: []) + field(:max_concurrent_agents_per_host, :integer) end @spec changeset(%__MODULE__{}, map()) :: Ecto.Changeset.t() def changeset(schema, attrs) do schema - |> cast(attrs, [:ssh_hosts], empty_values: []) + |> cast(attrs, [:ssh_hosts, :max_concurrent_agents_per_host], empty_values: []) + |> validate_number(:max_concurrent_agents_per_host, greater_than: 0) end end diff --git a/elixir/lib/symphony_elixir/orchestrator.ex b/elixir/lib/symphony_elixir/orchestrator.ex index 0dedc38c..3cd81482 100644 --- a/elixir/lib/symphony_elixir/orchestrator.ex +++ b/elixir/lib/symphony_elixir/orchestrator.ex @@ -327,6 +327,12 @@ defmodule SymphonyElixir.Orchestrator do sort_issues_for_dispatch(issues) end + @doc false + @spec select_worker_host_for_test(term(), String.t() | nil) :: String.t() | nil | :no_worker_capacity + def select_worker_host_for_test(%State{} = state, preferred_worker_host) do + select_worker_host(state, preferred_worker_host) + end + defp reconcile_running_issue_states([], state, _active_states, _terminal_states), do: state defp reconcile_running_issue_states([issue | rest], state, active_states, terminal_states) do @@ -556,7 +562,8 @@ defmodule SymphonyElixir.Orchestrator do !MapSet.member?(claimed, issue.id) and !Map.has_key?(running, issue.id) and available_slots(state) > 0 and - state_slots_available?(issue, running) + state_slots_available?(issue, running) and + worker_slots_available?(state) end defp should_dispatch_issue?(_issue, _state, _active_states, _terminal_states), do: false @@ -672,8 +679,18 @@ defmodule SymphonyElixir.Orchestrator do defp do_dispatch_issue(%State{} = state, issue, attempt, preferred_worker_host) do recipient = self() - worker_host = select_worker_host(state, preferred_worker_host) + case select_worker_host(state, preferred_worker_host) do + :no_worker_capacity -> + Logger.debug("No SSH worker slots available for #{issue_context(issue)} preferred_worker_host=#{inspect(preferred_worker_host)}") + state + + worker_host -> + spawn_issue_on_worker_host(state, issue, attempt, recipient, worker_host) + end + end + + defp spawn_issue_on_worker_host(%State{} = state, issue, attempt, recipient, worker_host) do case Task.Supervisor.start_child(SymphonyElixir.TaskSupervisor, fn -> AgentRunner.run(issue, recipient, attempt: attempt, worker_host: worker_host) end) do @@ -885,7 +902,8 @@ defmodule SymphonyElixir.Orchestrator do defp handle_active_retry(state, issue, attempt, metadata) do if retry_candidate_issue?(issue, terminal_state_set()) and - dispatch_slots_available?(issue, state) do + dispatch_slots_available?(issue, state) and + worker_slots_available?(state, metadata[:worker_host]) do {:noreply, dispatch_issue(state, issue, attempt, metadata[:worker_host])} else Logger.debug("No available slots for retrying #{issue_context(issue)}; retrying again") @@ -958,10 +976,17 @@ defmodule SymphonyElixir.Orchestrator do nil hosts -> - if preferred_worker_host_available?(preferred_worker_host, hosts) do - preferred_worker_host - else - least_loaded_worker_host(state, hosts) + available_hosts = Enum.filter(hosts, &worker_host_slots_available?(state, &1)) + + cond do + available_hosts == [] -> + :no_worker_capacity + + preferred_worker_host_available?(preferred_worker_host, available_hosts) -> + preferred_worker_host + + true -> + least_loaded_worker_host(state, available_hosts) end end end @@ -989,6 +1014,24 @@ defmodule SymphonyElixir.Orchestrator do end) end + defp worker_slots_available?(%State{} = state) do + select_worker_host(state, nil) != :no_worker_capacity + end + + defp worker_slots_available?(%State{} = state, preferred_worker_host) do + select_worker_host(state, preferred_worker_host) != :no_worker_capacity + end + + defp worker_host_slots_available?(%State{} = state, worker_host) when is_binary(worker_host) do + case Config.settings!().worker.max_concurrent_agents_per_host do + limit when is_integer(limit) and limit > 0 -> + running_worker_host_count(state.running, worker_host) < limit + + _ -> + true + end + end + defp find_issue_by_id(issues, issue_id) when is_binary(issue_id) do Enum.find(issues, fn %Issue{id: ^issue_id} -> diff --git a/elixir/test/support/test_support.exs b/elixir/test/support/test_support.exs index 81162f08..484c1cae 100644 --- a/elixir/test/support/test_support.exs +++ b/elixir/test/support/test_support.exs @@ -102,6 +102,7 @@ defmodule SymphonyElixir.TestSupport do poll_interval_ms: 30_000, workspace_root: Path.join(System.tmp_dir!(), "symphony_workspaces"), worker_ssh_hosts: [], + worker_max_concurrent_agents_per_host: nil, max_concurrent_agents: 10, max_turns: 20, max_retry_backoff_ms: 300_000, @@ -138,6 +139,7 @@ defmodule SymphonyElixir.TestSupport do poll_interval_ms = Keyword.get(config, :poll_interval_ms) workspace_root = Keyword.get(config, :workspace_root) worker_ssh_hosts = Keyword.get(config, :worker_ssh_hosts) + worker_max_concurrent_agents_per_host = Keyword.get(config, :worker_max_concurrent_agents_per_host) max_concurrent_agents = Keyword.get(config, :max_concurrent_agents) max_turns = Keyword.get(config, :max_turns) max_retry_backoff_ms = Keyword.get(config, :max_retry_backoff_ms) @@ -176,7 +178,7 @@ defmodule SymphonyElixir.TestSupport do " interval_ms: #{yaml_value(poll_interval_ms)}", "workspace:", " root: #{yaml_value(workspace_root)}", - worker_yaml(worker_ssh_hosts), + worker_yaml(worker_ssh_hosts, worker_max_concurrent_agents_per_host), "agent:", " max_concurrent_agents: #{yaml_value(max_concurrent_agents)}", " max_turns: #{yaml_value(max_turns)}", @@ -238,14 +240,18 @@ defmodule SymphonyElixir.TestSupport do |> Enum.join("\n") end - defp worker_yaml([]), do: nil - defp worker_yaml(nil), do: nil + defp worker_yaml(ssh_hosts, max_concurrent_agents_per_host) + when ssh_hosts in [nil, []] and is_nil(max_concurrent_agents_per_host), + do: nil - defp worker_yaml(ssh_hosts) do + defp worker_yaml(ssh_hosts, max_concurrent_agents_per_host) do [ "worker:", - " ssh_hosts: #{yaml_value(ssh_hosts)}" + ssh_hosts not in [nil, []] && " ssh_hosts: #{yaml_value(ssh_hosts)}", + !is_nil(max_concurrent_agents_per_host) && + " max_concurrent_agents_per_host: #{yaml_value(max_concurrent_agents_per_host)}" ] + |> Enum.reject(&(&1 in [nil, false])) |> Enum.join("\n") end diff --git a/elixir/test/symphony_elixir/core_test.exs b/elixir/test/symphony_elixir/core_test.exs index fa96b7b0..2e332393 100644 --- a/elixir/test/symphony_elixir/core_test.exs +++ b/elixir/test/symphony_elixir/core_test.exs @@ -703,6 +703,53 @@ defmodule SymphonyElixir.CoreTest do assert {:noreply, ^coalesced_state} = Orchestrator.handle_info({:tick, stale_tick_token}, coalesced_state) end + test "select_worker_host_for_test skips full ssh hosts under the shared per-host cap" do + write_workflow_file!(Workflow.workflow_file_path(), + worker_ssh_hosts: ["worker-a", "worker-b"], + worker_max_concurrent_agents_per_host: 1 + ) + + state = %Orchestrator.State{ + running: %{ + "issue-1" => %{worker_host: "worker-a"} + } + } + + assert Orchestrator.select_worker_host_for_test(state, nil) == "worker-b" + end + + test "select_worker_host_for_test returns no_worker_capacity when every ssh host is full" do + write_workflow_file!(Workflow.workflow_file_path(), + worker_ssh_hosts: ["worker-a", "worker-b"], + worker_max_concurrent_agents_per_host: 1 + ) + + state = %Orchestrator.State{ + running: %{ + "issue-1" => %{worker_host: "worker-a"}, + "issue-2" => %{worker_host: "worker-b"} + } + } + + assert Orchestrator.select_worker_host_for_test(state, nil) == :no_worker_capacity + end + + test "select_worker_host_for_test keeps the preferred ssh host when it still has capacity" do + write_workflow_file!(Workflow.workflow_file_path(), + worker_ssh_hosts: ["worker-a", "worker-b"], + worker_max_concurrent_agents_per_host: 2 + ) + + state = %Orchestrator.State{ + running: %{ + "issue-1" => %{worker_host: "worker-a"}, + "issue-2" => %{worker_host: "worker-b"} + } + } + + assert Orchestrator.select_worker_host_for_test(state, "worker-a") == "worker-a" + end + defp assert_due_in_range(due_at_ms, min_remaining_ms, max_remaining_ms) do remaining_ms = due_at_ms - System.monotonic_time(:millisecond) diff --git a/elixir/test/symphony_elixir/workspace_and_config_test.exs b/elixir/test/symphony_elixir/workspace_and_config_test.exs index e6b4e522..b83cffdf 100644 --- a/elixir/test/symphony_elixir/workspace_and_config_test.exs +++ b/elixir/test/symphony_elixir/workspace_and_config_test.exs @@ -742,6 +742,7 @@ defmodule SymphonyElixir.WorkspaceAndConfigTest do assert config.tracker.api_key == nil assert config.tracker.project_slug == nil assert config.workspace.root == Path.join(System.tmp_dir!(), "symphony_workspaces") + assert config.worker.max_concurrent_agents_per_host == nil assert config.agent.max_concurrent_agents == 10 assert config.codex.command == "codex app-server" @@ -813,6 +814,10 @@ defmodule SymphonyElixir.WorkspaceAndConfigTest do assert {:error, {:invalid_workflow_config, message}} = Config.validate!() assert message =~ "agent.max_concurrent_agents" + write_workflow_file!(Workflow.workflow_file_path(), worker_max_concurrent_agents_per_host: 0) + assert {:error, {:invalid_workflow_config, message}} = Config.validate!() + assert message =~ "worker.max_concurrent_agents_per_host" + write_workflow_file!(Workflow.workflow_file_path(), codex_turn_timeout_ms: "bad") assert {:error, {:invalid_workflow_config, message}} = Config.validate!() assert message =~ "codex.turn_timeout_ms" @@ -955,6 +960,10 @@ defmodule SymphonyElixir.WorkspaceAndConfigTest do assert Config.max_concurrent_agents_for_state("In Review") == 2 assert Config.max_concurrent_agents_for_state("Closed") == 10 assert Config.max_concurrent_agents_for_state(:not_a_string) == 10 + + write_workflow_file!(Workflow.workflow_file_path(), worker_max_concurrent_agents_per_host: 2) + assert :ok = Config.validate!() + assert Config.settings!().worker.max_concurrent_agents_per_host == 2 end test "schema helpers cover custom type and state limit validation" do From 4261753f426ab9d9068e9a64067c49ac0e4fbbe5 Mon Sep 17 00:00:00 2001 From: Alex Kotliarskyi Date: Wed, 11 Mar 2026 12:35:10 -0700 Subject: [PATCH 3/4] docs(spec): add ssh worker appendix Summary: - add an appendix describing the optional SSH worker extension - clarify the execution model, scheduling notes, and operator concerns - call out failover, workspace locality, and host-capacity behavior Rationale: - document the SSH extension in one place without over-specifying the implementation - make the main operational tradeoffs explicit for readers and operators Tests: - not run (doc-only change) Co-authored-by: Codex --- SPEC.md | 55 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/SPEC.md b/SPEC.md index ebdac170..f9e2b63a 100644 --- a/SPEC.md +++ b/SPEC.md @@ -2118,3 +2118,58 @@ Use the same validation profiles as Section 17: - Verify hook execution and workflow path resolution on the target host OS/shell environment. - If the optional HTTP server is shipped, verify the configured port behavior and loopback/default bind expectations on the target environment. + +## Appendix A. SSH Worker Extension (Optional) + +This appendix describes a common extension profile in which Symphony keeps one central +orchestrator but executes worker runs on one or more remote hosts over SSH. + +### A.1 Execution Model + +- The orchestrator remains the single source of truth for polling, claims, retries, and + reconciliation. +- `worker.ssh_hosts` provides the candidate SSH destinations for remote execution. +- Each worker run is assigned to one host at a time, and that host becomes part of the run's + effective execution identity along with the issue workspace. +- `workspace.root` is interpreted on the remote host, not on the orchestrator host. +- The coding-agent app-server is launched over SSH stdio instead of as a local subprocess, so the + orchestrator still owns the session lifecycle even though commands execute remotely. +- Continuation turns inside one worker lifetime should stay on the same host and workspace. +- A remote host should satisfy the same basic contract as a local worker environment: reachable + shell, writable workspace root, coding-agent executable, and any required auth or repository + prerequisites. + +### A.2 Scheduling Notes + +- SSH hosts may be treated as a pool for dispatch. +- Implementations may prefer the previously used host on retries when that host is still + available. +- `worker.max_concurrent_agents_per_host` is an optional shared per-host cap across configured SSH + hosts. +- When all SSH hosts are at capacity, dispatch should wait rather than silently falling back to a + different execution mode. +- Implementations may fail over to another host when the original host is unavailable before work + has meaningfully started. +- Once a run has already produced side effects, a transparent rerun on another host should be + treated as a new attempt, not as invisible failover. + +### A.3 Problems to Consider + +- Remote environment drift: + - Each host needs the expected shell environment, coding-agent executable, auth, and repository + prerequisites. +- Workspace locality: + - Workspaces are usually host-local, so moving an issue to a different host is typically a cold + restart unless shared storage exists. +- Path and command safety: + - Remote path resolution, shell quoting, and workspace-boundary checks matter more once execution + crosses a machine boundary. +- Startup and failover semantics: + - Implementations should distinguish host-connectivity/startup failures from in-workspace agent + failures so the same ticket is not accidentally re-executed on multiple hosts. +- Host health and saturation: + - A dead or overloaded host should reduce available capacity, not cause duplicate execution or an + accidental fallback to local work. +- Cleanup and observability: + - Operators need to know which host owns a run, where its workspace lives, and whether cleanup + happened on the right machine. From 1fdb494b57430e616c3893716304e6a0659b0ed4 Mon Sep 17 00:00:00 2001 From: Alex Kotliarskyi Date: Wed, 11 Mar 2026 14:45:41 -0700 Subject: [PATCH 4/4] refactor(elixir): keep workspace roots raw for ssh workers Summary: - keep workspace.root as the configured string instead of expanding it in config parsing - expand local workspace roots only at local sandbox/path use sites and keep ssh roots raw for worker-side ~ resolution - update the ssh workspace tests and live e2e harness to exercise remote roots that begin with ~ - stabilize the orchestrator polling snapshot test under coverage timing Rationale: - preserve the correct ownership boundary for remote path interpretation so ssh workers resolve ~ using their own HOME - avoid carrying duplicate config state such as raw_root when the consumers can handle expansion at the actual boundary - keep make all green while changing the path semantics by tightening the affected tests Tests: - make -C elixir all - cd elixir && env -u SYMPHONY_LIVE_SSH_WORKER_HOSTS LINEAR_API_KEY="$(tr -d '\r\n' < ~/.linear_api_key)" SYMPHONY_RUN_LIVE_E2E=1 mix test test/symphony_elixir/live_e2e_test.exs:128 Co-authored-by: Codex --- elixir/lib/symphony_elixir/config/schema.ex | 43 ++++++++++------ elixir/lib/symphony_elixir/workspace.ex | 39 +++++++++++---- elixir/test/symphony_elixir/live_e2e_test.exs | 6 ++- .../orchestrator_status_test.exs | 2 + .../workspace_and_config_test.exs | 49 +++++++++++++++++-- 5 files changed, 109 insertions(+), 30 deletions(-) diff --git a/elixir/lib/symphony_elixir/config/schema.ex b/elixir/lib/symphony_elixir/config/schema.ex index 4a4a0854..17ead4ad 100644 --- a/elixir/lib/symphony_elixir/config/schema.ex +++ b/elixir/lib/symphony_elixir/config/schema.ex @@ -296,7 +296,10 @@ defmodule SymphonyElixir.Config.Schema do policy _ -> - default_turn_sandbox_policy(workspace || settings.workspace.root) + workspace + |> default_workspace_root(settings.workspace.root) + |> expand_local_workspace_root() + |> default_turn_sandbox_policy() end end @@ -308,7 +311,9 @@ defmodule SymphonyElixir.Config.Schema do {:ok, policy} _ -> - default_runtime_turn_sandbox_policy(workspace || settings.workspace.root, opts) + workspace + |> default_workspace_root(settings.workspace.root) + |> default_runtime_turn_sandbox_policy(opts) end end @@ -420,13 +425,13 @@ defmodule SymphonyElixir.Config.Schema do defp resolve_path_value(value, default) when is_binary(value) do case normalize_path_token(value) do :missing -> - Path.expand(default) + default "" -> - Path.expand(default) + default path -> - Path.expand(path) + path end end @@ -475,16 +480,9 @@ defmodule SymphonyElixir.Config.Schema do defp normalize_secret_value(_value), do: nil defp default_turn_sandbox_policy(workspace) do - writable_root = - if is_binary(workspace) and workspace != "" do - Path.expand(workspace) - else - Path.expand(Path.join(System.tmp_dir!(), "symphony_workspaces")) - end - %{ "type" => "workspaceWrite", - "writableRoots" => [writable_root], + "writableRoots" => [workspace], "readOnlyAccess" => %{"type" => "fullAccess"}, "networkAccess" => false, "excludeTmpdirEnvVar" => false, @@ -496,7 +494,8 @@ defmodule SymphonyElixir.Config.Schema do if Keyword.get(opts, :remote, false) do {:ok, default_turn_sandbox_policy(workspace_root)} else - with {:ok, canonical_workspace_root} <- PathSafety.canonicalize(workspace_root) do + with expanded_workspace_root <- expand_local_workspace_root(workspace_root), + {:ok, canonical_workspace_root} <- PathSafety.canonicalize(expanded_workspace_root) do {:ok, default_turn_sandbox_policy(canonical_workspace_root)} end end @@ -506,6 +505,22 @@ defmodule SymphonyElixir.Config.Schema do {:error, {:unsafe_turn_sandbox_policy, {:invalid_workspace_root, workspace_root}}} end + defp default_workspace_root(workspace, _fallback) when is_binary(workspace) and workspace != "", + do: workspace + + defp default_workspace_root(nil, fallback), do: fallback + defp default_workspace_root("", fallback), do: fallback + defp default_workspace_root(workspace, _fallback), do: workspace + + defp expand_local_workspace_root(workspace_root) + when is_binary(workspace_root) and workspace_root != "" do + Path.expand(workspace_root) + end + + defp expand_local_workspace_root(_workspace_root) do + Path.expand(Path.join(System.tmp_dir!(), "symphony_workspaces")) + end + defp format_errors(changeset) do changeset |> traverse_errors(&translate_error/1) diff --git a/elixir/lib/symphony_elixir/workspace.ex b/elixir/lib/symphony_elixir/workspace.ex index 38e8ecac..14e29da4 100644 --- a/elixir/lib/symphony_elixir/workspace.ex +++ b/elixir/lib/symphony_elixir/workspace.ex @@ -49,17 +49,18 @@ defmodule SymphonyElixir.Workspace do script = [ "set -eu", - "if [ -d #{shell_escape(workspace)} ]; then", + remote_shell_assign("workspace", workspace), + "if [ -d \"$workspace\" ]; then", " created=0", - "elif [ -e #{shell_escape(workspace)} ]; then", - " rm -rf #{shell_escape(workspace)}", - " mkdir -p #{shell_escape(workspace)}", + "elif [ -e \"$workspace\" ]; then", + " rm -rf \"$workspace\"", + " mkdir -p \"$workspace\"", " created=1", "else", - " mkdir -p #{shell_escape(workspace)}", + " mkdir -p \"$workspace\"", " created=1", "fi", - "cd #{shell_escape(workspace)}", + "cd \"$workspace\"", "printf '%s\\t%s\\t%s\\n' '#{@remote_workspace_marker}' \"$created\" \"$(pwd -P)\"" ] |> Enum.reject(&(&1 == "")) @@ -107,7 +108,14 @@ defmodule SymphonyElixir.Workspace do def remove(workspace, worker_host) when is_binary(worker_host) do maybe_run_before_remove_hook(workspace, worker_host) - case run_remote_command(worker_host, "rm -rf #{shell_escape(workspace)}", Config.settings!().hooks.timeout_ms) do + script = + [ + remote_shell_assign("workspace", workspace), + "rm -rf \"$workspace\"" + ] + |> Enum.join("\n") + + case run_remote_command(worker_host, script, Config.settings!().hooks.timeout_ms) do {:ok, {_output, 0}} -> {:ok, []} @@ -252,8 +260,9 @@ defmodule SymphonyElixir.Workspace do command -> script = [ - "if [ -d #{shell_escape(workspace)} ]; then", - " cd #{shell_escape(workspace)}", + remote_shell_assign("workspace", workspace), + "if [ -d \"$workspace\" ]; then", + " cd \"$workspace\"", " #{command}", "fi" ] @@ -388,6 +397,18 @@ defmodule SymphonyElixir.Workspace do end end + defp remote_shell_assign(variable_name, raw_path) + when is_binary(variable_name) and is_binary(raw_path) do + [ + "#{variable_name}=#{shell_escape(raw_path)}", + "case \"$#{variable_name}\" in", + " '~') #{variable_name}=\"$HOME\" ;;", + " '~/'*) " <> variable_name <> "=\"$HOME/${" <> variable_name <> "#~/}\" ;;", + "esac" + ] + |> Enum.join("\n") + end + defp parse_remote_workspace_output(output) do lines = String.split(IO.iodata_to_binary(output), "\n", trim: true) diff --git a/elixir/test/symphony_elixir/live_e2e_test.exs b/elixir/test/symphony_elixir/live_e2e_test.exs index 163d03e4..9bfeced8 100644 --- a/elixir/test/symphony_elixir/live_e2e_test.exs +++ b/elixir/test/symphony_elixir/live_e2e_test.exs @@ -555,12 +555,13 @@ defmodule SymphonyElixir.LiveE2ETest do defp live_ssh_worker_setup!(run_id) when is_binary(run_id) do ssh_worker_hosts = live_ssh_worker_hosts() remote_test_root = Path.join(shared_remote_home!(ssh_worker_hosts), ".#{run_id}") + remote_workspace_root = "~/.#{run_id}/workspaces" %{ cleanup: fn -> cleanup_remote_test_root(remote_test_root, ssh_worker_hosts) end, codex_command: "codex app-server", ssh_worker_hosts: ssh_worker_hosts, - workspace_root: Path.join(remote_test_root, "workspaces") + workspace_root: remote_workspace_root } end @@ -589,6 +590,7 @@ defmodule SymphonyElixir.LiveE2ETest do docker_compose_up!(project_name, docker_compose_env(worker_ports, auth_json_path, key_path <> ".pub")) wait_for_ssh_hosts!(worker_hosts) remote_test_root = Path.join(shared_remote_home!(worker_hosts), ".#{run_id}") + remote_workspace_root = "~/.#{run_id}/workspaces" %{ cleanup: fn -> @@ -597,7 +599,7 @@ defmodule SymphonyElixir.LiveE2ETest do end, codex_command: "codex app-server", ssh_worker_hosts: worker_hosts, - workspace_root: Path.join(remote_test_root, "workspaces") + workspace_root: remote_workspace_root } rescue error -> diff --git a/elixir/test/symphony_elixir/orchestrator_status_test.exs b/elixir/test/symphony_elixir/orchestrator_status_test.exs index 14c3e1bb..4326b80c 100644 --- a/elixir/test/symphony_elixir/orchestrator_status_test.exs +++ b/elixir/test/symphony_elixir/orchestrator_status_test.exs @@ -767,6 +767,8 @@ defmodule SymphonyElixir.OrchestratorStatusTest do %{ state | poll_interval_ms: 30_000, + tick_timer_ref: nil, + tick_token: make_ref(), next_poll_due_at_ms: now_ms + 4_000, poll_check_in_progress: false } diff --git a/elixir/test/symphony_elixir/workspace_and_config_test.exs b/elixir/test/symphony_elixir/workspace_and_config_test.exs index b83cffdf..59ff0850 100644 --- a/elixir/test/symphony_elixir/workspace_and_config_test.exs +++ b/elixir/test/symphony_elixir/workspace_and_config_test.exs @@ -937,7 +937,7 @@ defmodule SymphonyElixir.WorkspaceAndConfigTest do config = Config.settings!() assert config.tracker.api_key == "env:#{api_key_env_var}" - assert config.workspace.root == Path.expand("env:#{workspace_env_var}") + assert config.workspace.root == "env:#{workspace_env_var}" end test "config supports per-state max concurrent agent overrides" do @@ -1030,7 +1030,7 @@ defmodule SymphonyElixir.WorkspaceAndConfigTest do }) assert settings.tracker.api_key == nil - assert settings.workspace.root == Path.expand(Path.join(System.tmp_dir!(), "symphony_workspaces")) + assert settings.workspace.root == Path.join(System.tmp_dir!(), "symphony_workspaces") assert settings.codex.approval_policy == %{ "reject" => %{"sandbox_approval" => true} @@ -1043,7 +1043,7 @@ defmodule SymphonyElixir.WorkspaceAndConfigTest do }) assert settings.tracker.api_key == "fallback-linear-token" - assert settings.workspace.root == Path.expand(Path.join(System.tmp_dir!(), "symphony_workspaces")) + assert settings.workspace.root == Path.join(System.tmp_dir!(), "symphony_workspaces") end test "schema resolves sandbox policies from explicit and default workspaces" do @@ -1082,6 +1082,37 @@ defmodule SymphonyElixir.WorkspaceAndConfigTest do } end + test "schema keeps workspace roots raw while sandbox helpers expand only for local use" do + assert {:ok, settings} = + Schema.parse(%{ + workspace: %{root: "~/.symphony-workspaces"}, + codex: %{} + }) + + assert settings.workspace.root == "~/.symphony-workspaces" + + assert Schema.resolve_turn_sandbox_policy(settings) == %{ + "type" => "workspaceWrite", + "writableRoots" => [Path.expand("~/.symphony-workspaces")], + "readOnlyAccess" => %{"type" => "fullAccess"}, + "networkAccess" => false, + "excludeTmpdirEnvVar" => false, + "excludeSlashTmp" => false + } + + assert {:ok, remote_policy} = + Schema.resolve_runtime_turn_sandbox_policy(settings, nil, remote: true) + + assert remote_policy == %{ + "type" => "workspaceWrite", + "writableRoots" => ["~/.symphony-workspaces"], + "readOnlyAccess" => %{"type" => "fullAccess"}, + "networkAccess" => false, + "excludeTmpdirEnvVar" => false, + "excludeSlashTmp" => false + } + end + test "runtime sandbox policy resolution passes explicit policies through unchanged" do test_root = Path.join( @@ -1163,6 +1194,11 @@ defmodule SymphonyElixir.WorkspaceAndConfigTest do assert default_policy["type"] == "workspaceWrite" assert default_policy["writableRoots"] == [canonical_workspace_root] + assert {:ok, blank_workspace_policy} = + Schema.resolve_runtime_turn_sandbox_policy(settings, "") + + assert blank_workspace_policy == default_policy + read_only_settings = %{ settings | codex: %{settings.codex | turn_sandbox_policy: %{"type" => "readOnly", "networkAccess" => true}} @@ -1211,8 +1247,8 @@ defmodule SymphonyElixir.WorkspaceAndConfigTest do try do trace_file = Path.join(test_root, "ssh.trace") fake_ssh = Path.join(test_root, "ssh") - workspace_root = "/remote/workspaces" - workspace_path = Path.join(workspace_root, "MT-SSH-WS") + workspace_root = "~/.symphony-remote-workspaces" + workspace_path = "/remote/home/.symphony-remote-workspaces/MT-SSH-WS" File.mkdir_p!(test_root) System.put_env("SYMP_TEST_SSH_TRACE", trace_file) @@ -1243,6 +1279,7 @@ defmodule SymphonyElixir.WorkspaceAndConfigTest do ) assert Config.settings!().worker.ssh_hosts == ["worker-01:2200"] + assert Config.settings!().workspace.root == workspace_root assert {:ok, ^workspace_path} = Workspace.create_for_issue("MT-SSH-WS", "worker-01:2200") assert :ok = Workspace.run_before_run_hook(workspace_path, "MT-SSH-WS", "worker-01:2200") assert :ok = Workspace.run_after_run_hook(workspace_path, "MT-SSH-WS", "worker-01:2200") @@ -1251,6 +1288,8 @@ defmodule SymphonyElixir.WorkspaceAndConfigTest do trace = File.read!(trace_file) assert trace =~ "-p 2200 worker-01 bash -lc" assert trace =~ "__SYMPHONY_WORKSPACE__" + assert trace =~ "~/.symphony-remote-workspaces/MT-SSH-WS" + assert trace =~ "${workspace#~/}" assert trace =~ "echo before-run" assert trace =~ "echo after-run" assert trace =~ "echo before-remove"