diff --git a/airflow/container.go b/airflow/container.go index 4bb80fa95..f5e28c0b9 100644 --- a/airflow/container.go +++ b/airflow/container.go @@ -78,6 +78,10 @@ func ContainerHandlerInit(airflowHome, envFile, dockerfile, projectName string) return DockerComposeInit(airflowHome, envFile, dockerfile, projectName) } +func StandaloneHandlerInit(airflowHome, envFile, dockerfile, projectName string) (ContainerHandler, error) { + return StandaloneInit(airflowHome, envFile, dockerfile) +} + func RegistryHandlerInit(registry string) (RegistryHandler, error) { return DockerRegistryInit(registry) } diff --git a/airflow/local.go b/airflow/local.go new file mode 100644 index 000000000..74cf17442 --- /dev/null +++ b/airflow/local.go @@ -0,0 +1,825 @@ +package airflow + +import ( + "bufio" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "os/exec" + "os/signal" + "path/filepath" + "regexp" + "strings" + "sync" + "syscall" + "time" + + airflowversions "github.com/astronomer/astro-cli/airflow_versions" + astrocore "github.com/astronomer/astro-cli/astro-client-core" + astroplatformcore "github.com/astronomer/astro-cli/astro-client-platform-core" + "github.com/astronomer/astro-cli/docker" + "github.com/astronomer/astro-cli/pkg/ansi" + "github.com/astronomer/astro-cli/pkg/fileutil" + "github.com/astronomer/astro-cli/pkg/spinner" + "github.com/astronomer/astro-cli/settings" + "github.com/pkg/errors" +) + +const ( + standaloneDir = ".astro/standalone" + standalonePIDFile = "airflow.pid" + standaloneLogFile = "airflow.log" + defaultStandalonePort = "8080" + standaloneIndexURL = "https://pip.astronomer.io/v2/" + defaultPythonVersion = "3.12" // default Python version for all Runtime 3.x images + constraintsBaseURL = "https://cdn.astronomer.io/runtime-constraints" + freezeBaseURL = "https://cdn.astronomer.io/runtime-freeze" + stopPollInterval = 500 * time.Millisecond + stopTimeout = 10 * time.Second + filePermissions = os.FileMode(0o644) + dirPermissions = os.FileMode(0o755) + standaloneAdminUser = "admin" + standaloneAdminPassword = "admin" + // standalonePasswordsFile lives inside standaloneDir (.astro/standalone/) so it stays + // out of the project root and is cleaned up automatically by Kill/reset. + standalonePasswordsFile = "simple_auth_manager_passwords.json.generated" //nolint:gosec +) + +var ( + errStandaloneNotSupported = errors.New("this command is not supported in standalone mode") + errUnsupportedAirflowVersion = errors.New("standalone mode requires Airflow 3 (runtime 3.x)") + errUVNotFound = errors.New("'uv' is required for standalone mode but was not found on PATH.\nInstall it with: curl -LsSf https://astral.sh/uv/install.sh | sh\nSee https://docs.astral.sh/uv/getting-started/installation/ for more options") + + // Function variables for testing + lookPath = exec.LookPath + standaloneParseFile = docker.ParseFile + standaloneGetImageTag = docker.GetImageTagFromParsedFile + runCommand = execCommand + startCommand = startCmd + osReadFile = os.ReadFile + osFindProcess = os.FindProcess +) + +// runtimePythonRe matches the optional -python-X.Y (and optional -base) suffix on a runtime tag. +var runtimePythonRe = regexp.MustCompile(`-python-(\d+\.\d+)(-base)?$`) + +// parseRuntimeTagPython extracts the base runtime tag and the Python version from a +// full image tag. Tags may look like: +// +// "3.1-12" → base="3.1-12", python="3.12" (default) +// "3.1-12-python-3.11" → base="3.1-12", python="3.11" +// "3.1-12-python-3.11-base" → base="3.1-12", python="3.11" +func parseRuntimeTagPython(tag string) (baseTag, pythonVersion string) { + loc := runtimePythonRe.FindStringSubmatchIndex(tag) + if loc == nil { + return strings.TrimSuffix(tag, "-base"), defaultPythonVersion + } + return tag[:loc[0]], tag[loc[2]:loc[3]] +} + +// Standalone implements ContainerHandler using `airflow standalone` instead of Docker Compose. +type Standalone struct { + airflowHome string + envFile string + dockerfile string + foreground bool // if true, run in foreground (stream output, block on Wait) +} + +// StandaloneInit creates a new Standalone handler. +func StandaloneInit(airflowHome, envFile, dockerfile string) (*Standalone, error) { + return &Standalone{ + airflowHome: airflowHome, + envFile: envFile, + dockerfile: dockerfile, + }, nil +} + +// SetForeground controls whether Start() runs the process in the foreground. +func (s *Standalone) SetForeground(fg bool) { + s.foreground = fg +} + +// pidFilePath returns the full path to the PID file. +func (s *Standalone) pidFilePath() string { + return filepath.Join(s.airflowHome, standaloneDir, standalonePIDFile) +} + +// logFilePath returns the full path to the log file. +func (s *Standalone) logFilePath() string { + return filepath.Join(s.airflowHome, standaloneDir, standaloneLogFile) +} + +// passwordsFilePath returns the full path to the SimpleAuthManager passwords file. +// Keeping it inside standaloneDir means it stays out of the project root and is +// cleaned up automatically by Kill/reset along with other standalone state. +func (s *Standalone) passwordsFilePath() string { + return filepath.Join(s.airflowHome, standaloneDir, standalonePasswordsFile) +} + +// Start runs airflow standalone locally without Docker. +// +//nolint:gocognit,gocyclo +func (s *Standalone) Start(imageName, settingsFile, composeFile, buildSecretString string, noCache, noBrowser bool, waitTime time.Duration, envConns map[string]astrocore.EnvironmentObjectConnection) error { + // 1. Parse Dockerfile to get runtime image + tag + cmds, err := standaloneParseFile(filepath.Join(s.airflowHome, "Dockerfile")) + if err != nil { + return fmt.Errorf("error parsing Dockerfile: %w", err) + } + _, tag := standaloneGetImageTag(cmds) + if tag == "" { + return errors.New("could not determine runtime version from Dockerfile") + } + + // Parse Python version from tag (e.g. "3.1-12-python-3.11" → base="3.1-12", python="3.11") + baseTag, pythonVersion := parseRuntimeTagPython(tag) + + // 2. Validate Airflow version (AF3 only) + if airflowversions.AirflowMajorVersionForRuntimeVersion(baseTag) != "3" { + return errUnsupportedAirflowVersion + } + + // 3. Check uv is on PATH + _, err = lookPath("uv") + if err != nil { + return errUVNotFound + } + + // 3b. In background mode, bail early if already running (before any install work) + if !s.foreground { + if pid, alive := s.readPID(); alive { + return fmt.Errorf("standalone Airflow is already running (PID %d). Run 'astro dev local stop' first", pid) + } + } + + // 4. Fetch constraints and freeze files from CDN (cached locally) + freezePath, airflowVersion, taskSDKVersion, err := s.getConstraints(baseTag, pythonVersion) + if err != nil { + return err + } + + sp := spinner.NewSpinner("Setting up standalone environment…") + sp.Start() + + // 5. Create venv + err = runCommand(s.airflowHome, "uv", "venv", "--python", pythonVersion) + if err != nil { + sp.Stop() + return fmt.Errorf("error creating virtual environment: %w", err) + } + + // 6. Install dependencies (2-step install) + // Step 1: Install airflow with full freeze constraints (reproduces runtime env exactly) + installArgs := []string{ + "pip", "install", + fmt.Sprintf("apache-airflow==%s", airflowVersion), + "-c", freezePath, + "--index-url", standaloneIndexURL, + } + err = runCommand(s.airflowHome, "uv", installArgs...) + if err != nil { + sp.Stop() + return fmt.Errorf("error installing dependencies: %w", err) + } + + // Step 2: Install user requirements with only airflow/sdk version locks + requirementsPath := filepath.Join(s.airflowHome, "requirements.txt") + if exists, _ := fileutil.Exists(requirementsPath, nil); exists { + userInstallArgs := []string{ + "pip", "install", + "-r", requirementsPath, + fmt.Sprintf("apache-airflow==%s", airflowVersion), + } + if taskSDKVersion != "" { + userInstallArgs = append(userInstallArgs, fmt.Sprintf("apache-airflow-task-sdk==%s", taskSDKVersion)) + } + userInstallArgs = append(userInstallArgs, "--index-url", standaloneIndexURL) + err = runCommand(s.airflowHome, "uv", userInstallArgs...) + if err != nil { + sp.Stop() + return fmt.Errorf("error installing user requirements: %w", err) + } + } + + spinner.StopWithCheckmark(sp, "Environment ready") + + // 7. Apply settings + err = s.applySettings(settingsFile, envConns) + if err != nil { + fmt.Printf("Warning: could not apply airflow settings: %s\n", err.Error()) + } + + // 8. Seed credentials file (admin:admin) if this is a fresh environment + if err = s.ensureCredentials(); err != nil { + fmt.Printf("Warning: could not seed credentials file: %s\n", err.Error()) + } + + // 9. Build environment + env := s.buildEnv() + + // 10. Start airflow standalone + fmt.Println("\nStarting Airflow in standalone mode…") + + venvBin := filepath.Join(s.airflowHome, ".venv", "bin") + airflowBin := filepath.Join(venvBin, "airflow") + + cmd := exec.Command(airflowBin, "standalone") //nolint:gosec + cmd.Dir = s.airflowHome + cmd.Env = env + // Start the subprocess in its own process group so we can kill the entire + // tree (scheduler, triggerer, api-server, etc.) when the user sends Ctrl+C. + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + + if s.foreground { + return s.startForeground(cmd, waitTime) + } + return s.startBackground(cmd, waitTime) +} + +// startForeground runs the airflow process in the foreground, streaming output to the terminal. +func (s *Standalone) startForeground(cmd *exec.Cmd, waitTime time.Duration) error { + // Set up pipes for stdout/stderr so we can stream output + stdout, err := cmd.StdoutPipe() + if err != nil { + return fmt.Errorf("error creating stdout pipe: %w", err) + } + stderr, err := cmd.StderrPipe() + if err != nil { + return fmt.Errorf("error creating stderr pipe: %w", err) + } + + err = startCommand(cmd) + if err != nil { + return fmt.Errorf("error starting airflow standalone: %w", err) + } + + // Forward signals to the entire process group so child processes + // (scheduler, triggerer, api-server, etc.) are also terminated. + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-sigChan + if cmd.Process != nil { + // Send SIGTERM to the entire process group (-pid). + syscall.Kill(-cmd.Process.Pid, syscall.SIGTERM) //nolint:errcheck + } + }() + defer signal.Stop(sigChan) + + // Stream output in background goroutines + var wg sync.WaitGroup + wg.Add(2) //nolint:mnd + go func() { + defer wg.Done() + scanner := bufio.NewScanner(stdout) + for scanner.Scan() { + fmt.Println(scanner.Text()) + } + }() + go func() { + defer wg.Done() + scanner := bufio.NewScanner(stderr) + for scanner.Scan() { + fmt.Fprintln(os.Stderr, scanner.Text()) + } + }() + + // Run health check in background + healthURL, healthComp := s.healthEndpoint() + go func() { + err := checkWebserverHealth(healthURL, waitTime, healthComp) + if err != nil { + fmt.Fprintf(os.Stderr, "\n%s\n", err.Error()) + return + } + bullet := ansi.Cyan("\u27A4") + " " + uiURL := "http://localhost:" + defaultStandalonePort + fmt.Println("\n" + ansi.Green("\u2714") + " Airflow is ready!") + fmt.Printf("%sAirflow UI: %s\n", bullet, ansi.Bold(uiURL)) + if user, pass := s.readCredentials(); user != "" { + fmt.Printf("%sUsername: %s\n", bullet, ansi.Bold(user)) + fmt.Printf("%sPassword: %s\n", bullet, ansi.Bold(pass)) + } + fmt.Println() + }() + + // Wait for the process to complete + wg.Wait() + err = cmd.Wait() + if err != nil { + // If the process was killed by a signal (e.g. Ctrl+C), don't treat it as an error + var exitErr *exec.ExitError + if errors.As(err, &exitErr) { + if exitErr.ExitCode() == -1 { + fmt.Println("\nAirflow standalone stopped.") + return nil + } + } + return fmt.Errorf("airflow standalone exited with error: %w", err) + } + + fmt.Println("\nAirflow standalone stopped.") + return nil +} + +// startBackground runs the airflow process in the background, writes a PID file, +// runs the health check, and returns. +func (s *Standalone) startBackground(cmd *exec.Cmd, waitTime time.Duration) error { + // Check if already running + if pid, alive := s.readPID(); alive { + return fmt.Errorf("standalone Airflow is already running (PID %d). Run 'astro dev local stop' first", pid) + } + + // Open log file for writing + logPath := s.logFilePath() + logFile, err := os.Create(logPath) + if err != nil { + return fmt.Errorf("error creating log file: %w", err) + } + defer logFile.Close() + + cmd.Stdout = logFile + cmd.Stderr = logFile + + err = startCommand(cmd) + if err != nil { + return fmt.Errorf("error starting airflow standalone: %w", err) + } + + // Write PID file + err = os.WriteFile(s.pidFilePath(), []byte(fmt.Sprintf("%d", cmd.Process.Pid)), filePermissions) + if err != nil { + // Kill the process if we can't write the PID file + syscall.Kill(-cmd.Process.Pid, syscall.SIGTERM) //nolint:errcheck + return fmt.Errorf("error writing PID file: %w", err) + } + + // Run health check (blocking — wait for healthy or timeout) + healthURL, healthComp := s.healthEndpoint() + err = checkWebserverHealth(healthURL, waitTime, healthComp) + if err != nil { + return fmt.Errorf("airflow did not become healthy: %w", err) + } + + bullet := ansi.Cyan("\u27A4") + " " + uiURL := "http://localhost:" + defaultStandalonePort + fmt.Printf("\n%s Airflow is ready! (PID %d)\n", ansi.Green("\u2714"), cmd.Process.Pid) + fmt.Printf("%sAirflow UI: %s\n", bullet, ansi.Bold(uiURL)) + if user, pass := s.readCredentials(); user != "" { + fmt.Printf("%sUsername: %s\n", bullet, ansi.Bold(user)) + fmt.Printf("%sPassword: %s\n", bullet, ansi.Bold(pass)) + } + fmt.Printf("%sView logs: %s\n", bullet, ansi.Bold("astro dev local logs -f")) + fmt.Printf("%sStop: %s\n", bullet, ansi.Bold("astro dev local stop")) + + return nil +} + +// healthEndpoint returns the health check URL and component name. +func (s *Standalone) healthEndpoint() (url, component string) { + return "http://localhost:" + defaultStandalonePort + "/api/v2/monitor/health", "api-server" +} + +// ensureCredentials seeds the SimpleAuthManager passwords file with admin:admin +// if it doesn't already exist. Airflow's init() uses "a+" mode, so if the entry +// is already present it won't be overwritten — existing passwords survive a restart, +// but a fresh environment always gets the predictable admin/admin default. +func (s *Standalone) ensureCredentials() error { + path := s.passwordsFilePath() + if _, err := os.Stat(path); err == nil { + return nil // already exists, leave it alone + } + // Ensure .astro/standalone/ exists before writing into it + if err := os.MkdirAll(filepath.Dir(path), dirPermissions); err != nil { + return err + } + creds := map[string]string{standaloneAdminUser: standaloneAdminPassword} + data, err := json.Marshal(creds) + if err != nil { + return err + } + return os.WriteFile(path, data, filePermissions) +} + +// readCredentials reads the SimpleAuthManager password file and returns (username, password). +// Returns empty strings if the file doesn't exist or can't be parsed. +func (s *Standalone) readCredentials() (username, password string) { + data, err := osReadFile(s.passwordsFilePath()) + if err != nil { + return "", "" + } + var creds map[string]string + if err := json.Unmarshal(data, &creds); err != nil { + return "", "" + } + for u, p := range creds { + return u, p + } + return "", "" +} + +// getConstraints fetches pip constraints and freeze files from the CDN. +// The constraints file (small, 3 version pins) is used to extract version info. +// The freeze file (full package list) is used as pip constraints for the install. +// Both are cached in .astro/standalone/. +func (s *Standalone) getConstraints(tag, pythonVersion string) (freezePath, airflowVersion, taskSDKVersion string, err error) { + constraintsDir := filepath.Join(s.airflowHome, standaloneDir) + constraintsFile := filepath.Join(constraintsDir, fmt.Sprintf("constraints-%s-python-%s.txt", tag, pythonVersion)) + freezeFile := filepath.Join(constraintsDir, fmt.Sprintf("freeze-%s-python-%s.txt", tag, pythonVersion)) + + // Check cache — both files must exist + constraintsCached, _ := fileutil.Exists(constraintsFile, nil) + freezeCached, _ := fileutil.Exists(freezeFile, nil) + if constraintsCached && freezeCached { + airflowVersion, err = parseAirflowVersionFromConstraints(constraintsFile) + if err == nil && airflowVersion != "" { + taskSDKVersion, _ = parsePackageVersionFromConstraints(constraintsFile, "apache-airflow-task-sdk") + return freezeFile, airflowVersion, taskSDKVersion, nil + } + } + + // Create directory + err = os.MkdirAll(constraintsDir, dirPermissions) + if err != nil { + return "", "", "", fmt.Errorf("error creating standalone directory: %w", err) + } + + // Fetch constraints file (small — version pins only, used for parsing) + constraintsURL := fmt.Sprintf("%s/runtime-%s-python-%s.txt", constraintsBaseURL, tag, pythonVersion) + constraintsContent, fetchErr := fetchConstraintsURL(constraintsURL) + if fetchErr != nil { + return "", "", "", fmt.Errorf("error fetching constraints from %s: %w", constraintsURL, fetchErr) + } + if err = os.WriteFile(constraintsFile, []byte(constraintsContent), filePermissions); err != nil { + return "", "", "", fmt.Errorf("error caching constraints file: %w", err) + } + + // Fetch freeze file (full package list, used as pip -c constraints) + freezeURL := fmt.Sprintf("%s/runtime-%s-python-%s.txt", freezeBaseURL, tag, pythonVersion) + freezeContent, fetchErr := fetchConstraintsURL(freezeURL) + if fetchErr != nil { + return "", "", "", fmt.Errorf("error fetching freeze file from %s: %w", freezeURL, fetchErr) + } + if err = os.WriteFile(freezeFile, []byte(freezeContent), filePermissions); err != nil { + return "", "", "", fmt.Errorf("error caching freeze file: %w", err) + } + + airflowVersion, err = parseAirflowVersionFromConstraints(constraintsFile) + if err != nil { + return "", "", "", err + } + + taskSDKVersion, _ = parsePackageVersionFromConstraints(constraintsFile, "apache-airflow-task-sdk") + + return freezeFile, airflowVersion, taskSDKVersion, nil +} + +// fetchConstraintsURL fetches constraints from a URL and returns the body as a string. +var fetchConstraintsURL = func(url string) (string, error) { + resp, err := http.Get(url) //nolint:gosec,noctx + if err != nil { + return "", err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return "", fmt.Errorf("failed to fetch constraints: HTTP %d", resp.StatusCode) + } + body, err := io.ReadAll(resp.Body) + if err != nil { + return "", err + } + return string(body), nil +} + +// parsePackageVersionFromConstraints reads a constraints file and extracts the version for a given package. +func parsePackageVersionFromConstraints(constraintsFile, packageName string) (string, error) { + data, err := os.ReadFile(constraintsFile) + if err != nil { + return "", fmt.Errorf("error reading constraints file: %w", err) + } + + prefix := packageName + "==" + for _, line := range strings.Split(string(data), "\n") { + line = strings.TrimSpace(line) + if strings.HasPrefix(line, prefix) { + return strings.TrimPrefix(line, prefix), nil + } + } + return "", fmt.Errorf("could not find %s version in constraints file", packageName) +} + +// parseAirflowVersionFromConstraints reads a constraints file and extracts the apache-airflow version. +func parseAirflowVersionFromConstraints(constraintsFile string) (string, error) { + return parsePackageVersionFromConstraints(constraintsFile, "apache-airflow") +} + +// buildEnv constructs the environment variables for the standalone process. +func (s *Standalone) buildEnv() []string { + venvBin := filepath.Join(s.airflowHome, ".venv", "bin") + + // Point AIRFLOW_HOME at .astro/standalone/ so all Airflow-generated files + // (airflow.cfg, airflow.db, logs/) land there rather than in the project root. + // DAGS_FOLDER is pinned back to the project root so DAGs are still discovered. + standaloneHome := filepath.Join(s.airflowHome, standaloneDir) + + // Build our override map — these take precedence over the inherited env. + overrides := map[string]string{ + "PATH": fmt.Sprintf("%s:%s", venvBin, os.Getenv("PATH")), + "AIRFLOW_HOME": standaloneHome, + "ASTRONOMER_ENVIRONMENT": "local", + "AIRFLOW__CORE__LOAD_EXAMPLES": "False", + "AIRFLOW__CORE__DAGS_FOLDER": filepath.Join(s.airflowHome, "dags"), + "AIRFLOW__CORE__SIMPLE_AUTH_MANAGER_USERS": standaloneAdminUser + ":admin", + "AIRFLOW__CORE__SIMPLE_AUTH_MANAGER_PASSWORDS_FILE": s.passwordsFilePath(), + } + + // Load .env file if it exists — these also override inherited env. + envFilePath := s.envFile + if envFilePath == "" { + envFilePath = filepath.Join(s.airflowHome, ".env") + } + if envVars, err := loadEnvFile(envFilePath); err == nil { + for _, kv := range envVars { + if idx := strings.IndexByte(kv, '='); idx >= 0 { + overrides[kv[:idx]] = kv[idx+1:] + } + } + } + + // Start with inherited env, filtering out keys we override. + env := make([]string, 0, len(os.Environ())+len(overrides)) + for _, kv := range os.Environ() { + if idx := strings.IndexByte(kv, '='); idx >= 0 { + if _, overridden := overrides[kv[:idx]]; overridden { + continue + } + } + env = append(env, kv) + } + + // Append our overrides. + for k, v := range overrides { + env = append(env, fmt.Sprintf("%s=%s", k, v)) + } + + return env +} + +// loadEnvFile reads a .env file and returns key=value pairs. +func loadEnvFile(path string) ([]string, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, err + } + + var envVars []string + for _, line := range strings.Split(string(data), "\n") { + line = strings.TrimSpace(line) + if line == "" || strings.HasPrefix(line, "#") { + continue + } + if strings.Contains(line, "=") { + envVars = append(envVars, line) + } + } + return envVars, nil +} + +// applySettings imports airflow_settings.yaml using airflow CLI commands run via the venv. +func (s *Standalone) applySettings(settingsFile string, envConns map[string]astrocore.EnvironmentObjectConnection) error { + settingsExists, err := fileutil.Exists(settingsFile, nil) + if err != nil || !settingsExists { + if len(envConns) == 0 { + return nil + } + } + + // Temporarily swap the execAirflowCommand to use venv instead of docker + origExec := settings.SetExecAirflowCommand(s.standaloneExecAirflowCommand) + defer settings.SetExecAirflowCommand(origExec) + + return settings.ConfigSettings("standalone", settingsFile, envConns, 3, true, true, true) //nolint:mnd +} + +// standaloneExecAirflowCommand runs an airflow command via the local venv. +func (s *Standalone) standaloneExecAirflowCommand(_, command string) (string, error) { + env := s.buildEnv() + venvBash := filepath.Join(s.airflowHome, ".venv", "bin", "bash") + + cmd := exec.Command(venvBash, "-c", command) //nolint:gosec + cmd.Dir = s.airflowHome + cmd.Env = env + + out, err := cmd.CombinedOutput() + if err != nil { + return string(out), fmt.Errorf("error running airflow command: %w", err) + } + return string(out), nil +} + +// readPID reads the PID file and checks if the process is alive. +// Returns the PID and true if the process is running, or 0 and false otherwise. +func (s *Standalone) readPID() (int, bool) { + data, err := osReadFile(s.pidFilePath()) + if err != nil { + return 0, false + } + + pid := 0 + if _, err := fmt.Sscanf(strings.TrimSpace(string(data)), "%d", &pid); err != nil || pid <= 0 { + return 0, false + } + + // Check if process is alive + proc, err := osFindProcess(pid) + if err != nil { + return pid, false + } + // On Unix, FindProcess always succeeds; use signal 0 to probe. + if err := proc.Signal(syscall.Signal(0)); err != nil { + return pid, false + } + return pid, true +} + +// Stop terminates the standalone Airflow process. +func (s *Standalone) Stop(_ bool) error { + pid, alive := s.readPID() + if pid == 0 { + fmt.Println("No standalone Airflow process found.") + return nil + } + + if !alive { + // Stale PID file — clean up + os.Remove(s.pidFilePath()) + fmt.Println("No standalone Airflow process found (cleaned up stale PID file).") + return nil + } + + // Send SIGTERM to the process group + fmt.Printf("Stopping Airflow standalone (PID %d)…\n", pid) + syscall.Kill(-pid, syscall.SIGTERM) //nolint:errcheck + + // Poll for process exit + deadline := time.Now().Add(stopTimeout) + for time.Now().Before(deadline) { + time.Sleep(stopPollInterval) + if _, stillAlive := s.readPID(); !stillAlive { + break + } + } + + // If still alive, send SIGKILL + if _, stillAlive := s.readPID(); stillAlive { + syscall.Kill(-pid, syscall.SIGKILL) //nolint:errcheck + time.Sleep(stopPollInterval) + } + + os.Remove(s.pidFilePath()) + fmt.Println("Airflow standalone stopped.") + return nil +} + +// Kill stops a running process (if any) and cleans up standalone state files. +func (s *Standalone) Kill() error { + // Stop the running process first + s.Stop(false) //nolint:errcheck + + sp := spinner.NewSpinner("Cleaning up standalone environment…") + sp.Start() + defer sp.Stop() + + // Remove venv and the entire standaloneDir (.astro/standalone/). + // Since AIRFLOW_HOME points at standaloneDir, all Airflow-generated files + // (airflow.cfg, airflow.db, logs/, passwords file, constraint caches) live + // there and are cleaned up in one shot. + pathsToRemove := []string{ + filepath.Join(s.airflowHome, ".venv"), + filepath.Join(s.airflowHome, standaloneDir), + } + + for _, p := range pathsToRemove { + if exists, _ := fileutil.Exists(p, nil); exists { + os.RemoveAll(p) + } + } + + spinner.StopWithCheckmark(sp, "Standalone environment cleaned up") + return nil +} + +// Stub methods — not supported in standalone mode. + +// PS reports the status of the standalone Airflow process. +func (s *Standalone) PS() error { + pid, alive := s.readPID() + if alive { + fmt.Printf("Airflow standalone is running (PID %d)\n", pid) + } else { + fmt.Println("Airflow standalone is not running.") + } + return nil +} + +// Logs streams the standalone Airflow log file. +func (s *Standalone) Logs(follow bool, _ ...string) error { + logPath := s.logFilePath() + if _, err := os.Stat(logPath); os.IsNotExist(err) { + return fmt.Errorf("no log file found at %s — has standalone been started?", logPath) + } + + if !follow { + data, err := osReadFile(logPath) + if err != nil { + return fmt.Errorf("error reading log file: %w", err) + } + fmt.Print(string(data)) + return nil + } + + // Follow mode: read existing content then poll for new data + f, err := os.Open(logPath) + if err != nil { + return fmt.Errorf("error opening log file: %w", err) + } + defer f.Close() + + reader := bufio.NewReader(f) + + // Set up signal handling so Ctrl+C exits cleanly + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + defer signal.Stop(sigChan) + + for { + line, err := reader.ReadString('\n') + if line != "" { + fmt.Print(line) + } + if err != nil { + // At EOF, poll for new data + select { + case <-sigChan: + return nil + case <-time.After(stopPollInterval): + continue + } + } + } +} + +func (s *Standalone) Build(_, _ string, _ bool) error { + return errStandaloneNotSupported +} + +func (s *Standalone) Run(_ []string, _ string) error { + return errStandaloneNotSupported +} + +func (s *Standalone) Bash(_ string) error { + return errStandaloneNotSupported +} + +func (s *Standalone) RunDAG(_, _, _, _ string, _, _ bool) error { + return errStandaloneNotSupported +} + +func (s *Standalone) ImportSettings(_, _ string, _, _, _ bool) error { + return errStandaloneNotSupported +} + +func (s *Standalone) ExportSettings(_, _ string, _, _, _, _ bool) error { + return errStandaloneNotSupported +} + +func (s *Standalone) ComposeExport(_, _ string) error { + return errStandaloneNotSupported +} + +func (s *Standalone) Pytest(_, _, _, _, _ string) (string, error) { + return "", errStandaloneNotSupported +} + +func (s *Standalone) Parse(_, _, _ string) error { + return errStandaloneNotSupported +} + +func (s *Standalone) UpgradeTest(_, _, _, _ string, _, _, _, _, _ bool, _ string, _ astroplatformcore.ClientWithResponsesInterface) error { + return errStandaloneNotSupported +} + +// execCommand runs a command in the given directory. +func execCommand(dir, name string, args ...string) error { + cmd := exec.Command(name, args...) //nolint:gosec + cmd.Dir = dir + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + return cmd.Run() +} + +// startCmd starts a command without waiting for it to finish. +func startCmd(cmd *exec.Cmd) error { + return cmd.Start() +} diff --git a/airflow/local_test.go b/airflow/local_test.go new file mode 100644 index 000000000..4cf96ad21 --- /dev/null +++ b/airflow/local_test.go @@ -0,0 +1,956 @@ +package airflow + +import ( + "fmt" + "os" + "os/exec" + "path/filepath" + "strings" + "syscall" + "testing" + "time" + + astrocore "github.com/astronomer/astro-cli/astro-client-core" + "github.com/astronomer/astro-cli/docker" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func (s *Suite) TestStandaloneInit() { + handler, err := StandaloneInit("/tmp/test", ".env", "Dockerfile") + s.NoError(err) + s.NotNil(handler) + s.Equal("/tmp/test", handler.airflowHome) + s.Equal(".env", handler.envFile) + s.Equal("Dockerfile", handler.dockerfile) +} + +func (s *Suite) TestStandaloneHandlerInit() { + handler, err := StandaloneHandlerInit("/tmp/test", ".env", "Dockerfile", "project") + s.NoError(err) + s.NotNil(handler) +} + +func (s *Suite) TestStandaloneStart_Airflow2Rejected() { + // Airflow 2 runtime versions (old format like 12.0.0) should be rejected + origParseFile := standaloneParseFile + defer func() { standaloneParseFile = origParseFile }() + + standaloneParseFile = func(filename string) ([]docker.Command, error) { + return []docker.Command{ + {Cmd: "from", Value: []string{"quay.io/astronomer/astro-runtime:12.0.0"}}, + }, nil + } + + handler, err := StandaloneInit("/tmp/test", ".env", "Dockerfile") + s.NoError(err) + + err = handler.Start("", "airflow_settings.yaml", "", "", false, false, 1*time.Minute, nil) + s.Error(err) + s.Equal(errUnsupportedAirflowVersion, err) +} + +func (s *Suite) TestStandaloneStart_UnsupportedVersion() { + origParseFile := standaloneParseFile + origGetImageTag := standaloneGetImageTag + defer func() { + standaloneParseFile = origParseFile + standaloneGetImageTag = origGetImageTag + }() + + standaloneParseFile = func(filename string) ([]docker.Command, error) { + return []docker.Command{ + {Cmd: "from", Value: []string{"some-image:unknown-tag"}}, + }, nil + } + standaloneGetImageTag = func(cmds []docker.Command) (string, string) { + return "some-image", "unknown-tag" + } + + handler, err := StandaloneInit("/tmp/test", ".env", "Dockerfile") + s.NoError(err) + + err = handler.Start("", "airflow_settings.yaml", "", "", false, false, 1*time.Minute, nil) + s.Error(err) + s.Equal(errUnsupportedAirflowVersion, err) +} + +func (s *Suite) TestStandaloneStart_MissingUV() { + // Mock parseFile to return an Airflow 3 runtime image + origParseFile := standaloneParseFile + origLookPath := lookPath + defer func() { + standaloneParseFile = origParseFile + lookPath = origLookPath + }() + + standaloneParseFile = func(filename string) ([]docker.Command, error) { + return []docker.Command{ + {Cmd: "from", Value: []string{"astrocrpublic.azurecr.io/runtime:3.1-12"}}, + }, nil + } + + lookPath = func(file string) (string, error) { + return "", &exec.Error{Name: file, Err: exec.ErrNotFound} + } + + handler, err := StandaloneInit("/tmp/test", ".env", "Dockerfile") + s.NoError(err) + + err = handler.Start("", "airflow_settings.yaml", "", "", false, false, 1*time.Minute, nil) + s.Error(err) + s.Equal(errUVNotFound, err) +} + +func (s *Suite) TestStandaloneStart_DockerfileParseError() { + origParseFile := standaloneParseFile + defer func() { standaloneParseFile = origParseFile }() + + standaloneParseFile = func(filename string) ([]docker.Command, error) { + return nil, docker.IOError{Msg: "file not found"} + } + + handler, err := StandaloneInit("/tmp/test", ".env", "Dockerfile") + s.NoError(err) + + err = handler.Start("", "airflow_settings.yaml", "", "", false, false, 1*time.Minute, nil) + s.Error(err) + s.Contains(err.Error(), "error parsing Dockerfile") +} + +func (s *Suite) TestStandaloneStart_EmptyTag() { + origParseFile := standaloneParseFile + defer func() { standaloneParseFile = origParseFile }() + + standaloneParseFile = func(filename string) ([]docker.Command, error) { + return []docker.Command{ + {Cmd: "run", Value: []string{"echo hello"}}, + }, nil + } + + handler, err := StandaloneInit("/tmp/test", ".env", "Dockerfile") + s.NoError(err) + + err = handler.Start("", "airflow_settings.yaml", "", "", false, false, 1*time.Minute, nil) + s.Error(err) + s.Contains(err.Error(), "could not determine runtime version") +} + +func (s *Suite) TestStandaloneStubMethods() { + handler, err := StandaloneInit("/tmp/test", ".env", "Dockerfile") + s.NoError(err) + + s.Equal(errStandaloneNotSupported, handler.Run(nil, "")) + s.Equal(errStandaloneNotSupported, handler.Bash("")) + s.Equal(errStandaloneNotSupported, handler.RunDAG("", "", "", "", false, false)) + s.Equal(errStandaloneNotSupported, handler.ImportSettings("", "", false, false, false)) + s.Equal(errStandaloneNotSupported, handler.ExportSettings("", "", false, false, false, false)) + s.Equal(errStandaloneNotSupported, handler.ComposeExport("", "")) + + _, pytestErr := handler.Pytest("", "", "", "", "") + s.Equal(errStandaloneNotSupported, pytestErr) + + s.Equal(errStandaloneNotSupported, handler.Parse("", "", "")) + s.Equal(errStandaloneNotSupported, handler.UpgradeTest("", "", "", "", false, false, false, false, false, "", nil)) +} + +func (s *Suite) TestStandaloneStop_NoPIDFile() { + tmpDir, err := os.MkdirTemp("", "standalone-stop-test") + s.NoError(err) + defer os.RemoveAll(tmpDir) + + handler, err := StandaloneInit(tmpDir, ".env", "Dockerfile") + s.NoError(err) + + // No PID file exists — should handle gracefully + err = handler.Stop(false) + s.NoError(err) +} + +func (s *Suite) TestStandaloneStop_StalePID() { + tmpDir, err := os.MkdirTemp("", "standalone-stop-test") + s.NoError(err) + defer os.RemoveAll(tmpDir) + + // Create standalone dir and a PID file with a non-existent PID + standaloneStateDir := filepath.Join(tmpDir, ".astro", "standalone") + err = os.MkdirAll(standaloneStateDir, 0o755) + s.NoError(err) + err = os.WriteFile(filepath.Join(standaloneStateDir, "airflow.pid"), []byte("999999999"), 0o644) + s.NoError(err) + + handler, err := StandaloneInit(tmpDir, ".env", "Dockerfile") + s.NoError(err) + + err = handler.Stop(false) + s.NoError(err) + + // PID file should be cleaned up + _, err = os.Stat(filepath.Join(standaloneStateDir, "airflow.pid")) + s.True(os.IsNotExist(err)) +} + +func (s *Suite) TestStandaloneKill() { + // Create a temp directory with some files to clean up + tmpDir, err := os.MkdirTemp("", "standalone-kill-test") + s.NoError(err) + defer os.RemoveAll(tmpDir) + + // AIRFLOW_HOME is .astro/standalone/, so airflow.cfg, airflow.db, logs/ all + // live inside standaloneStateDir and are cleaned up when it is removed. + venvDir := filepath.Join(tmpDir, ".venv") + standaloneStateDir := filepath.Join(tmpDir, ".astro", "standalone") + // Simulate Airflow-generated files inside standaloneDir + dbFile := filepath.Join(standaloneStateDir, "airflow.db") + logsDir := filepath.Join(standaloneStateDir, "logs") + + err = os.MkdirAll(venvDir, 0o755) + s.NoError(err) + err = os.MkdirAll(standaloneStateDir, 0o755) + s.NoError(err) + err = os.WriteFile(dbFile, []byte("test"), 0o644) + s.NoError(err) + err = os.MkdirAll(logsDir, 0o755) + s.NoError(err) + + handler, err := StandaloneInit(tmpDir, ".env", "Dockerfile") + s.NoError(err) + + err = handler.Kill() + s.NoError(err) + + // venv and entire standaloneDir (including db and logs inside it) removed + _, err = os.Stat(venvDir) + s.True(os.IsNotExist(err)) + _, err = os.Stat(standaloneStateDir) + s.True(os.IsNotExist(err)) +} + +func TestParseRuntimeTagPython(t *testing.T) { + tests := []struct { + tag string + wantBase string + wantPython string + }{ + {"3.1-12", "3.1-12", "3.12"}, + {"3.1-12-python-3.11", "3.1-12", "3.11"}, + {"3.1-12-python-3.11-base", "3.1-12", "3.11"}, + {"3.1-12-base", "3.1-12", "3.12"}, + {"3.2-1-python-3.13", "3.2-1", "3.13"}, + } + for _, tt := range tests { + t.Run(tt.tag, func(t *testing.T) { + base, python := parseRuntimeTagPython(tt.tag) + assert.Equal(t, tt.wantBase, base) + assert.Equal(t, tt.wantPython, python) + }) + } +} + +func TestParseAirflowVersionFromConstraints(t *testing.T) { + // Create a temp file with constraints + tmpDir, err := os.MkdirTemp("", "constraints-test") + require.NoError(t, err) + defer os.RemoveAll(tmpDir) + + constraintsFile := filepath.Join(tmpDir, "constraints.txt") + content := `something-else==1.0.0 +apache-airflow==3.0.0 +another-package==2.0.0` + err = os.WriteFile(constraintsFile, []byte(content), 0o644) + require.NoError(t, err) + + version, err := parseAirflowVersionFromConstraints(constraintsFile) + assert.NoError(t, err) + assert.Equal(t, "3.0.0", version) +} + +func TestParseAirflowVersionFromConstraints_NotFound(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "constraints-test") + require.NoError(t, err) + defer os.RemoveAll(tmpDir) + + constraintsFile := filepath.Join(tmpDir, "constraints.txt") + content := `something-else==1.0.0 +another-package==2.0.0` + err = os.WriteFile(constraintsFile, []byte(content), 0o644) + require.NoError(t, err) + + _, err = parseAirflowVersionFromConstraints(constraintsFile) + assert.Error(t, err) + assert.Contains(t, err.Error(), "could not find apache-airflow version") +} + +func TestLoadEnvFile(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "envfile-test") + require.NoError(t, err) + defer os.RemoveAll(tmpDir) + + envFilePath := filepath.Join(tmpDir, ".env") + content := `# Comment +FOO=bar +BAZ=qux + +# Another comment +EMPTY=` + err = os.WriteFile(envFilePath, []byte(content), 0o644) + require.NoError(t, err) + + envVars, err := loadEnvFile(envFilePath) + assert.NoError(t, err) + assert.Contains(t, envVars, "FOO=bar") + assert.Contains(t, envVars, "BAZ=qux") + assert.Contains(t, envVars, "EMPTY=") + assert.Len(t, envVars, 3) +} + +func TestLoadEnvFile_NotFound(t *testing.T) { + _, err := loadEnvFile("/nonexistent/.env") + assert.Error(t, err) +} + +func TestLoadEnvFile_ValueWithEquals(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "envfile-test") + require.NoError(t, err) + defer os.RemoveAll(tmpDir) + + envFilePath := filepath.Join(tmpDir, ".env") + content := `DB_URL=postgres://user:pass@host:5432/db?sslmode=require` + err = os.WriteFile(envFilePath, []byte(content), 0o644) + require.NoError(t, err) + + envVars, err := loadEnvFile(envFilePath) + assert.NoError(t, err) + assert.Len(t, envVars, 1) + assert.Equal(t, "DB_URL=postgres://user:pass@host:5432/db?sslmode=require", envVars[0]) +} + +func (s *Suite) TestStandaloneBuildEnv() { + handler, err := StandaloneInit("/tmp/test-project", "", "Dockerfile") + s.NoError(err) + + env := handler.buildEnv() + + // Check that key env vars are present + envMap := make(map[string]string) + for _, e := range env { + parts := splitEnvVar(e) + if parts != nil { + envMap[parts[0]] = parts[1] + } + } + + s.Equal("/tmp/test-project/.astro/standalone", envMap["AIRFLOW_HOME"]) + s.Equal("local", envMap["ASTRONOMER_ENVIRONMENT"]) + s.Equal("False", envMap["AIRFLOW__CORE__LOAD_EXAMPLES"]) + s.Equal("/tmp/test-project/dags", envMap["AIRFLOW__CORE__DAGS_FOLDER"]) + s.Contains(envMap["PATH"], "/tmp/test-project/.venv/bin") +} + +func (s *Suite) TestStandaloneBuildEnv_NoDuplicateKeys() { + handler, err := StandaloneInit("/tmp/test-project", "", "Dockerfile") + s.NoError(err) + + env := handler.buildEnv() + + // Count occurrences of each key — should all be exactly 1 + keyCounts := make(map[string]int) + for _, e := range env { + parts := splitEnvVar(e) + if parts != nil { + keyCounts[parts[0]]++ + } + } + + for key, count := range keyCounts { + s.Equalf(1, count, "env var %q appears %d times, expected exactly 1", key, count) + } +} + +func (s *Suite) TestStandaloneBuildEnv_WithEnvFile() { + tmpDir, err := os.MkdirTemp("", "standalone-buildenv-test") + s.NoError(err) + defer os.RemoveAll(tmpDir) + + // Write a .env file with an override and a custom var + envContent := "ASTRONOMER_ENVIRONMENT=custom\nMY_CUSTOM_VAR=hello\n" + err = os.WriteFile(filepath.Join(tmpDir, ".env"), []byte(envContent), 0o644) + s.NoError(err) + + handler, err := StandaloneInit(tmpDir, "", "Dockerfile") + s.NoError(err) + + env := handler.buildEnv() + + envMap := make(map[string]string) + for _, e := range env { + parts := splitEnvVar(e) + if parts != nil { + envMap[parts[0]] = parts[1] + } + } + + // .env should override our defaults + s.Equal("custom", envMap["ASTRONOMER_ENVIRONMENT"]) + s.Equal("hello", envMap["MY_CUSTOM_VAR"]) + // Other defaults should still be present + s.Equal(filepath.Join(tmpDir, ".astro", "standalone"), envMap["AIRFLOW_HOME"]) +} + +func splitEnvVar(s string) []string { + idx := indexOf(s, '=') + if idx < 0 { + return nil + } + return []string{s[:idx], s[idx+1:]} +} + +func indexOf(s string, c byte) int { + for i := range len(s) { + if s[i] == c { + return i + } + } + return -1 +} + +func (s *Suite) TestStandaloneGetConstraints_Cached() { + tmpDir, err := os.MkdirTemp("", "standalone-constraints-test") + s.NoError(err) + defer os.RemoveAll(tmpDir) + + // Pre-create both cached files + constraintsDir := filepath.Join(tmpDir, ".astro", "standalone") + err = os.MkdirAll(constraintsDir, 0o755) + s.NoError(err) + + constraintsFile := filepath.Join(constraintsDir, "constraints-3.1-12-python-3.12.txt") + content := "apache-airflow==3.0.1\napache-airflow-task-sdk==1.0.0\nother-package==1.0.0\n" + err = os.WriteFile(constraintsFile, []byte(content), 0o644) + s.NoError(err) + + freezeFile := filepath.Join(constraintsDir, "freeze-3.1-12-python-3.12.txt") + err = os.WriteFile(freezeFile, []byte("apache-airflow==3.0.1\nsome-dep==1.2.3\n"), 0o644) + s.NoError(err) + + handler, err := StandaloneInit(tmpDir, ".env", "Dockerfile") + s.NoError(err) + + path, version, taskSDKVersion, err := handler.getConstraints("3.1-12", defaultPythonVersion) + s.NoError(err) + s.Equal(freezeFile, path) + s.Equal("3.0.1", version) + s.Equal("1.0.0", taskSDKVersion) +} + +func (s *Suite) TestStandaloneGetConstraints_FetchesFromURL() { + tmpDir, err := os.MkdirTemp("", "standalone-constraints-test") + s.NoError(err) + defer os.RemoveAll(tmpDir) + + // Mock fetchConstraintsURL — called twice (constraints then freeze) + origFetch := fetchConstraintsURL + defer func() { fetchConstraintsURL = origFetch }() + + fetchConstraintsURL = func(url string) (string, error) { + s.Contains(url, "runtime-3.1-13-python-3.12.txt") + if strings.Contains(url, "runtime-constraints") { + return "apache-airflow==3.0.2\napache-airflow-task-sdk==1.0.0\n", nil + } + // freeze URL + return "apache-airflow==3.0.2\nsome-dep==1.2.3\n", nil + } + + handler, err := StandaloneInit(tmpDir, ".env", "Dockerfile") + s.NoError(err) + + path, version, taskSDKVersion, err := handler.getConstraints("3.1-13", defaultPythonVersion) + s.NoError(err) + s.Contains(path, "freeze-3.1-13-python-3.12.txt") + s.Equal("3.0.2", version) + s.Equal("1.0.0", taskSDKVersion) + + // Verify freeze file was cached + _, err = os.Stat(path) + s.NoError(err) +} + +func (s *Suite) TestStandaloneGetConstraints_FetchFails() { + tmpDir, err := os.MkdirTemp("", "standalone-constraints-test") + s.NoError(err) + defer os.RemoveAll(tmpDir) + + origFetch := fetchConstraintsURL + defer func() { fetchConstraintsURL = origFetch }() + + fetchConstraintsURL = func(url string) (string, error) { + return "", fmt.Errorf("network error") + } + + handler, err := StandaloneInit(tmpDir, ".env", "Dockerfile") + s.NoError(err) + + _, _, _, err = handler.getConstraints("3.1-99", defaultPythonVersion) //nolint:dogsled + s.Error(err) + s.Contains(err.Error(), "error fetching constraints") + s.Contains(err.Error(), "network error") +} + +func (s *Suite) TestStandaloneStart_VenvCreationFails() { + origParseFile := standaloneParseFile + origLookPath := lookPath + origRunCommand := runCommand + defer func() { + standaloneParseFile = origParseFile + lookPath = origLookPath + runCommand = origRunCommand + }() + + standaloneParseFile = func(filename string) ([]docker.Command, error) { + return []docker.Command{ + {Cmd: "from", Value: []string{"astrocrpublic.azurecr.io/runtime:3.1-12"}}, + }, nil + } + + lookPath = func(file string) (string, error) { + return "/usr/local/bin/uv", nil + } + + callCount := 0 + runCommand = func(dir, name string, args ...string) error { + callCount++ + if callCount == 1 { + // First call is "uv venv" — fail it + return fmt.Errorf("uv venv failed: python 3.12 not found") + } + return nil + } + + tmpDir, err := os.MkdirTemp("", "standalone-venv-fail") + s.NoError(err) + defer os.RemoveAll(tmpDir) + + // Pre-create cached constraints + freeze to skip URL fetch + constraintsDir := filepath.Join(tmpDir, ".astro", "standalone") + err = os.MkdirAll(constraintsDir, 0o755) + s.NoError(err) + err = os.WriteFile(filepath.Join(constraintsDir, "constraints-3.1-12-python-3.12.txt"), []byte("apache-airflow==3.0.1\napache-airflow-task-sdk==1.0.0\n"), 0o644) + s.NoError(err) + err = os.WriteFile(filepath.Join(constraintsDir, "freeze-3.1-12-python-3.12.txt"), []byte("apache-airflow==3.0.1\n"), 0o644) + s.NoError(err) + + handler, err := StandaloneInit(tmpDir, ".env", "Dockerfile") + s.NoError(err) + + err = handler.Start("", "airflow_settings.yaml", "", "", false, false, 1*time.Minute, nil) + s.Error(err) + s.Contains(err.Error(), "error creating virtual environment") +} + +func (s *Suite) TestStandaloneStart_InstallFails() { + origParseFile := standaloneParseFile + origLookPath := lookPath + origRunCommand := runCommand + defer func() { + standaloneParseFile = origParseFile + lookPath = origLookPath + runCommand = origRunCommand + }() + + standaloneParseFile = func(filename string) ([]docker.Command, error) { + return []docker.Command{ + {Cmd: "from", Value: []string{"astrocrpublic.azurecr.io/runtime:3.1-12"}}, + }, nil + } + + lookPath = func(file string) (string, error) { + return "/usr/local/bin/uv", nil + } + + callCount := 0 + runCommand = func(dir, name string, args ...string) error { + callCount++ + if callCount == 2 { + // Second call is "uv pip install" — fail it + return fmt.Errorf("pip install failed: network error") + } + return nil + } + + tmpDir, err := os.MkdirTemp("", "standalone-install-fail") + s.NoError(err) + defer os.RemoveAll(tmpDir) + + constraintsDir := filepath.Join(tmpDir, ".astro", "standalone") + err = os.MkdirAll(constraintsDir, 0o755) + s.NoError(err) + err = os.WriteFile(filepath.Join(constraintsDir, "constraints-3.1-12-python-3.12.txt"), []byte("apache-airflow==3.0.1\napache-airflow-task-sdk==1.0.0\n"), 0o644) + s.NoError(err) + err = os.WriteFile(filepath.Join(constraintsDir, "freeze-3.1-12-python-3.12.txt"), []byte("apache-airflow==3.0.1\n"), 0o644) + s.NoError(err) + + handler, err := StandaloneInit(tmpDir, ".env", "Dockerfile") + s.NoError(err) + + err = handler.Start("", "airflow_settings.yaml", "", "", false, false, 1*time.Minute, nil) + s.Error(err) + s.Contains(err.Error(), "error installing dependencies") +} + +func (s *Suite) TestStandaloneImplementsContainerHandler() { + handler, err := StandaloneInit("/tmp/test", ".env", "Dockerfile") + s.NoError(err) + + // Verify that Standalone implements ContainerHandler + var _ ContainerHandler = handler +} + +func (s *Suite) TestStandaloneStart_HappyPath() { + tmpDir, err := os.MkdirTemp("", "standalone-happy-test") + s.NoError(err) + defer os.RemoveAll(tmpDir) + + // Pre-create cached constraints + freeze + constraintsDir := filepath.Join(tmpDir, ".astro", "standalone") + err = os.MkdirAll(constraintsDir, 0o755) + s.NoError(err) + err = os.WriteFile(filepath.Join(constraintsDir, "constraints-3.1-12-python-3.12.txt"), []byte("apache-airflow==3.0.1\napache-airflow-task-sdk==1.0.0\n"), 0o644) + s.NoError(err) + err = os.WriteFile(filepath.Join(constraintsDir, "freeze-3.1-12-python-3.12.txt"), []byte("apache-airflow==3.0.1\n"), 0o644) + s.NoError(err) + + // Create a fake airflow binary that exits immediately + venvBin := filepath.Join(tmpDir, ".venv", "bin") + err = os.MkdirAll(venvBin, 0o755) + s.NoError(err) + airflowScript := filepath.Join(venvBin, "airflow") + err = os.WriteFile(airflowScript, []byte("#!/bin/sh\necho 'standalone started'\nexit 0\n"), 0o755) + s.NoError(err) + + // Mock all function variables + origParseFile := standaloneParseFile + origLookPath := lookPath + origRunCommand := runCommand + origCheckHealth := checkWebserverHealth + defer func() { + standaloneParseFile = origParseFile + lookPath = origLookPath + runCommand = origRunCommand + checkWebserverHealth = origCheckHealth + }() + + standaloneParseFile = func(filename string) ([]docker.Command, error) { + return []docker.Command{ + {Cmd: "from", Value: []string{"astrocrpublic.azurecr.io/runtime:3.1-12"}}, + }, nil + } + + lookPath = func(file string) (string, error) { + return "/usr/local/bin/uv", nil + } + + runCommand = func(dir, name string, args ...string) error { + return nil + } + + checkWebserverHealth = func(url string, timeout time.Duration, component string) error { + return nil + } + + handler, err := StandaloneInit(tmpDir, ".env", "Dockerfile") + s.NoError(err) + handler.SetForeground(true) // Use foreground mode for this test + + err = handler.Start("", "airflow_settings.yaml", "", "", false, false, 1*time.Minute, map[string]astrocore.EnvironmentObjectConnection(nil)) + s.NoError(err) +} + +func (s *Suite) TestStandaloneStart_Background() { + tmpDir, err := os.MkdirTemp("", "standalone-bg-test") + s.NoError(err) + defer os.RemoveAll(tmpDir) + + // Pre-create cached constraints + freeze and standalone dir + constraintsDir := filepath.Join(tmpDir, ".astro", "standalone") + err = os.MkdirAll(constraintsDir, 0o755) + s.NoError(err) + err = os.WriteFile(filepath.Join(constraintsDir, "constraints-3.1-12-python-3.12.txt"), []byte("apache-airflow==3.0.1\napache-airflow-task-sdk==1.0.0\n"), 0o644) + s.NoError(err) + err = os.WriteFile(filepath.Join(constraintsDir, "freeze-3.1-12-python-3.12.txt"), []byte("apache-airflow==3.0.1\n"), 0o644) + s.NoError(err) + + // Create a fake airflow binary that sleeps briefly then exits + venvBin := filepath.Join(tmpDir, ".venv", "bin") + err = os.MkdirAll(venvBin, 0o755) + s.NoError(err) + err = os.WriteFile(filepath.Join(venvBin, "airflow"), []byte("#!/bin/sh\necho 'standalone running'\nsleep 30\n"), 0o755) + s.NoError(err) + + origParseFile := standaloneParseFile + origLookPath := lookPath + origRunCommand := runCommand + origCheckHealth := checkWebserverHealth + defer func() { + standaloneParseFile = origParseFile + lookPath = origLookPath + runCommand = origRunCommand + checkWebserverHealth = origCheckHealth + }() + + standaloneParseFile = func(filename string) ([]docker.Command, error) { + return []docker.Command{ + {Cmd: "from", Value: []string{"astrocrpublic.azurecr.io/runtime:3.1-12"}}, + }, nil + } + lookPath = func(file string) (string, error) { return "/usr/local/bin/uv", nil } + runCommand = func(dir, name string, args ...string) error { return nil } + checkWebserverHealth = func(url string, timeout time.Duration, component string) error { return nil } + + handler, err := StandaloneInit(tmpDir, ".env", "Dockerfile") + s.NoError(err) + // Default is background mode (foreground = false) + + err = handler.Start("", "airflow_settings.yaml", "", "", false, false, 1*time.Minute, nil) + s.NoError(err) + + // Verify PID file was written + pidFilePath := filepath.Join(constraintsDir, "airflow.pid") + _, err = os.Stat(pidFilePath) + s.NoError(err) + + // Verify log file was created + logFilePath := filepath.Join(constraintsDir, "airflow.log") + _, err = os.Stat(logFilePath) + s.NoError(err) + + // Clean up the process + handler.Stop(false) //nolint:errcheck +} + +func (s *Suite) TestStandaloneStart_AlreadyRunning() { + tmpDir, err := os.MkdirTemp("", "standalone-already-running") + s.NoError(err) + defer os.RemoveAll(tmpDir) + + // Pre-create standalone dir, constraints + freeze, venv + constraintsDir := filepath.Join(tmpDir, ".astro", "standalone") + err = os.MkdirAll(constraintsDir, 0o755) + s.NoError(err) + err = os.WriteFile(filepath.Join(constraintsDir, "constraints-3.1-12-python-3.12.txt"), []byte("apache-airflow==3.0.1\napache-airflow-task-sdk==1.0.0\n"), 0o644) + s.NoError(err) + err = os.WriteFile(filepath.Join(constraintsDir, "freeze-3.1-12-python-3.12.txt"), []byte("apache-airflow==3.0.1\n"), 0o644) + s.NoError(err) + + venvBin := filepath.Join(tmpDir, ".venv", "bin") + err = os.MkdirAll(venvBin, 0o755) + s.NoError(err) + err = os.WriteFile(filepath.Join(venvBin, "airflow"), []byte("#!/bin/sh\nsleep 30\n"), 0o755) + s.NoError(err) + + // Write a PID file with our own PID (guaranteed to be alive) + err = os.WriteFile(filepath.Join(constraintsDir, "airflow.pid"), []byte(fmt.Sprintf("%d", os.Getpid())), 0o644) + s.NoError(err) + + origParseFile := standaloneParseFile + origLookPath := lookPath + origRunCommand := runCommand + defer func() { + standaloneParseFile = origParseFile + lookPath = origLookPath + runCommand = origRunCommand + }() + + standaloneParseFile = func(filename string) ([]docker.Command, error) { + return []docker.Command{ + {Cmd: "from", Value: []string{"astrocrpublic.azurecr.io/runtime:3.1-12"}}, + }, nil + } + lookPath = func(file string) (string, error) { return "/usr/local/bin/uv", nil } + runCommand = func(dir, name string, args ...string) error { return nil } + + handler, err := StandaloneInit(tmpDir, ".env", "Dockerfile") + s.NoError(err) + + err = handler.Start("", "airflow_settings.yaml", "", "", false, false, 1*time.Minute, nil) + s.Error(err) + s.Contains(err.Error(), "already running") +} + +func (s *Suite) TestStandaloneStop_Running() { + tmpDir, err := os.MkdirTemp("", "standalone-stop-running") + s.NoError(err) + defer os.RemoveAll(tmpDir) + + // Start a real background process that we can stop + standaloneStateDir := filepath.Join(tmpDir, ".astro", "standalone") + err = os.MkdirAll(standaloneStateDir, 0o755) + s.NoError(err) + + // Start a sleep process + cmd := exec.Command("sleep", "60") + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + err = cmd.Start() + s.NoError(err) + + // Write its PID + err = os.WriteFile(filepath.Join(standaloneStateDir, "airflow.pid"), []byte(fmt.Sprintf("%d", cmd.Process.Pid)), 0o644) + s.NoError(err) + + handler, err := StandaloneInit(tmpDir, ".env", "Dockerfile") + s.NoError(err) + + err = handler.Stop(false) + s.NoError(err) + + // PID file should be removed + _, err = os.Stat(filepath.Join(standaloneStateDir, "airflow.pid")) + s.True(os.IsNotExist(err)) +} + +func (s *Suite) TestStandaloneLogs() { + tmpDir, err := os.MkdirTemp("", "standalone-logs-test") + s.NoError(err) + defer os.RemoveAll(tmpDir) + + // Create a log file with some content + standaloneStateDir := filepath.Join(tmpDir, ".astro", "standalone") + err = os.MkdirAll(standaloneStateDir, 0o755) + s.NoError(err) + err = os.WriteFile(filepath.Join(standaloneStateDir, "airflow.log"), []byte("log line 1\nlog line 2\n"), 0o644) + s.NoError(err) + + handler, err := StandaloneInit(tmpDir, ".env", "Dockerfile") + s.NoError(err) + + // Non-follow mode should return immediately + err = handler.Logs(false) + s.NoError(err) +} + +func (s *Suite) TestStandaloneLogs_NoFile() { + tmpDir, err := os.MkdirTemp("", "standalone-logs-nofile") + s.NoError(err) + defer os.RemoveAll(tmpDir) + + handler, err := StandaloneInit(tmpDir, ".env", "Dockerfile") + s.NoError(err) + + err = handler.Logs(false) + s.Error(err) + s.Contains(err.Error(), "no log file found") +} + +func (s *Suite) TestStandalonePS_NotRunning() { + tmpDir, err := os.MkdirTemp("", "standalone-ps-test") + s.NoError(err) + defer os.RemoveAll(tmpDir) + + handler, err := StandaloneInit(tmpDir, ".env", "Dockerfile") + s.NoError(err) + + // Should not error even when not running + err = handler.PS() + s.NoError(err) +} + +func (s *Suite) TestStandalonePS_Running() { + tmpDir, err := os.MkdirTemp("", "standalone-ps-running") + s.NoError(err) + defer os.RemoveAll(tmpDir) + + standaloneStateDir := filepath.Join(tmpDir, ".astro", "standalone") + err = os.MkdirAll(standaloneStateDir, 0o755) + s.NoError(err) + + // Write our own PID (guaranteed alive) + err = os.WriteFile(filepath.Join(standaloneStateDir, "airflow.pid"), []byte(fmt.Sprintf("%d", os.Getpid())), 0o644) + s.NoError(err) + + handler, err := StandaloneInit(tmpDir, ".env", "Dockerfile") + s.NoError(err) + + err = handler.PS() + s.NoError(err) +} + +func (s *Suite) TestStandaloneSetForeground() { + handler, err := StandaloneInit("/tmp/test", ".env", "Dockerfile") + s.NoError(err) + + s.False(handler.foreground) + handler.SetForeground(true) + s.True(handler.foreground) +} + +func (s *Suite) TestStandaloneEnsureCredentials() { + tmpDir, err := os.MkdirTemp("", "standalone-ensure-creds") + s.NoError(err) + defer os.RemoveAll(tmpDir) + + handler, err := StandaloneInit(tmpDir, ".env", "Dockerfile") + s.NoError(err) + + // File lives in .astro/standalone/, not the project root + credsPath := handler.passwordsFilePath() + s.Contains(credsPath, ".astro/standalone/") + + // File doesn't exist — ensureCredentials should create it with admin:admin + err = handler.ensureCredentials() + s.NoError(err) + + data, err := os.ReadFile(credsPath) + s.NoError(err) + s.Contains(string(data), `"admin"`) + + // Called again — should be a no-op (not overwrite existing file) + err = os.WriteFile(credsPath, []byte(`{"admin":"custompassword"}`), 0o644) + s.NoError(err) + err = handler.ensureCredentials() + s.NoError(err) + data, err = os.ReadFile(credsPath) + s.NoError(err) + s.Contains(string(data), "custompassword") // unchanged +} + +func (s *Suite) TestStandaloneReadCredentials() { + tmpDir, err := os.MkdirTemp("", "standalone-creds-test") + s.NoError(err) + defer os.RemoveAll(tmpDir) + + handler, err := StandaloneInit(tmpDir, ".env", "Dockerfile") + s.NoError(err) + + // No file — should return empty strings gracefully + user, pass := handler.readCredentials() + s.Equal("", user) + s.Equal("", pass) + + // Write a valid credentials file into .astro/standalone/ + credsPath := handler.passwordsFilePath() + err = os.MkdirAll(filepath.Dir(credsPath), 0o755) + s.NoError(err) + err = os.WriteFile(credsPath, []byte(`{"admin":"supersecret"}`), 0o644) + s.NoError(err) + + user, pass = handler.readCredentials() + s.Equal("admin", user) + s.Equal("supersecret", pass) +} + +func (s *Suite) TestStandaloneReadCredentials_InvalidJSON() { + tmpDir, err := os.MkdirTemp("", "standalone-creds-invalid") + s.NoError(err) + defer os.RemoveAll(tmpDir) + + handler, err := StandaloneInit(tmpDir, ".env", "Dockerfile") + s.NoError(err) + credsPath := handler.passwordsFilePath() + err = os.MkdirAll(filepath.Dir(credsPath), 0o755) + s.NoError(err) + err = os.WriteFile(credsPath, []byte(`not valid json`), 0o644) + s.NoError(err) + + user, pass := handler.readCredentials() + s.Equal("", user) + s.Equal("", pass) +} diff --git a/cmd/airflow.go b/cmd/airflow.go index 02ec38717..6d57ad826 100644 --- a/cmd/airflow.go +++ b/cmd/airflow.go @@ -122,6 +122,7 @@ astro dev init --remote-execution-enabled --remote-image-repository quay.io/acme // this is used to monkey patch the function in order to write unit test cases containerHandlerInit = airflow.ContainerHandlerInit + localHandlerInit = airflow.StandaloneHandlerInit getDefaultImageTag = airflowversions.GetDefaultImageTag projectNameUnique = airflow.ProjectNameUnique @@ -133,6 +134,7 @@ astro dev init --remote-execution-enabled --remote-image-repository quay.io/acme TemplateList = airflow.FetchTemplateList defaultWaitTime = 1 * time.Minute directoryPermissions uint32 = 0o755 + localForeground bool ) func newDevRootCmd(platformCoreClient astroplatformcore.CoreClient, astroCoreClient astrocore.CoreClient) *cobra.Command { @@ -165,6 +167,7 @@ func newDevRootCmd(platformCoreClient astroplatformcore.CoreClient, astroCoreCli newAirflowBashCmd(), newAirflowObjectRootCmd(), newAirflowUpgradeTestCmd(platformCoreClient), + newAirflowLocalCmd(astroCoreClient), ) return cmd } @@ -345,6 +348,66 @@ func newAirflowKillCmd() *cobra.Command { return cmd } +func newAirflowLocalCmd(astroCoreClient astrocore.CoreClient) *cobra.Command { + cmd := &cobra.Command{ + Use: "local", + Short: "Run Airflow locally without Docker", + Long: "Run Airflow locally without Docker using 'airflow standalone'. Requires 'uv' to be installed. By default the process is backgrounded; use --foreground to stream output in the terminal.", + // Override PersistentPreRunE so we don't require a container runtime. + PersistentPreRunE: SetupLogging, + PreRunE: EnsureLocalRuntime, + RunE: func(cmd *cobra.Command, args []string) error { + return airflowLocal(cmd, astroCoreClient) + }, + } + cmd.Flags().StringVarP(&envFile, "env", "e", ".env", "Location of file containing environment variables") + cmd.Flags().StringVarP(&settingsFile, "settings-file", "s", "airflow_settings.yaml", "Settings file from which to import airflow objects") + cmd.Flags().DurationVar(&waitTime, "wait", defaultWaitTime, "Duration to wait for the API server to become healthy") + cmd.Flags().StringVarP(&workspaceID, "workspace-id", "w", "", "ID of the Workspace to retrieve environment connections from") + cmd.Flags().StringVarP(&deploymentID, "deployment-id", "d", "", "ID of the Deployment to retrieve environment connections from") + cmd.Flags().BoolVarP(&localForeground, "foreground", "f", false, "Run in the foreground instead of backgrounding the process") + + cmd.AddCommand( + newAirflowLocalResetCmd(), + newAirflowLocalStopCmd(), + newAirflowLocalLogsCmd(), + ) + + return cmd +} + +func newAirflowLocalResetCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "reset", + Short: "Reset the local environment", + Long: "Reset the local environment by removing all generated files (.venv, cached constraints, airflow.db, logs). The next run of 'astro dev local' will start fresh.", + PreRunE: EnsureLocalRuntime, + RunE: airflowLocalReset, + } + return cmd +} + +func newAirflowLocalStopCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "stop", + Short: "Stop the local Airflow process", + PreRunE: EnsureLocalRuntime, + RunE: airflowLocalStop, + } + return cmd +} + +func newAirflowLocalLogsCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "logs", + Short: "View local Airflow logs", + PreRunE: EnsureLocalRuntime, + RunE: airflowLocalLogs, + } + cmd.Flags().BoolVarP(&followLogs, "follow", "f", false, "Follow log output") + return cmd +} + func newAirflowRestartCmd(astroCoreClient astrocore.CoreClient) *cobra.Command { cmd := &cobra.Command{ Use: "restart", @@ -756,6 +819,68 @@ func airflowStart(cmd *cobra.Command, args []string, astroCoreClient astrocore.C return containerHandler.Start(customImageName, settingsFile, composeFile, buildSecretString, noCache, noBrowser, waitTime, envConns) } +// airflowLocal starts Airflow locally without Docker. +func airflowLocal(cmd *cobra.Command, astroCoreClient astrocore.CoreClient) error { + cmd.SilenceUsage = true + + var envConns map[string]astrocore.EnvironmentObjectConnection + if workspaceID != "" || deploymentID != "" { + var err error + envConns, err = environment.ListConnections(workspaceID, deploymentID, astroCoreClient) + if err != nil { + return err + } + } + + containerHandler, err := localHandlerInit(config.WorkingPath, envFile, dockerfile, "") + if err != nil { + return err + } + + // Set foreground mode if the flag was provided + if sa, ok := containerHandler.(*airflow.Standalone); ok { + sa.SetForeground(localForeground) + } + + return containerHandler.Start("", settingsFile, "", "", false, false, waitTime, envConns) +} + +// airflowLocalReset removes local environment files. +func airflowLocalReset(cmd *cobra.Command, _ []string) error { + cmd.SilenceUsage = true + + containerHandler, err := localHandlerInit(config.WorkingPath, envFile, dockerfile, "") + if err != nil { + return err + } + + return containerHandler.Kill() +} + +// airflowLocalStop stops the local Airflow process. +func airflowLocalStop(cmd *cobra.Command, _ []string) error { + cmd.SilenceUsage = true + + containerHandler, err := localHandlerInit(config.WorkingPath, envFile, dockerfile, "") + if err != nil { + return err + } + + return containerHandler.Stop(false) +} + +// airflowLocalLogs streams the local Airflow log file. +func airflowLocalLogs(cmd *cobra.Command, _ []string) error { + cmd.SilenceUsage = true + + containerHandler, err := localHandlerInit(config.WorkingPath, envFile, dockerfile, "") + if err != nil { + return err + } + + return containerHandler.Logs(followLogs) +} + // airflowRun func airflowRun(cmd *cobra.Command, args []string) error { // Silence Usage as we have now validated command input diff --git a/cmd/airflow_hooks.go b/cmd/airflow_hooks.go index 9649e440e..5d80dedc7 100644 --- a/cmd/airflow_hooks.go +++ b/cmd/airflow_hooks.go @@ -47,6 +47,12 @@ func EnsureRuntime(cmd *cobra.Command, args []string) error { return containerRuntime.Initialize() } +// EnsureLocalRuntime is a pre-run hook for local mode. +// It ensures the project directory exists but skips Docker runtime initialization. +func EnsureLocalRuntime(cmd *cobra.Command, args []string) error { + return utils.EnsureProjectDir(cmd, args) +} + // SetRuntimeIfExists is a pre-run hook that ensures the project directory exists // and sets the container runtime if its running, otherwise we bail with an error message. func SetRuntimeIfExists(cmd *cobra.Command, args []string) error { diff --git a/cmd/airflow_test.go b/cmd/airflow_test.go index 69e412875..b7dc59f42 100644 --- a/cmd/airflow_test.go +++ b/cmd/airflow_test.go @@ -1693,6 +1693,145 @@ func (s *AirflowSuite) TestAirflowObjectExport() { }) } +func (s *AirflowSuite) TestAirflowLocal() { + s.Run("success", func() { + cmd := newAirflowLocalCmd(nil) + + mockContainerHandler := new(mocks.ContainerHandler) + localHandlerInit = func(airflowHome, envFile, dockerfile, imageName string) (airflow.ContainerHandler, error) { + mockContainerHandler.On("Start", "", "airflow_settings.yaml", "", "", false, false, defaultWaitTime, map[string]astrocore.EnvironmentObjectConnection(nil)).Return(nil).Once() + return mockContainerHandler, nil + } + + err := airflowLocal(cmd, nil) + s.NoError(err) + mockContainerHandler.AssertExpectations(s.T()) + }) + + s.Run("handler init error", func() { + cmd := newAirflowLocalCmd(nil) + + localHandlerInit = func(airflowHome, envFile, dockerfile, imageName string) (airflow.ContainerHandler, error) { + return nil, errMock + } + + err := airflowLocal(cmd, nil) + s.ErrorIs(err, errMock) + }) + + s.Run("start error", func() { + cmd := newAirflowLocalCmd(nil) + + mockContainerHandler := new(mocks.ContainerHandler) + localHandlerInit = func(airflowHome, envFile, dockerfile, imageName string) (airflow.ContainerHandler, error) { + mockContainerHandler.On("Start", "", "airflow_settings.yaml", "", "", false, false, defaultWaitTime, map[string]astrocore.EnvironmentObjectConnection(nil)).Return(errMock).Once() + return mockContainerHandler, nil + } + + err := airflowLocal(cmd, nil) + s.ErrorIs(err, errMock) + mockContainerHandler.AssertExpectations(s.T()) + }) + + s.Run("command exists", func() { + cmd := newAirflowLocalCmd(nil) + s.Equal("local", cmd.Use) + // Verify subcommands exist + resetCmd, _, err := cmd.Find([]string{"reset"}) + s.NoError(err) + s.Equal("reset", resetCmd.Use) + + stopCmd, _, err := cmd.Find([]string{"stop"}) + s.NoError(err) + s.Equal("stop", stopCmd.Use) + + logsCmd, _, err := cmd.Find([]string{"logs"}) + s.NoError(err) + s.Equal("logs", logsCmd.Use) + }) +} + +func (s *AirflowSuite) TestAirflowLocalReset() { + s.Run("success", func() { + cmd := newAirflowLocalCmd(nil) + + mockContainerHandler := new(mocks.ContainerHandler) + localHandlerInit = func(airflowHome, envFile, dockerfile, imageName string) (airflow.ContainerHandler, error) { + mockContainerHandler.On("Kill").Return(nil).Once() + return mockContainerHandler, nil + } + + err := airflowLocalReset(cmd, nil) + s.NoError(err) + mockContainerHandler.AssertExpectations(s.T()) + }) + + s.Run("handler init error", func() { + cmd := newAirflowLocalCmd(nil) + + localHandlerInit = func(airflowHome, envFile, dockerfile, imageName string) (airflow.ContainerHandler, error) { + return nil, errMock + } + + err := airflowLocalReset(cmd, nil) + s.ErrorIs(err, errMock) + }) +} + +func (s *AirflowSuite) TestAirflowLocalStop() { + s.Run("success", func() { + cmd := newAirflowLocalCmd(nil) + + mockContainerHandler := new(mocks.ContainerHandler) + localHandlerInit = func(airflowHome, envFile, dockerfile, imageName string) (airflow.ContainerHandler, error) { + mockContainerHandler.On("Stop", false).Return(nil).Once() + return mockContainerHandler, nil + } + + err := airflowLocalStop(cmd, nil) + s.NoError(err) + mockContainerHandler.AssertExpectations(s.T()) + }) + + s.Run("handler init error", func() { + cmd := newAirflowLocalCmd(nil) + + localHandlerInit = func(airflowHome, envFile, dockerfile, imageName string) (airflow.ContainerHandler, error) { + return nil, errMock + } + + err := airflowLocalStop(cmd, nil) + s.ErrorIs(err, errMock) + }) +} + +func (s *AirflowSuite) TestAirflowLocalLogs() { + s.Run("success", func() { + cmd := newAirflowLocalCmd(nil) + + mockContainerHandler := new(mocks.ContainerHandler) + localHandlerInit = func(airflowHome, envFile, dockerfile, imageName string) (airflow.ContainerHandler, error) { + mockContainerHandler.On("Logs", false).Return(nil).Once() + return mockContainerHandler, nil + } + + err := airflowLocalLogs(cmd, nil) + s.NoError(err) + mockContainerHandler.AssertExpectations(s.T()) + }) + + s.Run("handler init error", func() { + cmd := newAirflowLocalCmd(nil) + + localHandlerInit = func(airflowHome, envFile, dockerfile, imageName string) (airflow.ContainerHandler, error) { + return nil, errMock + } + + err := airflowLocalLogs(cmd, nil) + s.ErrorIs(err, errMock) + }) +} + func (s *AirflowSuite) TestAirflowBuild() { s.Run("success", func() { cmd := newAirflowBuildCmd() diff --git a/settings/settings.go b/settings/settings.go index a18668ff9..0f8bf0d7c 100644 --- a/settings/settings.go +++ b/settings/settings.go @@ -721,6 +721,14 @@ func jsonString(conn *Connection) string { return string(extraBytes) } +// SetExecAirflowCommand replaces the function used to execute airflow CLI commands. +// It returns the previous function so callers can restore it. +func SetExecAirflowCommand(fn func(id, command string) (string, error)) func(id, command string) (string, error) { + prev := execAirflowCommand + execAirflowCommand = fn + return prev +} + func WriteAirflowSettingstoYAML(settingsFile string) error { err := InitSettings(settingsFile) if err != nil {