Skip to content

Commit f2471e4

Browse files
Integrate deployment metadata service for server-side locking and state
Add client integration with the deployment metadata service API for server-side deployment locking and resource state tracking. Gated behind DATABRICKS_BUNDLE_DEPLOYMENT_SERVICE=true environment variable. Co-authored-by: Isaac
1 parent 651ab26 commit f2471e4

File tree

9 files changed

+880
-0
lines changed

9 files changed

+880
-0
lines changed
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
package service
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net/http"
7+
8+
"errors"
9+
10+
"github.com/databricks/databricks-sdk-go"
11+
"github.com/databricks/databricks-sdk-go/apierr"
12+
"github.com/databricks/databricks-sdk-go/client"
13+
)
14+
15+
const basePath = "/api/2.0/bundle"
16+
17+
// Client wraps the Databricks API client for the deployment metadata service.
18+
type Client struct {
19+
api *client.DatabricksClient
20+
}
21+
22+
// NewClient creates a new deployment metadata service client from a workspace client.
23+
func NewClient(w *databricks.WorkspaceClient) (*Client, error) {
24+
apiClient, err := client.New(w.Config)
25+
if err != nil {
26+
return nil, fmt.Errorf("failed to create deployment metadata API client: %w", err)
27+
}
28+
return &Client{api: apiClient}, nil
29+
}
30+
31+
// CreateDeployment creates a new deployment.
32+
func (c *Client) CreateDeployment(ctx context.Context, deploymentID string, deployment *Deployment) (*Deployment, error) {
33+
resp := &Deployment{}
34+
path := fmt.Sprintf("%s/deployments", basePath)
35+
err := c.api.Do(ctx, http.MethodPost, path, nil, nil, CreateDeploymentRequest{
36+
DeploymentID: deploymentID,
37+
Deployment: deployment,
38+
}, resp)
39+
if err != nil {
40+
return nil, mapError("create deployment", err)
41+
}
42+
return resp, nil
43+
}
44+
45+
// GetDeployment retrieves a deployment by ID.
46+
func (c *Client) GetDeployment(ctx context.Context, deploymentID string) (*Deployment, error) {
47+
resp := &Deployment{}
48+
path := fmt.Sprintf("%s/deployments/%s", basePath, deploymentID)
49+
err := c.api.Do(ctx, http.MethodGet, path, nil, nil, nil, resp)
50+
if err != nil {
51+
return nil, mapError("get deployment", err)
52+
}
53+
return resp, nil
54+
}
55+
56+
// DeleteDeployment soft-deletes a deployment.
57+
func (c *Client) DeleteDeployment(ctx context.Context, deploymentID string) error {
58+
path := fmt.Sprintf("%s/deployments/%s", basePath, deploymentID)
59+
err := c.api.Do(ctx, http.MethodDelete, path, nil, nil, nil, nil)
60+
if err != nil {
61+
return mapError("delete deployment", err)
62+
}
63+
return nil
64+
}
65+
66+
// CreateVersion creates a new version (acquires the deployment lock).
67+
func (c *Client) CreateVersion(ctx context.Context, deploymentID string, versionID string, version *Version) (*Version, error) {
68+
resp := &Version{}
69+
path := fmt.Sprintf("%s/deployments/%s/versions", basePath, deploymentID)
70+
err := c.api.Do(ctx, http.MethodPost, path, nil, nil, CreateVersionRequest{
71+
Parent: fmt.Sprintf("deployments/%s", deploymentID),
72+
Version: version,
73+
VersionID: versionID,
74+
}, resp)
75+
if err != nil {
76+
return nil, mapError("create version", err)
77+
}
78+
return resp, nil
79+
}
80+
81+
// GetVersion retrieves a version.
82+
func (c *Client) GetVersion(ctx context.Context, deploymentID, versionID string) (*Version, error) {
83+
resp := &Version{}
84+
path := fmt.Sprintf("%s/deployments/%s/versions/%s", basePath, deploymentID, versionID)
85+
err := c.api.Do(ctx, http.MethodGet, path, nil, nil, nil, resp)
86+
if err != nil {
87+
return nil, mapError("get version", err)
88+
}
89+
return resp, nil
90+
}
91+
92+
// Heartbeat renews the lock lease for an in-progress version.
93+
func (c *Client) Heartbeat(ctx context.Context, deploymentID, versionID string) (*HeartbeatResponse, error) {
94+
resp := &HeartbeatResponse{}
95+
path := fmt.Sprintf("%s/deployments/%s/versions/%s/heartbeat", basePath, deploymentID, versionID)
96+
err := c.api.Do(ctx, http.MethodPost, path, nil, nil, struct{}{}, resp)
97+
if err != nil {
98+
return nil, mapError("heartbeat", err)
99+
}
100+
return resp, nil
101+
}
102+
103+
// CompleteVersion marks a version as completed (releases the deployment lock).
104+
func (c *Client) CompleteVersion(ctx context.Context, deploymentID, versionID string, reason VersionComplete, force bool) (*Version, error) {
105+
resp := &Version{}
106+
path := fmt.Sprintf("%s/deployments/%s/versions/%s/complete", basePath, deploymentID, versionID)
107+
err := c.api.Do(ctx, http.MethodPost, path, nil, nil, CompleteVersionRequest{
108+
Name: fmt.Sprintf("deployments/%s/versions/%s", deploymentID, versionID),
109+
CompletionReason: reason,
110+
Force: force,
111+
}, resp)
112+
if err != nil {
113+
return nil, mapError("complete version", err)
114+
}
115+
return resp, nil
116+
}
117+
118+
// CreateOperation records a resource operation for a version.
119+
func (c *Client) CreateOperation(ctx context.Context, deploymentID, versionID, resourceKey string, operation *Operation) (*Operation, error) {
120+
resp := &Operation{}
121+
path := fmt.Sprintf("%s/deployments/%s/versions/%s/operations", basePath, deploymentID, versionID)
122+
err := c.api.Do(ctx, http.MethodPost, path, nil, nil, CreateOperationRequest{
123+
Parent: fmt.Sprintf("deployments/%s/versions/%s", deploymentID, versionID),
124+
ResourceKey: resourceKey,
125+
Operation: operation,
126+
}, resp)
127+
if err != nil {
128+
return nil, mapError("create operation", err)
129+
}
130+
return resp, nil
131+
}
132+
133+
// ListResources lists all resources for a deployment.
134+
func (c *Client) ListResources(ctx context.Context, deploymentID string) ([]Resource, error) {
135+
var allResources []Resource
136+
pageToken := ""
137+
138+
for {
139+
resp := &ListResourcesResponse{}
140+
path := fmt.Sprintf("%s/deployments/%s/resources", basePath, deploymentID)
141+
142+
q := map[string]any{
143+
"parent": fmt.Sprintf("deployments/%s", deploymentID),
144+
"page_size": 1000,
145+
}
146+
if pageToken != "" {
147+
q["page_token"] = pageToken
148+
}
149+
150+
err := c.api.Do(ctx, http.MethodGet, path, nil, q, nil, resp)
151+
if err != nil {
152+
return nil, mapError("list resources", err)
153+
}
154+
155+
allResources = append(allResources, resp.Resources...)
156+
if resp.NextPageToken == "" {
157+
break
158+
}
159+
pageToken = resp.NextPageToken
160+
}
161+
162+
return allResources, nil
163+
}
164+
165+
// mapError translates API errors into user-friendly messages.
166+
func mapError(operation string, err error) error {
167+
var apiErr *apierr.APIError
168+
if !errors.As(err, &apiErr) {
169+
return fmt.Errorf("%s: %w", operation, err)
170+
}
171+
172+
switch apiErr.StatusCode {
173+
case http.StatusConflict:
174+
return fmt.Errorf("%s: deployment is locked by another active deployment. "+
175+
"Use --force-lock to override", operation)
176+
case http.StatusNotFound:
177+
return fmt.Errorf("%s: resource not found: %w", operation, err)
178+
case http.StatusBadRequest:
179+
return fmt.Errorf("%s: bad request: %s", operation, apiErr.Message)
180+
default:
181+
return fmt.Errorf("%s: %w", operation, err)
182+
}
183+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package service
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/databricks/cli/libs/log"
8+
)
9+
10+
const DefaultHeartbeatInterval = 2 * time.Minute
11+
12+
// StartHeartbeat starts a background goroutine that sends heartbeats to keep
13+
// the deployment lock alive. Returns a cancel function to stop the heartbeat.
14+
func StartHeartbeat(ctx context.Context, client *Client, deploymentID, versionID string, interval time.Duration) context.CancelFunc {
15+
ctx, cancel := context.WithCancel(ctx)
16+
17+
go func() {
18+
ticker := time.NewTicker(interval)
19+
defer ticker.Stop()
20+
21+
for {
22+
select {
23+
case <-ctx.Done():
24+
return
25+
case <-ticker.C:
26+
_, err := client.Heartbeat(ctx, deploymentID, versionID)
27+
if err != nil {
28+
log.Warnf(ctx, "Failed to send deployment heartbeat: %v", err)
29+
} else {
30+
log.Debugf(ctx, "Deployment heartbeat sent for deployment=%s version=%s", deploymentID, versionID)
31+
}
32+
}
33+
}
34+
}()
35+
36+
return cancel
37+
}

0 commit comments

Comments
 (0)