From b4df8d51599a8bdd1a7510934f97217fc32b6c7b Mon Sep 17 00:00:00 2001 From: Dmitry Voytik Date: Tue, 7 Nov 2017 22:03:41 +0100 Subject: [PATCH] proxy: implement (re)store of proxy's state Introduce the high availability feature of cc-proxy by implementing store/restore of proxy's state to/from disk. This feature depends on the ability of shim to reconnect to cc-proxy if connection is lost. Fixes #4. Signed-off-by: Dmitry Voytik --- Makefile | 3 +- proxy.go | 42 ++++- state.go | 415 ++++++++++++++++++++++++++++++++++++++++++++++++++ state_test.go | 338 ++++++++++++++++++++++++++++++++++++++++ vm.go | 58 ++++++- 5 files changed, 846 insertions(+), 10 deletions(-) create mode 100644 state.go create mode 100644 state_test.go diff --git a/Makefile b/Makefile index 490dd62..cabfdc0 100644 --- a/Makefile +++ b/Makefile @@ -16,6 +16,7 @@ LOCALSTATEDIR := /var SOURCES := $(shell find . 2>&1 | grep -E '.*\.(c|h|go)$$') PROXY_SOCKET := $(LOCALSTATEDIR)/run/clear-containers/proxy.sock +STORE_STATE_DIR := $(LOCALSTATEDIR)/run/clear-containers/proxy/ DESCRIBE := $(shell git describe 2> /dev/null || true) DESCRIBE_DIRTY := $(if $(shell git status --porcelain --untracked-files=no 2> /dev/null),${DESCRIBE}-dirty,${DESCRIBE}) @@ -53,7 +54,7 @@ all: cc-proxy $(UNIT_FILES) cc-proxy: $(SOURCES) Makefile $(QUIET_GOBUILD)go build -i -o $@ -ldflags \ - "-X main.DefaultSocketPath=$(PROXY_SOCKET) -X main.Version=$(VERSION)" + "-X main.DefaultSocketPath=$(PROXY_SOCKET) -X main.Version=$(VERSION) -X main.storeStateDir=$(STORE_STATE_DIR)" # # Tests diff --git a/proxy.go b/proxy.go index edf2ca3..c070260 100644 --- a/proxy.go +++ b/proxy.go @@ -255,6 +255,15 @@ func registerVM(data []byte, userData interface{}, response *handlerResponse) { client.vm = vm + if err := storeVMState(vm); err != nil { + logContID(vm.containerID).Errorf( + "couldn't store a VM state: %v", err) + } + + if err := storeProxyState(proxy); err != nil { + proxyLog.Errorf("couldn't store proxy's state: %v", err) + } + if proxyKSM != nil { proxyKSM.kick() } @@ -300,6 +309,15 @@ func attachVM(data []byte, userData interface{}, response *handlerResponse) { client.log.Infof("AttachVM(containerId=%s)", payload.ContainerID) client.vm = vm + + if err := storeVMState(vm); err != nil { + logContID(vm.containerID).Errorf( + "couldn't store a VM state: %v", err) + } + + if err := storeProxyState(proxy); err != nil { + proxyLog.Errorf("couldn't store proxy's state: %v", err) + } } // "UnregisterVM" @@ -330,9 +348,10 @@ func unregisterVM(data []byte, userData interface{}, response *handlerResponse) client.log.Info("UnregisterVM()") - proxy.Lock() - delete(proxy.vms, vm.containerID) - proxy.Unlock() + if err := delVMAndState(proxy, vm); err != nil { + logContID(payload.ContainerID).Warnf("Error deleting state: %v", + err) + } client.vm = nil } @@ -627,12 +646,19 @@ func (proxy *proxy) init() error { // Force a coredump + full stacktrace on internal error debug.SetTraceback("crash") - // flags - proxy.enableVMConsole = logrus.GetLevel() == logrus.DebugLevel + stateIsRestored, err := restoreAllState(proxy) + if err != nil { + proxyLog.Errorf("Restoring failed: %v", err) + } + + if !stateIsRestored { + // flags + proxy.enableVMConsole = logrus.GetLevel() == logrus.DebugLevel - // Open the proxy socket - if proxy.socketPath, err = getSocketPath(); err != nil { - return fmt.Errorf("couldn't get a rigth socket path: %v", err) + // Open the proxy socket + if proxy.socketPath, err = getSocketPath(); err != nil { + return fmt.Errorf("couldn't get a right socket path: %v", err) + } } fds := listenFds() diff --git a/state.go b/state.go new file mode 100644 index 0000000..2ed2502 --- /dev/null +++ b/state.go @@ -0,0 +1,415 @@ +// Copyright (c) 2017 Huawei Technologies Duesseldorf GmbH +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "os" + "path/filepath" + + "github.com/clearcontainers/proxy/api" + "github.com/sirupsen/logrus" +) + +const proxyStateFileName = "state.json" +const proxyStateDirPerm = 0750 +const proxyStateFilesPerm = 0640 + +const stateFileFormatVersion = 1 + +// storeStateDir is populated at link time with the value of: +// $(LOCALSTATEDIR)/run/clear-containers/proxy/" +var storeStateDir = "/var/run/clear-containers/proxy" + +var proxyStateFilePath = filepath.Join(storeStateDir, proxyStateFileName) + +// proxyState is used to (re)store proxy state on disk. +// XXX stateFileFormatVersion must be updated in case of any changes in this +// struct. +type proxyState struct { + Version uint `json:"version"` + SocketPath string `json:"socket_path"` + EnableVMConsole bool `json:"enable_vm_console"` + ContainerIDs []string `json:"container_ids"` +} + +// vmState is used to (re)store vm struct on disk +// XXX stateFileFormatVersion must be updated in case of any changes in this +// struct. +type vmState struct { + RegisterVM api.RegisterVM `json:"registerVM"` + IoSessions []ioSessionState `json:"io_sessions"` + //nextIoBase is restored when ioSessions are restored +} + +type ioSessionState struct { + Token Token `json:"token"` + ContainerID string `json:"container_id"` + NStreams int `json:"n_streams"` + IoBase uint64 `json:"io_base"` +} + +func logContID(containerID string) *logrus.Entry { + return proxyLog.WithField("container", containerID) +} + +// On success returns nil, otherwise an error string message. +func restoreIoSessions(proxy *proxy, vm *vm, ioSessions []ioSessionState) error { + if vm == nil { + return fmt.Errorf("vm parameter is nil") + } + + for _, ioSes := range ioSessions { + if ioSes.Token == "" { + continue + } + + token, err := vm.AllocateIoSessionAs(ioSes.Token, ioSes.ContainerID, + ioSes.NStreams, ioSes.IoBase) + if err != nil { + return err + } + + proxy.Lock() + proxy.tokenToVM[token] = &tokenInfo{ + state: tokenStateAllocated, + vm: vm, + } + proxy.Unlock() + + session := vm.findSessionByToken(token) + if session == nil { + vm.Lock() + for token := range vm.tokenToSession { + _ = vm.freeTokenUnlocked(token) + delete(vm.tokenToSession, token) + } + vm.Unlock() + return fmt.Errorf("unknown token %s", token) + } + + // Signal that the process is already started + close(session.processStarted) + } + + return nil +} + +// Returns (false, nil) if it's a clean start (i.e. no state was found). +// Returns (false, error) if the restoring failed. +// Returns (true, nil) if the restoring succeeded. +func restoreAllState(proxy *proxy) (bool, error) { + if _, err := os.Stat(storeStateDir); os.IsNotExist(err) { + err := os.MkdirAll(storeStateDir, proxyStateDirPerm) + if err != nil { + return false, fmt.Errorf( + "couldn't create directory %s: %v", + storeStateDir, err) + } + return false, nil + } + + proxyLog.Info("Restoring proxy's state from: ", proxyStateFilePath) + + fdata, err := ioutil.ReadFile(proxyStateFilePath) + if err != nil { + return false, fmt.Errorf("couldn't read state file %s: %v", + proxyStateFilePath, err) + } + + var proxyState proxyState + + err = json.Unmarshal(fdata, &proxyState) + if err != nil { + return false, fmt.Errorf("couldn't unmarshal %s: %v", + proxyStateFilePath, err) + } + + proxyLog.Debugf("proxyState: %+v", proxyState) + + if len(proxyState.ContainerIDs) == 0 { + return false, fmt.Errorf("containerIDs list is empty") + } + + if proxyState.Version > stateFileFormatVersion { + return false, fmt.Errorf("stored state format version (%d) is"+ + " higher than supported (%d). Aborting", + proxyState.Version, stateFileFormatVersion) + } + + proxy.socketPath = proxyState.SocketPath + proxy.enableVMConsole = proxyState.EnableVMConsole + + for _, containerID := range proxyState.ContainerIDs { + go func(contID string) { + // ignore failures here but log them inside + _ = restoreVMState(proxy, contID) + }(containerID) + } + + return true, nil +} + +// On success returns nil, otherwise an error string message. +func storeProxyState(proxy *proxy) error { + proxy.Lock() + defer proxy.Unlock() + + // if there are 0 VMs then remove state from disk + if (len(proxy.vms)) == 0 { + if _, err := os.Stat(proxyStateFilePath); os.IsNotExist(err) { + return nil + } + + if err := os.Remove(proxyStateFilePath); err != nil { + return fmt.Errorf("couldn't remove file %s: %v", + proxyStateFilePath, err) + } + + return nil + } + + proxyState := &proxyState{ + Version: stateFileFormatVersion, + SocketPath: proxy.socketPath, + EnableVMConsole: proxy.enableVMConsole, + } + + for cID := range proxy.vms { + proxyState.ContainerIDs = append(proxyState.ContainerIDs, cID) + } + + data, err := json.MarshalIndent(proxyState, "", "\t") + if err != nil { + return fmt.Errorf("couldn't marshal proxy state %+v: %v", + proxyState, err) + } + + err = ioutil.WriteFile(proxyStateFilePath, data, proxyStateFilesPerm) + if err != nil { + return fmt.Errorf("couldn't store proxy state to file %s: %v", + proxyStateFilePath, err) + } + + return nil +} + +func vmStateFilePath(id string) string { + return filepath.Join(storeStateDir, "vm_"+id+".json") +} + +// On success returns nil, otherwise an error string message. +func storeVMState(vm *vm) error { + vm.Lock() + ioSessions := make([]ioSessionState, 0, len(vm.tokenToSession)) + for _, ioS := range vm.tokenToSession { + ioSessions = append(ioSessions, ioSessionState{ + ioS.token, + ioS.containerID, + ioS.nStreams, + ioS.ioBase, + }) + } + vm.Unlock() + + stVM := vmState{ + RegisterVM: api.RegisterVM{ + ContainerID: vm.containerID, + CtlSerial: vm.hyperHandler.GetCtlSockPath(), + IoSerial: vm.hyperHandler.GetIoSockPath(), + Console: vm.console.socketPath, + }, + IoSessions: ioSessions, + } + + o, err := json.MarshalIndent(&stVM, "", "\t") + if err != nil { + return fmt.Errorf("couldn't marshal VM state: %v", err) + } + + stFile := vmStateFilePath(vm.containerID) + + if err := ioutil.WriteFile(stFile, o, proxyStateFilesPerm); err != nil { + return fmt.Errorf("couldn't store VM state to %s: %v", + stFile, err) + } + + return nil +} + +// On success returns nil, otherwise an error string message. +func delVMAndState(proxy *proxy, vm *vm) error { + if proxy == nil { + return errors.New("proxy parameter is nil") + } + + if vm == nil { + return errors.New("vm parameter is nil") + } + + logContID(vm.containerID).Infof("Removing on-disk state") + + proxy.Lock() + delete(proxy.vms, vm.containerID) + proxy.Unlock() + + if err := storeProxyState(proxy); err != nil { + logContID(vm.containerID).Warnf("Couldn't store proxy's state:"+ + " %v", err) + // don't fail + } + + storeFile := vmStateFilePath(vm.containerID) + if err := os.Remove(storeFile); err != nil { + return fmt.Errorf("couldn't remove file %s: %v", storeFile, err) + } + + return nil +} + +func readVMState(containerID string) (*vmState, error) { + if containerID == "" { + return nil, fmt.Errorf("containerID parameter is empty") + } + + vmStateFilePath := vmStateFilePath(containerID) + fdata, err := ioutil.ReadFile(vmStateFilePath) + if err != nil { + return nil, fmt.Errorf("couldn't read %s: %v", vmStateFilePath, err) + } + + var vmState vmState + err = json.Unmarshal(fdata, &vmState) + if err != nil { + return nil, fmt.Errorf("couldn't unmarshal %s: %v", + vmStateFilePath, err) + } + + return &vmState, nil +} + +func restoreIoSessionsWaitForShim(proxy *proxy, vm *vm, vmState *vmState) error { + if err := restoreIoSessions(proxy, vm, vmState.IoSessions); err != nil { + return fmt.Errorf("failed to restore io sessions %+v: %v", + vmState.IoSessions, err) + } + + for _, ioSes := range vmState.IoSessions { + token := ioSes.Token + + if token == "" { + return fmt.Errorf("empty token in recovering state") + } + + session := vm.findSessionByToken(token) + if session == nil { + _ = delVMAndState(proxy, vm) // errors are irrelevant here + return fmt.Errorf("couldn't find a session for token: %s", + token) + } + + if err := session.WaitForShim(); err != nil { + _ = delVMAndState(proxy, vm) // errors are irrelevant here + return fmt.Errorf("failed to re-connect with shim "+ + "(token = %s): %v", token, err) + } + } + + return nil +} + +func restoreVMState(proxy *proxy, containerID string) bool { + if proxy == nil { + logContID(containerID).Errorf("proxy parameter is nil") + return false + } + + if containerID == "" { + logContID(containerID).Errorf("containerID is empty") + return false + } + + vmState, err := readVMState(containerID) + if err != nil { + logContID(containerID).Error(err) + return false + } + logContID(containerID).Debugf("restoring vm state: %+v", vmState) + + regVM := vmState.RegisterVM + if regVM.ContainerID == "" || regVM.CtlSerial == "" || + regVM.IoSerial == "" { + logContID(containerID).Errorf("wrong VM parameters: %+v", regVM) + return false + } + + if regVM.ContainerID != containerID { + logContID(containerID).Errorf("inconsistent container ID: %s", + regVM.ContainerID) + return false + } + + proxy.Lock() + if _, ok := proxy.vms[regVM.ContainerID]; ok { + proxy.Unlock() + logContID(containerID).Errorf("container already registered") + return false + } + vm := newVM(regVM.ContainerID, regVM.CtlSerial, regVM.IoSerial) + proxy.vms[regVM.ContainerID] = vm + proxy.Unlock() + + proxyLog.WithFields(logrus.Fields{ + "container": regVM.ContainerID, + "control-channel": regVM.CtlSerial, + "io-channel": regVM.IoSerial, + "console": regVM.Console, + }).Info("restoring state") + + if regVM.Console != "" && proxy.enableVMConsole { + vm.setConsole(regVM.Console) + } + + if err := restoreIoSessionsWaitForShim(proxy, vm, vmState); err != nil { + logContID(containerID).Errorf("error restoring tokens: %v", err) + if err := delVMAndState(proxy, vm); err != nil { + logContID(containerID).Errorf("failed to delete vm's "+ + "state: %v", err) + } + return false + } + + if err := vm.Reconnect(true); err != nil { + logContID(containerID).Errorf("failed to connect: %v", err) + if err := delVMAndState(proxy, vm); err != nil { + logContID(containerID).Errorf("failed to delete vm's "+ + "state: %v", err) + } + return false + } + + // We start one goroutine per-VM to monitor the qemu process + proxy.wg.Add(1) + go func() { + <-vm.OnVMLost() + vm.Close() + proxy.wg.Done() + }() + + return true +} diff --git a/state_test.go b/state_test.go new file mode 100644 index 0000000..a960721 --- /dev/null +++ b/state_test.go @@ -0,0 +1,338 @@ +// Copyright (c) 2017 Huawei Technologies Duesseldorf GmbH +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/clearcontainers/proxy/api" + "github.com/sirupsen/logrus/hooks/test" + "github.com/stretchr/testify/assert" +) + +func lastLogEq(a *assert.Assertions, lh *test.Hook, msg string) { + entry := lh.LastEntry() + if a.NotNil(entry) { + a.Equal(entry.Message, msg) + } +} + +func TestState_restoreIoSessions(t *testing.T) { + a := assert.New(t) + + rig := newTestRig(t) + rig.Start() + proxy := rig.proxy + ctlSocketPath, ioSocketPath := rig.Hyperstart.GetSocketPaths() + tVM := newVM(testContainerID, ctlSocketPath, ioSocketPath) + a.NotNil(tVM) + + // Test 1: vm == nil + e := restoreIoSessions(proxy, nil, []ioSessionState{}) + a.EqualError(e, "vm parameter is nil") + + // Test 2: ignores empty list of tokens + a.Nil(restoreIoSessions(proxy, tVM, []ioSessionState{})) + a.Equal(len(proxy.tokenToVM), 0) + + // Test 3: registers token1 + a.Nil(restoreIoSessions(proxy, tVM, []ioSessionState{{"token1", + testContainerID, 2, 2}})) + a.Equal(proxy.tokenToVM["token1"], &tokenInfo{ + state: tokenStateAllocated, + vm: tVM, + }) + + // Test 1: vm == nil + e = restoreIoSessionsWaitForShim(proxy, nil, &vmState{}) + a.EqualError(e, "failed to restore io sessions []: vm parameter is nil") + + // Test 2: ignores empty list of tokens + a.Nil(restoreIoSessionsWaitForShim(rig.proxy, &vm{}, + &vmState{api.RegisterVM{}, []ioSessionState{}})) + + // Test 3: + e = restoreIoSessionsWaitForShim(rig.proxy, &vm{}, + &vmState{api.RegisterVM{}, []ioSessionState{{Token: ""}}}) + a.EqualError(e, "empty token in recovering state") + + // Test 4: success + proxy.Lock() + proxy.vms[testContainerID] = tVM + proxy.Unlock() + go tVM.AssociateShim(Token("token2"), 1, nil) + a.Nil(restoreIoSessionsWaitForShim(proxy, + tVM, + &vmState{ + api.RegisterVM{ + ContainerID: testContainerID, + CtlSerial: "", + IoSerial: "", + Console: "", + NumIOStreams: 1}, + []ioSessionState{{"token2", testContainerID, 2, 2}}})) +} + +func TestState_restoreAllState(t *testing.T) { + a := assert.New(t) + rig := newTestRig(t) + rig.Start() + + // clean up a possible state + a.Nil(os.RemoveAll(storeStateDir)) + + // Test 1: nothing to restore + restored, err := restoreAllState(rig.proxy) + a.False(restored) + a.Nil(err) + + a.Nil(os.MkdirAll(storeStateDir, 0750)) + + // Test 2: fails to restore from an inaccessible file (permission denied) + a.Nil(ioutil.WriteFile(proxyStateFilePath, []byte{' '}, 0000)) + + restored, err = restoreAllState(rig.proxy) + a.False(restored) + a.EqualError(err, "couldn't unmarshal "+proxyStateFilePath+ + ": unexpected end of JSON input") + + a.Nil(os.Remove(proxyStateFilePath)) + + // Test 3: fails to restore from an empty file + a.Nil(ioutil.WriteFile(proxyStateFilePath, []byte(""), 0600)) + + restored, err = restoreAllState(rig.proxy) + a.False(restored) + a.EqualError(err, "couldn't unmarshal "+proxyStateFilePath+ + ": unexpected end of JSON input") + + // Test 4: fails to restore from garbage + a.Nil(ioutil.WriteFile(proxyStateFilePath, []byte("Hello, World!"), 0600)) + + restored, err = restoreAllState(rig.proxy) + a.False(restored) + a.EqualError(err, "couldn't unmarshal "+proxyStateFilePath+ + ": invalid character 'H' looking for beginning of value") + + // Test 5: fails to restore when ContainerIDs list is empty + const s = `{ "container_ids": [ ] }` + a.Nil(ioutil.WriteFile(proxyStateFilePath, []byte(s), 0600)) + + restored, err = restoreAllState(rig.proxy) + a.False(restored) + a.EqualError(err, "containerIDs list is empty") + + // Test 6: fails to restore when stored Version is higher + sVer := fmt.Sprintf(`{ "version": %d, "container_ids": [ "09876543210" ] }`, + stateFileFormatVersion+1) + a.Nil(ioutil.WriteFile(proxyStateFilePath, []byte(sVer), 0600)) + + restored, err = restoreAllState(rig.proxy) + a.False(restored) + a.EqualError(err, fmt.Sprintf("stored state format version (%d) is "+ + "higher than supported (%d). Aborting", stateFileFormatVersion+1, + stateFileFormatVersion)) + + a.Nil(os.Remove(proxyStateFilePath)) + + // Test 7: success + rig.RegisterVM() + rig.Stop() + rig = newTestRig(t) + rig.Start() + + restored, err = restoreAllState(rig.proxy) + a.True(restored) + a.Nil(err) +} + +func TestState_storeProxyState(t *testing.T) { + a := assert.New(t) + rig := newTestRig(t) + rig.Start() + proxy := rig.proxy + a.NotNil(proxy) + + // Test 1: success - 0 vm to store, no proxy's state file + a.Nil(os.RemoveAll(storeStateDir)) + a.Nil(storeProxyState(proxy)) + + // Test 2: fails to store a state to a file + a.Nil(os.MkdirAll(storeStateDir, 0600)) + rig.RegisterVM() + a.Nil(os.RemoveAll(storeStateDir)) + a.EqualError(storeProxyState(proxy), fmt.Sprintf("couldn't store proxy"+ + " state to file %s: open %s: no such file or directory", + proxyStateFilePath, proxyStateFilePath)) +} + +func TestState_storeVMState(t *testing.T) { + a := assert.New(t) + rig := newTestRig(t) + rig.Start() + proxy := rig.proxy + a.NotNil(proxy) + + a.Equal(vmStateFilePath(testContainerID), filepath.Join(storeStateDir, + "vm_"+testContainerID+".json")) + + // clean up a possible state + a.Nil(os.RemoveAll(storeStateDir)) + a.Nil(os.MkdirAll(storeStateDir, 0750)) + + _ = rig.RegisterVM() + vm := proxy.vms[testContainerID] + a.NotNil(vm) + + // Test 1: success to store vm's state + a.Nil(storeVMState(vm)) + + // Test 2: fails to write file + a.Nil(os.RemoveAll(storeStateDir)) + a.EqualError(storeVMState(vm), + fmt.Sprintf("couldn't store VM state to %s: open %s: no such"+ + " file or directory", vmStateFilePath(testContainerID), + vmStateFilePath(testContainerID))) +} + +func TestState_delVMAndState(t *testing.T) { + a := assert.New(t) + rig := newTestRig(t) + rig.Start() + proxy := rig.proxy + a.NotNil(proxy) + + // clean up a possible state + a.Nil(os.RemoveAll(storeStateDir)) + a.Nil(os.MkdirAll(storeStateDir, proxyStateDirPerm)) + + // Test 1: check proxy parameter + a.EqualError(delVMAndState(nil, nil), "proxy parameter is nil") + // Test 2: check vm parameter + a.EqualError(delVMAndState(proxy, nil), "vm parameter is nil") + + // Test 3: check failure to delete a vm state file + _ = rig.RegisterVM() + vm := proxy.vms[testContainerID] + a.Nil(storeVMState(vm)) + a.Nil(os.RemoveAll(storeStateDir)) + a.EqualError(delVMAndState(proxy, vm), + fmt.Sprintf("couldn't remove file %s: remove %s: no such"+ + " file or directory", vmStateFilePath(testContainerID), + vmStateFilePath(testContainerID))) + + // Test 4: check successful execution + a.Nil(os.MkdirAll(storeStateDir, proxyStateDirPerm)) + a.Nil(storeVMState(vm)) + a.Nil(delVMAndState(proxy, vm)) +} + +func TestState_readVMState(t *testing.T) { + a := assert.New(t) + rig := newTestRig(t) + rig.Start() + proxy := rig.proxy + a.NotNil(proxy) + + // clean up a possible state + a.Nil(os.RemoveAll(storeStateDir)) + a.Nil(os.MkdirAll(storeStateDir, proxyStateDirPerm)) + + // Test 1: containerID parameter must be non-empty + vmState, err := readVMState("") + a.Nil(vmState) + a.EqualError(err, "containerID parameter is empty") + + testContPath := vmStateFilePath(testContainerID) + + // Test 2: read from an inaccessible file + vmState, err = readVMState(testContainerID) + a.Nil(vmState) + a.EqualError(err, fmt.Sprintf("couldn't read %s: open %s: no such file"+ + " or directory", testContPath, testContPath)) + + // Test 3: read garbage + a.Nil(ioutil.WriteFile(testContPath, []byte("Garbage"), 0600)) + vmState, err = readVMState(testContainerID) + a.Nil(vmState) + a.EqualError(err, fmt.Sprintf("couldn't unmarshal %s: invalid "+ + "character 'G' looking for beginning of value", testContPath)) + + // Test 4: success + _ = rig.RegisterVM() + vmState, err = readVMState(testContainerID) + a.NotNil(vmState) + a.Nil(err) +} + +func TestState_restoreVMState(t *testing.T) { + a := assert.New(t) + rig := newTestRig(t) + rig.Start() + proxy := rig.proxy + a.NotNil(proxy) + lh := test.NewGlobal() + + testContPath := vmStateFilePath(testContainerID) + + // clean up a possible state + a.Nil(os.RemoveAll(storeStateDir)) + a.Nil(os.MkdirAll(storeStateDir, proxyStateDirPerm)) + + // Test 1: proxy == nil + a.False(restoreVMState(nil, testContainerID)) + lastLogEq(a, lh, "proxy parameter is nil") + + // Test 2: ContainerID is empty + a.False(restoreVMState(proxy, "")) + lastLogEq(a, lh, "containerID is empty") + + // Test 3: readVMState() returns an error + a.False(restoreVMState(proxy, testContainerID)) + lastLogEq(a, lh, fmt.Sprintf("couldn't read %s: open %s: no such file"+ + " or directory", testContPath, testContPath)) + + // Test 4: wrong vm parameters + const t4 = `{ "registerVM": { "containerId": "" } } ` + a.Nil(ioutil.WriteFile(testContPath, []byte(t4), 0600)) + a.False(restoreVMState(proxy, testContainerID)) + lastLogEq(a, lh, fmt.Sprintf("wrong VM parameters: {ContainerID: "+ + "CtlSerial: IoSerial: Console: NumIOStreams:0}")) + + // Test 5: inconsistent container ID + const t5 = `{ "registerVM": { "containerId": "0", "ctlSerial": "path", + "ioSerial": "path" } }` + a.Nil(ioutil.WriteFile(testContPath, []byte(t5), 0600)) + a.False(restoreVMState(proxy, testContainerID)) + lastLogEq(a, lh, fmt.Sprintf("inconsistent container ID: 0")) + + // Test 6: restoreTokens() returns an error + t6 := fmt.Sprintf(`{ "registerVM": { "containerID": "%s", + "ctlSerial": "path", "ioSerial": "path" }, + "io_sessions": [ { "token": "" } ] }`, testContainerID) + a.Nil(ioutil.WriteFile(testContPath, []byte(t6), 0600)) + a.False(restoreVMState(proxy, testContainerID)) + lastLogEq(a, lh, fmt.Sprintf("error restoring tokens: empty token in "+ + "recovering state")) + + // Test 7: container is already registered + _ = rig.RegisterVM() + a.False(restoreVMState(proxy, testContainerID)) + lastLogEq(a, lh, fmt.Sprintf("container already registered")) +} diff --git a/vm.go b/vm.go index 86fe7f1..749eec2 100644 --- a/vm.go +++ b/vm.go @@ -284,6 +284,10 @@ func (vm *vm) consoleToLog() { } func (vm *vm) Connect() error { + return vm.Reconnect(false) +} + +func (vm *vm) Reconnect(reconnect bool) error { if vm.console.socketPath != "" { var err error @@ -300,7 +304,12 @@ func (vm *vm) Connect() error { return err } - if err := vm.hyperHandler.WaitForReady(); err != nil { + if reconnect { + if !vm.hyperHandler.IsStarted() { + vm.hyperHandler.CloseSockets() + return errors.New("failed to reconnect to the agent") + } + } else if err := vm.hyperHandler.WaitForReady(); err != nil { vm.hyperHandler.CloseSockets() return err } @@ -362,6 +371,12 @@ func newcontainerHandler(vm *vm, hyper *api.Hyper, session *ioSession) error { session.containerID = cmdIn.ID + // update stored ioSessions + if err := storeVMState(vm); err != nil { + proxyLog.WithField("vm", vm.containerID).Errorf( + "couldn't store a VM state: %v", err) + } + if err := relocateProcess(cmdIn.Process, session); err != nil { return err } @@ -635,6 +650,47 @@ func (vm *vm) AllocateToken() (Token, error) { return token, nil } +// This function is used to restore tokens and io sessions +func (vm *vm) AllocateIoSessionAs(token Token, contID string, nStreams int, + ioBase uint64) (Token, error) { + + if token == "" || contID == "" || nStreams == 0 { + return "", fmt.Errorf("Can't allocate a session: " + + "got an invalid parameter") + } + + vm.Lock() + defer vm.Unlock() + + if ioBase > vm.nextIoBase { + return "", fmt.Errorf("Can't allocate a session: " + + "inconsisten ioBase parameter") + } + + vm.nextIoBase += uint64(nStreams) + + session := &ioSession{ + vm: vm, + token: token, + containerID: contID, + nStreams: nStreams, + ioBase: ioBase, + shimConnected: make(chan interface{}), + processStarted: make(chan interface{}), + } + + // This mapping is to get the session from the seq number in an + // hyperstart I/O paquet. + for i := 0; i < nStreams; i++ { + vm.ioSessions[ioBase+uint64(i)] = session + } + + // This mapping is to get the session from the I/O token + vm.tokenToSession[token] = session + + return token, nil +} + // AssociateShim associates a shim given by the triplet (token, clientID, // clientConn) to a vm (POD). After associating the shim, a hyper command can // be issued to start the process inside the VM and data can flow between shim