Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions e2e/capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestFlowCapture(t *testing.T) {
})
defer timer.Stop()

output, err := RunCommand(clog, "commands/oc-netobserv", "flows", "--log-level=trace")
output, err := RunCommandAndTerminate(clog, "commands/oc-netobserv", "flows", "--log-level=trace")
// TODO: find a way to avoid error here; this is probably related to SIGTERM instead of CTRL + C call
//assert.Nil(t, err)

Expand Down Expand Up @@ -124,7 +124,7 @@ func TestPacketCapture(t *testing.T) {
})
defer timer.Stop()

output, err := RunCommand(clog, "commands/oc-netobserv", "packets", "--log-level=trace", "--protocol=TCP", "--port=6443")
output, err := RunCommandAndTerminate(clog, "commands/oc-netobserv", "packets", "--log-level=trace", "--protocol=TCP", "--port=6443")
// TODO: find a way to avoid error here; this is probably related to SIGTERM instead of CTRL + C call
//assert.Nil(t, err)

Expand Down
131 changes: 96 additions & 35 deletions e2e/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io"
"os/exec"
"strings"
"sync"
"syscall"
"time"

Expand All @@ -13,7 +14,9 @@ import (
)

const (
StartCommandWait = 30 * time.Second
// Keep StartCommandWait at least 20 seconds
// to avoid pty to get closed early and ultimately killing process
StartCommandWait = 20 * time.Second
RunCommandTimeout = 60 * time.Second
)

Expand All @@ -32,39 +35,56 @@ func StartCommand(log *logrus.Entry, commandName string, arg ...string) (string,
outPipe, _ := cmd.StdoutPipe()
errPipe, _ := cmd.StderrPipe()

var sb strings.Builder
var sbOut strings.Builder
var sbErr strings.Builder

go func(_ io.ReadCloser) {
reader := bufio.NewReader(errPipe)
line, err := reader.ReadString('\n')
for err == nil {
sb.WriteString(line)
line, err = reader.ReadString('\n')
for {
line, err := reader.ReadString('\n')
// Write line even if there's an error, as long as we got data
if len(line) > 0 {
sbErr.WriteString(line)
}
if err != nil {
break
}
}
}(errPipe)

go func(_ io.ReadCloser) {
reader := bufio.NewReader(outPipe)
line, err := reader.ReadString('\n')
for err == nil {
sb.WriteString(line)
line, err = reader.ReadString('\n')
for {
line, err := reader.ReadString('\n')
// Write line even if there's an error, as long as we got data
if len(line) > 0 {
sbOut.WriteString(line)
}
if err != nil {
break
}
}
}(outPipe)

// start async
go func() {
log.Debug("Starting async ...")
_, err := pty.Start(cmd)
ptmx, err := pty.Start(cmd)
if err != nil {
log.Errorf("Start returned error: %v", err)
return
}
// Note: PTY is intentionally NOT closed here as command continues running
// Keep the PTY file descriptor alive to prevent SIGHUP
_ = ptmx // Keep reference to prevent premature PTY closure
}()

log.Debugf("Waiting %v ...", StartCommandWait)
time.Sleep(StartCommandWait)

log.Debug("Returning result while command still running")
return sb.String(), nil
// Combine stderr first (errors more visible), then stdout
return sbErr.String() + sbOut.String(), nil
}

// run command with tty support and wait for stop
Expand All @@ -77,44 +97,65 @@ func RunCommand(log *logrus.Entry, commandName string, arg ...string) (string, e
outPipe, _ := cmd.StdoutPipe()
errPipe, _ := cmd.StderrPipe()

var sb strings.Builder
var sbOut strings.Builder
var sbErr strings.Builder
var wg sync.WaitGroup

wg.Add(2)
go func(_ io.ReadCloser) {
defer wg.Done()
reader := bufio.NewReader(errPipe)
line, err := reader.ReadString('\n')
for err == nil {
sb.WriteString(line)
line, err = reader.ReadString('\n')
for {
line, err := reader.ReadString('\n')
// Write line even if there's an error, as long as we got data
if len(line) > 0 {
sbErr.WriteString(line)
}
if err != nil {
break
}
}
}(errPipe)

go func(_ io.ReadCloser) {
defer wg.Done()
reader := bufio.NewReader(outPipe)
line, err := reader.ReadString('\n')
for err == nil {
sb.WriteString(line)
line, err = reader.ReadString('\n')
for {
line, err := reader.ReadString('\n')
// Write line even if there's an error, as long as we got data
if len(line) > 0 {
sbOut.WriteString(line)
}
if err != nil {
break
}
}
}(outPipe)

log.Debug("Starting ...")
_, err := pty.Start(cmd)
ptmx, err := pty.Start(cmd)
if err != nil {
log.Errorf("Start returned error: %v", err)
return "", err
}
defer ptmx.Close() // Ensure PTY is closed after command finishes

log.Debug("Waiting ...")
err = cmd.Wait()
if err != nil {
log.Errorf("Wait returned error: %v", err)
}

log.Debug("Waiting for output goroutines to finish...")
wg.Wait()

// TODO: find why this returns -1. That may be related to pty implementation
/*if cmd.ProcessState.ExitCode() != 0 {
return sb.String(), fmt.Errorf("Cmd returned code %d", cmd.ProcessState.ExitCode())
return sbErr.String() + sbOut.String(), fmt.Errorf("Cmd returned code %d", cmd.ProcessState.ExitCode())
}*/

return sb.String(), nil
// Combine stderr first (errors more visible), then stdout
return sbErr.String() + sbOut.String(), nil
}

// run command with tty support and terminate it after timeout
Expand All @@ -137,22 +178,38 @@ func RunCommandAndTerminate(log *logrus.Entry, commandName string, arg ...string
})
defer timer.Stop()

var sb strings.Builder
var sbOut strings.Builder
var sbErr strings.Builder
var wg sync.WaitGroup

wg.Add(2)
go func(_ io.ReadCloser) {
defer wg.Done()
reader := bufio.NewReader(errPipe)
line, err := reader.ReadString('\n')
for err == nil {
sb.WriteString(line)
line, err = reader.ReadString('\n')
for {
line, err := reader.ReadString('\n')
// Write line even if there's an error, as long as we got data
if len(line) > 0 {
sbErr.WriteString(line)
}
if err != nil {
break
}
}
}(errPipe)

go func(_ io.ReadCloser) {
defer wg.Done()
reader := bufio.NewReader(outPipe)
line, err := reader.ReadString('\n')
for err == nil {
sb.WriteString(line)
line, err = reader.ReadString('\n')
for {
line, err := reader.ReadString('\n')
// Write line even if there's an error, as long as we got data
if len(line) > 0 {
sbOut.WriteString(line)
}
if err != nil {
break
}
}
}(outPipe)

Expand All @@ -178,10 +235,14 @@ func RunCommandAndTerminate(log *logrus.Entry, commandName string, arg ...string
log.Errorf("Wait returned error: %v", err)
}

log.Debug("Waiting for output goroutines to finish...")
wg.Wait()

// TODO: find why this returns -1. That may be related to pty implementation
/*if cmd.ProcessState.ExitCode() != 0 {
return sb.String(), fmt.Errorf("Cmd returned code %d", cmd.ProcessState.ExitCode())
return sbErr.String() + sbOut.String(), fmt.Errorf("Cmd returned code %d", cmd.ProcessState.ExitCode())
}*/

return sb.String(), nil
// Combine stderr first (errors more visible), then stdout
return sbErr.String() + sbOut.String(), nil
}
53 changes: 40 additions & 13 deletions e2e/integration-tests/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"io/fs"
"os"
"path/filepath"
"time"

"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -19,15 +20,16 @@ import (
const (
PollInterval = 5 * time.Second
PollTimeout = 10 * time.Minute
outputDir = "./output/flow"
)

var (
clog = logrus.WithField("component", "cli")
)

func isNamespace(clientset *kubernetes.Clientset, cliNS string, exists bool) (bool, error) {
err := wait.PollUntilContextTimeout(context.Background(), PollInterval, PollTimeout, true, func(context.Context) (done bool, err error) {
namespace, err := getNamespace(clientset, cliNS)
err := wait.PollUntilContextTimeout(context.Background(), PollInterval, PollTimeout, true, func(ctx context.Context) (done bool, err error) {
namespace, err := getNamespace(ctx, clientset, cliNS)
if exists {
if err != nil {
return false, err
Expand All @@ -36,7 +38,7 @@ func isNamespace(clientset *kubernetes.Clientset, cliNS string, exists bool) (bo
} else if errors.IsNotFound(err) {
return true, nil
}
return false, err
return false, nil
})
if err != nil {
return false, err
Expand All @@ -45,8 +47,8 @@ func isNamespace(clientset *kubernetes.Clientset, cliNS string, exists bool) (bo
}

func isCollector(clientset *kubernetes.Clientset, cliNS string, ready bool) (bool, error) {
err := wait.PollUntilContextTimeout(context.Background(), PollInterval, PollTimeout, true, func(context.Context) (done bool, err error) {
collectorPod, err := getNamespacePods(clientset, cliNS, &metav1.ListOptions{FieldSelector: "status.phase=Running", LabelSelector: "run=collector"})
err := wait.PollUntilContextTimeout(context.Background(), PollInterval, PollTimeout, true, func(ctx context.Context) (done bool, err error) {
collectorPod, err := getNamespacePods(ctx, clientset, cliNS, &metav1.ListOptions{FieldSelector: "status.phase=Running", LabelSelector: "run=collector"})
if err != nil {
return false, err
}
Expand All @@ -62,12 +64,33 @@ func isCollector(clientset *kubernetes.Clientset, cliNS string, ready bool) (boo
}

func isDaemonsetReady(clientset *kubernetes.Clientset, daemonsetName string, cliNS string) (bool, error) {
err := wait.PollUntilContextTimeout(context.Background(), PollInterval, PollTimeout, true, func(context.Context) (done bool, err error) {
cliDaemonset, err := getDaemonSet(clientset, daemonsetName, cliNS)
err := wait.PollUntilContextTimeout(context.Background(), PollInterval, PollTimeout, true, func(ctx context.Context) (done bool, err error) {
cliDaemonset, err := getDaemonSet(ctx, clientset, daemonsetName, cliNS)
if err != nil {
if errors.IsNotFound(err) {
clog.Infof("daemonset not found %v", err)
return false, nil
}
return false, err
}
return cliDaemonset.Status.DesiredNumberScheduled == cliDaemonset.Status.NumberReady, nil

desired := cliDaemonset.Status.DesiredNumberScheduled
ready := cliDaemonset.Status.NumberReady
current := cliDaemonset.Status.CurrentNumberScheduled

clog.Debugf("daemonset %s status: DesiredNumberScheduled=%d, CurrentNumberScheduled=%d, NumberReady=%d",
daemonsetName, desired, current, ready)

// Ensure daemonset has scheduled pods before checking readiness
// This prevents race condition where both DesiredNumberScheduled and NumberReady are 0
if desired == 0 {
clog.Debugf("daemonset %s has not scheduled any pods yet (DesiredNumberScheduled=0)", daemonsetName)
return false, nil
}

// Check both that all desired pods are scheduled AND ready
// This ensures pods actually exist before we return true
return desired == current && current == ready, nil
})
if err != nil {
return false, err
Expand All @@ -86,13 +109,13 @@ func isCLIRuning(clientset *kubernetes.Clientset, cliNS string) (bool, error) {
if err != nil {
return false, err
}
clog.Debugf("Daemonset ready: %v", daemonsetReady)
clog.Infof("Daemonset ready: %v", daemonsetReady)

collectorReady, err := isCollector(clientset, cliNS, true)
if err != nil {
return false, err
}
clog.Debugf("Collector ready: %v", collectorReady)
clog.Infof("Collector ready: %v", collectorReady)

return namespaceCreated && daemonsetReady && collectorReady, nil
}
Expand All @@ -110,22 +133,26 @@ func isCLIDone(clientset *kubernetes.Clientset, cliNS string) (bool, error) {
func getFlowsJSONFile() (string, error) {
// var files []fs.DirEntry
var files []string
outputDir := "./output/flow/"
dirFS := os.DirFS(outputDir)

files, err := fs.Glob(dirFS, "*.json")
if err != nil {
return "", err
}
// this could be problematic if two tests are running in parallel with --copy=true
var mostRecentFile fs.FileInfo
for _, file := range files {
fileInfo, err := os.Stat(outputDir + file)
fileInfo, err := os.Stat(filepath.Join(outputDir, file))
if err != nil {
return "", nil
}
if mostRecentFile == nil || fileInfo.ModTime().After(mostRecentFile.ModTime()) {
mostRecentFile = fileInfo
}
}
return outputDir + mostRecentFile.Name(), nil
absPath, err := filepath.Abs(filepath.Join(outputDir, mostRecentFile.Name()))
if err != nil {
return "", err
}
return absPath, nil
}
Loading