Skip to content

Commit baf371e

Browse files
author
Varun Deep Saini
committed
Add WAL for direct deployment state recovery
Signed-off-by: Varun Deep Saini <varun.23bcs10048@ms.sst.scaler.com>
1 parent f4f794c commit baf371e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+2040
-19
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
bundle:
2+
name: wal-corrupted-test
3+
4+
resources:
5+
jobs:
6+
valid_job:
7+
name: "valid-job"
8+
tasks:
9+
- task_key: "task-a"
10+
spark_python_task:
11+
python_file: ./test.py
12+
new_cluster:
13+
spark_version: 15.4.x-scala2.12
14+
node_type_id: i3.xlarge
15+
num_workers: 0
16+
another_valid:
17+
name: "another-valid"
18+
tasks:
19+
- task_key: "task-b"
20+
spark_python_task:
21+
python_file: ./test.py
22+
new_cluster:
23+
spark_version: 15.4.x-scala2.12
24+
node_type_id: i3.xlarge
25+
num_workers: 0

acceptance/bundle/deploy/wal/corrupted-wal-entry/out.test.toml

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
=== Creating state file with serial 5 ===
2+
=== Creating WAL with corrupted entry ===
3+
=== WAL content ===
4+
{"lineage":"test-lineage-123","serial": [SERIAL]}
5+
{"k":"resources.jobs.valid_job","v":{"__id__": "[ID]","state":{"name":"valid-job"}}}
6+
not valid json - this line should be skipped
7+
{"k":"resources.jobs.another_valid","v":{"__id__": "[ID]","state":{"name":"another-valid"}}}
8+
=== Deploy (should recover valid entries, skip corrupted) ===
9+
10+
>>> [CLI] bundle deploy
11+
Warning: Single node cluster is not correctly configured
12+
at resources.jobs.another_valid.tasks[0].new_cluster
13+
in databricks.yml:23:13
14+
15+
num_workers should be 0 only for single-node clusters. To create a
16+
valid single node cluster please ensure that the following properties
17+
are correctly set in the cluster specification:
18+
19+
spark_conf:
20+
spark.databricks.cluster.profile: singleNode
21+
spark.master: local[*]
22+
23+
custom_tags:
24+
ResourceClass: SingleNode
25+
26+
27+
Warning: Single node cluster is not correctly configured
28+
at resources.jobs.valid_job.tasks[0].new_cluster
29+
in databricks.yml:13:13
30+
31+
num_workers should be 0 only for single-node clusters. To create a
32+
valid single node cluster please ensure that the following properties
33+
are correctly set in the cluster specification:
34+
35+
spark_conf:
36+
spark.databricks.cluster.profile: singleNode
37+
spark.master: local[*]
38+
39+
custom_tags:
40+
ResourceClass: SingleNode
41+
42+
43+
Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-corrupted-test/default/files...
44+
Deploying resources...
45+
Updating deployment state...
46+
Deployment complete!
47+
=== Final state (should have recovered entries) ===
48+
{
49+
"serial": [SERIAL],
50+
"state_keys": [
51+
"resources.jobs.another_valid",
52+
"resources.jobs.valid_job"
53+
]
54+
}
55+
=== WAL after successful deploy ===
56+
WAL deleted (expected)
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
echo "=== Creating state file with serial 5 ==="
2+
mkdir -p .databricks/bundle/default
3+
cat > .databricks/bundle/default/resources.json << 'EOF'
4+
{
5+
"state_version": 1,
6+
"cli_version": "0.0.0",
7+
"lineage": "test-lineage-123",
8+
"serial": 5,
9+
"state": {}
10+
}
11+
EOF
12+
13+
echo "=== Creating WAL with corrupted entry ==="
14+
cat > .databricks/bundle/default/resources.json.wal << 'EOF'
15+
{"lineage":"test-lineage-123","serial":6}
16+
{"k":"resources.jobs.valid_job","v":{"__id__":"1111","state":{"name":"valid-job"}}}
17+
not valid json - this line should be skipped
18+
{"k":"resources.jobs.another_valid","v":{"__id__":"2222","state":{"name":"another-valid"}}}
19+
EOF
20+
21+
echo "=== WAL content ==="
22+
cat .databricks/bundle/default/resources.json.wal
23+
24+
echo "=== Deploy (should recover valid entries, skip corrupted) ==="
25+
trace $CLI bundle deploy 2>&1 | python3 sort_warnings.py
26+
27+
echo "=== Final state (should have recovered entries) ==="
28+
cat .databricks/bundle/default/resources.json | jq -S '{serial: .serial, state_keys: (.state | keys | sort)}'
29+
30+
echo "=== WAL after successful deploy ==="
31+
if [ -f ".databricks/bundle/default/resources.json.wal" ]; then
32+
echo "WAL exists (unexpected)"
33+
else
34+
echo "WAL deleted (expected)"
35+
fi
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
#!/usr/bin/env python3
2+
"""Sort warning blocks in CLI output to make test output deterministic.
3+
4+
Warning blocks look like:
5+
Warning: Single node cluster is not correctly configured
6+
at resources.jobs.XXX.tasks[0].new_cluster
7+
in databricks.yml:NN:NN
8+
9+
num_workers should be 0 only for single-node clusters...
10+
spark_conf:
11+
...
12+
custom_tags:
13+
...
14+
15+
This script groups consecutive warning blocks, sorts them by job name, and outputs.
16+
"""
17+
18+
import re
19+
import sys
20+
21+
22+
def main():
23+
content = sys.stdin.read()
24+
lines = content.split("\n")
25+
26+
result = []
27+
i = 0
28+
29+
while i < len(lines):
30+
line = lines[i]
31+
32+
# Check if this is the start of a warning block
33+
if line.startswith("Warning:"):
34+
# Collect all consecutive warning blocks
35+
warnings = []
36+
while i < len(lines) and (
37+
lines[i].startswith("Warning:")
38+
or (
39+
warnings
40+
and not lines[i].startswith("Uploading")
41+
and not lines[i].startswith("Deploying")
42+
and not lines[i].startswith(">>>")
43+
and not lines[i].startswith("===")
44+
)
45+
):
46+
# Collect one complete warning block
47+
block = []
48+
if lines[i].startswith("Warning:"):
49+
block.append(lines[i])
50+
i += 1
51+
# Collect until next Warning or end marker
52+
while i < len(lines):
53+
if lines[i].startswith("Warning:"):
54+
break
55+
if lines[i].startswith("Uploading") or lines[i].startswith("Deploying"):
56+
break
57+
if lines[i].startswith(">>>") or lines[i].startswith("==="):
58+
break
59+
block.append(lines[i])
60+
i += 1
61+
warnings.append(block)
62+
else:
63+
i += 1
64+
65+
# Sort warnings by the job name in "at resources.jobs.XXX"
66+
def get_sort_key(block):
67+
for line in block:
68+
match = re.search(r"at resources\.jobs\.(\w+)", line)
69+
if match:
70+
return match.group(1)
71+
return ""
72+
73+
warnings.sort(key=get_sort_key)
74+
75+
# Output sorted warnings
76+
for block in warnings:
77+
for line in block:
78+
result.append(line)
79+
else:
80+
result.append(line)
81+
i += 1
82+
83+
print("\n".join(result), end="")
84+
85+
86+
if __name__ == "__main__":
87+
main()
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
print("test")
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# WAL with corrupted entry - valid entries should be recovered, corrupted skipped.
2+
3+
[[Server]]
4+
Pattern = "POST /api/2.2/jobs/reset"
5+
Response.Body = '{}'
6+
7+
[[Server]]
8+
Pattern = "GET /api/2.2/jobs/get?job_id=1111"
9+
Response.Body = '{"job_id": 1111, "settings": {"name": "valid-job"}}'
10+
11+
[[Server]]
12+
Pattern = "GET /api/2.2/jobs/get?job_id=2222"
13+
Response.Body = '{"job_id": 2222, "settings": {"name": "another-valid"}}'
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
bundle:
2+
name: wal-crash-test
3+
4+
resources:
5+
jobs:
6+
job_a:
7+
name: "test-job-a"
8+
tasks:
9+
- task_key: "task-a"
10+
spark_python_task:
11+
python_file: ./test.py
12+
new_cluster:
13+
spark_version: 15.4.x-scala2.12
14+
node_type_id: i3.xlarge
15+
num_workers: 0

acceptance/bundle/deploy/wal/crash-after-create/out.test.toml

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
=== Creating state directory ===
2+
=== Creating WAL file (simulating crash after job create) ===
3+
=== WAL content before deploy ===
4+
{"lineage":"test-lineage-123","serial": [SERIAL]}
5+
{"k":"resources.jobs.job_a","v":{"__id__": "[ID]","state":{"name":"test-job-a"}}}
6+
=== Deploy (should recover from WAL) ===
7+
8+
>>> [CLI] bundle deploy
9+
Warning: Single node cluster is not correctly configured
10+
at resources.jobs.job_a.tasks[0].new_cluster
11+
in databricks.yml:13:13
12+
13+
num_workers should be 0 only for single-node clusters. To create a
14+
valid single node cluster please ensure that the following properties
15+
are correctly set in the cluster specification:
16+
17+
spark_conf:
18+
spark.databricks.cluster.profile: singleNode
19+
spark.master: local[*]
20+
21+
custom_tags:
22+
ResourceClass: SingleNode
23+
24+
25+
Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-crash-test/default/files...
26+
Deploying resources...
27+
Updating deployment state...
28+
Deployment complete!
29+
=== State file after recovery ===
30+
{
31+
"lineage": "test-lineage-123",
32+
"serial": [SERIAL],
33+
"state_keys": [
34+
"resources.jobs.job_a"
35+
]
36+
}
37+
=== WAL file after successful deploy ===
38+
WAL file deleted (expected)

0 commit comments

Comments
 (0)