diff --git a/server.go b/server.go index 8a9d05c..18e6a38 100644 --- a/server.go +++ b/server.go @@ -199,7 +199,7 @@ func NewServer(name string, path string, transporter Transporter, stateMachine S return c.Apply(&context{ server: s, currentTerm: s.currentTerm, - currentIndex: s.log.internalCurrentIndex(), + currentIndex: e.Index(), commitIndex: s.log.commitIndex, }) case deprecatedCommandApply: diff --git a/server_test.go b/server_test.go index 9b53c9a..5f0772c 100644 --- a/server_test.go +++ b/server_test.go @@ -271,6 +271,45 @@ func TestServerAppendEntries(t *testing.T) { } } +// Ensure we can append entries to a server. +func TestServerProvidesCurrentContextWhenApplyingCommand(t *testing.T) { + s := newTestServer("1", &testTransporter{}) + + s.SetHeartbeatInterval(time.Second * 10) + s.Start() + defer s.Stop() + + // Append single entry. + appliedContexts = make([]Context, 0) + + e, _ := newLogEntry(nil, nil, 1, 1, &testApplyCommand{Val: "foo", I: 10}) + entries := []*LogEntry{e} + s.AppendEntries(newAppendEntriesRequest(1, 0, 0, 1, "ldr", entries)) + if len(appliedContexts) != 1 { + t.Fatalf("Called Apply %v times (expected 1)", len(appliedContexts)) + } + if appliedContexts[0].CommitIndex() != 1 || appliedContexts[0].CurrentIndex() != 1 { + t.Fatalf("Invalid context index info [Current=%v, Commit=%v]", appliedContexts[0].CurrentIndex(), appliedContexts[0].CommitIndex()) + } + + // Append multiple entries + commit the last one. + appliedContexts = make([]Context, 0) + + e1, _ := newLogEntry(nil, nil, 2, 1, &testApplyCommand{Val: "bar", I: 20}) + e2, _ := newLogEntry(nil, nil, 3, 1, &testApplyCommand{Val: "baz", I: 30}) + entries = []*LogEntry{e1, e2} + s.AppendEntries(newAppendEntriesRequest(1, 1, 1, 3, "ldr", entries)) + if len(appliedContexts) != 2 { + t.Fatalf("Called Apply %v times (expected 2)", len(appliedContexts)) + } + if appliedContexts[0].CommitIndex() != 2 || appliedContexts[0].CurrentIndex() != 2 { + t.Fatalf("Invalid context index info [Current=%v, Commit=%v]", appliedContexts[0].CurrentIndex(), appliedContexts[0].CommitIndex()) + } + if appliedContexts[1].CommitIndex() != 3 || appliedContexts[1].CurrentIndex() != 3 { + t.Fatalf("Invalid context index info [Current=%v, Commit=%v]", appliedContexts[1].CurrentIndex(), appliedContexts[1].CommitIndex()) + } +} + //Ensure that entries with stale terms are rejected. func TestServerAppendEntriesWithStaleTermsAreRejected(t *testing.T) { s := newTestServer("1", &testTransporter{}) diff --git a/test.go b/test.go index a1be3c4..10d1d62 100644 --- a/test.go +++ b/test.go @@ -19,6 +19,7 @@ const ( func init() { RegisterCommand(&testCommand1{}) RegisterCommand(&testCommand2{}) + RegisterCommand(&testApplyCommand{}) } //------------------------------------------------------------------------------ @@ -195,3 +196,28 @@ func (c *testCommand2) CommandName() string { func (c *testCommand2) Apply(server Server) (interface{}, error) { return nil, nil } + +//----------------------------------------------- +// ApplyCommand - testing passed content to Apply +//----------------------------------------------- + +var appliedContexts []Context + +type testApplyCommand struct { + Val string `json:"val"` + I int `json:"i"` +} + +func (c *testApplyCommand) CommandName() string { + return "cmd_apply" +} + +func (c *testApplyCommand) Apply(context Context) (interface{}, error) { + if appliedContexts == nil { + appliedContexts = make([]Context, 0) + } + + appliedContexts = append(appliedContexts, context) + + return nil, nil +}