Skip to content

Commit 7f67cb2

Browse files
author
Varun Deep Saini
committed
Add append log for direct deployment state to recover interrupted deployments
1 parent daef3df commit 7f67cb2

File tree

2 files changed

+234
-16
lines changed

2 files changed

+234
-16
lines changed

bundle/direct/dstate/state.go

Lines changed: 41 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,14 @@ type Database struct {
2222
Lineage string `json:"lineage"`
2323
Serial int `json:"serial"`
2424
State map[string]ResourceEntry `json:"state"`
25+
WAL []LogEntry `json:"wal,omitempty"`
26+
}
27+
28+
type LogEntry struct {
29+
Op string `json:"op"`
30+
Key string `json:"key"`
31+
ID string `json:"id,omitempty"`
32+
State any `json:"state,omitempty"`
2533
}
2634

2735
type ResourceEntry struct {
@@ -34,15 +42,16 @@ func (db *DeploymentState) SaveState(key, newID string, state any) error {
3442
db.mu.Lock()
3543
defer db.mu.Unlock()
3644

37-
if db.Data.State == nil {
38-
db.Data.State = make(map[string]ResourceEntry)
45+
db.Data.WAL = append(db.Data.WAL, LogEntry{Op: "save", Key: key, ID: newID, State: state})
46+
if err := db.writeState(); err != nil {
47+
db.Data.WAL = db.Data.WAL[:len(db.Data.WAL)-1] // Rollback on failure
48+
return err
3949
}
4050

41-
db.Data.State[key] = ResourceEntry{
42-
ID: newID,
43-
State: state,
51+
if db.Data.State == nil {
52+
db.Data.State = make(map[string]ResourceEntry)
4453
}
45-
54+
db.Data.State[key] = ResourceEntry{ID: newID, State: state}
4655
return nil
4756
}
4857

@@ -51,12 +60,13 @@ func (db *DeploymentState) DeleteState(key string) error {
5160
db.mu.Lock()
5261
defer db.mu.Unlock()
5362

54-
if db.Data.State == nil {
55-
return nil
63+
db.Data.WAL = append(db.Data.WAL, LogEntry{Op: "delete", Key: key})
64+
if err := db.writeState(); err != nil {
65+
db.Data.WAL = db.Data.WAL[:len(db.Data.WAL)-1] // Rollback on failure
66+
return err
5667
}
5768

5869
delete(db.Data.State, key)
59-
6070
return nil
6171
}
6272

@@ -104,14 +114,21 @@ func (db *DeploymentState) Open(path string) error {
104114
}
105115

106116
db.Path = path
117+
db.replayWAL()
107118
return nil
108119
}
109120

110121
func (db *DeploymentState) Finalize() error {
111122
db.mu.Lock()
112123
defer db.mu.Unlock()
113124

114-
return db.unlockedSave()
125+
oldWAL := db.Data.WAL
126+
db.Data.WAL = nil
127+
if err := db.writeState(); err != nil {
128+
db.Data.WAL = oldWAL // Restore on failure
129+
return err
130+
}
131+
return nil
115132
}
116133

117134
func (db *DeploymentState) AssertOpened() {
@@ -148,17 +165,25 @@ func (db *DeploymentState) ExportState(ctx context.Context) resourcestate.Export
148165
return result
149166
}
150167

151-
func (db *DeploymentState) unlockedSave() error {
168+
func (db *DeploymentState) writeState() error {
152169
db.AssertOpened()
153170
data, err := json.MarshalIndent(db.Data, "", " ")
154171
if err != nil {
155172
return err
156173
}
174+
return os.WriteFile(db.Path, data, 0o600)
175+
}
157176

158-
err = os.WriteFile(db.Path, data, 0o600)
159-
if err != nil {
160-
return fmt.Errorf("failed to save resources state to %#v: %w", db.Path, err)
177+
func (db *DeploymentState) replayWAL() {
178+
for _, entry := range db.Data.WAL {
179+
switch entry.Op {
180+
case "save":
181+
if db.Data.State == nil {
182+
db.Data.State = make(map[string]ResourceEntry)
183+
}
184+
db.Data.State[entry.Key] = ResourceEntry{ID: entry.ID, State: entry.State}
185+
case "delete":
186+
delete(db.Data.State, entry.Key)
187+
}
161188
}
162-
163-
return nil
164189
}

bundle/direct/dstate/state_test.go

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
package dstate
2+
3+
import (
4+
"encoding/json"
5+
"os"
6+
"path/filepath"
7+
"testing"
8+
9+
"github.com/stretchr/testify/assert"
10+
"github.com/stretchr/testify/require"
11+
)
12+
13+
func TestWALSaveAndReplay(t *testing.T) {
14+
tmpDir := t.TempDir()
15+
statePath := filepath.Join(tmpDir, "state.json")
16+
17+
db := &DeploymentState{}
18+
err := db.Open(statePath)
19+
require.NoError(t, err)
20+
21+
err = db.SaveState("resources.jobs.job1", "123", map[string]any{"name": "job1"})
22+
require.NoError(t, err)
23+
24+
err = db.SaveState("resources.jobs.job2", "456", map[string]any{"name": "job2"})
25+
require.NoError(t, err)
26+
27+
// Verify WAL exists in file
28+
data, err := os.ReadFile(statePath)
29+
require.NoError(t, err)
30+
var loaded Database
31+
require.NoError(t, json.Unmarshal(data, &loaded))
32+
assert.Len(t, loaded.WAL, 2)
33+
34+
// Simulate crash: open a new state (WAL replays on open)
35+
db2 := &DeploymentState{}
36+
err = db2.Open(statePath)
37+
require.NoError(t, err)
38+
39+
entry1, ok := db2.GetResourceEntry("resources.jobs.job1")
40+
assert.True(t, ok)
41+
assert.Equal(t, "123", entry1.ID)
42+
43+
entry2, ok := db2.GetResourceEntry("resources.jobs.job2")
44+
assert.True(t, ok)
45+
assert.Equal(t, "456", entry2.ID)
46+
}
47+
48+
func TestWALDeleteAndReplay(t *testing.T) {
49+
tmpDir := t.TempDir()
50+
statePath := filepath.Join(tmpDir, "state.json")
51+
52+
db := &DeploymentState{}
53+
err := db.Open(statePath)
54+
require.NoError(t, err)
55+
56+
err = db.SaveState("resources.jobs.job1", "123", map[string]any{"name": "job1"})
57+
require.NoError(t, err)
58+
59+
err = db.SaveState("resources.jobs.job2", "456", map[string]any{"name": "job2"})
60+
require.NoError(t, err)
61+
62+
err = db.DeleteState("resources.jobs.job1")
63+
require.NoError(t, err)
64+
65+
// Simulate crash and reopen
66+
db2 := &DeploymentState{}
67+
err = db2.Open(statePath)
68+
require.NoError(t, err)
69+
70+
_, ok := db2.GetResourceEntry("resources.jobs.job1")
71+
assert.False(t, ok, "job1 should be deleted")
72+
73+
entry2, ok := db2.GetResourceEntry("resources.jobs.job2")
74+
assert.True(t, ok)
75+
assert.Equal(t, "456", entry2.ID)
76+
}
77+
78+
func TestWALFinalizeClearsWAL(t *testing.T) {
79+
tmpDir := t.TempDir()
80+
statePath := filepath.Join(tmpDir, "state.json")
81+
82+
db := &DeploymentState{}
83+
err := db.Open(statePath)
84+
require.NoError(t, err)
85+
86+
err = db.SaveState("resources.jobs.job1", "123", map[string]any{"name": "job1"})
87+
require.NoError(t, err)
88+
89+
// Verify WAL has entries before finalize
90+
data, err := os.ReadFile(statePath)
91+
require.NoError(t, err)
92+
var before Database
93+
require.NoError(t, json.Unmarshal(data, &before))
94+
assert.NotEmpty(t, before.WAL)
95+
96+
err = db.Finalize()
97+
require.NoError(t, err)
98+
99+
// Verify WAL is cleared after finalize
100+
data, err = os.ReadFile(statePath)
101+
require.NoError(t, err)
102+
var after Database
103+
require.NoError(t, json.Unmarshal(data, &after))
104+
assert.Empty(t, after.WAL)
105+
106+
// State still has the entry
107+
db2 := &DeploymentState{}
108+
err = db2.Open(statePath)
109+
require.NoError(t, err)
110+
111+
entry, ok := db2.GetResourceEntry("resources.jobs.job1")
112+
assert.True(t, ok)
113+
assert.Equal(t, "123", entry.ID)
114+
}
115+
116+
func TestWALWithExistingState(t *testing.T) {
117+
tmpDir := t.TempDir()
118+
statePath := filepath.Join(tmpDir, "state.json")
119+
120+
// Create state with one entry and finalize
121+
db := &DeploymentState{}
122+
err := db.Open(statePath)
123+
require.NoError(t, err)
124+
125+
err = db.SaveState("resources.jobs.job1", "123", map[string]any{"name": "job1"})
126+
require.NoError(t, err)
127+
128+
err = db.Finalize()
129+
require.NoError(t, err)
130+
131+
// Open again and add more entries without finalizing
132+
db2 := &DeploymentState{}
133+
err = db2.Open(statePath)
134+
require.NoError(t, err)
135+
136+
err = db2.SaveState("resources.jobs.job2", "456", map[string]any{"name": "job2"})
137+
require.NoError(t, err)
138+
139+
// Simulate crash and reopen
140+
db3 := &DeploymentState{}
141+
err = db3.Open(statePath)
142+
require.NoError(t, err)
143+
144+
entry1, ok := db3.GetResourceEntry("resources.jobs.job1")
145+
assert.True(t, ok)
146+
assert.Equal(t, "123", entry1.ID)
147+
148+
entry2, ok := db3.GetResourceEntry("resources.jobs.job2")
149+
assert.True(t, ok)
150+
assert.Equal(t, "456", entry2.ID)
151+
}
152+
153+
func TestWALUpdateExistingEntry(t *testing.T) {
154+
tmpDir := t.TempDir()
155+
statePath := filepath.Join(tmpDir, "state.json")
156+
157+
db := &DeploymentState{}
158+
err := db.Open(statePath)
159+
require.NoError(t, err)
160+
161+
err = db.SaveState("resources.jobs.job1", "123", map[string]any{"name": "job1"})
162+
require.NoError(t, err)
163+
164+
err = db.SaveState("resources.jobs.job1", "789", map[string]any{"name": "job1-updated"})
165+
require.NoError(t, err)
166+
167+
// Simulate crash and reopen
168+
db2 := &DeploymentState{}
169+
err = db2.Open(statePath)
170+
require.NoError(t, err)
171+
172+
entry, ok := db2.GetResourceEntry("resources.jobs.job1")
173+
assert.True(t, ok)
174+
assert.Equal(t, "789", entry.ID)
175+
}
176+
177+
func TestNoStateFile(t *testing.T) {
178+
tmpDir := t.TempDir()
179+
statePath := filepath.Join(tmpDir, "state.json")
180+
181+
db := &DeploymentState{}
182+
err := db.Open(statePath)
183+
require.NoError(t, err)
184+
185+
err = db.Finalize()
186+
require.NoError(t, err)
187+
188+
db2 := &DeploymentState{}
189+
err = db2.Open(statePath)
190+
require.NoError(t, err)
191+
192+
assert.NotEmpty(t, db2.Data.Lineage)
193+
}

0 commit comments

Comments
 (0)