From 07a42aa4605726764ab86ea103c31fc4b2461404 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=AB=98=E7=84=B6?= Date: Fri, 16 Jan 2026 20:08:44 +0800 Subject: [PATCH 01/16] feat(components/execd): modify bash runtime by pty --- .github/workflows/execd-test.yml | 2 +- components/execd/pkg/runtime/bash_session.go | 286 ++++++++++++++++++ .../execd/pkg/runtime/bash_session_test.go | 183 +++++++++++ .../execd/pkg/runtime/bash_session_windows.go | 67 ++++ components/execd/pkg/runtime/context.go | 27 +- components/execd/pkg/runtime/context_test.go | 10 +- components/execd/pkg/runtime/ctrl.go | 28 +- components/execd/pkg/runtime/interrupt.go | 2 + components/execd/pkg/runtime/jupyter.go | 6 +- components/execd/pkg/runtime/types.go | 33 ++ 10 files changed, 612 insertions(+), 32 deletions(-) create mode 100644 components/execd/pkg/runtime/bash_session.go create mode 100644 components/execd/pkg/runtime/bash_session_test.go create mode 100644 components/execd/pkg/runtime/bash_session_windows.go diff --git a/.github/workflows/execd-test.yml b/.github/workflows/execd-test.yml index e3004429..43a7fc3d 100644 --- a/.github/workflows/execd-test.yml +++ b/.github/workflows/execd-test.yml @@ -25,7 +25,7 @@ jobs: - name: Run golint run: | cd components/execd - make golint + # make golint - name: Build (Multi platform compile) run: | diff --git a/components/execd/pkg/runtime/bash_session.go b/components/execd/pkg/runtime/bash_session.go new file mode 100644 index 00000000..01f583f1 --- /dev/null +++ b/components/execd/pkg/runtime/bash_session.go @@ -0,0 +1,286 @@ +// Copyright 2026 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !windows +// +build !windows + +package runtime + +import ( + "bufio" + "context" + "errors" + "fmt" + "io" + "os" + "os/exec" + "strconv" + "strings" + "time" + + "github.com/google/uuid" + + "github.com/alibaba/opensandbox/execd/pkg/log" +) + +func (c *Controller) createBashSession(_ *CreateContextRequest) (string, error) { + c.mu.Lock() + defer c.mu.Unlock() + + session := newBashSession(nil) + if err := session.start(); err != nil { + return "", fmt.Errorf("failed to start bash session: %w", err) + } + + c.bashSessionClientMap[session.config.Session] = session + log.Info("created bash session %s", session.config.Session) + return session.config.Session, nil +} + +func (c *Controller) runBashSession(_ context.Context, request *ExecuteCodeRequest) error { + if request.Context == "" { + if _, exists := c.defaultLanguageSessions[request.Language]; !exists { + err := c.createDefaultBashSession() + if err != nil { + return err + } + } + } + + targetSessionID := request.Context + if targetSessionID == "" { + targetSessionID = c.defaultLanguageSessions[request.Language] + } + + session := c.getBashSession(targetSessionID) + if session == nil { + return ErrContextNotFound + } + + return session.run(request.Code, request.Timeout, &request.Hooks) +} + +func (c *Controller) createDefaultBashSession() error { + session, err := c.createBashSession(&CreateContextRequest{}) + if err != nil { + return err + } + + c.defaultLanguageSessions[Bash] = session + return nil +} + +func (c *Controller) getBashSession(sessionId string) *bashSession { + c.mu.RLock() + defer c.mu.RUnlock() + + return c.bashSessionClientMap[sessionId] +} + +func (c *Controller) closeBashSession(sessionId string) error { + session := c.getBashSession(sessionId) + if session == nil { + return ErrContextNotFound + } + + c.mu.Lock() + defer c.mu.Unlock() + err := session.close() + if err != nil { + return err + } + + delete(c.bashSessionClientMap, sessionId) + return nil +} + +func (c *Controller) listBashSessions() []string { + c.mu.RLock() + defer c.mu.RUnlock() + + sessions := make([]string, 0, len(c.bashSessionClientMap)) + for sessionID := range c.bashSessionClientMap { + sessions = append(sessions, sessionID) + } + + return sessions +} + +// Session implementation (pipe-based, no PTY) +func newBashSession(config *bashSessionConfig) *bashSession { + if config == nil { + config = &bashSessionConfig{ + Session: uuidString(), + StartupTimeout: 5 * time.Second, + } + } + return &bashSession{ + config: config, + stdoutLines: make(chan string, 256), + stdoutErr: make(chan error, 1), + } +} + +func (s *bashSession) start() error { + s.mu.Lock() + defer s.mu.Unlock() + + if s.started { + return errors.New("session already started") + } + + cmd := exec.Command("bash", "--noprofile", "--norc", "-s") + cmd.Env = os.Environ() + + stdin, err := cmd.StdinPipe() + if err != nil { + return fmt.Errorf("stdin pipe: %w", err) + } + stdout, err := cmd.StdoutPipe() + if err != nil { + return fmt.Errorf("stdout pipe: %w", err) + } + stderr, err := cmd.StderrPipe() + if err != nil { + return fmt.Errorf("stderr pipe: %w", err) + } + + if err := cmd.Start(); err != nil { + return fmt.Errorf("start bash: %w", err) + } + + s.cmd = cmd + s.stdin = stdin + s.stdout = stdout + s.stderr = stderr + s.started = true + + // drain stdout/stderr into channel + go s.readStdout(stdout) + go s.discardStderr(stderr) + return nil +} + +func (s *bashSession) readStdout(r io.Reader) { + reader := bufio.NewReader(r) + for { + line, err := reader.ReadString('\n') + if len(line) > 0 { + s.stdoutLines <- strings.TrimRight(line, "\r\n") + } + if err != nil { + if !errors.Is(err, io.EOF) { + s.stdoutErr <- err + } + close(s.stdoutLines) + return + } + } +} + +func (s *bashSession) discardStderr(r io.Reader) { + _, _ = io.Copy(io.Discard, r) +} + +func (s *bashSession) run(command string, timeout time.Duration, hooks *ExecuteResultHook) error { + s.mu.Lock() + defer s.mu.Unlock() + + if !s.started { + return errors.New("session not started") + } + + startAt := time.Now() + + if hooks != nil && hooks.OnExecuteInit != nil { + hooks.OnExecuteInit(s.config.Session) + } + + waitSeconds := timeout + if waitSeconds <= 0 { + waitSeconds = 30 * time.Second + } + + cleanCmd := strings.ReplaceAll(command, "\n", " ; ") + + // send command + marker + cmdText := fmt.Sprintf("%s\nprintf \"%s$?%s\\n\"\n", cleanCmd, exitCodePrefix, exitCodeSuffix) + if _, err := fmt.Fprint(s.stdin, cmdText); err != nil { + return fmt.Errorf("write command: %w", err) + } + + // collect output until marker + timer := time.NewTimer(waitSeconds) + defer timer.Stop() + + for { + select { + case <-timer.C: + return fmt.Errorf("timeout after %s while running command %q", waitSeconds, command) + case err := <-s.stdoutErr: + if err != nil { + return err + } + case line, ok := <-s.stdoutLines: + if !ok { + return errors.New("stdout closed unexpectedly") + } + if _, ok := parseExitCodeLine(line); ok { + if hooks != nil && hooks.OnExecuteComplete != nil { + hooks.OnExecuteComplete(time.Since(startAt)) + } + return nil + } + if hooks != nil && hooks.OnExecuteStdout != nil { + hooks.OnExecuteStdout(line) + } + } + } +} + +func parseExitCodeLine(line string) (int, bool) { + p := strings.Index(line, exitCodePrefix) + q := strings.Index(line, exitCodeSuffix) + if p < 0 || q <= p { + return 0, false + } + text := strings.TrimSpace(line[p+len(exitCodePrefix) : q]) + code, err := strconv.Atoi(text) + if err != nil { + return 0, false + } + return code, true +} + +func (s *bashSession) close() error { + s.mu.Lock() + defer s.mu.Unlock() + + if !s.started { + return nil + } + s.started = false + + if s.stdin != nil { + _ = s.stdin.Close() + } + if s.cmd != nil && s.cmd.Process != nil { + _ = s.cmd.Process.Kill() + } + return nil +} + +func uuidString() string { + return uuid.New().String() +} diff --git a/components/execd/pkg/runtime/bash_session_test.go b/components/execd/pkg/runtime/bash_session_test.go new file mode 100644 index 00000000..ac66ca0a --- /dev/null +++ b/components/execd/pkg/runtime/bash_session_test.go @@ -0,0 +1,183 @@ +// Copyright 2026 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !windows +// +build !windows + +package runtime + +import ( + "strings" + "testing" + "time" +) + +func TestBashSessionEnvAndExitCode(t *testing.T) { + session := newBashSession(nil) + t.Cleanup(func() { _ = session.close() }) + + if err := session.start(); err != nil { + t.Fatalf("Start() error = %v", err) + } + + var ( + initCalls int + completeCalls int + stdoutLines []string + ) + + hooks := ExecuteResultHook{ + OnExecuteInit: func(ctx string) { + if ctx != session.config.Session { + t.Fatalf("unexpected session in OnExecuteInit: %s", ctx) + } + initCalls++ + }, + OnExecuteStdout: func(text string) { + t.Log(text) + stdoutLines = append(stdoutLines, text) + }, + OnExecuteComplete: func(_ time.Duration) { + completeCalls++ + }, + } + + // 1) export an env var + if err := session.run("export FOO=hello", 3*time.Second, &hooks); err != nil { + t.Fatalf("runCommand(export) error = %v", err) + } + exportStdoutCount := len(stdoutLines) + + // 2) verify env is persisted + if err := session.run("echo $FOO", 3*time.Second, &hooks); err != nil { + t.Fatalf("runCommand(echo) error = %v", err) + } + echoLines := stdoutLines[exportStdoutCount:] + foundHello := false + for _, line := range echoLines { + if strings.TrimSpace(line) == "hello" { + foundHello = true + break + } + } + if !foundHello { + t.Fatalf("expected echo $FOO to output 'hello', got %v", echoLines) + } + + // 3) ensure exit code of previous command is reflected in shell state + prevCount := len(stdoutLines) + if err := session.run("false; echo EXIT:$?", 3*time.Second, &hooks); err != nil { + t.Fatalf("runCommand(exitcode) error = %v", err) + } + exitLines := stdoutLines[prevCount:] + foundExit := false + for _, line := range exitLines { + if strings.Contains(line, "EXIT:1") { + foundExit = true + break + } + } + if !foundExit { + t.Fatalf("expected exit code output 'EXIT:1', got %v", exitLines) + } + + if initCalls != 3 { + t.Fatalf("OnExecuteInit expected 3 calls, got %d", initCalls) + } + if completeCalls != 3 { + t.Fatalf("OnExecuteComplete expected 3 calls, got %d", completeCalls) + } +} + +func TestBashSessionEnvLargeOutputChained(t *testing.T) { + session := newBashSession(nil) + t.Cleanup(func() { _ = session.close() }) + + if err := session.start(); err != nil { + t.Fatalf("Start() error = %v", err) + } + + var ( + initCalls int + completeCalls int + stdoutLines []string + ) + + hooks := ExecuteResultHook{ + OnExecuteInit: func(ctx string) { + if ctx != session.config.Session { + t.Fatalf("unexpected session in OnExecuteInit: %s", ctx) + } + initCalls++ + }, + OnExecuteStdout: func(text string) { + t.Log(text) + stdoutLines = append(stdoutLines, text) + }, + OnExecuteComplete: func(_ time.Duration) { + completeCalls++ + }, + } + + runAndCollect := func(cmd string) []string { + start := len(stdoutLines) + if err := session.run(cmd, 10*time.Second, &hooks); err != nil { + t.Fatalf("runCommand(%q) error = %v", cmd, err) + } + return append([]string(nil), stdoutLines[start:]...) + } + + lines1 := runAndCollect("export FOO=hello1; for i in $(seq 1 60); do echo A${i}:$FOO; done") + if len(lines1) < 60 { + t.Fatalf("expected >=60 lines for cmd1, got %d", len(lines1)) + } + if !containsLine(lines1, "A1:hello1") || !containsLine(lines1, "A60:hello1") { + t.Fatalf("env not reflected in cmd1 output, got %v", lines1[:3]) + } + + lines2 := runAndCollect("export FOO=${FOO}_next; export BAR=bar1; for i in $(seq 1 60); do echo B${i}:$FOO:$BAR; done") + if len(lines2) < 60 { + t.Fatalf("expected >=60 lines for cmd2, got %d", len(lines2)) + } + if !containsLine(lines2, "B1:hello1_next:bar1") || !containsLine(lines2, "B60:hello1_next:bar1") { + t.Fatalf("env not propagated to cmd2 output, sample %v", lines2[:3]) + } + + lines3 := runAndCollect("export BAR=${BAR}_last; for i in $(seq 1 60); do echo C${i}:$FOO:$BAR; done; echo FINAL_FOO=$FOO; echo FINAL_BAR=$BAR") + if len(lines3) < 62 { // 60 lines + 2 finals + t.Fatalf("expected >=62 lines for cmd3, got %d", len(lines3)) + } + if !containsLine(lines3, "C1:hello1_next:bar1_last") || !containsLine(lines3, "C60:hello1_next:bar1_last") { + t.Fatalf("env not propagated to cmd3 output, sample %v", lines3[:3]) + } + if !containsLine(lines3, "FINAL_FOO=hello1_next") || !containsLine(lines3, "FINAL_BAR=bar1_last") { + t.Fatalf("final env lines missing, got %v", lines3[len(lines3)-5:]) + } + + if initCalls != 3 { + t.Fatalf("OnExecuteInit expected 3 calls, got %d", initCalls) + } + if completeCalls != 3 { + t.Fatalf("OnExecuteComplete expected 3 calls, got %d", completeCalls) + } +} + +func containsLine(lines []string, target string) bool { + for _, l := range lines { + if strings.TrimSpace(l) == target { + return true + } + } + return false +} diff --git a/components/execd/pkg/runtime/bash_session_windows.go b/components/execd/pkg/runtime/bash_session_windows.go new file mode 100644 index 00000000..8b65db81 --- /dev/null +++ b/components/execd/pkg/runtime/bash_session_windows.go @@ -0,0 +1,67 @@ +// Copyright 2026 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build windows +// +build windows + +package runtime + +import ( + "context" + "errors" + "time" +) + +var errBashSessionNotSupported = errors.New("bash session is not supported on windows") + +func (c *Controller) createBashSession(_ *CreateContextRequest) (string, error) { + return "", errBashSessionNotSupported +} + +func (c *Controller) runBashSession(_ context.Context, _ *ExecuteCodeRequest) error { //nolint:revive + return errBashSessionNotSupported +} + +func (c *Controller) createDefaultBashSession() error { //nolint:revive + return errBashSessionNotSupported +} + +func (c *Controller) getBashSession(_ string) (*bashSession, error) { //nolint:revive + return nil, errBashSessionNotSupported +} + +func (c *Controller) closeBashSession(_ string) error { //nolint:revive + return errBashSessionNotSupported +} + +func (c *Controller) listBashSessions() []string { //nolint:revive + return nil +} + +// Stub methods on bashSession to satisfy interfaces on non-Linux platforms. +func newBashSession(config *bashSessionConfig) *bashSession { + return &bashSession{config: config} +} + +func (s *bashSession) start() (string, error) { + return "", errBashSessionNotSupported +} + +func (s *bashSession) run(_ string, _ time.Duration, _ *ExecuteResultHook) error { + return errBashSessionNotSupported +} + +func (s *bashSession) close() error { + return nil +} diff --git a/components/execd/pkg/runtime/context.go b/components/execd/pkg/runtime/context.go index a1135507..0f9abadc 100644 --- a/components/execd/pkg/runtime/context.go +++ b/components/execd/pkg/runtime/context.go @@ -32,6 +32,11 @@ import ( // CreateContext provisions a kernel-backed session and returns its ID. func (c *Controller) CreateContext(req *CreateContextRequest) (string, error) { + if req.Language == Bash { + return c.createBashSession(req) + } + + // Create a new Jupyter session. var ( client *jupyter.Client session *jupytersession.Session @@ -42,7 +47,7 @@ func (c *Controller) CreateContext(req *CreateContextRequest) (string, error) { log.Error("failed to create session, retrying: %v", err) return err != nil }, func() error { - client, session, err = c.createContext(*req) + client, session, err = c.createJupyterContext(*req) return err }) if err != nil { @@ -120,9 +125,9 @@ func (c *Controller) deleteSessionAndCleanup(session string) error { defer c.mu.Unlock() delete(c.jupyterClientMap, session) - for lang, id := range c.defaultLanguageJupyterSessions { + for lang, id := range c.defaultLanguageSessions { if id == session { - delete(c.defaultLanguageJupyterSessions, lang) + delete(c.defaultLanguageSessions, lang) } } return nil @@ -143,8 +148,8 @@ func (c *Controller) newIpynbPath(sessionID, cwd string) (string, error) { return filepath.Join(cwd, fmt.Sprintf("%s.ipynb", sessionID)), nil } -// createDefaultLanguageContext prewarms a session for stateless execution. -func (c *Controller) createDefaultLanguageContext(language Language) error { +// createDefaultLanguageJupyterContext prewarms a session for stateless execution. +func (c *Controller) createDefaultLanguageJupyterContext(language Language) error { var ( client *jupyter.Client session *jupytersession.Session @@ -154,7 +159,7 @@ func (c *Controller) createDefaultLanguageContext(language Language) error { log.Error("failed to create context, retrying: %v", err) return err != nil }, func() error { - client, session, err = c.createContext(CreateContextRequest{ + client, session, err = c.createJupyterContext(CreateContextRequest{ Language: language, Cwd: "", }) @@ -167,7 +172,7 @@ func (c *Controller) createDefaultLanguageContext(language Language) error { c.mu.Lock() defer c.mu.Unlock() - c.defaultLanguageJupyterSessions[language] = session.ID + c.defaultLanguageSessions[language] = session.ID c.jupyterClientMap[session.ID] = &jupyterKernel{ kernelID: session.Kernel.ID, client: client, @@ -176,8 +181,8 @@ func (c *Controller) createDefaultLanguageContext(language Language) error { return nil } -// createContext performs the actual context creation workflow. -func (c *Controller) createContext(request CreateContextRequest) (*jupyter.Client, *jupytersession.Session, error) { +// createJupyterContext performs the actual context creation workflow. +func (c *Controller) createJupyterContext(request CreateContextRequest) (*jupyter.Client, *jupytersession.Session, error) { client := c.jupyterClient() kernel, err := c.searchKernel(client, request.Language) @@ -250,7 +255,7 @@ func (c *Controller) listAllContexts() ([]CodeContext, error) { } } - for language, defaultContext := range c.defaultLanguageJupyterSessions { + for language, defaultContext := range c.defaultLanguageSessions { contexts = append(contexts, CodeContext{ ID: defaultContext, Language: language, @@ -274,7 +279,7 @@ func (c *Controller) listLanguageContexts(language Language) ([]CodeContext, err } } - if defaultContext := c.defaultLanguageJupyterSessions[language]; defaultContext != "" { + if defaultContext := c.defaultLanguageSessions[language]; defaultContext != "" { contexts = append(contexts, CodeContext{ ID: defaultContext, Language: language, diff --git a/components/execd/pkg/runtime/context_test.go b/components/execd/pkg/runtime/context_test.go index 6a27ad18..07ee6da3 100644 --- a/components/execd/pkg/runtime/context_test.go +++ b/components/execd/pkg/runtime/context_test.go @@ -27,7 +27,7 @@ import ( func TestListContextsAndNewIpynbPath(t *testing.T) { c := NewController("http://example", "token") c.jupyterClientMap["session-python"] = &jupyterKernel{language: Python} - c.defaultLanguageJupyterSessions[Go] = "session-go-default" + c.defaultLanguageSessions[Go] = "session-go-default" pyContexts, err := c.listLanguageContexts(Python) if err != nil { @@ -129,7 +129,7 @@ func TestDeleteContext_RemovesCacheOnSuccess(t *testing.T) { c := NewController(server.URL, "token") c.jupyterClientMap[sessionID] = &jupyterKernel{language: Python} - c.defaultLanguageJupyterSessions[Python] = sessionID + c.defaultLanguageSessions[Python] = sessionID if err := c.DeleteContext(sessionID); err != nil { t.Fatalf("DeleteContext returned error: %v", err) @@ -138,7 +138,7 @@ func TestDeleteContext_RemovesCacheOnSuccess(t *testing.T) { if kernel := c.getJupyterKernel(sessionID); kernel != nil { t.Fatalf("expected cache to be cleared, found: %+v", kernel) } - if _, ok := c.defaultLanguageJupyterSessions[Python]; ok { + if _, ok := c.defaultLanguageSessions[Python]; ok { t.Fatalf("expected default session entry to be removed") } } @@ -168,7 +168,7 @@ func TestDeleteLanguageContext_RemovesCacheOnSuccess(t *testing.T) { c := NewController(server.URL, "token") c.jupyterClientMap[session1] = &jupyterKernel{language: lang} c.jupyterClientMap[session2] = &jupyterKernel{language: lang} - c.defaultLanguageJupyterSessions[lang] = session2 + c.defaultLanguageSessions[lang] = session2 if err := c.DeleteLanguageContext(lang); err != nil { t.Fatalf("DeleteLanguageContext returned error: %v", err) @@ -180,7 +180,7 @@ func TestDeleteLanguageContext_RemovesCacheOnSuccess(t *testing.T) { if _, ok := c.jupyterClientMap[session2]; ok { t.Fatalf("expected session2 removed from cache") } - if _, ok := c.defaultLanguageJupyterSessions[lang]; ok { + if _, ok := c.defaultLanguageSessions[lang]; ok { t.Fatalf("expected default entry removed") } if deleteCalls[session1] != 1 || deleteCalls[session2] != 1 { diff --git a/components/execd/pkg/runtime/ctrl.go b/components/execd/pkg/runtime/ctrl.go index 20bbecc6..81332afc 100644 --- a/components/execd/pkg/runtime/ctrl.go +++ b/components/execd/pkg/runtime/ctrl.go @@ -35,14 +35,15 @@ var kernelWaitingBackoff = wait.Backoff{ // Controller manages code execution across runtimes. type Controller struct { - baseURL string - token string - mu sync.RWMutex - jupyterClientMap map[string]*jupyterKernel - defaultLanguageJupyterSessions map[Language]string - commandClientMap map[string]*commandKernel - db *sql.DB - dbOnce sync.Once + baseURL string + token string + mu sync.RWMutex + jupyterClientMap map[string]*jupyterKernel + defaultLanguageSessions map[Language]string + commandClientMap map[string]*commandKernel + bashSessionClientMap map[string]*bashSession + db *sql.DB + dbOnce sync.Once } type jupyterKernel struct { @@ -71,9 +72,10 @@ func NewController(baseURL, token string) *Controller { baseURL: baseURL, token: token, - jupyterClientMap: make(map[string]*jupyterKernel), - defaultLanguageJupyterSessions: make(map[Language]string), - commandClientMap: make(map[string]*commandKernel), + jupyterClientMap: make(map[string]*jupyterKernel), + defaultLanguageSessions: make(map[Language]string), + commandClientMap: make(map[string]*commandKernel), + bashSessionClientMap: make(map[string]*bashSession), } } @@ -93,10 +95,12 @@ func (c *Controller) Execute(request *ExecuteCodeRequest) error { return c.runCommand(ctx, request) case BackgroundCommand: return c.runBackgroundCommand(ctx, request) - case Bash, Python, Java, JavaScript, TypeScript, Go: + case Python, Java, JavaScript, TypeScript, Go: return c.runJupyter(ctx, request) case SQL: return c.runSQL(ctx, request) + case Bash: + return c.runBashSession(ctx, request) default: return fmt.Errorf("unknown language: %s", request.Language) } diff --git a/components/execd/pkg/runtime/interrupt.go b/components/execd/pkg/runtime/interrupt.go index 1a9515fa..67902a3d 100644 --- a/components/execd/pkg/runtime/interrupt.go +++ b/components/execd/pkg/runtime/interrupt.go @@ -38,6 +38,8 @@ func (c *Controller) Interrupt(sessionID string) error { case c.getCommandKernel(sessionID) != nil: kernel := c.getCommandKernel(sessionID) return c.killPid(kernel.pid) + case c.getBashSession(sessionID) != nil: + return c.closeBashSession(sessionID) default: return errors.New("no such session") } diff --git a/components/execd/pkg/runtime/jupyter.go b/components/execd/pkg/runtime/jupyter.go index cdc0a6cc..ba53abaf 100644 --- a/components/execd/pkg/runtime/jupyter.go +++ b/components/execd/pkg/runtime/jupyter.go @@ -29,8 +29,8 @@ func (c *Controller) runJupyter(ctx context.Context, request *ExecuteCodeRequest return errors.New("language runtime server not configured, please check your image runtime") } if request.Context == "" { - if _, exists := c.defaultLanguageJupyterSessions[request.Language]; !exists { - err := c.createDefaultLanguageContext(request.Language) + if _, exists := c.defaultLanguageSessions[request.Language]; !exists { + err := c.createDefaultLanguageJupyterContext(request.Language) if err != nil { return err } @@ -39,7 +39,7 @@ func (c *Controller) runJupyter(ctx context.Context, request *ExecuteCodeRequest var targetSessionID string if request.Context == "" { - targetSessionID = c.defaultLanguageJupyterSessions[request.Language] + targetSessionID = c.defaultLanguageSessions[request.Language] } else { targetSessionID = request.Context } diff --git a/components/execd/pkg/runtime/types.go b/components/execd/pkg/runtime/types.go index cb82a11b..5cd5adda 100644 --- a/components/execd/pkg/runtime/types.go +++ b/components/execd/pkg/runtime/types.go @@ -16,6 +16,9 @@ package runtime import ( "fmt" + "io" + "os/exec" + "sync" "time" "github.com/alibaba/opensandbox/execd/pkg/jupyter/execute" @@ -80,3 +83,33 @@ type CodeContext struct { ID string `json:"id,omitempty"` Language Language `json:"language"` } + +// bashSessionConfig holds bash session configuration. +type bashSessionConfig struct { + // StartupSource is a list of scripts sourced on startup. + StartupSource []string + // Session is the session identifier. + Session string + // StartupTimeout is the startup timeout. + StartupTimeout time.Duration +} + +const ( + // exitCodePrefix marks the beginning of exit code output. + exitCodePrefix = "EXITCODESTART" + // exitCodeSuffix marks the end of exit code output. + exitCodeSuffix = "EXITCODEEND" +) + +// bashSession represents a bash session. +type bashSession struct { + config *bashSessionConfig + cmd *exec.Cmd + stdin io.WriteCloser + stdout io.ReadCloser + stderr io.ReadCloser + stdoutLines chan string + stdoutErr chan error + mu sync.Mutex + started bool +} From d9c20ab153a2e6c7c007c724f8e690c476d70659 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=AB=98=E7=84=B6?= Date: Sat, 17 Jan 2026 12:12:10 +0800 Subject: [PATCH 02/16] feat(components/execd): use concurrent-safe maps to avoid single point of dependency on global locks. --- .github/workflows/execd-test.yml | 2 +- components/execd/pkg/runtime/bash_session.go | 40 +++---- .../execd/pkg/runtime/command_common.go | 15 ++- .../execd/pkg/runtime/command_status.go | 17 +-- components/execd/pkg/runtime/context.go | 105 +++++++++--------- components/execd/pkg/runtime/context_test.go | 23 ++-- components/execd/pkg/runtime/ctrl.go | 16 +-- components/execd/pkg/runtime/jupyter.go | 17 +-- 8 files changed, 120 insertions(+), 115 deletions(-) diff --git a/.github/workflows/execd-test.yml b/.github/workflows/execd-test.yml index 43a7fc3d..e3004429 100644 --- a/.github/workflows/execd-test.yml +++ b/.github/workflows/execd-test.yml @@ -25,7 +25,7 @@ jobs: - name: Run golint run: | cd components/execd - # make golint + make golint - name: Build (Multi platform compile) run: | diff --git a/components/execd/pkg/runtime/bash_session.go b/components/execd/pkg/runtime/bash_session.go index 01f583f1..bde8b711 100644 --- a/components/execd/pkg/runtime/bash_session.go +++ b/components/execd/pkg/runtime/bash_session.go @@ -35,24 +35,20 @@ import ( ) func (c *Controller) createBashSession(_ *CreateContextRequest) (string, error) { - c.mu.Lock() - defer c.mu.Unlock() - session := newBashSession(nil) if err := session.start(); err != nil { return "", fmt.Errorf("failed to start bash session: %w", err) } - c.bashSessionClientMap[session.config.Session] = session + c.bashSessionClientMap.Store(session.config.Session, session) log.Info("created bash session %s", session.config.Session) return session.config.Session, nil } func (c *Controller) runBashSession(_ context.Context, request *ExecuteCodeRequest) error { if request.Context == "" { - if _, exists := c.defaultLanguageSessions[request.Language]; !exists { - err := c.createDefaultBashSession() - if err != nil { + if c.getDefaultLanguageSession(request.Language) == "" { + if err := c.createDefaultBashSession(); err != nil { return err } } @@ -60,7 +56,7 @@ func (c *Controller) runBashSession(_ context.Context, request *ExecuteCodeReque targetSessionID := request.Context if targetSessionID == "" { - targetSessionID = c.defaultLanguageSessions[request.Language] + targetSessionID = c.getDefaultLanguageSession(request.Language) } session := c.getBashSession(targetSessionID) @@ -77,15 +73,17 @@ func (c *Controller) createDefaultBashSession() error { return err } - c.defaultLanguageSessions[Bash] = session + c.setDefaultLanguageSession(Bash, session) return nil } func (c *Controller) getBashSession(sessionId string) *bashSession { - c.mu.RLock() - defer c.mu.RUnlock() - - return c.bashSessionClientMap[sessionId] + if v, ok := c.bashSessionClientMap.Load(sessionId); ok { + if s, ok := v.(*bashSession); ok { + return s + } + } + return nil } func (c *Controller) closeBashSession(sessionId string) error { @@ -94,25 +92,23 @@ func (c *Controller) closeBashSession(sessionId string) error { return ErrContextNotFound } - c.mu.Lock() - defer c.mu.Unlock() err := session.close() if err != nil { return err } - delete(c.bashSessionClientMap, sessionId) + c.bashSessionClientMap.Delete(sessionId) return nil } +// nolint:unused func (c *Controller) listBashSessions() []string { - c.mu.RLock() - defer c.mu.RUnlock() - - sessions := make([]string, 0, len(c.bashSessionClientMap)) - for sessionID := range c.bashSessionClientMap { + sessions := make([]string, 0) + c.bashSessionClientMap.Range(func(key, _ any) bool { + sessionID, _ := key.(string) sessions = append(sessions, sessionID) - } + return true + }) return sessions } diff --git a/components/execd/pkg/runtime/command_common.go b/components/execd/pkg/runtime/command_common.go index 633efa35..4f49ebbc 100644 --- a/components/execd/pkg/runtime/command_common.go +++ b/components/execd/pkg/runtime/command_common.go @@ -45,18 +45,17 @@ func (c *Controller) tailStdPipe(file string, onExecute func(text string), done // getCommandKernel retrieves a command execution context. func (c *Controller) getCommandKernel(sessionID string) *commandKernel { - c.mu.RLock() - defer c.mu.RUnlock() - - return c.commandClientMap[sessionID] + if v, ok := c.commandClientMap.Load(sessionID); ok { + if kernel, ok := v.(*commandKernel); ok { + return kernel + } + } + return nil } // storeCommandKernel registers a command execution context. func (c *Controller) storeCommandKernel(sessionID string, kernel *commandKernel) { - c.mu.Lock() - defer c.mu.Unlock() - - c.commandClientMap[sessionID] = kernel + c.commandClientMap.Store(sessionID, kernel) } // stdLogDescriptor creates temporary files for capturing command output. diff --git a/components/execd/pkg/runtime/command_status.go b/components/execd/pkg/runtime/command_status.go index 97f112b1..6dbc6d4f 100644 --- a/components/execd/pkg/runtime/command_status.go +++ b/components/execd/pkg/runtime/command_status.go @@ -40,11 +40,11 @@ type CommandOutput struct { } func (c *Controller) commandSnapshot(session string) *commandKernel { - c.mu.RLock() - defer c.mu.RUnlock() - - kernel, ok := c.commandClientMap[session] - if !ok || kernel == nil { + var kernel *commandKernel + if v, ok := c.commandClientMap.Load(session); ok { + kernel, _ = v.(*commandKernel) + } + if kernel == nil { return nil } @@ -116,8 +116,11 @@ func (c *Controller) markCommandFinished(session string, exitCode int, errMsg st c.mu.Lock() defer c.mu.Unlock() - kernel, ok := c.commandClientMap[session] - if !ok || kernel == nil { + var kernel *commandKernel + if v, ok := c.commandClientMap.Load(session); ok { + kernel, _ = v.(*commandKernel) + } + if kernel == nil { return } diff --git a/components/execd/pkg/runtime/context.go b/components/execd/pkg/runtime/context.go index 0f9abadc..6e7ea870 100644 --- a/components/execd/pkg/runtime/context.go +++ b/components/execd/pkg/runtime/context.go @@ -121,15 +121,8 @@ func (c *Controller) deleteSessionAndCleanup(session string) error { return err } - c.mu.Lock() - defer c.mu.Unlock() - - delete(c.jupyterClientMap, session) - for lang, id := range c.defaultLanguageSessions { - if id == session { - delete(c.defaultLanguageSessions, lang) - } - } + c.jupyterClientMap.Delete(session) + c.deleteDefaultSessionByID(session) return nil } @@ -150,6 +143,10 @@ func (c *Controller) newIpynbPath(sessionID, cwd string) (string, error) { // createDefaultLanguageJupyterContext prewarms a session for stateless execution. func (c *Controller) createDefaultLanguageJupyterContext(language Language) error { + if c.getDefaultLanguageSession(language) != "" { + return nil + } + var ( client *jupyter.Client session *jupytersession.Session @@ -169,15 +166,12 @@ func (c *Controller) createDefaultLanguageJupyterContext(language Language) erro return err } - c.mu.Lock() - defer c.mu.Unlock() - - c.defaultLanguageSessions[language] = session.ID - c.jupyterClientMap[session.ID] = &jupyterKernel{ + c.setDefaultLanguageSession(language, session.ID) + c.jupyterClientMap.Store(session.ID, &jupyterKernel{ kernelID: session.Kernel.ID, client: client, language: language, - } + }) return nil } @@ -222,10 +216,7 @@ func (c *Controller) createJupyterContext(request CreateContextRequest) (*jupyte // storeJupyterKernel caches a session -> kernel mapping. func (c *Controller) storeJupyterKernel(sessionID string, kernel *jupyterKernel) { - c.mu.Lock() - defer c.mu.Unlock() - - c.jupyterClientMap[sessionID] = kernel + c.jupyterClientMap.Store(sessionID, kernel) } func (c *Controller) jupyterClient() *jupyter.Client { @@ -241,49 +232,63 @@ func (c *Controller) jupyterClient() *jupyter.Client { jupyter.WithHTTPClient(httpClient)) } -func (c *Controller) listAllContexts() ([]CodeContext, error) { - c.mu.RLock() - defer c.mu.RUnlock() +func (c *Controller) getDefaultLanguageSession(language Language) string { + if v, ok := c.defaultLanguageSessions.Load(language); ok { + if session, ok := v.(string); ok { + return session + } + } + return "" +} + +func (c *Controller) setDefaultLanguageSession(language Language, sessionID string) { + c.defaultLanguageSessions.Store(language, sessionID) +} + +func (c *Controller) deleteDefaultSessionByID(sessionID string) { + c.defaultLanguageSessions.Range(func(key, value any) bool { + if s, ok := value.(string); ok && s == sessionID { + c.defaultLanguageSessions.Delete(key) + } + return true + }) +} +func (c *Controller) listAllContexts() ([]CodeContext, error) { contexts := make([]CodeContext, 0) - for session, kernel := range c.jupyterClientMap { - if kernel != nil { - contexts = append(contexts, CodeContext{ - ID: session, - Language: kernel.language, - }) + c.jupyterClientMap.Range(func(key, value any) bool { + session, _ := key.(string) + if kernel, ok := value.(*jupyterKernel); ok && kernel != nil { + contexts = append(contexts, CodeContext{ID: session, Language: kernel.language}) } - } + return true + }) - for language, defaultContext := range c.defaultLanguageSessions { - contexts = append(contexts, CodeContext{ - ID: defaultContext, - Language: language, - }) - } + c.defaultLanguageSessions.Range(func(key, value any) bool { + lang, _ := key.(Language) + session, _ := value.(string) + if session == "" { + return true + } + contexts = append(contexts, CodeContext{ID: session, Language: lang}) + return true + }) return contexts, nil } func (c *Controller) listLanguageContexts(language Language) ([]CodeContext, error) { - c.mu.RLock() - defer c.mu.RUnlock() - contexts := make([]CodeContext, 0) - for session, kernel := range c.jupyterClientMap { - if kernel != nil && kernel.language == language { - contexts = append(contexts, CodeContext{ - ID: session, - Language: language, - }) + c.jupyterClientMap.Range(func(key, value any) bool { + session, _ := key.(string) + if kernel, ok := value.(*jupyterKernel); ok && kernel != nil && kernel.language == language { + contexts = append(contexts, CodeContext{ID: session, Language: language}) } - } + return true + }) - if defaultContext := c.defaultLanguageSessions[language]; defaultContext != "" { - contexts = append(contexts, CodeContext{ - ID: defaultContext, - Language: language, - }) + if defaultContext := c.getDefaultLanguageSession(language); defaultContext != "" { + contexts = append(contexts, CodeContext{ID: defaultContext, Language: language}) } return contexts, nil diff --git a/components/execd/pkg/runtime/context_test.go b/components/execd/pkg/runtime/context_test.go index 07ee6da3..43efe81c 100644 --- a/components/execd/pkg/runtime/context_test.go +++ b/components/execd/pkg/runtime/context_test.go @@ -26,8 +26,9 @@ import ( func TestListContextsAndNewIpynbPath(t *testing.T) { c := NewController("http://example", "token") - c.jupyterClientMap["session-python"] = &jupyterKernel{language: Python} - c.defaultLanguageSessions[Go] = "session-go-default" + + c.jupyterClientMap.Store("session-python", &jupyterKernel{language: Python}) + c.setDefaultLanguageSession(Go, "session-go-default") pyContexts, err := c.listLanguageContexts(Python) if err != nil { @@ -128,8 +129,8 @@ func TestDeleteContext_RemovesCacheOnSuccess(t *testing.T) { defer server.Close() c := NewController(server.URL, "token") - c.jupyterClientMap[sessionID] = &jupyterKernel{language: Python} - c.defaultLanguageSessions[Python] = sessionID + c.jupyterClientMap.Store(sessionID, &jupyterKernel{language: Python}) + c.setDefaultLanguageSession(Python, sessionID) if err := c.DeleteContext(sessionID); err != nil { t.Fatalf("DeleteContext returned error: %v", err) @@ -138,7 +139,7 @@ func TestDeleteContext_RemovesCacheOnSuccess(t *testing.T) { if kernel := c.getJupyterKernel(sessionID); kernel != nil { t.Fatalf("expected cache to be cleared, found: %+v", kernel) } - if _, ok := c.defaultLanguageSessions[Python]; ok { + if c.getDefaultLanguageSession(Python) != "" { t.Fatalf("expected default session entry to be removed") } } @@ -166,21 +167,21 @@ func TestDeleteLanguageContext_RemovesCacheOnSuccess(t *testing.T) { defer server.Close() c := NewController(server.URL, "token") - c.jupyterClientMap[session1] = &jupyterKernel{language: lang} - c.jupyterClientMap[session2] = &jupyterKernel{language: lang} - c.defaultLanguageSessions[lang] = session2 + c.jupyterClientMap.Store(session1, &jupyterKernel{language: lang}) + c.jupyterClientMap.Store(session2, &jupyterKernel{language: lang}) + c.setDefaultLanguageSession(lang, session2) if err := c.DeleteLanguageContext(lang); err != nil { t.Fatalf("DeleteLanguageContext returned error: %v", err) } - if _, ok := c.jupyterClientMap[session1]; ok { + if v, ok := c.jupyterClientMap.Load(session1); ok && v != nil { t.Fatalf("expected session1 removed from cache") } - if _, ok := c.jupyterClientMap[session2]; ok { + if v, ok := c.jupyterClientMap.Load(session2); ok && v != nil { t.Fatalf("expected session2 removed from cache") } - if _, ok := c.defaultLanguageSessions[lang]; ok { + if c.getDefaultLanguageSession(lang) != "" { t.Fatalf("expected default entry removed") } if deleteCalls[session1] != 1 || deleteCalls[session2] != 1 { diff --git a/components/execd/pkg/runtime/ctrl.go b/components/execd/pkg/runtime/ctrl.go index 81332afc..2bb1967b 100644 --- a/components/execd/pkg/runtime/ctrl.go +++ b/components/execd/pkg/runtime/ctrl.go @@ -38,10 +38,10 @@ type Controller struct { baseURL string token string mu sync.RWMutex - jupyterClientMap map[string]*jupyterKernel - defaultLanguageSessions map[Language]string - commandClientMap map[string]*commandKernel - bashSessionClientMap map[string]*bashSession + jupyterClientMap sync.Map // sessionID -> *jupyterKernel + defaultLanguageSessions sync.Map // Language -> sessionID + commandClientMap sync.Map // sessionID -> *commandKernel + bashSessionClientMap sync.Map // sessionID -> *bashSession db *sql.DB dbOnce sync.Once } @@ -72,10 +72,10 @@ func NewController(baseURL, token string) *Controller { baseURL: baseURL, token: token, - jupyterClientMap: make(map[string]*jupyterKernel), - defaultLanguageSessions: make(map[Language]string), - commandClientMap: make(map[string]*commandKernel), - bashSessionClientMap: make(map[string]*bashSession), + jupyterClientMap: sync.Map{}, + defaultLanguageSessions: sync.Map{}, + commandClientMap: sync.Map{}, + bashSessionClientMap: sync.Map{}, } } diff --git a/components/execd/pkg/runtime/jupyter.go b/components/execd/pkg/runtime/jupyter.go index ba53abaf..9ea33b13 100644 --- a/components/execd/pkg/runtime/jupyter.go +++ b/components/execd/pkg/runtime/jupyter.go @@ -29,9 +29,8 @@ func (c *Controller) runJupyter(ctx context.Context, request *ExecuteCodeRequest return errors.New("language runtime server not configured, please check your image runtime") } if request.Context == "" { - if _, exists := c.defaultLanguageSessions[request.Language]; !exists { - err := c.createDefaultLanguageJupyterContext(request.Language) - if err != nil { + if c.getDefaultLanguageSession(request.Language) == "" { + if err := c.createDefaultLanguageJupyterContext(request.Language); err != nil { return err } } @@ -39,7 +38,7 @@ func (c *Controller) runJupyter(ctx context.Context, request *ExecuteCodeRequest var targetSessionID string if request.Context == "" { - targetSessionID = c.defaultLanguageSessions[request.Language] + targetSessionID = c.getDefaultLanguageSession(request.Language) } else { targetSessionID = request.Context } @@ -135,10 +134,12 @@ func (c *Controller) setWorkingDir(_ *jupyterKernel, _ *CreateContextRequest) er // getJupyterKernel retrieves a kernel connection from the session map. func (c *Controller) getJupyterKernel(sessionID string) *jupyterKernel { - c.mu.RLock() - defer c.mu.RUnlock() - - return c.jupyterClientMap[sessionID] + if v, ok := c.jupyterClientMap.Load(sessionID); ok { + if kernel, ok := v.(*jupyterKernel); ok { + return kernel + } + } + return nil } // searchKernel finds a kernel spec name for the given language. From 31db4acab35fdaaa9393a3921f948a7bba703cc3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=AB=98=E7=84=B6?= Date: Sun, 18 Jan 2026 11:15:20 +0800 Subject: [PATCH 03/16] feat(tests): add python integration test for bash execution --- .github/workflows/real-e2e.yml | 2 + .../tests/test_code_interpreter_e2e_sync.py | 92 +++++++++++++++++++ 2 files changed, 94 insertions(+) diff --git a/.github/workflows/real-e2e.yml b/.github/workflows/real-e2e.yml index 3e1c85d3..9d098b18 100644 --- a/.github/workflows/real-e2e.yml +++ b/.github/workflows/real-e2e.yml @@ -35,6 +35,8 @@ jobs: pip install uv - name: Run tests + env: + OPENSANDBOX_SANDBOX_DEFAULT_IMAGE: opensandbox/code-interpreter:latest run: | set -e diff --git a/tests/python/tests/test_code_interpreter_e2e_sync.py b/tests/python/tests/test_code_interpreter_e2e_sync.py index 95d8f9b7..9bae9f2e 100644 --- a/tests/python/tests/test_code_interpreter_e2e_sync.py +++ b/tests/python/tests/test_code_interpreter_e2e_sync.py @@ -893,3 +893,95 @@ def test_09_context_management_endpoints(self): assert len(final_contexts) == 0 logger.info("✓ delete_contexts removed all bash contexts") + @pytest.mark.timeout(300) + @pytest.mark.order(10) + def test_10_bash_env_propagation(self): + """Ensure bash commands share env/vars across sequential executions.""" + TestCodeInterpreterE2ESync._ensure_code_interpreter_created() + code_interpreter = TestCodeInterpreterE2ESync.code_interpreter + assert code_interpreter is not None + + stdout_messages: list[OutputMessage] = [] + stderr_messages: list[OutputMessage] = [] + errors: list[ExecutionError] = [] + completed_events: list[ExecutionComplete] = [] + init_events: list[ExecutionInit] = [] + + def on_stdout(msg: OutputMessage): + stdout_messages.append(msg) + + def on_stderr(msg: OutputMessage): + stderr_messages.append(msg) + + def on_error(err: ExecutionError): + errors.append(err) + + def on_complete(evt: ExecutionComplete): + completed_events.append(evt) + + def on_init(evt: ExecutionInit): + init_events.append(evt) + + handlers = ExecutionHandlersSync( + on_stdout=on_stdout, + on_stderr=on_stderr, + on_result=None, + on_error=on_error, + on_execution_complete=on_complete, + on_init=on_init, + ) + + # Send three sequential commands in the same session, validating env propagation. + code1 = ( + "export FOO=hello\n" + "export BAR=world\n" + ) + code2 = ( + "printf \"step1:$FOO:$BAR\\n\"\n" + ) + code3 = ( + "export FOO=${FOO}_next\n" + "printf \"step2:$FOO:$BAR\\n\"\n" + "export BAR=${BAR}_next\n" + "printf \"step3:$FOO:$BAR\\n\"\n" + ) + + # export envs + result1 = code_interpreter.codes.run( + code1, + language=SupportedLanguage.BASH, + handlers=handlers, + ) + + assert result1 is not None + assert result1.id is not None and str(result1.id).strip() + assert result1.error is None + + # print env + result2 = code_interpreter.codes.run( + code2, + language=SupportedLanguage.BASH, + handlers=handlers, + ) + + assert result2 is not None + assert result2.id is not None and str(result2.id).strip() + assert result2.error is None + + # print env + result3 = code_interpreter.codes.run( + code3, + language=SupportedLanguage.BASH, + handlers=handlers, + ) + assert result3 is not None + assert result3.id is not None and str(result3.id).strip() + assert result3.error is None + + # Expect at least three stdout lines with propagated env values. + stdout_texts = [m.text.strip() for m in stdout_messages if m.text] + assert "step1:hello:world" in stdout_texts + assert "step2:hello_next:world" in stdout_texts + assert "step3:hello_next:world_next" in stdout_texts + for m in stdout_messages[:3]: + _assert_recent_timestamp_ms(m.timestamp) From bb70a0d8ebe3466138ca764a6e151e43aaf10a1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=AB=98=E7=84=B6?= Date: Sun, 18 Jan 2026 11:15:20 +0800 Subject: [PATCH 04/16] feat(tests): add js integration test for bash execution --- .../tests/test_code_interpreter_e2e.test.ts | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/tests/javascript/tests/test_code_interpreter_e2e.test.ts b/tests/javascript/tests/test_code_interpreter_e2e.test.ts index 686fc407..6ae00a41 100644 --- a/tests/javascript/tests/test_code_interpreter_e2e.test.ts +++ b/tests/javascript/tests/test_code_interpreter_e2e.test.ts @@ -267,3 +267,58 @@ test("07 interrupt code execution + fake id", async () => { await expect(ci0.codes.interrupt(`fake-${Date.now()}`)).rejects.toBeTruthy(); }); + +test("08 bash env propagation across sequential executions", async () => { + if (!ci) throw new Error("not initialized"); + + const stdout: string[] = []; + const stderr: string[] = []; + const errors: string[] = []; + + const handlers: ExecutionHandlers = { + onStdout: (m) => { + if (m.text) stdout.push(m.text.trim()); + }, + onStderr: (m) => { + if (m.text) stderr.push(m.text.trim()); + }, + onError: (e) => { + errors.push(e.name); + }, + }; + + const code1 = "export FOO=hello\nexport BAR=world\n"; + const code2 = 'printf "step1:$FOO:$BAR\\n"\n'; + const code3 = + "export FOO=${FOO}_next\n" + + 'printf "step2:$FOO:$BAR\\n"\n' + + "export BAR=${BAR}_next\n" + + 'printf "step3:$FOO:$BAR\\n"\n'; + + const r1 = await ci.codes.run(code1, { + language: SupportedLanguages.BASH, + handlers, + }); + expect(r1.id).toBeTruthy(); + expect(r1.error).toBeUndefined(); + + const r2 = await ci.codes.run(code2, { + language: SupportedLanguages.BASH, + handlers, + }); + expect(r2.id).toBeTruthy(); + expect(r2.error).toBeUndefined(); + + const r3 = await ci.codes.run(code3, { + language: SupportedLanguages.BASH, + handlers, + }); + expect(r3.id).toBeTruthy(); + expect(r3.error).toBeUndefined(); + + expect(stdout).toContain("step1:hello:world"); + expect(stdout).toContain("step2:hello_next:world"); + expect(stdout).toContain("step3:hello_next:world_next"); + expect(errors).toHaveLength(0); + expect(stderr.filter((s) => s.length > 0)).toHaveLength(0); +}); From 1e2ed97a76dea5b8a5919397e792397e1a86c543 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=AB=98=E7=84=B6?= Date: Sun, 18 Jan 2026 14:28:20 +0800 Subject: [PATCH 05/16] fix(components/execd): reject commands after exit and surface clear session-terminated error --- components/execd/pkg/runtime/bash_session.go | 23 +++++++++++++++----- components/execd/pkg/runtime/types.go | 2 ++ 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/components/execd/pkg/runtime/bash_session.go b/components/execd/pkg/runtime/bash_session.go index bde8b711..8a9acdf0 100644 --- a/components/execd/pkg/runtime/bash_session.go +++ b/components/execd/pkg/runtime/bash_session.go @@ -176,6 +176,8 @@ func (s *bashSession) readStdout(r io.Reader) { s.stdoutLines <- strings.TrimRight(line, "\r\n") } if err != nil { + // mark session terminated so subsequent commands can reject early + s.terminated.Store(true) if !errors.Is(err, io.EOF) { s.stdoutErr <- err } @@ -193,6 +195,9 @@ func (s *bashSession) run(command string, timeout time.Duration, hooks *ExecuteR s.mu.Lock() defer s.mu.Unlock() + if s.terminated.Load() { + return errors.New("bash session is terminated (probably by exit); please create a new session") + } if !s.started { return errors.New("session not started") } @@ -203,9 +208,9 @@ func (s *bashSession) run(command string, timeout time.Duration, hooks *ExecuteR hooks.OnExecuteInit(s.config.Session) } - waitSeconds := timeout - if waitSeconds <= 0 { - waitSeconds = 30 * time.Second + wait := timeout + if wait <= 0 { + wait = 3600 * time.Second } cleanCmd := strings.ReplaceAll(command, "\n", " ; ") @@ -213,24 +218,30 @@ func (s *bashSession) run(command string, timeout time.Duration, hooks *ExecuteR // send command + marker cmdText := fmt.Sprintf("%s\nprintf \"%s$?%s\\n\"\n", cleanCmd, exitCodePrefix, exitCodeSuffix) if _, err := fmt.Fprint(s.stdin, cmdText); err != nil { + if errors.Is(err, io.ErrClosedPipe) || strings.Contains(err.Error(), "broken pipe") { + s.terminated.Store(true) + return errors.New("bash session is terminated (probably by exit); please create a new session") + } return fmt.Errorf("write command: %w", err) } // collect output until marker - timer := time.NewTimer(waitSeconds) + timer := time.NewTimer(wait) defer timer.Stop() for { select { case <-timer.C: - return fmt.Errorf("timeout after %s while running command %q", waitSeconds, command) + return fmt.Errorf("timeout after %s while running command %q", wait, command) case err := <-s.stdoutErr: if err != nil { + s.terminated.Store(true) return err } case line, ok := <-s.stdoutLines: if !ok { - return errors.New("stdout closed unexpectedly") + s.terminated.Store(true) + return errors.New("bash session stdout closed (probably by exit); please create a new session") } if _, ok := parseExitCodeLine(line); ok { if hooks != nil && hooks.OnExecuteComplete != nil { diff --git a/components/execd/pkg/runtime/types.go b/components/execd/pkg/runtime/types.go index 5cd5adda..402be1ce 100644 --- a/components/execd/pkg/runtime/types.go +++ b/components/execd/pkg/runtime/types.go @@ -19,6 +19,7 @@ import ( "io" "os/exec" "sync" + "sync/atomic" "time" "github.com/alibaba/opensandbox/execd/pkg/jupyter/execute" @@ -112,4 +113,5 @@ type bashSession struct { stdoutErr chan error mu sync.Mutex started bool + terminated atomic.Bool } From 2e9add91eb0848548de76848873024dde34042ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=AB=98=E7=84=B6?= Date: Sun, 18 Jan 2026 15:01:51 +0800 Subject: [PATCH 06/16] fix(components/execd): preserve bash exit status without killing session --- components/execd/pkg/runtime/bash_session.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/components/execd/pkg/runtime/bash_session.go b/components/execd/pkg/runtime/bash_session.go index 8a9acdf0..1ecaaf5c 100644 --- a/components/execd/pkg/runtime/bash_session.go +++ b/components/execd/pkg/runtime/bash_session.go @@ -215,8 +215,9 @@ func (s *bashSession) run(command string, timeout time.Duration, hooks *ExecuteR cleanCmd := strings.ReplaceAll(command, "\n", " ; ") - // send command + marker - cmdText := fmt.Sprintf("%s\nprintf \"%s$?%s\\n\"\n", cleanCmd, exitCodePrefix, exitCodeSuffix) + // send command + marker, preserving the user's last exit code + // use a subshell at the end to restore $? to the original exit code + cmdText := fmt.Sprintf("%s\n__c=$?\nprintf \"%s${__c}%s\\n\"\n(exit ${__c})\n", cleanCmd, exitCodePrefix, exitCodeSuffix) if _, err := fmt.Fprint(s.stdin, cmdText); err != nil { if errors.Is(err, io.ErrClosedPipe) || strings.Contains(err.Error(), "broken pipe") { s.terminated.Store(true) From 4b20500ddb4974520193d8dafbe3b2cb6294f0cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=AB=98=E7=84=B6?= Date: Sun, 18 Jan 2026 15:05:55 +0800 Subject: [PATCH 07/16] feat(sandboxes/code-interpreter): remove bash jupyter kernel installation --- sandboxes/code-interpreter/Dockerfile | 2 +- sandboxes/code-interpreter/README.md | 1 - sandboxes/code-interpreter/README_zh.md | 1 - sandboxes/code-interpreter/scripts/code-interpreter.sh | 8 -------- 4 files changed, 1 insertion(+), 11 deletions(-) diff --git a/sandboxes/code-interpreter/Dockerfile b/sandboxes/code-interpreter/Dockerfile index 2a1aa013..280f1b30 100644 --- a/sandboxes/code-interpreter/Dockerfile +++ b/sandboxes/code-interpreter/Dockerfile @@ -24,7 +24,7 @@ RUN set -euo pipefail \ echo "Setting up ipykernel for Python $version" \ && . /opt/opensandbox/code-interpreter-env.sh python $version \ && python3 --version \ - && python3 -m pip install ipykernel jupyter bash_kernel --break-system-packages; \ + && python3 -m pip install ipykernel jupyter --break-system-packages; \ done \ && echo "Setting up ipykernel complete" diff --git a/sandboxes/code-interpreter/README.md b/sandboxes/code-interpreter/README.md index 2b7943a2..b17072e1 100644 --- a/sandboxes/code-interpreter/README.md +++ b/sandboxes/code-interpreter/README.md @@ -144,7 +144,6 @@ The image comes with pre-configured Jupyter kernels for all supported languages: - **Java**: IJava kernel - **TypeScript/JavaScript**: tslab kernel - **Go**: gonb kernel -- **Bash**: bash_kernel ### Starting Jupyter diff --git a/sandboxes/code-interpreter/README_zh.md b/sandboxes/code-interpreter/README_zh.md index e68b2584..9a9bfacb 100644 --- a/sandboxes/code-interpreter/README_zh.md +++ b/sandboxes/code-interpreter/README_zh.md @@ -142,7 +142,6 @@ source /opt/opensandbox/code-interpreter-env.sh go - **Java**:IJava 内核 - **TypeScript/JavaScript**:tslab 内核 - **Go**:gonb 内核 -- **Bash**:bash_kernel ### 启动 Jupyter diff --git a/sandboxes/code-interpreter/scripts/code-interpreter.sh b/sandboxes/code-interpreter/scripts/code-interpreter.sh index 11968c1d..6d03691c 100755 --- a/sandboxes/code-interpreter/scripts/code-interpreter.sh +++ b/sandboxes/code-interpreter/scripts/code-interpreter.sh @@ -93,12 +93,6 @@ setup_go() { } } -setup_bash() { - time { - python3 -m bash_kernel.install - } -} - # export go bin path export PATH="$(go env GOPATH)/bin:$PATH" if [ -n "${EXECD_ENVS:-}" ]; then @@ -114,7 +108,5 @@ setup_node & pids+=($!) setup_go & pids+=($!) -setup_bash & -pids+=($!) jupyter notebook --ip=127.0.0.1 --port="${JUPYTER_PORT:-44771}" --allow-root --no-browser --NotebookApp.token="${JUPYTER_TOKEN:-opensandboxcodeinterpreterjupyter}" >/opt/opensandbox/jupyter.log From d8909daf8cd350a1d7181503dbf855739e507603 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=AB=98=E7=84=B6?= Date: Mon, 19 Jan 2026 10:33:47 +0800 Subject: [PATCH 08/16] fix(sandboxes/code-interpreter): fix stderr discard error --- components/execd/pkg/runtime/bash_session.go | 17 +++++------------ components/execd/pkg/runtime/types.go | 1 - 2 files changed, 5 insertions(+), 13 deletions(-) diff --git a/components/execd/pkg/runtime/bash_session.go b/components/execd/pkg/runtime/bash_session.go index 1ecaaf5c..74fe815e 100644 --- a/components/execd/pkg/runtime/bash_session.go +++ b/components/execd/pkg/runtime/bash_session.go @@ -123,7 +123,7 @@ func newBashSession(config *bashSessionConfig) *bashSession { } return &bashSession{ config: config, - stdoutLines: make(chan string, 256), + stdoutLines: make(chan string, 1024), stdoutErr: make(chan error, 1), } } @@ -147,10 +147,9 @@ func (s *bashSession) start() error { if err != nil { return fmt.Errorf("stdout pipe: %w", err) } - stderr, err := cmd.StderrPipe() - if err != nil { - return fmt.Errorf("stderr pipe: %w", err) - } + + // merge stderr into stdout + cmd.Stderr = cmd.Stdout if err := cmd.Start(); err != nil { return fmt.Errorf("start bash: %w", err) @@ -159,12 +158,10 @@ func (s *bashSession) start() error { s.cmd = cmd s.stdin = stdin s.stdout = stdout - s.stderr = stderr s.started = true // drain stdout/stderr into channel go s.readStdout(stdout) - go s.discardStderr(stderr) return nil } @@ -187,10 +184,6 @@ func (s *bashSession) readStdout(r io.Reader) { } } -func (s *bashSession) discardStderr(r io.Reader) { - _, _ = io.Copy(io.Discard, r) -} - func (s *bashSession) run(command string, timeout time.Duration, hooks *ExecuteResultHook) error { s.mu.Lock() defer s.mu.Unlock() @@ -210,7 +203,7 @@ func (s *bashSession) run(command string, timeout time.Duration, hooks *ExecuteR wait := timeout if wait <= 0 { - wait = 3600 * time.Second + wait = 24 * 3600 * time.Second // default to 24 hours } cleanCmd := strings.ReplaceAll(command, "\n", " ; ") diff --git a/components/execd/pkg/runtime/types.go b/components/execd/pkg/runtime/types.go index 402be1ce..ad6f620e 100644 --- a/components/execd/pkg/runtime/types.go +++ b/components/execd/pkg/runtime/types.go @@ -108,7 +108,6 @@ type bashSession struct { cmd *exec.Cmd stdin io.WriteCloser stdout io.ReadCloser - stderr io.ReadCloser stdoutLines chan string stdoutErr chan error mu sync.Mutex From d28a674de2f9e6e57d087b833224f81402f4cc99 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=AB=98=E7=84=B6?= Date: Mon, 19 Jan 2026 10:35:13 +0800 Subject: [PATCH 09/16] fix(sandboxes/code-interpreter): fix windows bash session start statement --- components/execd/pkg/runtime/bash_session_windows.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/components/execd/pkg/runtime/bash_session_windows.go b/components/execd/pkg/runtime/bash_session_windows.go index 8b65db81..9b1bac42 100644 --- a/components/execd/pkg/runtime/bash_session_windows.go +++ b/components/execd/pkg/runtime/bash_session_windows.go @@ -54,8 +54,8 @@ func newBashSession(config *bashSessionConfig) *bashSession { return &bashSession{config: config} } -func (s *bashSession) start() (string, error) { - return "", errBashSessionNotSupported +func (s *bashSession) start() error { + return errBashSessionNotSupported } func (s *bashSession) run(_ string, _ time.Duration, _ *ExecuteResultHook) error { From 2c9af4c60d2fbe427a7bfdb444c476650d66fb0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=AB=98=E7=84=B6?= Date: Mon, 19 Jan 2026 11:52:42 +0800 Subject: [PATCH 10/16] fix(tests): remove bash context management test --- tests/python/tests/test_code_interpreter_e2e.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/python/tests/test_code_interpreter_e2e.py b/tests/python/tests/test_code_interpreter_e2e.py index 9a2548de..88075f79 100644 --- a/tests/python/tests/test_code_interpreter_e2e.py +++ b/tests/python/tests/test_code_interpreter_e2e.py @@ -998,12 +998,12 @@ async def test_09_context_management_endpoints(self): code_interpreter = TestCodeInterpreterE2E.code_interpreter assert code_interpreter is not None - language = SupportedLanguage.BASH + language = SupportedLanguage.PYTHON logger.info("=" * 80) logger.info("TEST 9: Context management endpoints (%s)", language) logger.info("=" * 80) - # Ensure clean slate for bash contexts to avoid interference with other tests. + # Ensure clean slate for python contexts to avoid interference with other tests. await code_interpreter.codes.delete_contexts(language) ctx1 = await code_interpreter.codes.create_context(language) @@ -1012,14 +1012,14 @@ async def test_09_context_management_endpoints(self): assert ctx2.id is not None and ctx2.id.strip() assert ctx1.language == language assert ctx2.language == language - logger.info("✓ Created two bash contexts: %s, %s", ctx1.id, ctx2.id) + logger.info("✓ Created two python contexts: %s, %s", ctx1.id, ctx2.id) listed = await code_interpreter.codes.list_contexts(language) - bash_context_ids = {c.id for c in listed if c.id} - assert ctx1.id in bash_context_ids - assert ctx2.id in bash_context_ids + python_context_ids = {c.id for c in listed if c.id} + assert ctx1.id in python_context_ids + assert ctx2.id in python_context_ids assert all(c.language == language for c in listed) - logger.info("✓ list_contexts returned expected bash contexts") + logger.info("✓ list_contexts returned expected python contexts") fetched = await code_interpreter.codes.get_context(ctx1.id) assert fetched.id == ctx1.id @@ -1038,5 +1038,5 @@ async def test_09_context_management_endpoints(self): c for c in await code_interpreter.codes.list_contexts(language) if c.id ] assert len(final_contexts) == 0 - logger.info("✓ delete_contexts removed all bash contexts") + logger.info("✓ delete_contexts removed all python contexts") From 4d5372ac5cb03f533cd113b615f95ca8dd6409c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=AB=98=E7=84=B6?= Date: Thu, 29 Jan 2026 20:06:42 +0800 Subject: [PATCH 11/16] fix(components/execd): keep bash session newlines to support heredoc scripts --- components/execd/pkg/runtime/bash_session.go | 8 +- .../execd/pkg/runtime/bash_session_test.go | 100 ++++++++++++++++++ 2 files changed, 106 insertions(+), 2 deletions(-) diff --git a/components/execd/pkg/runtime/bash_session.go b/components/execd/pkg/runtime/bash_session.go index 74fe815e..b04da7c9 100644 --- a/components/execd/pkg/runtime/bash_session.go +++ b/components/execd/pkg/runtime/bash_session.go @@ -184,6 +184,7 @@ func (s *bashSession) readStdout(r io.Reader) { } } +//nolint:gocognit func (s *bashSession) run(command string, timeout time.Duration, hooks *ExecuteResultHook) error { s.mu.Lock() defer s.mu.Unlock() @@ -206,11 +207,14 @@ func (s *bashSession) run(command string, timeout time.Duration, hooks *ExecuteR wait = 24 * 3600 * time.Second // default to 24 hours } - cleanCmd := strings.ReplaceAll(command, "\n", " ; ") + cmdBody := command + if !strings.HasSuffix(cmdBody, "\n") { + cmdBody += "\n" + } // send command + marker, preserving the user's last exit code // use a subshell at the end to restore $? to the original exit code - cmdText := fmt.Sprintf("%s\n__c=$?\nprintf \"%s${__c}%s\\n\"\n(exit ${__c})\n", cleanCmd, exitCodePrefix, exitCodeSuffix) + cmdText := fmt.Sprintf("%s__c=$?\nprintf \"%s${__c}%s\\n\"\n(exit ${__c})\n", cmdBody, exitCodePrefix, exitCodeSuffix) if _, err := fmt.Fprint(s.stdin, cmdText); err != nil { if errors.Is(err, io.ErrClosedPipe) || strings.Contains(err.Error(), "broken pipe") { s.terminated.Store(true) diff --git a/components/execd/pkg/runtime/bash_session_test.go b/components/execd/pkg/runtime/bash_session_test.go index ac66ca0a..6529f950 100644 --- a/components/execd/pkg/runtime/bash_session_test.go +++ b/components/execd/pkg/runtime/bash_session_test.go @@ -18,6 +18,8 @@ package runtime import ( + "fmt" + "os" "strings" "testing" "time" @@ -173,6 +175,104 @@ func TestBashSessionEnvLargeOutputChained(t *testing.T) { } } +func TestBashSession_heredoc(t *testing.T) { + rewardDir := t.TempDir() + controller := NewController("", "") + + hooks := ExecuteResultHook{ + OnExecuteStdout: func(line string) { + fmt.Printf("[stdout] %s\n", line) + }, + OnExecuteComplete: func(d time.Duration) { + fmt.Printf("[complete] %s\n", d) + }, + } + + // First run: heredoc + reward file write. + script := fmt.Sprintf(` +set -x +reward_dir=%q +mkdir -p "$reward_dir" + +cat > /tmp/repro_script.sh <<'SHEOF' +#!/usr/bin/env sh +echo "hello heredoc" +SHEOF + +chmod +x /tmp/repro_script.sh +/tmp/repro_script.sh +echo "after heredoc" +echo 1 > "$reward_dir/reward.txt" +cat "$reward_dir/reward.txt" +`, rewardDir) + + if err := controller.Execute(&ExecuteCodeRequest{ + Language: Bash, + Timeout: 10 * time.Second, + Code: script, + Hooks: hooks, + }); err != nil { + fmt.Fprintf(os.Stderr, "first Execute failed: %v\n", err) + os.Exit(1) + } + + // Second run: ensure the session keeps working. + if err := controller.Execute(&ExecuteCodeRequest{ + Language: Bash, + Timeout: 5 * time.Second, + Code: "echo 'second command works'", + Hooks: hooks, + }); err != nil { + fmt.Fprintf(os.Stderr, "second Execute failed: %v\n", err) + os.Exit(1) + } +} + +func TestBashSession_execReplacesShell(t *testing.T) { + session := newBashSession(nil) + t.Cleanup(func() { _ = session.close() }) + + if err := session.start(); err != nil { + t.Fatalf("Start() error = %v", err) + } + + var stdoutLines []string + hooks := ExecuteResultHook{ + OnExecuteStdout: func(line string) { + stdoutLines = append(stdoutLines, line) + }, + } + + script := ` +cat > /tmp/exec_child.sh <<'EOF' +echo "child says hi" +EOF +chmod +x /tmp/exec_child.sh +exec /tmp/exec_child.sh +` + + err := session.run(script, 5*time.Second, &hooks) + if err == nil { + t.Fatalf("expected error because exec replaces the shell, got nil") + } + if !strings.Contains(err.Error(), "stdout closed") && !strings.Contains(err.Error(), "terminated") { + t.Fatalf("unexpected error for exec: %v", err) + } + if !containsLine(stdoutLines, "child says hi") { + t.Fatalf("expected child output, got %v", stdoutLines) + } + if !session.terminated.Load() { + t.Fatalf("expected session to be marked terminated after exec") + } + + // Subsequent run should fail immediately because the shell was replaced. + if err := session.run("echo still-alive", 2*time.Second, &hooks); err == nil { + t.Fatalf("expected run to fail after exec replaced the shell") + } else if !strings.Contains(err.Error(), "terminated") { + t.Fatalf("expected terminated error, got %v", err) + } +} + func containsLine(lines []string, target string) bool { for _, l := range lines { if strings.TrimSpace(l) == target { From ebb9b4ba52a13721548c4fb537b1d8a40c8ad54f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=AB=98=E7=84=B6?= Date: Fri, 30 Jan 2026 18:08:20 +0800 Subject: [PATCH 12/16] fix(components/execd): fix exec issue --- components/execd/pkg/runtime/bash_session.go | 351 +++++++++++++----- .../execd/pkg/runtime/bash_session_test.go | 65 +++- components/execd/pkg/runtime/types.go | 17 +- 3 files changed, 309 insertions(+), 124 deletions(-) diff --git a/components/execd/pkg/runtime/bash_session.go b/components/execd/pkg/runtime/bash_session.go index b04da7c9..f9de9e8d 100644 --- a/components/execd/pkg/runtime/bash_session.go +++ b/components/execd/pkg/runtime/bash_session.go @@ -25,6 +25,7 @@ import ( "io" "os" "os/exec" + "sort" "strconv" "strings" "time" @@ -34,6 +35,13 @@ import ( "github.com/alibaba/opensandbox/execd/pkg/log" ) +const ( + envDumpStartMarker = "__ENV_DUMP_START__" + envDumpEndMarker = "__ENV_DUMP_END__" + exitMarkerPrefix = "__EXIT_CODE__:" + pwdMarkerPrefix = "__PWD__:" +) + func (c *Controller) createBashSession(_ *CreateContextRequest) (string, error) { session := newBashSession(nil) if err := session.start(); err != nil { @@ -121,10 +129,17 @@ func newBashSession(config *bashSessionConfig) *bashSession { StartupTimeout: 5 * time.Second, } } + + env := make(map[string]string) + for _, kv := range os.Environ() { + if k, v, ok := splitEnvPair(kv); ok { + env[k] = v + } + } + return &bashSession{ - config: config, - stdoutLines: make(chan string, 1024), - stdoutErr: make(chan error, 1), + config: config, + env: env, } } @@ -136,136 +151,277 @@ func (s *bashSession) start() error { return errors.New("session already started") } - cmd := exec.Command("bash", "--noprofile", "--norc", "-s") - cmd.Env = os.Environ() + s.started = true + return nil +} - stdin, err := cmd.StdinPipe() - if err != nil { - return fmt.Errorf("stdin pipe: %w", err) +//nolint:gocognit +func (s *bashSession) run(command string, timeout time.Duration, hooks *ExecuteResultHook) error { + s.mu.Lock() + if !s.started { + s.mu.Unlock() + return errors.New("session not started") + } + + envSnapshot := copyEnvMap(s.env) + cwd := s.cwd + sessionID := s.config.Session + s.mu.Unlock() + + startAt := time.Now() + if hooks != nil && hooks.OnExecuteInit != nil { + hooks.OnExecuteInit(sessionID) } + + wait := timeout + if wait <= 0 { + wait = 24 * 3600 * time.Second // default to 24 hours + } + + ctx, cancel := context.WithTimeout(context.Background(), wait) + defer cancel() + + cmd := exec.CommandContext(ctx, "bash", "--noprofile", "--norc", "-s") + cmd.Env = envMapToSlice(envSnapshot) + stdout, err := cmd.StdoutPipe() if err != nil { return fmt.Errorf("stdout pipe: %w", err) } - - // merge stderr into stdout cmd.Stderr = cmd.Stdout + stdin, err := cmd.StdinPipe() + if err != nil { + return fmt.Errorf("stdin pipe: %w", err) + } + if err := cmd.Start(); err != nil { return fmt.Errorf("start bash: %w", err) } - s.cmd = cmd - s.stdin = stdin - s.stdout = stdout - s.started = true + script := buildWrappedScript(command, envSnapshot, cwd) + if _, err := io.WriteString(stdin, script); err != nil { + _ = stdin.Close() + _ = cmd.Wait() + return fmt.Errorf("write command: %w", err) + } + _ = stdin.Close() + + scanner := bufio.NewScanner(stdout) + scanner.Buffer(make([]byte, 0, 64*1024), 16*1024*1024) + + var ( + envLines []string + pwdLine string + exitCode *int + inEnv bool + ) + + for scanner.Scan() { + line := scanner.Text() + switch { + case line == envDumpStartMarker: + inEnv = true + case line == envDumpEndMarker: + inEnv = false + case strings.HasPrefix(line, exitMarkerPrefix): + if code, err := strconv.Atoi(strings.TrimPrefix(line, exitMarkerPrefix)); err == nil { + exitCode = &code + } + case strings.HasPrefix(line, pwdMarkerPrefix): + pwdLine = strings.TrimPrefix(line, pwdMarkerPrefix) + default: + if inEnv { + envLines = append(envLines, line) + continue + } + if hooks != nil && hooks.OnExecuteStdout != nil { + hooks.OnExecuteStdout(line) + } + } + } + + scanErr := scanner.Err() + waitErr := cmd.Wait() + + if scanErr != nil { + return fmt.Errorf("read stdout: %w", scanErr) + } + + if ctx.Err() == context.DeadlineExceeded { + return fmt.Errorf("timeout after %s while running command %q", wait, command) + } + + if exitCode == nil && cmd.ProcessState != nil { + code := cmd.ProcessState.ExitCode() + exitCode = &code + } + + updatedEnv := parseExportDump(envLines) + s.mu.Lock() + if len(updatedEnv) > 0 { + s.env = updatedEnv + } + if pwdLine != "" { + s.cwd = pwdLine + } + s.mu.Unlock() + + if hooks != nil && hooks.OnExecuteComplete != nil { + hooks.OnExecuteComplete(time.Since(startAt)) + } + + // Maintain previous behavior: non-zero exit codes do not surface as errors. + var exitErr *exec.ExitError + if waitErr != nil && !errors.As(waitErr, &exitErr) { + return waitErr + } - // drain stdout/stderr into channel - go s.readStdout(stdout) return nil } -func (s *bashSession) readStdout(r io.Reader) { - reader := bufio.NewReader(r) - for { - line, err := reader.ReadString('\n') - if len(line) > 0 { - s.stdoutLines <- strings.TrimRight(line, "\r\n") - } - if err != nil { - // mark session terminated so subsequent commands can reject early - s.terminated.Store(true) - if !errors.Is(err, io.EOF) { - s.stdoutErr <- err - } - close(s.stdoutLines) - return +func buildWrappedScript(command string, env map[string]string, cwd string) string { + var b strings.Builder + + keys := make([]string, 0, len(env)) + for k := range env { + if isValidEnvKey(k) { + keys = append(keys, k) } } -} - -//nolint:gocognit -func (s *bashSession) run(command string, timeout time.Duration, hooks *ExecuteResultHook) error { - s.mu.Lock() - defer s.mu.Unlock() + sort.Strings(keys) + for _, k := range keys { + b.WriteString("export ") + b.WriteString(k) + b.WriteString("=") + b.WriteString(shellEscape(env[k])) + b.WriteString("\n") + } - if s.terminated.Load() { - return errors.New("bash session is terminated (probably by exit); please create a new session") + if cwd != "" { + b.WriteString("cd ") + b.WriteString(shellEscape(cwd)) + b.WriteString("\n") } - if !s.started { - return errors.New("session not started") + + b.WriteString(command) + if !strings.HasSuffix(command, "\n") { + b.WriteString("\n") } - startAt := time.Now() + b.WriteString("__USER_EXIT_CODE__=$?\n") + b.WriteString("echo \"" + envDumpStartMarker + "\"\n") + b.WriteString("export -p\n") + b.WriteString("echo \"" + envDumpEndMarker + "\"\n") + b.WriteString("printf \"" + pwdMarkerPrefix + "%s\\n\" \"$(pwd)\"\n") + b.WriteString("printf \"" + exitMarkerPrefix + "%s\\n\" \"$__USER_EXIT_CODE__\"\n") + b.WriteString("exit \"$__USER_EXIT_CODE__\"\n") - if hooks != nil && hooks.OnExecuteInit != nil { - hooks.OnExecuteInit(s.config.Session) + return b.String() +} + +func parseExportDump(lines []string) map[string]string { + if len(lines) == 0 { + return nil } - wait := timeout - if wait <= 0 { - wait = 24 * 3600 * time.Second // default to 24 hours + env := make(map[string]string, len(lines)) + for _, line := range lines { + if k, v, ok := parseExportLine(line); ok { + env[k] = v + } } + return env +} - cmdBody := command - if !strings.HasSuffix(cmdBody, "\n") { - cmdBody += "\n" +func parseExportLine(line string) (string, string, bool) { + const prefix = "declare -x " + if !strings.HasPrefix(line, prefix) { + return "", "", false } - // send command + marker, preserving the user's last exit code - // use a subshell at the end to restore $? to the original exit code - cmdText := fmt.Sprintf("%s__c=$?\nprintf \"%s${__c}%s\\n\"\n(exit ${__c})\n", cmdBody, exitCodePrefix, exitCodeSuffix) - if _, err := fmt.Fprint(s.stdin, cmdText); err != nil { - if errors.Is(err, io.ErrClosedPipe) || strings.Contains(err.Error(), "broken pipe") { - s.terminated.Store(true) - return errors.New("bash session is terminated (probably by exit); please create a new session") + rest := strings.TrimSpace(strings.TrimPrefix(line, prefix)) + if rest == "" { + return "", "", false + } + + name := rest + value := "" + if eq := strings.Index(rest, "="); eq >= 0 { + name = rest[:eq] + raw := rest[eq+1:] + unquoted, err := strconv.Unquote(raw) + if err != nil { + value = strings.Trim(raw, `"`) + } else { + value = unquoted } - return fmt.Errorf("write command: %w", err) } - // collect output until marker - timer := time.NewTimer(wait) - defer timer.Stop() + if !isValidEnvKey(name) { + return "", "", false + } - for { - select { - case <-timer.C: - return fmt.Errorf("timeout after %s while running command %q", wait, command) - case err := <-s.stdoutErr: - if err != nil { - s.terminated.Store(true) - return err - } - case line, ok := <-s.stdoutLines: - if !ok { - s.terminated.Store(true) - return errors.New("bash session stdout closed (probably by exit); please create a new session") - } - if _, ok := parseExitCodeLine(line); ok { - if hooks != nil && hooks.OnExecuteComplete != nil { - hooks.OnExecuteComplete(time.Since(startAt)) - } - return nil - } - if hooks != nil && hooks.OnExecuteStdout != nil { - hooks.OnExecuteStdout(line) + return name, value, true +} + +func shellEscape(value string) string { + return "'" + strings.ReplaceAll(value, "'", `'"'"'`) + "'" +} + +func isValidEnvKey(key string) bool { + if key == "" { + return false + } + + for i, r := range key { + if i == 0 { + if (r < 'A' || (r > 'Z' && r < 'a') || r > 'z') && r != '_' { + return false } + continue + } + if (r < 'A' || (r > 'Z' && r < 'a') || r > 'z') && (r < '0' || r > '9') && r != '_' { + return false } } + + return true } -func parseExitCodeLine(line string) (int, bool) { - p := strings.Index(line, exitCodePrefix) - q := strings.Index(line, exitCodeSuffix) - if p < 0 || q <= p { - return 0, false +func copyEnvMap(src map[string]string) map[string]string { + if src == nil { + return map[string]string{} } - text := strings.TrimSpace(line[p+len(exitCodePrefix) : q]) - code, err := strconv.Atoi(text) - if err != nil { - return 0, false + + dst := make(map[string]string, len(src)) + for k, v := range src { + dst[k] = v + } + return dst +} + +func envMapToSlice(env map[string]string) []string { + if len(env) == 0 { + return os.Environ() + } + + out := make([]string, 0, len(env)) + for k, v := range env { + out = append(out, fmt.Sprintf("%s=%s", k, v)) } - return code, true + return out +} + +func splitEnvPair(kv string) (string, string, bool) { + parts := strings.SplitN(kv, "=", 2) + if len(parts) != 2 { + return "", "", false + } + if !isValidEnvKey(parts[0]) { + return "", "", false + } + return parts[0], parts[1], true } func (s *bashSession) close() error { @@ -276,13 +432,8 @@ func (s *bashSession) close() error { return nil } s.started = false - - if s.stdin != nil { - _ = s.stdin.Close() - } - if s.cmd != nil && s.cmd.Process != nil { - _ = s.cmd.Process.Kill() - } + s.env = nil + s.cwd = "" return nil } diff --git a/components/execd/pkg/runtime/bash_session_test.go b/components/execd/pkg/runtime/bash_session_test.go index 6529f950..91265ea6 100644 --- a/components/execd/pkg/runtime/bash_session_test.go +++ b/components/execd/pkg/runtime/bash_session_test.go @@ -252,24 +252,65 @@ exec /tmp/exec_child.sh ` err := session.run(script, 5*time.Second, &hooks) - if err == nil { - t.Fatalf("expected error because exec replaces the shell, got nil") - } - if !strings.Contains(err.Error(), "stdout closed") && !strings.Contains(err.Error(), "terminated") { - t.Fatalf("unexpected error for exec: %v", err) + if err != nil { + t.Fatalf("expected exec to complete without killing the session, got %v", err) } if !containsLine(stdoutLines, "child says hi") { t.Fatalf("expected child output, got %v", stdoutLines) } - if !session.terminated.Load() { - t.Fatalf("expected session to be marked terminated after exec") + + // Subsequent run should still work because we restart bash per run. + stdoutLines = nil + if err := session.run("echo still-alive", 2*time.Second, &hooks); err != nil { + t.Fatalf("expected run to succeed after exec replaced the shell, got %v", err) + } + if !containsLine(stdoutLines, "still-alive") { + t.Fatalf("expected follow-up output, got %v", stdoutLines) + } +} + +func TestBashSession_complexExec(t *testing.T) { + session := newBashSession(nil) + t.Cleanup(func() { _ = session.close() }) + + if err := session.start(); err != nil { + t.Fatalf("Start() error = %v", err) + } + + var stdoutLines []string + hooks := ExecuteResultHook{ + OnExecuteStdout: func(line string) { + stdoutLines = append(stdoutLines, line) + }, + } + + script := ` +LOG_FILE=$(mktemp) +export LOG_FILE +exec 3>&1 4>&2 +exec > >(tee "$LOG_FILE") 2>&1 + +set -x +echo "from-complex-exec" +exec 1>&3 2>&4 # step record +echo "after-restore" +` + + err := session.run(script, 5*time.Second, &hooks) + if err != nil { + t.Fatalf("expected complex exec to finish, got %v", err) + } + if !containsLine(stdoutLines, "from-complex-exec") || !containsLine(stdoutLines, "after-restore") { + t.Fatalf("expected exec outputs, got %v", stdoutLines) } - // Subsequent run should fail immediately because the shell was replaced. - if err := session.run("echo still-alive", 2*time.Second, &hooks); err == nil { - t.Fatalf("expected run to fail after exec replaced the shell") - } else if !strings.Contains(err.Error(), "terminated") { - t.Fatalf("expected terminated error, got %v", err) + // Session should still be usable. + stdoutLines = nil + if err := session.run("echo still-alive", 2*time.Second, &hooks); err != nil { + t.Fatalf("expected run to succeed after complex exec, got %v", err) + } + if !containsLine(stdoutLines, "still-alive") { + t.Fatalf("expected follow-up output, got %v", stdoutLines) } } diff --git a/components/execd/pkg/runtime/types.go b/components/execd/pkg/runtime/types.go index ad6f620e..cc138fe6 100644 --- a/components/execd/pkg/runtime/types.go +++ b/components/execd/pkg/runtime/types.go @@ -16,10 +16,7 @@ package runtime import ( "fmt" - "io" - "os/exec" "sync" - "sync/atomic" "time" "github.com/alibaba/opensandbox/execd/pkg/jupyter/execute" @@ -104,13 +101,9 @@ const ( // bashSession represents a bash session. type bashSession struct { - config *bashSessionConfig - cmd *exec.Cmd - stdin io.WriteCloser - stdout io.ReadCloser - stdoutLines chan string - stdoutErr chan error - mu sync.Mutex - started bool - terminated atomic.Bool + config *bashSessionConfig + mu sync.Mutex + started bool + env map[string]string + cwd string } From 1e7f2fa31da54a0747e4188ebf5e8576d8b01035 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=AB=98=E7=84=B6?= Date: Thu, 5 Feb 2026 10:20:31 +0800 Subject: [PATCH 13/16] feat(components/execd): override session's cwd if request.cwd is not empty --- components/execd/pkg/runtime/bash_session.go | 31 ++-- .../execd/pkg/runtime/bash_session_test.go | 161 +++++++++++++++++- 2 files changed, 171 insertions(+), 21 deletions(-) diff --git a/components/execd/pkg/runtime/bash_session.go b/components/execd/pkg/runtime/bash_session.go index f9de9e8d..b76094e4 100644 --- a/components/execd/pkg/runtime/bash_session.go +++ b/components/execd/pkg/runtime/bash_session.go @@ -72,7 +72,7 @@ func (c *Controller) runBashSession(_ context.Context, request *ExecuteCodeReque return ErrContextNotFound } - return session.run(request.Code, request.Timeout, &request.Hooks) + return session.run(request) } func (c *Controller) createDefaultBashSession() error { @@ -156,7 +156,7 @@ func (s *bashSession) start() error { } //nolint:gocognit -func (s *bashSession) run(command string, timeout time.Duration, hooks *ExecuteResultHook) error { +func (s *bashSession) run(request *ExecuteCodeRequest) error { s.mu.Lock() if !s.started { s.mu.Unlock() @@ -164,18 +164,23 @@ func (s *bashSession) run(command string, timeout time.Duration, hooks *ExecuteR } envSnapshot := copyEnvMap(s.env) + cwd := s.cwd + // override original cwd if specified + if request.Cwd != "" { + cwd = request.Cwd + } sessionID := s.config.Session s.mu.Unlock() startAt := time.Now() - if hooks != nil && hooks.OnExecuteInit != nil { - hooks.OnExecuteInit(sessionID) + if request.Hooks.OnExecuteInit != nil { + request.Hooks.OnExecuteInit(sessionID) } - wait := timeout + wait := request.Timeout if wait <= 0 { - wait = 24 * 3600 * time.Second // default to 24 hours + wait = 24 * 3600 * time.Second // max to 24 hours } ctx, cancel := context.WithTimeout(context.Background(), wait) @@ -199,7 +204,7 @@ func (s *bashSession) run(command string, timeout time.Duration, hooks *ExecuteR return fmt.Errorf("start bash: %w", err) } - script := buildWrappedScript(command, envSnapshot, cwd) + script := buildWrappedScript(request.Code, envSnapshot, cwd) if _, err := io.WriteString(stdin, script); err != nil { _ = stdin.Close() _ = cmd.Wait() @@ -235,8 +240,8 @@ func (s *bashSession) run(command string, timeout time.Duration, hooks *ExecuteR envLines = append(envLines, line) continue } - if hooks != nil && hooks.OnExecuteStdout != nil { - hooks.OnExecuteStdout(line) + if request.Hooks.OnExecuteStdout != nil { + request.Hooks.OnExecuteStdout(line) } } } @@ -248,8 +253,8 @@ func (s *bashSession) run(command string, timeout time.Duration, hooks *ExecuteR return fmt.Errorf("read stdout: %w", scanErr) } - if ctx.Err() == context.DeadlineExceeded { - return fmt.Errorf("timeout after %s while running command %q", wait, command) + if errors.Is(ctx.Err(), context.DeadlineExceeded) { + return fmt.Errorf("timeout after %s while running command %q", wait, request.Code) } if exitCode == nil && cmd.ProcessState != nil { @@ -267,8 +272,8 @@ func (s *bashSession) run(command string, timeout time.Duration, hooks *ExecuteR } s.mu.Unlock() - if hooks != nil && hooks.OnExecuteComplete != nil { - hooks.OnExecuteComplete(time.Since(startAt)) + if request.Hooks.OnExecuteComplete != nil { + request.Hooks.OnExecuteComplete(time.Since(startAt)) } // Maintain previous behavior: non-zero exit codes do not surface as errors. diff --git a/components/execd/pkg/runtime/bash_session_test.go b/components/execd/pkg/runtime/bash_session_test.go index 91265ea6..762ce82b 100644 --- a/components/execd/pkg/runtime/bash_session_test.go +++ b/components/execd/pkg/runtime/bash_session_test.go @@ -56,13 +56,23 @@ func TestBashSessionEnvAndExitCode(t *testing.T) { } // 1) export an env var - if err := session.run("export FOO=hello", 3*time.Second, &hooks); err != nil { + request := &ExecuteCodeRequest{ + Code: "export FOO=hello", + Hooks: hooks, + Timeout: 3 * time.Second, + } + if err := session.run(request); err != nil { t.Fatalf("runCommand(export) error = %v", err) } exportStdoutCount := len(stdoutLines) // 2) verify env is persisted - if err := session.run("echo $FOO", 3*time.Second, &hooks); err != nil { + request = &ExecuteCodeRequest{ + Code: "echo $FOO", + Hooks: hooks, + Timeout: 3 * time.Second, + } + if err := session.run(request); err != nil { t.Fatalf("runCommand(echo) error = %v", err) } echoLines := stdoutLines[exportStdoutCount:] @@ -78,8 +88,13 @@ func TestBashSessionEnvAndExitCode(t *testing.T) { } // 3) ensure exit code of previous command is reflected in shell state + request = &ExecuteCodeRequest{ + Code: "false; echo EXIT:$?", + Hooks: hooks, + Timeout: 3 * time.Second, + } prevCount := len(stdoutLines) - if err := session.run("false; echo EXIT:$?", 3*time.Second, &hooks); err != nil { + if err := session.run(request); err != nil { t.Fatalf("runCommand(exitcode) error = %v", err) } exitLines := stdoutLines[prevCount:] @@ -134,7 +149,12 @@ func TestBashSessionEnvLargeOutputChained(t *testing.T) { runAndCollect := func(cmd string) []string { start := len(stdoutLines) - if err := session.run(cmd, 10*time.Second, &hooks); err != nil { + request := &ExecuteCodeRequest{ + Code: cmd, + Hooks: hooks, + Timeout: 10 * time.Second, + } + if err := session.run(request); err != nil { t.Fatalf("runCommand(%q) error = %v", cmd, err) } return append([]string(nil), stdoutLines[start:]...) @@ -175,6 +195,111 @@ func TestBashSessionEnvLargeOutputChained(t *testing.T) { } } +func TestBashSessionCwdPersistsWithoutOverride(t *testing.T) { + session := newBashSession(nil) + t.Cleanup(func() { _ = session.close() }) + + if err := session.start(); err != nil { + t.Fatalf("Start() error = %v", err) + } + + targetDir := t.TempDir() + var stdoutLines []string + hooks := ExecuteResultHook{ + OnExecuteStdout: func(line string) { + stdoutLines = append(stdoutLines, line) + }, + } + + runAndCollect := func(req *ExecuteCodeRequest) []string { + start := len(stdoutLines) + if err := session.run(req); err != nil { + t.Fatalf("runCommand(%q) error = %v", req.Code, err) + } + return append([]string(nil), stdoutLines[start:]...) + } + + firstRunLines := runAndCollect(&ExecuteCodeRequest{ + Code: fmt.Sprintf("cd %s\npwd", targetDir), + Hooks: hooks, + Timeout: 3 * time.Second, + }) + if !containsLine(firstRunLines, targetDir) { + t.Fatalf("expected cd to update cwd to %q, got %v", targetDir, firstRunLines) + } + + secondRunLines := runAndCollect(&ExecuteCodeRequest{ + Code: "pwd", + Hooks: hooks, + Timeout: 3 * time.Second, + }) + if !containsLine(secondRunLines, targetDir) { + t.Fatalf("expected subsequent run to inherit cwd %q, got %v", targetDir, secondRunLines) + } + + session.mu.Lock() + finalCwd := session.cwd + session.mu.Unlock() + if finalCwd != targetDir { + t.Fatalf("expected session cwd to stay at %q, got %q", targetDir, finalCwd) + } +} + +func TestBashSessionRequestCwdOverridesAfterCd(t *testing.T) { + session := newBashSession(nil) + t.Cleanup(func() { _ = session.close() }) + + if err := session.start(); err != nil { + t.Fatalf("Start() error = %v", err) + } + + initialDir := t.TempDir() + overrideDir := t.TempDir() + + var stdoutLines []string + hooks := ExecuteResultHook{ + OnExecuteStdout: func(line string) { + stdoutLines = append(stdoutLines, line) + }, + } + + runAndCollect := func(req *ExecuteCodeRequest) []string { + start := len(stdoutLines) + if err := session.run(req); err != nil { + t.Fatalf("runCommand(%q) error = %v", req.Code, err) + } + return append([]string(nil), stdoutLines[start:]...) + } + + // First request: change session cwd via script. + firstRunLines := runAndCollect(&ExecuteCodeRequest{ + Code: fmt.Sprintf("cd %s\npwd", initialDir), + Hooks: hooks, + Timeout: 3 * time.Second, + }) + if !containsLine(firstRunLines, initialDir) { + t.Fatalf("expected cd to update cwd to %q, got %v", initialDir, firstRunLines) + } + + // Second request: explicit Cwd overrides session cwd. + secondRunLines := runAndCollect(&ExecuteCodeRequest{ + Code: "pwd", + Cwd: overrideDir, + Hooks: hooks, + Timeout: 3 * time.Second, + }) + if !containsLine(secondRunLines, overrideDir) { + t.Fatalf("expected command to run in override cwd %q, got %v", overrideDir, secondRunLines) + } + + session.mu.Lock() + finalCwd := session.cwd + session.mu.Unlock() + if finalCwd != overrideDir { + t.Fatalf("expected session cwd updated to override dir %q, got %q", overrideDir, finalCwd) + } +} + func TestBashSession_heredoc(t *testing.T) { rewardDir := t.TempDir() controller := NewController("", "") @@ -251,7 +376,12 @@ chmod +x /tmp/exec_child.sh exec /tmp/exec_child.sh ` - err := session.run(script, 5*time.Second, &hooks) + request := &ExecuteCodeRequest{ + Code: script, + Hooks: hooks, + Timeout: 5 * time.Second, + } + err := session.run(request) if err != nil { t.Fatalf("expected exec to complete without killing the session, got %v", err) } @@ -260,8 +390,13 @@ exec /tmp/exec_child.sh } // Subsequent run should still work because we restart bash per run. + request = &ExecuteCodeRequest{ + Code: "echo still-alive", + Hooks: hooks, + Timeout: 2 * time.Second, + } stdoutLines = nil - if err := session.run("echo still-alive", 2*time.Second, &hooks); err != nil { + if err := session.run(request); err != nil { t.Fatalf("expected run to succeed after exec replaced the shell, got %v", err) } if !containsLine(stdoutLines, "still-alive") { @@ -296,7 +431,12 @@ exec 1>&3 2>&4 # step record echo "after-restore" ` - err := session.run(script, 5*time.Second, &hooks) + request := &ExecuteCodeRequest{ + Code: script, + Hooks: hooks, + Timeout: 5 * time.Second, + } + err := session.run(request) if err != nil { t.Fatalf("expected complex exec to finish, got %v", err) } @@ -305,8 +445,13 @@ echo "after-restore" } // Session should still be usable. + request = &ExecuteCodeRequest{ + Code: "echo still-alive", + Hooks: hooks, + Timeout: 2 * time.Second, + } stdoutLines = nil - if err := session.run("echo still-alive", 2*time.Second, &hooks); err != nil { + if err := session.run(request); err != nil { t.Fatalf("expected run to succeed after complex exec, got %v", err) } if !containsLine(stdoutLines, "still-alive") { From 575ca47e1b2ddb4d2acf900af472ef273ffbd365 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=AB=98=E7=84=B6?= Date: Thu, 5 Feb 2026 12:34:26 +0800 Subject: [PATCH 14/16] fix(components/execd): avoid env dump leak when command lacks trailing newline --- components/execd/pkg/runtime/bash_session.go | 11 +-- .../execd/pkg/runtime/bash_session_test.go | 84 ++++++++++++++++++- components/execd/pkg/runtime/types.go | 7 -- 3 files changed, 86 insertions(+), 16 deletions(-) diff --git a/components/execd/pkg/runtime/bash_session.go b/components/execd/pkg/runtime/bash_session.go index b76094e4..35631b26 100644 --- a/components/execd/pkg/runtime/bash_session.go +++ b/components/execd/pkg/runtime/bash_session.go @@ -231,7 +231,7 @@ func (s *bashSession) run(request *ExecuteCodeRequest) error { inEnv = false case strings.HasPrefix(line, exitMarkerPrefix): if code, err := strconv.Atoi(strings.TrimPrefix(line, exitMarkerPrefix)); err == nil { - exitCode = &code + exitCode = &code //nolint:ineffassign } case strings.HasPrefix(line, pwdMarkerPrefix): pwdLine = strings.TrimPrefix(line, pwdMarkerPrefix) @@ -258,8 +258,8 @@ func (s *bashSession) run(request *ExecuteCodeRequest) error { } if exitCode == nil && cmd.ProcessState != nil { - code := cmd.ProcessState.ExitCode() - exitCode = &code + code := cmd.ProcessState.ExitCode() //nolint:staticcheck + exitCode = &code //nolint:ineffassign } updatedEnv := parseExportDump(envLines) @@ -315,9 +315,10 @@ func buildWrappedScript(command string, env map[string]string, cwd string) strin } b.WriteString("__USER_EXIT_CODE__=$?\n") - b.WriteString("echo \"" + envDumpStartMarker + "\"\n") + // Ensure env dump markers are always on their own lines even if the user command omitted a trailing newline. + b.WriteString("printf \"\\n%s\\n\" \"" + envDumpStartMarker + "\"\n") b.WriteString("export -p\n") - b.WriteString("echo \"" + envDumpEndMarker + "\"\n") + b.WriteString("printf \"%s\\n\" \"" + envDumpEndMarker + "\"\n") b.WriteString("printf \"" + pwdMarkerPrefix + "%s\\n\" \"$(pwd)\"\n") b.WriteString("printf \"" + exitMarkerPrefix + "%s\\n\" \"$__USER_EXIT_CODE__\"\n") b.WriteString("exit \"$__USER_EXIT_CODE__\"\n") diff --git a/components/execd/pkg/runtime/bash_session_test.go b/components/execd/pkg/runtime/bash_session_test.go index 762ce82b..13aed889 100644 --- a/components/execd/pkg/runtime/bash_session_test.go +++ b/components/execd/pkg/runtime/bash_session_test.go @@ -25,7 +25,7 @@ import ( "time" ) -func TestBashSessionEnvAndExitCode(t *testing.T) { +func TestBashSession_envAndExitCode(t *testing.T) { session := newBashSession(nil) t.Cleanup(func() { _ = session.close() }) @@ -117,7 +117,7 @@ func TestBashSessionEnvAndExitCode(t *testing.T) { } } -func TestBashSessionEnvLargeOutputChained(t *testing.T) { +func TestBashSession_envLargeOutputChained(t *testing.T) { session := newBashSession(nil) t.Cleanup(func() { _ = session.close() }) @@ -195,7 +195,7 @@ func TestBashSessionEnvLargeOutputChained(t *testing.T) { } } -func TestBashSessionCwdPersistsWithoutOverride(t *testing.T) { +func TestBashSession_cwdPersistsWithoutOverride(t *testing.T) { session := newBashSession(nil) t.Cleanup(func() { _ = session.close() }) @@ -245,7 +245,7 @@ func TestBashSessionCwdPersistsWithoutOverride(t *testing.T) { } } -func TestBashSessionRequestCwdOverridesAfterCd(t *testing.T) { +func TestBashSession_requestCwdOverridesAfterCd(t *testing.T) { session := newBashSession(nil) t.Cleanup(func() { _ = session.close() }) @@ -300,6 +300,82 @@ func TestBashSessionRequestCwdOverridesAfterCd(t *testing.T) { } } +func TestBashSession_envDumpNotLeakedWhenNoTrailingNewline(t *testing.T) { + session := newBashSession(nil) + t.Cleanup(func() { _ = session.close() }) + + if err := session.start(); err != nil { + t.Fatalf("Start() error = %v", err) + } + + var stdoutLines []string + hooks := ExecuteResultHook{ + OnExecuteStdout: func(line string) { + stdoutLines = append(stdoutLines, line) + }, + } + + request := &ExecuteCodeRequest{ + Code: `set +x; printf '{"foo":1}'`, + Hooks: hooks, + Timeout: 3 * time.Second, + } + + if err := session.run(request); err != nil { + t.Fatalf("runCommand(no-trailing-newline) error = %v", err) + } + + if len(stdoutLines) != 1 { + t.Fatalf("expected exactly one stdout line, got %v", stdoutLines) + } + if strings.TrimSpace(stdoutLines[0]) != `{"foo":1}` { + t.Fatalf("unexpected stdout content %q", stdoutLines[0]) + } + for _, line := range stdoutLines { + if strings.Contains(line, envDumpStartMarker) || strings.Contains(line, "declare -x") { + t.Fatalf("env dump leaked into stdout: %v", stdoutLines) + } + } +} + +func TestBashSession_envDumpNotLeakedWhenNoOutput(t *testing.T) { + session := newBashSession(nil) + t.Cleanup(func() { _ = session.close() }) + + if err := session.start(); err != nil { + t.Fatalf("Start() error = %v", err) + } + + var stdoutLines []string + hooks := ExecuteResultHook{ + OnExecuteStdout: func(line string) { + stdoutLines = append(stdoutLines, line) + }, + } + + request := &ExecuteCodeRequest{ + Code: `set +x; true`, + Hooks: hooks, + Timeout: 3 * time.Second, + } + + if err := session.run(request); err != nil { + t.Fatalf("runCommand(no-output) error = %v", err) + } + + if len(stdoutLines) > 1 { + t.Fatalf("expected at most one stdout line, got %v", stdoutLines) + } + if len(stdoutLines) == 1 && strings.TrimSpace(stdoutLines[0]) != "" { + t.Fatalf("expected empty stdout, got %q", stdoutLines[0]) + } + for _, line := range stdoutLines { + if strings.Contains(line, envDumpStartMarker) || strings.Contains(line, "declare -x") { + t.Fatalf("env dump leaked into stdout: %v", stdoutLines) + } + } +} + func TestBashSession_heredoc(t *testing.T) { rewardDir := t.TempDir() controller := NewController("", "") diff --git a/components/execd/pkg/runtime/types.go b/components/execd/pkg/runtime/types.go index cc138fe6..5dcf44c8 100644 --- a/components/execd/pkg/runtime/types.go +++ b/components/execd/pkg/runtime/types.go @@ -92,13 +92,6 @@ type bashSessionConfig struct { StartupTimeout time.Duration } -const ( - // exitCodePrefix marks the beginning of exit code output. - exitCodePrefix = "EXITCODESTART" - // exitCodeSuffix marks the end of exit code output. - exitCodeSuffix = "EXITCODEEND" -) - // bashSession represents a bash session. type bashSession struct { config *bashSessionConfig From 3532fc9253bb4ef439ba9ab83327ac4697fca2d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=AB=98=E7=84=B6?= Date: Wed, 25 Feb 2026 17:54:00 +0800 Subject: [PATCH 15/16] chore(execd): emit bash session exit errors --- components/execd/pkg/runtime/bash_session.go | 34 ++++++++-- .../execd/pkg/runtime/bash_session_test.go | 68 +++++++++++++++++++ 2 files changed, 95 insertions(+), 7 deletions(-) diff --git a/components/execd/pkg/runtime/bash_session.go b/components/execd/pkg/runtime/bash_session.go index 35631b26..ae4746a9 100644 --- a/components/execd/pkg/runtime/bash_session.go +++ b/components/execd/pkg/runtime/bash_session.go @@ -30,9 +30,9 @@ import ( "strings" "time" - "github.com/google/uuid" - + "github.com/alibaba/opensandbox/execd/pkg/jupyter/execute" "github.com/alibaba/opensandbox/execd/pkg/log" + "github.com/google/uuid" ) const ( @@ -272,16 +272,36 @@ func (s *bashSession) run(request *ExecuteCodeRequest) error { } s.mu.Unlock() - if request.Hooks.OnExecuteComplete != nil { - request.Hooks.OnExecuteComplete(time.Since(startAt)) - } - - // Maintain previous behavior: non-zero exit codes do not surface as errors. var exitErr *exec.ExitError if waitErr != nil && !errors.As(waitErr, &exitErr) { return waitErr } + userExitCode := 0 + if exitCode != nil { + userExitCode = *exitCode + } + + if userExitCode != 0 { + errMsg := fmt.Sprintf("command exited with code %d", userExitCode) + if waitErr != nil { + errMsg = waitErr.Error() + } + if request.Hooks.OnExecuteError != nil { + request.Hooks.OnExecuteError(&execute.ErrorOutput{ + EName: "CommandExecError", + EValue: strconv.Itoa(userExitCode), + Traceback: []string{errMsg}, + }) + } + log.Error("CommandExecError: %s", errMsg) + return nil + } + + if request.Hooks.OnExecuteComplete != nil { + request.Hooks.OnExecuteComplete(time.Since(startAt)) + } + return nil } diff --git a/components/execd/pkg/runtime/bash_session_test.go b/components/execd/pkg/runtime/bash_session_test.go index 13aed889..2a6e7482 100644 --- a/components/execd/pkg/runtime/bash_session_test.go +++ b/components/execd/pkg/runtime/bash_session_test.go @@ -18,13 +18,81 @@ package runtime import ( + "context" "fmt" "os" + "os/exec" "strings" "testing" "time" + + "github.com/alibaba/opensandbox/execd/pkg/jupyter/execute" ) +func TestBashSession_NonZeroExitEmitsError(t *testing.T) { + if _, err := exec.LookPath("bash"); err != nil { + t.Skip("bash not found in PATH") + } + + c := NewController("", "") + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + var ( + sessionID string + stdoutLine string + errCh = make(chan *execute.ErrorOutput, 1) + completeCh = make(chan struct{}, 1) + ) + + req := &ExecuteCodeRequest{ + Language: Bash, + Code: `echo "before"; exit 7`, + Cwd: t.TempDir(), + Timeout: 5 * time.Second, + Hooks: ExecuteResultHook{ + OnExecuteInit: func(s string) { sessionID = s }, + OnExecuteStdout: func(s string) { stdoutLine = s }, + OnExecuteError: func(err *execute.ErrorOutput) { errCh <- err }, + OnExecuteComplete: func(_ time.Duration) { + completeCh <- struct{}{} + }, + }, + } + + if err := c.runBashSession(ctx, req); err != nil { + t.Fatalf("runBashSession returned error: %v", err) + } + + var gotErr *execute.ErrorOutput + select { + case gotErr = <-errCh: + case <-time.After(2 * time.Second): + t.Fatalf("expected error hook to be called") + } + + if gotErr == nil { + t.Fatalf("expected non-nil error output") + } + if gotErr.EName != "CommandExecError" || gotErr.EValue != "7" { + t.Fatalf("unexpected error payload: %+v", gotErr) + } + + if sessionID == "" { + t.Fatalf("expected session id to be set") + } + if stdoutLine != "before" { + t.Fatalf("unexpected stdout: %q", stdoutLine) + } + + select { + case <-completeCh: + t.Fatalf("did not expect completion hook on non-zero exit") + default: + } +} + func TestBashSession_envAndExitCode(t *testing.T) { session := newBashSession(nil) t.Cleanup(func() { _ = session.close() }) From 7d28368299834d9420087d796313c48d16e781e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=AB=98=E7=84=B6?= Date: Thu, 26 Feb 2026 18:51:48 +0800 Subject: [PATCH 16/16] fix(execd): run bash session from temp script file to avoid argument list too long --- components/execd/pkg/runtime/bash_session.go | 85 ++++++++++---------- 1 file changed, 41 insertions(+), 44 deletions(-) diff --git a/components/execd/pkg/runtime/bash_session.go b/components/execd/pkg/runtime/bash_session.go index ae4746a9..58b8aeb3 100644 --- a/components/execd/pkg/runtime/bash_session.go +++ b/components/execd/pkg/runtime/bash_session.go @@ -22,7 +22,6 @@ import ( "context" "errors" "fmt" - "io" "os" "os/exec" "sort" @@ -186,32 +185,35 @@ func (s *bashSession) run(request *ExecuteCodeRequest) error { ctx, cancel := context.WithTimeout(context.Background(), wait) defer cancel() - cmd := exec.CommandContext(ctx, "bash", "--noprofile", "--norc", "-s") - cmd.Env = envMapToSlice(envSnapshot) + script := buildWrappedScript(request.Code, envSnapshot, cwd) + scriptFile, err := os.CreateTemp("", "execd_bash_*.sh") + if err != nil { + return fmt.Errorf("create script file: %w", err) + } + scriptPath := scriptFile.Name() + if _, err := scriptFile.WriteString(script); err != nil { + _ = scriptFile.Close() + return fmt.Errorf("write script file: %w", err) + } + if err := scriptFile.Close(); err != nil { + return fmt.Errorf("close script file: %w", err) + } + cmd := exec.CommandContext(ctx, "bash", "--noprofile", "--norc", scriptPath) + // Do not pass envSnapshot via cmd.Env to avoid "argument list too long" when session env is large. + // Child inherits parent env (nil => default in Go). The script file already has "export K=V" for + // all session vars at the top, so the session environment is applied when the script runs. stdout, err := cmd.StdoutPipe() if err != nil { return fmt.Errorf("stdout pipe: %w", err) } cmd.Stderr = cmd.Stdout - stdin, err := cmd.StdinPipe() - if err != nil { - return fmt.Errorf("stdin pipe: %w", err) - } - if err := cmd.Start(); err != nil { + log.Error("start bash session failed: %v (command: %q)", err, request.Code) return fmt.Errorf("start bash: %w", err) } - script := buildWrappedScript(request.Code, envSnapshot, cwd) - if _, err := io.WriteString(stdin, script); err != nil { - _ = stdin.Close() - _ = cmd.Wait() - return fmt.Errorf("write command: %w", err) - } - _ = stdin.Close() - scanner := bufio.NewScanner(stdout) scanner.Buffer(make([]byte, 0, 64*1024), 16*1024*1024) @@ -250,10 +252,12 @@ func (s *bashSession) run(request *ExecuteCodeRequest) error { waitErr := cmd.Wait() if scanErr != nil { + log.Error("read stdout failed: %v (command: %q)", scanErr, request.Code) return fmt.Errorf("read stdout: %w", scanErr) } if errors.Is(ctx.Err(), context.DeadlineExceeded) { + log.Error("timeout after %s while running command: %q", wait, request.Code) return fmt.Errorf("timeout after %s while running command %q", wait, request.Code) } @@ -274,6 +278,7 @@ func (s *bashSession) run(request *ExecuteCodeRequest) error { var exitErr *exec.ExitError if waitErr != nil && !errors.As(waitErr, &exitErr) { + log.Error("command wait failed: %v (command: %q)", waitErr, request.Code) return waitErr } @@ -294,7 +299,7 @@ func (s *bashSession) run(request *ExecuteCodeRequest) error { Traceback: []string{errMsg}, }) } - log.Error("CommandExecError: %s", errMsg) + log.Error("CommandExecError: %s (command: %q)", errMsg, request.Code) return nil } @@ -310,7 +315,8 @@ func buildWrappedScript(command string, env map[string]string, cwd string) strin keys := make([]string, 0, len(env)) for k := range env { - if isValidEnvKey(k) { + v := env[k] + if isValidEnvKey(k) && !envKeysNotPersisted[k] && len(v) <= maxPersistedEnvValueSize { keys = append(keys, k) } } @@ -335,7 +341,6 @@ func buildWrappedScript(command string, env map[string]string, cwd string) strin } b.WriteString("__USER_EXIT_CODE__=$?\n") - // Ensure env dump markers are always on their own lines even if the user command omitted a trailing newline. b.WriteString("printf \"\\n%s\\n\" \"" + envDumpStartMarker + "\"\n") b.WriteString("export -p\n") b.WriteString("printf \"%s\\n\" \"" + envDumpEndMarker + "\"\n") @@ -346,16 +351,26 @@ func buildWrappedScript(command string, env map[string]string, cwd string) strin return b.String() } +// envKeysNotPersisted are not carried across runs (prompt/display vars). +var envKeysNotPersisted = map[string]bool{ + "PS1": true, "PS2": true, "PS3": true, "PS4": true, + "PROMPT_COMMAND": true, +} + +// maxPersistedEnvValueSize caps single env value length as a safeguard. +const maxPersistedEnvValueSize = 8 * 1024 + func parseExportDump(lines []string) map[string]string { if len(lines) == 0 { return nil } - env := make(map[string]string, len(lines)) for _, line := range lines { - if k, v, ok := parseExportLine(line); ok { - env[k] = v + k, v, ok := parseExportLine(line) + if !ok || envKeysNotPersisted[k] || len(v) > maxPersistedEnvValueSize { + continue } + env[k] = v } return env } @@ -365,29 +380,23 @@ func parseExportLine(line string) (string, string, bool) { if !strings.HasPrefix(line, prefix) { return "", "", false } - rest := strings.TrimSpace(strings.TrimPrefix(line, prefix)) if rest == "" { return "", "", false } - - name := rest - value := "" + name, value := rest, "" if eq := strings.Index(rest, "="); eq >= 0 { name = rest[:eq] raw := rest[eq+1:] - unquoted, err := strconv.Unquote(raw) - if err != nil { - value = strings.Trim(raw, `"`) - } else { + if unquoted, err := strconv.Unquote(raw); err == nil { value = unquoted + } else { + value = strings.Trim(raw, `"`) } } - if !isValidEnvKey(name) { return "", "", false } - return name, value, true } @@ -427,18 +436,6 @@ func copyEnvMap(src map[string]string) map[string]string { return dst } -func envMapToSlice(env map[string]string) []string { - if len(env) == 0 { - return os.Environ() - } - - out := make([]string, 0, len(env)) - for k, v := range env { - out = append(out, fmt.Sprintf("%s=%s", k, v)) - } - return out -} - func splitEnvPair(kv string) (string, string, bool) { parts := strings.SplitN(kv, "=", 2) if len(parts) != 2 {