Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 34 additions & 7 deletions pkg/unikontainers/ipc.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ const (
maxRetries = 50
waitTime = 5 * time.Millisecond
FromReexec = true
// IPCAcceptTimeout is the maximum time to wait for a connection on the IPC socket.
// This prevents processes from hanging indefinitely if the counterpart never connects
// (e.g., due to containerd restart, node pressure, or orchestration failures).
IPCAcceptTimeout = 60 * time.Second
// IPCReadTimeout is the maximum time to wait for reading a message after connection.
IPCReadTimeout = 10 * time.Second
)

func getSockAddr(dir string, name string) string {
Expand Down Expand Up @@ -145,27 +151,48 @@ func createListener(socketAddress string, mustBeValid bool) (*net.UnixListener,
return listener, nil
}

// awaitMessage opens a new connection to socketAddress
// and waits for a given message
// AwaitMessage waits for a connection on the listener and reads an expected message.
// It uses timeouts to prevent indefinite blocking if the counterpart process
// never connects (e.g., due to orchestration failures, crashes, or restarts).
func AwaitMessage(listener *net.UnixListener, expectedMessage IPCMessage) error {
// Set accept deadline to prevent indefinite blocking.
// This is critical for preventing orphaned processes when urunc start
// never runs after urunc create, or when reexec fails silently.
if err := listener.SetDeadline(time.Now().Add(IPCAcceptTimeout)); err != nil {
return fmt.Errorf("failed to set listener deadline: %w", err)
}

conn, err := listener.AcceptUnix()
if err != nil {
return err
var netErr net.Error
if errors.As(err, &netErr) && netErr.Timeout() {
return fmt.Errorf("timeout waiting for IPC connection (waited %v): counterpart process may have failed or not started", IPCAcceptTimeout)
}
return fmt.Errorf("failed to accept connection: %w", err)
}
defer func() {
err = conn.Close()
if err != nil {
logrus.WithError(err).Error("failed to close connection")
if closeErr := conn.Close(); closeErr != nil {
logrus.WithError(closeErr).Error("failed to close connection")
}
}()

// Set read deadline to prevent hanging on slow or stuck writers
if err := conn.SetReadDeadline(time.Now().Add(IPCReadTimeout)); err != nil {
return fmt.Errorf("failed to set read deadline: %w", err)
}

buf := make([]byte, len(expectedMessage))
n, err := conn.Read(buf)
if err != nil {
var netErr net.Error
if errors.As(err, &netErr) && netErr.Timeout() {
return fmt.Errorf("timeout reading IPC message (waited %v): counterpart process may be stuck", IPCReadTimeout)
}
return fmt.Errorf("failed to read from socket: %w", err)
}
msg := string(buf[0:n])
if msg != string(expectedMessage) {
return fmt.Errorf("received unexpected message: %s", msg)
return fmt.Errorf("received unexpected message: %s (expected: %s)", msg, expectedMessage)
}
return nil
}
53 changes: 53 additions & 0 deletions pkg/unikontainers/ipc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,56 @@
err = AwaitMessage(listener, expectedMessage)
assert.NoError(t, err, "Expected no error in awaiting message")
}

func TestAwaitMessageTimeout(t *testing.T) {
socketAddress := "/tmp/test_await_message_timeout.sock"
expectedMessage := ReexecStarted

listener, err := createListener(socketAddress, true)
if err != nil {
t.Fatalf("Failed to create listener: %v", err)
}
defer listener.Close()

// Don't send any message - this should trigger a timeout
// Note: For testing, we need shorter timeouts than production.
// The actual timeout check is that it returns an error containing "timeout"
// rather than blocking forever.

// Set a shorter deadline for testing purposes
listener.SetDeadline(time.Now().Add(100 * time.Millisecond))

Check failure on line 196 in pkg/unikontainers/ipc_test.go

View workflow job for this annotation

GitHub Actions / ci-on-push / Lint code / lint (amd64, ubuntu-22.04)

Error return value of `listener.SetDeadline` is not checked (errcheck)

err = AwaitMessage(listener, expectedMessage)
assert.Error(t, err, "Expected timeout error when no connection arrives")
assert.Contains(t, err.Error(), "timeout", "Expected error message to mention timeout")
}

func TestAwaitMessageWrongMessage(t *testing.T) {
socketAddress := "/tmp/test_await_wrong_message.sock"
expectedMessage := ReexecStarted
wrongMessage := StartExecve

listener, err := createListener(socketAddress, true)
if err != nil {
t.Fatalf("Failed to create listener: %v", err)
}
defer listener.Close()

go func() {
conn, err := net.Dial("unix", socketAddress)
if err != nil {
t.Errorf("Failed to dial connection: %v", err)
}
defer conn.Close()

// Send wrong message
_, err = conn.Write([]byte(wrongMessage))
if err != nil {
t.Errorf("Failed to send message: %v", err)
}
}()

err = AwaitMessage(listener, expectedMessage)
assert.Error(t, err, "Expected error for unexpected message")
assert.Contains(t, err.Error(), "unexpected message", "Expected error to mention unexpected message")
}
6 changes: 3 additions & 3 deletions pkg/unikontainers/unikontainers.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,8 +753,8 @@ func (u *Unikontainer) executeHooksConcurrently(name string, hooks []specs.Hook,
uniklog.WithFields(logrus.Fields{
"id": u.State.ID,
"name": name,
"path": hooks[i].Path,
"args": hooks[i].Args,
"path": h.Path,
"args": h.Args,
"error": err,
}).Error("Executing hook failed")
errChan <- err
Expand Down Expand Up @@ -1121,7 +1121,7 @@ func (u *Unikontainer) SendMessage(message IPCMessage) error {

// isRunning returns true if the PID is alive or hedge.ListVMs returns our containerID
func (u *Unikontainer) isRunning() bool {
vmmType := hypervisors.VmmType(u.State.Annotations[annotType])
vmmType := hypervisors.VmmType(u.State.Annotations[annotHypervisor])
if vmmType != hypervisors.HedgeVmm {
return syscall.Kill(u.State.Pid, syscall.Signal(0)) == nil
}
Expand Down
Loading