Skip to content

Commit 8d3a846

Browse files
Add a readonly function to walk a dynamic tree (#3084)
## Changes This PR adds a read-only version of dyn.Walk, which can be used to walk the bundle configuration tree faster than the normal walk method. Benchmarks show it is 4x faster than the normal walk method. ## Why Many walk use cases are read-only, so having such a method is useful. It'll be used to perform required and enum validation in DABs. ## Tests New unit tests.
1 parent ed02bc2 commit 8d3a846

File tree

5 files changed

+573
-0
lines changed

5 files changed

+573
-0
lines changed
Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
package bundletest
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"strconv"
7+
"testing"
8+
9+
"github.com/databricks/cli/bundle"
10+
"github.com/databricks/cli/bundle/config"
11+
"github.com/databricks/cli/bundle/config/resources"
12+
"github.com/databricks/cli/libs/diag"
13+
"github.com/databricks/cli/libs/dyn"
14+
"github.com/databricks/databricks-sdk-go/service/jobs"
15+
"github.com/stretchr/testify/require"
16+
)
17+
18+
const jobExample = `
19+
{
20+
"budget_policy_id": "550e8400-e29b-41d4-a716-446655440000",
21+
"continuous": {
22+
"pause_status": "UNPAUSED"
23+
},
24+
"deployment": {
25+
"kind": "BUNDLE",
26+
"metadata_file_path": "string"
27+
},
28+
"description": "This job contain multiple tasks that are required to produce the weekly shark sightings report.",
29+
"edit_mode": "UI_LOCKED",
30+
"email_notifications": {
31+
"no_alert_for_skipped_runs": false,
32+
"on_duration_warning_threshold_exceeded": [
33+
"user.name@databricks.com"
34+
],
35+
"on_failure": [
36+
"user.name@databricks.com"
37+
],
38+
"on_start": [
39+
"user.name@databricks.com"
40+
],
41+
"on_streaming_backlog_exceeded": [
42+
"user.name@databricks.com"
43+
],
44+
"on_success": [
45+
"user.name@databricks.com"
46+
]
47+
},
48+
"environments": [
49+
{
50+
"environment_key": "string",
51+
"spec": {
52+
"client": "1",
53+
"dependencies": [
54+
"string"
55+
]
56+
}
57+
}
58+
],
59+
"format": "SINGLE_TASK",
60+
"git_source": {
61+
"git_branch": "main",
62+
"git_provider": "gitHub",
63+
"git_url": "https://github.com/databricks/databricks-cli"
64+
},
65+
"health": {
66+
"rules": [
67+
{
68+
"metric": "RUN_DURATION_SECONDS",
69+
"op": "GREATER_THAN",
70+
"value": 10
71+
}
72+
]
73+
},
74+
"job_clusters": [
75+
{
76+
"job_cluster_key": "auto_scaling_cluster",
77+
"new_cluster": {
78+
"autoscale": {
79+
"max_workers": 16,
80+
"min_workers": 2
81+
},
82+
"node_type_id": null,
83+
"spark_conf": {
84+
"spark.speculation": "true"
85+
},
86+
"spark_version": "7.3.x-scala2.12"
87+
}
88+
}
89+
],
90+
"max_concurrent_runs": 10,
91+
"name": "A multitask job",
92+
"notification_settings": {
93+
"no_alert_for_canceled_runs": false,
94+
"no_alert_for_skipped_runs": false
95+
},
96+
"parameters": [
97+
{
98+
"default": "users",
99+
"name": "table"
100+
}
101+
],
102+
"performance_target": "PERFORMANCE_OPTIMIZED",
103+
"queue": {
104+
"enabled": true
105+
},
106+
"run_as": {
107+
"service_principal_name": "692bc6d0-ffa3-11ed-be56-0242ac120002",
108+
"user_name": "user@databricks.com"
109+
},
110+
"schedule": {
111+
"pause_status": "UNPAUSED",
112+
"quartz_cron_expression": "20 30 * * * ?",
113+
"timezone_id": "Europe/London"
114+
},
115+
"tags": {
116+
"cost-center": "engineering",
117+
"team": "jobs"
118+
},
119+
"tasks": [
120+
{
121+
"depends_on": [],
122+
"description": "Extracts session data from events",
123+
"existing_cluster_id": "0923-164208-meows279",
124+
"libraries": [
125+
{
126+
"jar": "dbfs:/mnt/databricks/Sessionize.jar"
127+
}
128+
],
129+
"max_retries": 3,
130+
"min_retry_interval_millis": 2000,
131+
"retry_on_timeout": false,
132+
"spark_jar_task": {
133+
"main_class_name": "com.databricks.Sessionize",
134+
"parameters": [
135+
"--data",
136+
"dbfs:/path/to/data.json"
137+
]
138+
},
139+
"task_key": "Sessionize",
140+
"timeout_seconds": 86400
141+
},
142+
{
143+
"depends_on": [],
144+
"description": "Ingests order data",
145+
"job_cluster_key": "auto_scaling_cluster",
146+
"libraries": [
147+
{
148+
"jar": "dbfs:/mnt/databricks/OrderIngest.jar"
149+
}
150+
],
151+
"max_retries": 3,
152+
"min_retry_interval_millis": 2000,
153+
"retry_on_timeout": false,
154+
"spark_jar_task": {
155+
"main_class_name": "com.databricks.OrdersIngest",
156+
"parameters": [
157+
"--data",
158+
"dbfs:/path/to/order-data.json"
159+
]
160+
},
161+
"task_key": "Orders_Ingest",
162+
"timeout_seconds": 86400
163+
},
164+
{
165+
"depends_on": [
166+
{
167+
"task_key": "Orders_Ingest"
168+
},
169+
{
170+
"task_key": "Sessionize"
171+
}
172+
],
173+
"description": "Matches orders with user sessions",
174+
"max_retries": 3,
175+
"min_retry_interval_millis": 2000,
176+
"new_cluster": {
177+
"autoscale": {
178+
"max_workers": 16,
179+
"min_workers": 2
180+
},
181+
"node_type_id": null,
182+
"spark_conf": {
183+
"spark.speculation": "true"
184+
},
185+
"spark_version": "7.3.x-scala2.12"
186+
},
187+
"notebook_task": {
188+
"base_parameters": {
189+
"age": "35",
190+
"name": "John Doe"
191+
},
192+
"notebook_path": "/Users/user.name@databricks.com/Match"
193+
},
194+
"retry_on_timeout": false,
195+
"run_if": "ALL_SUCCESS",
196+
"timeout_seconds": 86400
197+
}
198+
],
199+
"timeout_seconds": 86400,
200+
"trigger": {
201+
"file_arrival": {
202+
"min_time_between_triggers_seconds": 0,
203+
"url": "string",
204+
"wait_after_last_change_seconds": 0
205+
},
206+
"pause_status": "UNPAUSED",
207+
"periodic": {
208+
"interval": 0,
209+
"unit": "HOURS"
210+
}
211+
}
212+
}`
213+
214+
func BundleV(b *testing.B, numJobs int) dyn.Value {
215+
allJobs := map[string]*resources.Job{}
216+
for i := range numJobs {
217+
job := jobs.JobSettings{}
218+
err := json.Unmarshal([]byte(jobExample), &job)
219+
require.NoError(b, err)
220+
221+
allJobs[strconv.Itoa(i)] = &resources.Job{
222+
JobSettings: job,
223+
}
224+
}
225+
226+
myBundle := bundle.Bundle{
227+
Config: config.Root{
228+
Resources: config.Resources{
229+
Jobs: allJobs,
230+
},
231+
},
232+
}
233+
234+
// Apply noop mutator to initialize the bundle value.
235+
bundle.ApplyFunc(context.Background(), &myBundle, func(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
236+
return nil
237+
})
238+
239+
return myBundle.Config.Value()
240+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package bundletest
2+
3+
import (
4+
"testing"
5+
6+
"github.com/databricks/cli/libs/dyn"
7+
"github.com/stretchr/testify/assert"
8+
)
9+
10+
// This took 40ms to run on 18th June 2025.
11+
func BenchmarkWalkReadOnly(b *testing.B) {
12+
input := BundleV(b, 10000)
13+
14+
for b.Loop() {
15+
err := dyn.WalkReadOnly(input, func(p dyn.Path, v dyn.Value) error {
16+
return nil
17+
})
18+
assert.NoError(b, err)
19+
}
20+
}
21+
22+
// This took 160ms to run on 18th June 2025.
23+
func BenchmarkWalk(b *testing.B) {
24+
input := BundleV(b, 10000)
25+
26+
for b.Loop() {
27+
_, err := dyn.Walk(input, func(p dyn.Path, v dyn.Value) (dyn.Value, error) {
28+
return v, nil
29+
})
30+
assert.NoError(b, err)
31+
}
32+
}

libs/dyn/mapping.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,21 @@ func NewMapping() Mapping {
2828
}
2929
}
3030

31+
// NewMappingFromPairs computes a [Mapping] from a list of [Pair]s. The index
32+
// map does not need to be provided since that will be computed from the
33+
// key-value pairs provided.
34+
func NewMappingFromPairs(pairs []Pair) Mapping {
35+
index := make(map[string]int)
36+
for i, p := range pairs {
37+
index[p.Key.MustString()] = i
38+
}
39+
40+
return Mapping{
41+
pairs: pairs,
42+
index: index,
43+
}
44+
}
45+
3146
// newMappingWithSize creates a new Mapping preallocated to the specified size.
3247
func newMappingWithSize(size int) Mapping {
3348
return Mapping{

libs/dyn/walk_read_only.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package dyn
2+
3+
// WalkReadOnly walks the configuration tree in readonly mode and calls the given function on each node.
4+
// The callback may return ErrSkip to skip traversal of a subtree.
5+
// If the callback returns another error, the walk is aborted, and the error is returned.
6+
func WalkReadOnly(v Value, fn func(p Path, v Value) error) error {
7+
return walkReadOnly(v, EmptyPath, fn)
8+
}
9+
10+
// Unexported counterpart to WalkReadOnly.
11+
// It carries the path leading up to the current node,
12+
// such that it can be passed to the callback function.
13+
func walkReadOnly(v Value, p Path, fn func(p Path, v Value) error) error {
14+
if err := fn(p, v); err != nil {
15+
if err == ErrSkip {
16+
return nil
17+
}
18+
return err
19+
}
20+
21+
switch v.Kind() {
22+
case KindMap:
23+
m := v.MustMap()
24+
for _, pair := range m.Pairs() {
25+
pk := pair.Key
26+
pv := pair.Value
27+
if err := walkReadOnly(pv, append(p, Key(pk.MustString())), fn); err != nil {
28+
return err
29+
}
30+
}
31+
case KindSequence:
32+
s := v.MustSequence()
33+
for i := range s {
34+
if err := walkReadOnly(s[i], append(p, Index(i)), fn); err != nil {
35+
return err
36+
}
37+
}
38+
}
39+
40+
return nil
41+
}

0 commit comments

Comments
 (0)