Skip to content

Commit b2ac77e

Browse files
pieternclaude
andauthored
Add synchronized spinner API with direct tea.Program integration (#4351)
## Problem The previous `cmdio.Spinner()` API returned immediately on `close(spinner)` without waiting for the spinner to terminate. Callers had no way to ensure the spinner fully stopped before proceeding. This potentially races with log statements or program termination (failing to restore cursor state). As of #4336 we perform synchronization on program termination, but this addresses the symptom not the cause. ```go spinner := cmdio.Spinner(ctx) spinner <- "Processing..." close(spinner) // Returns immediately - spinner still running! ``` ## Solution New `NewSpinner()` API with synchronous `Close()` that blocks until tea.Program terminates: ```go sp := cmdio.NewSpinner(ctx) sp.Update("Processing...") sp.Close() // Blocks until spinner fully stopped ``` **Changes:** - `sp.Update()` sends directly to tea.Program (no bridge goroutines) - `sp.Close()` blocks until cleanup completes (guaranteed termination) - Race-free concurrent `Close()` calls (context cancellation + explicit) - Old API wraps new implementation (100% backward compatible) Refactored 9 manual usages. All tests pass with race detector. --------- Co-authored-by: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent 4332aa0 commit b2ac77e

File tree

12 files changed

+243
-66
lines changed

12 files changed

+243
-66
lines changed

bundle/run/job.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -294,10 +294,10 @@ func (r *jobRunner) Restart(ctx context.Context, opts *Options) (output.RunOutpu
294294
return r.Run(ctx, opts)
295295
}
296296

297-
s := cmdio.Spinner(ctx)
298-
s <- "Cancelling all active job runs"
297+
sp := cmdio.NewSpinner(ctx)
298+
sp.Update("Cancelling all active job runs")
299299
err := r.Cancel(ctx)
300-
close(s)
300+
sp.Close()
301301
if err != nil {
302302
return nil, err
303303
}

bundle/run/pipeline.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -179,10 +179,10 @@ func (r *pipelineRunner) Cancel(ctx context.Context) error {
179179
}
180180

181181
func (r *pipelineRunner) Restart(ctx context.Context, opts *Options) (output.RunOutput, error) {
182-
s := cmdio.Spinner(ctx)
183-
s <- "Cancelling the active pipeline update"
182+
sp := cmdio.NewSpinner(ctx)
183+
sp.Update("Cancelling the active pipeline update")
184184
err := r.Cancel(ctx)
185-
close(s)
185+
sp.Close()
186186
if err != nil {
187187
return nil, err
188188
}

cmd/labs/project/entrypoint.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,14 +122,14 @@ func (e *Entrypoint) preparePython(ctx context.Context, environment map[string]s
122122
}
123123

124124
func (e *Entrypoint) ensureRunningCluster(ctx context.Context, cfg *config.Config) error {
125-
feedback := cmdio.Spinner(ctx)
126-
defer close(feedback)
125+
sp := cmdio.NewSpinner(ctx)
126+
defer sp.Close()
127127
w, err := databricks.NewWorkspaceClient((*databricks.Config)(cfg))
128128
if err != nil {
129129
return fmt.Errorf("workspace client: %w", err)
130130
}
131131
// TODO: add in-progress callback to EnsureClusterIsRunning() in SDK
132-
feedback <- "Ensuring the cluster is running..."
132+
sp.Update("Ensuring the cluster is running...")
133133
err = w.Clusters.EnsureClusterIsRunning(ctx, cfg.ClusterID)
134134
if err != nil {
135135
return fmt.Errorf("ensure running: %w", err)

cmd/labs/project/installer.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -194,17 +194,17 @@ func (i *installer) login(ctx context.Context) (*databricks.WorkspaceClient, err
194194
}
195195

196196
func (i *installer) downloadLibrary(ctx context.Context) error {
197-
feedback := cmdio.Spinner(ctx)
198-
defer close(feedback)
199-
feedback <- "Cleaning up previous installation if necessary"
197+
sp := cmdio.NewSpinner(ctx)
198+
defer sp.Close()
199+
sp.Update("Cleaning up previous installation if necessary")
200200
err := i.cleanupLib(ctx)
201201
if err != nil {
202202
return fmt.Errorf("cleanup: %w", err)
203203
}
204204
libTarget := i.LibDir()
205205
// we may support wheels, jars, and golang binaries. but those are not zipballs
206206
if i.IsZipball() {
207-
feedback <- "Downloading and unpacking zipball for " + i.version
207+
sp.Update("Downloading and unpacking zipball for " + i.version)
208208
return i.downloadAndUnpackZipball(ctx, libTarget)
209209
}
210210
return errors.New("we only support zipballs for now")
@@ -224,9 +224,9 @@ func (i *installer) setupPythonVirtualEnvironment(ctx context.Context, w *databr
224224
if !i.HasPython() {
225225
return nil
226226
}
227-
feedback := cmdio.Spinner(ctx)
228-
defer close(feedback)
229-
feedback <- "Detecting all installed Python interpreters on the system"
227+
sp := cmdio.NewSpinner(ctx)
228+
defer sp.Close()
229+
sp.Update("Detecting all installed Python interpreters on the system")
230230
pythonInterpreters, err := DetectInterpreters(ctx)
231231
if err != nil {
232232
return fmt.Errorf("detect: %w", err)
@@ -238,13 +238,13 @@ func (i *installer) setupPythonVirtualEnvironment(ctx context.Context, w *databr
238238
log.Debugf(ctx, "Detected Python %s at: %s", py.Version, py.Path)
239239
venvPath := i.virtualEnvPath(ctx)
240240
log.Debugf(ctx, "Creating Python Virtual Environment at: %s", venvPath)
241-
feedback <- "Creating Virtual Environment with Python " + py.Version
241+
sp.Update("Creating Virtual Environment with Python " + py.Version)
242242
_, err = process.Background(ctx, []string{py.Path, "-m", "venv", venvPath})
243243
if err != nil {
244244
return fmt.Errorf("create venv: %w", err)
245245
}
246246
if i.Installer != nil && i.Installer.RequireDatabricksConnect {
247-
feedback <- "Determining Databricks Connect version"
247+
sp.Update("Determining Databricks Connect version")
248248
cluster, err := w.Clusters.Get(ctx, compute.GetClusterRequest{
249249
ClusterId: w.Config.ClusterID,
250250
})
@@ -255,14 +255,14 @@ func (i *installer) setupPythonVirtualEnvironment(ctx context.Context, w *databr
255255
if !ok {
256256
return fmt.Errorf("unsupported runtime: %s", cluster.SparkVersion)
257257
}
258-
feedback <- "Installing Databricks Connect v" + runtimeVersion
258+
sp.Update("Installing Databricks Connect v" + runtimeVersion)
259259
pipSpec := "databricks-connect==" + runtimeVersion
260260
err = i.installPythonDependencies(ctx, pipSpec)
261261
if err != nil {
262262
return fmt.Errorf("dbconnect: %w", err)
263263
}
264264
}
265-
feedback <- "Installing Python library dependencies"
265+
sp.Update("Installing Python library dependencies")
266266
if i.Installer.Extras != "" {
267267
// install main and optional dependencies
268268
return i.installPythonDependencies(ctx, fmt.Sprintf(".[%s]", i.Installer.Extras))

cmd/psql/psql.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,10 @@ You can pass additional arguments to psql after a double-dash (--):
5353
}
5454

5555
if argsBeforeDash != 1 {
56-
promptSpinner := cmdio.Spinner(ctx)
57-
promptSpinner <- "No DATABASE_INSTANCE_NAME argument specified. Loading names for Database instances drop-down."
56+
sp := cmdio.NewSpinner(ctx)
57+
sp.Update("No DATABASE_INSTANCE_NAME argument specified. Loading names for Database instances drop-down.")
5858
instances, err := w.Database.ListDatabaseInstancesAll(ctx, database.ListDatabaseInstancesRequest{})
59-
close(promptSpinner)
59+
sp.Close()
6060
if err != nil {
6161
return fmt.Errorf("failed to load names for Database instances drop-down. Please manually specify required argument: DATABASE_INSTANCE_NAME. Original error: %w", err)
6262
}

cmd/selftest/tui/spinner.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ func newSpinner() *cobra.Command {
1414
Run: func(cmd *cobra.Command, args []string) {
1515
ctx := cmd.Context()
1616

17-
spinner := cmdio.Spinner(ctx)
17+
sp := cmdio.NewSpinner(ctx)
1818

1919
// Test various status messages
2020
messages := []struct {
@@ -29,11 +29,11 @@ func newSpinner() *cobra.Command {
2929
}
3030

3131
for _, msg := range messages {
32-
spinner <- msg.text
32+
sp.Update(msg.text)
3333
time.Sleep(msg.duration)
3434
}
3535

36-
close(spinner)
36+
sp.Close()
3737

3838
cmdio.LogString(ctx, "✓ Spinner test complete")
3939
},

experimental/ssh/internal/setup/setup.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -166,14 +166,14 @@ func updateSSHConfigFile(configPath, hostConfig, hostName string) error {
166166
}
167167

168168
func clusterSelectionPrompt(ctx context.Context, client *databricks.WorkspaceClient) (string, error) {
169-
spinnerChan := cmdio.Spinner(ctx)
170-
spinnerChan <- "Loading clusters."
169+
sp := cmdio.NewSpinner(ctx)
170+
sp.Update("Loading clusters.")
171171
clusters, err := client.Clusters.ClusterDetailsClusterNameToClusterIdMap(ctx, compute.ListClustersRequest{
172172
FilterBy: &compute.ListClustersFilterBy{
173173
ClusterSources: []compute.ClusterSource{compute.ClusterSourceApi, compute.ClusterSourceUi},
174174
},
175175
})
176-
close(spinnerChan)
176+
sp.Close()
177177
if err != nil {
178178
return "", fmt.Errorf("failed to load names for Clusters drop-down. Please manually specify cluster argument. Original error: %w", err)
179179
}

libs/cmdio/io.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,25 @@ func Spinner(ctx context.Context) chan string {
165165
return c.Spinner(ctx)
166166
}
167167

168+
// NewSpinner creates a new spinner for displaying progress indicators.
169+
// The returned spinner should be closed when done to release resources.
170+
//
171+
// Example:
172+
//
173+
// sp := cmdio.NewSpinner(ctx)
174+
// defer sp.Close()
175+
// for i := range 100 {
176+
// sp.Update(fmt.Sprintf("processing item %d", i))
177+
// time.Sleep(100 * time.Millisecond)
178+
// }
179+
//
180+
// The spinner automatically degrades in non-interactive terminals (no output).
181+
// Context cancellation will automatically close the spinner.
182+
func NewSpinner(ctx context.Context) *spinner {
183+
c := fromContext(ctx)
184+
return c.NewSpinner(ctx)
185+
}
186+
168187
type cmdIOType int
169188

170189
var cmdIOKey cmdIOType

libs/cmdio/spinner.go

Lines changed: 82 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@ import (
55
"sync"
66
"time"
77

8-
"github.com/charmbracelet/bubbles/spinner"
8+
bubblespinner "github.com/charmbracelet/bubbles/spinner"
99
tea "github.com/charmbracelet/bubbletea"
1010
"github.com/charmbracelet/lipgloss"
1111
)
1212

1313
// spinnerModel is the Bubble Tea model for the spinner.
1414
type spinnerModel struct {
15-
spinner spinner.Model
15+
spinner bubblespinner.Model
1616
suffix string
1717
quitting bool
1818
}
@@ -25,9 +25,9 @@ type (
2525

2626
// newSpinnerModel creates a new spinner model.
2727
func newSpinnerModel() spinnerModel {
28-
s := spinner.New()
28+
s := bubblespinner.New()
2929
// Braille spinner frames with 200ms timing
30-
s.Spinner = spinner.Spinner{
30+
s.Spinner = bubblespinner.Spinner{
3131
Frames: []string{"⣾", "⣽", "⣻", "⢿", "⡿", "⣟", "⣯", "⣷"},
3232
FPS: time.Second / 5, // 200ms = 5 FPS
3333
}
@@ -54,7 +54,7 @@ func (m spinnerModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
5454
m.quitting = true
5555
return m, tea.Quit
5656

57-
case spinner.TickMsg:
57+
case bubblespinner.TickMsg:
5858
var cmd tea.Cmd
5959
m.spinner, cmd = m.spinner.Update(msg)
6060
return m, cmd
@@ -76,21 +76,55 @@ func (m spinnerModel) View() string {
7676
return m.spinner.View()
7777
}
7878

79-
// Spinner returns a channel for updating spinner status messages.
80-
// Send messages to update the suffix, close the channel to stop.
81-
// The spinner runs until the channel is closed or context is cancelled.
82-
func (c *cmdIO) Spinner(ctx context.Context) chan string {
83-
updates := make(chan string)
79+
// spinner provides a structured interface for displaying progress indicators.
80+
// Use NewSpinner to create an instance, Update to send status messages,
81+
// and Close to stop the spinner and clean up resources.
82+
//
83+
// The spinner automatically degrades in non-interactive terminals.
84+
// Context cancellation will automatically close the spinner.
85+
type spinner struct {
86+
p *tea.Program // nil in non-interactive mode
87+
c *cmdIO
88+
ctx context.Context
89+
once sync.Once
90+
done chan struct{} // Closed when tea.Program finishes
91+
}
92+
93+
// Update sends a status message to the spinner.
94+
// This operation sends directly to the tea.Program.
95+
func (sp *spinner) Update(msg string) {
96+
if sp.p != nil {
97+
sp.p.Send(suffixMsg(msg))
98+
}
99+
}
84100

101+
// Close stops the spinner and releases resources.
102+
// It waits for the spinner to fully terminate before returning.
103+
// It is safe to call Close multiple times and from multiple goroutines.
104+
func (sp *spinner) Close() {
105+
sp.once.Do(func() {
106+
if sp.p != nil {
107+
sp.p.Send(quitMsg{})
108+
}
109+
})
110+
// Always wait for termination, even if we weren't the first caller
111+
if sp.p != nil {
112+
<-sp.done
113+
}
114+
}
115+
116+
// NewSpinner creates a new spinner for displaying progress.
117+
// The spinner should be closed when done to clean up resources.
118+
//
119+
// Example:
120+
//
121+
// sp := cmdio.NewSpinner(ctx)
122+
// defer sp.Close()
123+
// sp.Update("processing files")
124+
func (c *cmdIO) NewSpinner(ctx context.Context) *spinner {
85125
// Don't show spinner if not interactive
86126
if !c.capabilities.SupportsInteractive() {
87-
// Return channel but don't start program - just drain messages
88-
go func() {
89-
for range updates {
90-
// Discard messages
91-
}
92-
}()
93-
return updates
127+
return &spinner{p: nil, c: c, ctx: ctx}
94128
}
95129

96130
// Create model and program
@@ -108,17 +142,40 @@ func (c *cmdIO) Spinner(ctx context.Context) chan string {
108142
// Acquire program slot (queues if another program is running)
109143
c.acquireTeaProgram(p)
110144

111-
// Track both goroutines to ensure clean shutdown
112-
var wg sync.WaitGroup
145+
done := make(chan struct{})
146+
sp := &spinner{
147+
p: p,
148+
c: c,
149+
ctx: ctx,
150+
done: done,
151+
}
113152

114153
// Start program in background
115-
wg.Go(func() {
154+
go func() {
116155
_, _ = p.Run()
117-
})
156+
c.releaseTeaProgram()
157+
close(done)
158+
}()
159+
160+
// Handle context cancellation
161+
go func() {
162+
<-ctx.Done()
163+
sp.Close()
164+
}()
165+
166+
return sp
167+
}
168+
169+
// Spinner returns a channel for updating spinner status messages.
170+
// Send messages to update the suffix, close the channel to stop.
171+
// The spinner runs until the channel is closed or context is cancelled.
172+
func (c *cmdIO) Spinner(ctx context.Context) chan string {
173+
updates := make(chan string)
174+
sp := c.NewSpinner(ctx)
118175

119-
// Bridge goroutine: channel -> tea messages
120-
wg.Go(func() {
121-
defer p.Send(quitMsg{})
176+
// Bridge goroutine: channel -> spinner.Update()
177+
go func() {
178+
defer sp.Close()
122179

123180
for {
124181
select {
@@ -129,15 +186,9 @@ func (c *cmdIO) Spinner(ctx context.Context) chan string {
129186
// Channel closed
130187
return
131188
}
132-
p.Send(suffixMsg(msg))
189+
sp.Update(msg)
133190
}
134191
}
135-
})
136-
137-
// Wait for both goroutines, then release
138-
go func() {
139-
wg.Wait()
140-
c.releaseTeaProgram()
141192
}()
142193

143194
return updates

0 commit comments

Comments
 (0)