Skip to content

Commit 8f914d0

Browse files
authored
Acceptance tests for pipeline run support in handlers and testserver files (#3178)
## Changes Added pipeline run support for acceptance testing: - Created `libs/testserver/pipelines_test.go` with test coverage - Implemented pipeline operations in `libs/testserver/pipelines.go` - Enhanced `libs/testserver/fake_workspace.go` for pipeline API support - Updated `acceptance/internal/handlers.go` for pipeline requests ## Why Pipeline functionality is needed for acceptance tests to validate pipeline API calls without requiring actual Databricks workspace connections. This provides test infrastructure for simulating pipeline operations. ## Tests Test coverage validates: - Pipeline start update functionality with various parameters - Error handling for invalid requests and non-existent pipelines - Unique update ID generation - Complex parameter combinations and edge cases The test infrastructure ensures pipeline-related acceptance tests can run. --- **Follows up on:** #3132
1 parent 51e5c28 commit 8f914d0

File tree

4 files changed

+187
-17
lines changed

4 files changed

+187
-17
lines changed

acceptance/internal/handlers.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,22 @@ func addDefaultHandlers(server *testserver.Server) {
324324
return testserver.MapDelete(req.Workspace, req.Workspace.Pipelines, req.Vars["pipeline_id"])
325325
})
326326

327+
server.Handle("POST", "/api/2.0/pipelines/{pipeline_id}/updates", func(req testserver.Request) any {
328+
return req.Workspace.PipelineStartUpdate(req.Vars["pipeline_id"])
329+
})
330+
331+
server.Handle("GET", "/api/2.0/pipelines/{pipeline_id}/events", func(req testserver.Request) any {
332+
return req.Workspace.PipelineEvents(req.Vars["pipeline_id"])
333+
})
334+
335+
server.Handle("GET", "/api/2.0/pipelines/{pipeline_id}/updates/{update_id}", func(req testserver.Request) any {
336+
return req.Workspace.PipelineGetUpdate(req.Vars["pipeline_id"], req.Vars["update_id"])
337+
})
338+
339+
server.Handle("POST", "/api/2.0/pipelines/{pipeline_id}/stop", func(req testserver.Request) any {
340+
return req.Workspace.PipelineStop(req.Vars["pipeline_id"])
341+
})
342+
327343
// Quality monitors:
328344

329345
server.Handle("GET", "/api/2.1/unity-catalog/tables/{table_name}/monitor", func(req testserver.Request) any {

libs/testserver/fake_workspace.go

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,13 @@ type FakeWorkspace struct {
4747
Jobs map[int64]jobs.Job
4848
JobRuns map[int64]jobs.Run
4949

50-
Pipelines map[string]pipelines.GetPipelineResponse
51-
Monitors map[string]catalog.MonitorInfo
52-
Apps map[string]apps.App
53-
Schemas map[string]catalog.SchemaInfo
54-
Volumes map[string]catalog.VolumeInfo
55-
Dashboards map[string]dashboards.Dashboard
50+
Pipelines map[string]pipelines.GetPipelineResponse
51+
PipelineUpdates map[string]bool
52+
Monitors map[string]catalog.MonitorInfo
53+
Apps map[string]apps.App
54+
Schemas map[string]catalog.SchemaInfo
55+
Volumes map[string]catalog.VolumeInfo
56+
Dashboards map[string]dashboards.Dashboard
5657
}
5758

5859
func (w *FakeWorkspace) LockUnlock() func() {
@@ -117,17 +118,18 @@ func NewFakeWorkspace(url string) *FakeWorkspace {
117118
directories: map[string]bool{
118119
"/Workspace": true,
119120
},
120-
files: make(map[string]FileEntry),
121-
Jobs: map[int64]jobs.Job{},
122-
JobRuns: map[int64]jobs.Run{},
123-
nextJobId: TestJobID,
124-
nextJobRunId: TestRunID,
125-
Pipelines: map[string]pipelines.GetPipelineResponse{},
126-
Monitors: map[string]catalog.MonitorInfo{},
127-
Apps: map[string]apps.App{},
128-
Schemas: map[string]catalog.SchemaInfo{},
129-
Volumes: map[string]catalog.VolumeInfo{},
130-
Dashboards: map[string]dashboards.Dashboard{},
121+
files: make(map[string]FileEntry),
122+
Jobs: map[int64]jobs.Job{},
123+
JobRuns: map[int64]jobs.Run{},
124+
nextJobId: TestJobID,
125+
nextJobRunId: TestRunID,
126+
Pipelines: map[string]pipelines.GetPipelineResponse{},
127+
PipelineUpdates: map[string]bool{},
128+
Monitors: map[string]catalog.MonitorInfo{},
129+
Apps: map[string]apps.App{},
130+
Schemas: map[string]catalog.SchemaInfo{},
131+
Volumes: map[string]catalog.VolumeInfo{},
132+
Dashboards: map[string]dashboards.Dashboard{},
131133
}
132134
}
133135

libs/testserver/pipelines.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,3 +94,91 @@ func (s *FakeWorkspace) PipelineUpdate(req Request, pipelineId string) Response
9494
Body: pipelines.EditPipelineResponse{},
9595
}
9696
}
97+
98+
func (s *FakeWorkspace) PipelineStartUpdate(pipelineId string) Response {
99+
defer s.LockUnlock()()
100+
101+
_, exists := s.Pipelines[pipelineId]
102+
if !exists {
103+
return Response{
104+
StatusCode: 404,
105+
Body: map[string]string{"message": fmt.Sprintf("The specified pipeline %s was not found.", pipelineId)},
106+
}
107+
}
108+
109+
updateId := uuid.New().String()
110+
s.PipelineUpdates[updateId] = true
111+
112+
return Response{
113+
Body: pipelines.StartUpdateResponse{
114+
UpdateId: updateId,
115+
},
116+
}
117+
}
118+
119+
func (s *FakeWorkspace) PipelineEvents(pipelineId string) Response {
120+
defer s.LockUnlock()()
121+
122+
_, exists := s.Pipelines[pipelineId]
123+
if !exists {
124+
return Response{
125+
StatusCode: 404,
126+
Body: map[string]string{"message": fmt.Sprintf("The specified pipeline %s was not found.", pipelineId)},
127+
}
128+
}
129+
130+
return Response{
131+
Body: map[string]any{
132+
"events": []pipelines.PipelineEvent{},
133+
},
134+
}
135+
}
136+
137+
func (s *FakeWorkspace) PipelineGetUpdate(pipelineId, updateId string) Response {
138+
defer s.LockUnlock()()
139+
140+
_, exists := s.Pipelines[pipelineId]
141+
if !exists {
142+
return Response{
143+
StatusCode: 404,
144+
Body: map[string]string{"message": fmt.Sprintf("The specified pipeline %s was not found.", pipelineId)},
145+
}
146+
}
147+
148+
// Check if the update exists
149+
_, updateExists := s.PipelineUpdates[updateId]
150+
if !updateExists {
151+
return Response{
152+
StatusCode: 404,
153+
Body: map[string]string{"message": fmt.Sprintf("The specified update %s was not found.", updateId)},
154+
}
155+
}
156+
157+
return Response{
158+
Body: pipelines.GetUpdateResponse{
159+
Update: &pipelines.UpdateInfo{
160+
UpdateId: updateId,
161+
State: pipelines.UpdateInfoStateCompleted,
162+
},
163+
},
164+
}
165+
}
166+
167+
func (s *FakeWorkspace) PipelineStop(pipelineId string) Response {
168+
defer s.LockUnlock()()
169+
170+
_, exists := s.Pipelines[pipelineId]
171+
if !exists {
172+
return Response{
173+
StatusCode: 404,
174+
Body: map[string]string{"message": fmt.Sprintf("The specified pipeline %s was not found.", pipelineId)},
175+
}
176+
}
177+
178+
return Response{
179+
Body: pipelines.GetPipelineResponse{
180+
PipelineId: pipelineId,
181+
State: pipelines.PipelineStateIdle,
182+
},
183+
}
184+
}

libs/testserver/pipelines_test.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package testserver
2+
3+
import (
4+
"testing"
5+
6+
"github.com/databricks/databricks-sdk-go/service/pipelines"
7+
"github.com/stretchr/testify/assert"
8+
"github.com/stretchr/testify/require"
9+
)
10+
11+
func createTestPipeline(t *testing.T, workspace *FakeWorkspace) string {
12+
createReq := Request{
13+
Body: []byte(`{
14+
"name": "Test Pipeline",
15+
"storage": "dbfs:/pipelines/test-pipeline"
16+
}`),
17+
}
18+
19+
createResponse := workspace.PipelineCreate(createReq)
20+
// StatusCode 0 gets converted to 200 by normalizeResponse in the server
21+
require.Equal(t, 0, createResponse.StatusCode)
22+
23+
createPipelineResponse, ok := createResponse.Body.(pipelines.CreatePipelineResponse)
24+
require.True(t, ok)
25+
return createPipelineResponse.PipelineId
26+
}
27+
28+
func TestPipelineStartUpdate_HandlesNonExistentPipeline(t *testing.T) {
29+
workspace := NewFakeWorkspace("http://test")
30+
31+
response := workspace.PipelineStartUpdate("non-existent-pipeline")
32+
assert.Equal(t, 404, response.StatusCode)
33+
assert.Contains(t, response.Body.(map[string]string)["message"], "The specified pipeline non-existent-pipeline was not found")
34+
}
35+
36+
func TestPipelineGetUpdate_HandlesNonExistent(t *testing.T) {
37+
workspace := NewFakeWorkspace("http://test")
38+
39+
response := workspace.PipelineGetUpdate("non-existent-pipeline", "some-update-id")
40+
assert.Equal(t, 404, response.StatusCode)
41+
42+
pipelineId := createTestPipeline(t, workspace)
43+
44+
response = workspace.PipelineGetUpdate(pipelineId, "non-existent-update")
45+
assert.Equal(t, 404, response.StatusCode)
46+
assert.Contains(t, response.Body.(map[string]string)["message"], "The specified update non-existent-update was not found")
47+
}
48+
49+
func TestPipelineStop_AfterUpdate(t *testing.T) {
50+
workspace := NewFakeWorkspace("http://test")
51+
52+
pipelineId := createTestPipeline(t, workspace)
53+
54+
startResponse := workspace.PipelineStartUpdate(pipelineId)
55+
assert.Equal(t, 0, startResponse.StatusCode)
56+
57+
stopResponse := workspace.PipelineStop(pipelineId)
58+
assert.Equal(t, 0, stopResponse.StatusCode)
59+
60+
stopBody, ok := stopResponse.Body.(pipelines.GetPipelineResponse)
61+
require.True(t, ok)
62+
assert.Equal(t, pipelineId, stopBody.PipelineId)
63+
assert.Equal(t, pipelines.PipelineStateIdle, stopBody.State)
64+
}

0 commit comments

Comments
 (0)