Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,651 changes: 879 additions & 772 deletions internal/index.html

Large diffs are not rendered by default.

59 changes: 56 additions & 3 deletions internal/proc/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type container struct {
log *log.Logger
spec types.Spec
types.Task
containerID string
}

func (c *container) Run(ctx context.Context, stdout, stderr io.Writer) error {
Expand Down Expand Up @@ -66,9 +67,7 @@ func (c *container) Run(ctx context.Context, stdout, stderr io.Writer) error {
return fmt.Errorf("error getting spec environ: %w", err)
}

if err != nil {
return fmt.Errorf("failed to get container ID: %w", err)
} else if id != "" {
if id != "" {
log.Printf("container already exists, skipping build/pull\n")
} else if _, err := os.Stat(dockerfile); err == nil {
log.Printf("creating tar image from %q", dockerfile)
Expand Down Expand Up @@ -175,6 +174,8 @@ func (c *container) Run(ctx context.Context, stdout, stderr io.Writer) error {
if err != nil {
return fmt.Errorf("failed to get container ID: %w", err)
}

c.containerID = id
if err = cli.ContainerStart(ctx, id, dockertypes.ContainerStartOptions{}); err != nil {
return fmt.Errorf("failed to start container: %w", err)
}
Expand All @@ -198,6 +199,7 @@ func (c *container) Run(ctx context.Context, stdout, stderr io.Writer) error {
// ignore errors, might be content cancelled, we still need to wait for the container to exit
log.Printf("failed to log container: %v", err)
}

waitC, errC := cli.ContainerWait(context.Background(), id, dockercontainer.WaitConditionNotRunning)
select {
case wait := <-waitC:
Expand Down Expand Up @@ -303,4 +305,55 @@ func ignoreNotExist(err error) error {

}

func (c *container) GetMetrics(ctx context.Context) (*types.Metrics, error) {
if c.containerID == "" {
return &types.Metrics{}, nil
}

cli, err := client.NewClientWithOpts(client.FromEnv)
if err != nil {
return nil, fmt.Errorf("failed to create docker client: %w", err)
}
defer cli.Close()

stats, err := cli.ContainerStats(ctx, c.containerID, false) // false = single stat, not stream
if err != nil {
return nil, fmt.Errorf("failed to get container stats: %w", err)
}
defer stats.Body.Close()

data, err := io.ReadAll(stats.Body)
if err != nil {
return nil, fmt.Errorf("failed to read stats: %w", err)
}

var dockerStats dockertypes.StatsJSON
if err := json.Unmarshal(data, &dockerStats); err != nil {
return nil, fmt.Errorf("failed to parse stats: %w", err)
}

// Calculate memory usage (subtract cache if available)
memoryBytes := dockerStats.MemoryStats.Usage
if dockerStats.MemoryStats.Stats["cache"] != 0 {
memoryBytes -= dockerStats.MemoryStats.Stats["cache"]
}

// Calculate CPU usage in millicores
var cpuMillicores uint64
if dockerStats.PreCPUStats.CPUUsage.TotalUsage != 0 {
cpuDelta := dockerStats.CPUStats.CPUUsage.TotalUsage - dockerStats.PreCPUStats.CPUUsage.TotalUsage
systemDelta := dockerStats.CPUStats.SystemUsage - dockerStats.PreCPUStats.SystemUsage

if systemDelta > 0 {
cpuPercent := (float64(cpuDelta) / float64(systemDelta)) * float64(len(dockerStats.CPUStats.CPUUsage.PercpuUsage))
cpuMillicores = uint64(cpuPercent * 1000) // Convert to millicores
}
}

return &types.Metrics{
CPU: cpuMillicores,
Mem: memoryBytes,
}, nil
}

var _ Interface = &container{}
52 changes: 51 additions & 1 deletion internal/proc/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log"
"os"
"os/exec"
"strconv"
"strings"
"syscall"
"time"
Expand All @@ -18,6 +19,7 @@ type host struct {
log *log.Logger
spec types.Spec
types.Task
pid int
}

func (h *host) Run(ctx context.Context, stdout, stderr io.Writer) error {
Expand Down Expand Up @@ -48,6 +50,7 @@ func (h *host) Run(ctx context.Context, stdout, stderr io.Writer) error {
// capture pgid straight away because it's not available after the process exits,
// the process may exit and leave children behind.
pid := cmd.Process.Pid
h.pid = pid
pgid, err := syscall.Getpgid(pid)
if err != nil {
return fmt.Errorf("failed get pgid: %w", err)
Expand Down Expand Up @@ -86,4 +89,51 @@ func ignoreProcessFinishedErr(err error) error {
return nil
}

var _ Interface = &host{}
func (h *host) GetMetrics(ctx context.Context) (*types.Metrics, error) {

// The `ps` command is used to get process information.
// -o %cpu,%mem specifies the desired output format: CPU and memory percentage.
// -p filters the output for the given PID.
cmd := exec.CommandContext(ctx, "ps", "-o", "%cpu,rss", "-p", strconv.Itoa(h.pid))

// Execute the command and capture its output.
output, err := cmd.Output()
if err != nil {
// This error typically occurs if the PID does not exist.
return nil, fmt.Errorf("failed to get process metrics for pid %d: %w", h.pid, err)
}

// The output from `ps` includes a header line, so we need to parse the second line.
// Example output:
// %CPU %MEM
// 0.1 0.2
lines := strings.Split(strings.TrimSpace(string(output)), "\n")
if len(lines) < 2 {
return nil, fmt.Errorf("unexpected ps output for pid %d: %s", h.pid, output)
}

// The second line contains the data. We split it by whitespace.
fields := strings.Fields(lines[1])
if len(fields) < 2 {
return nil, fmt.Errorf("unexpected ps output format for pid %d: %s", h.pid, lines[1])
}

// Parse the CPU usage from the first field.
cpuPercentage, err := strconv.ParseFloat(fields[0], 64)
if err != nil {
return nil, fmt.Errorf("failed to parse CPU usage '%s': %w", fields[0], err)
}

cpuMillicores := cpuPercentage * 10 // Convert percentage to millicores (1% = 10 millicores)
Copy link

Copilot AI Sep 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CPU percentage to millicores conversion is incorrect. 1% CPU usage should equal 10 millicores, but this assumes a single-core system. On multi-core systems, CPU percentage can exceed 100%. The conversion should account for the number of CPU cores or use a different approach to get absolute CPU usage.

Copilot uses AI. Check for mistakes.

rssMemoryKB, err := strconv.ParseInt(fields[1], 10, 64)
if err != nil {
return nil, fmt.Errorf("failed to parse RSS memory '%s': %w", fields[1], err)
}

// Convert RSS memoryBytes from KB to bytes.
memoryBytes := uint64(rssMemoryKB * 1024)

return &types.Metrics{CPU: uint64(cpuMillicores), Mem: memoryBytes}, nil

}
124 changes: 124 additions & 0 deletions internal/proc/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@ import (
"log"
"net/http"
"os"
"os/exec"
"path/filepath"
"regexp"
"sort"
"strconv"
"strings"
"sync"
"time"

Expand All @@ -39,6 +43,7 @@ type k8s struct {
log *log.Logger
spec types.Spec
name string
pods []string // namespace/name
types.Task
}

Expand Down Expand Up @@ -268,6 +273,12 @@ func (k *k8s) Run(ctx context.Context, stdout io.Writer, stderr io.Writer) error
processPod := func(obj any) {
pod := obj.(*corev1.Pod)

podKey := pod.Namespace + "/" + pod.Name

if !slices.Contains(k.pods, podKey) {
k.pods = append(k.pods, podKey)
}

running := make(map[string]bool)

for _, s := range append(pod.Status.InitContainerStatuses, pod.Status.ContainerStatuses...) {
Expand Down Expand Up @@ -422,3 +433,116 @@ func sortUnstructureds(uns []*unstructured.Unstructured) {
return slices.Index(order, uns[i].GetKind()) > slices.Index(order, uns[j].GetKind())
})
}

func (k *k8s) GetMetrics(ctx context.Context) (*types.Metrics, error) {
sum := &types.Metrics{}
for _, podKey := range k.pods {
parts := strings.SplitN(podKey, "/", 2)
namespace := parts[0]
podName := parts[1]
metrics, err := k.getMetrics(ctx, namespace, podName)
if err != nil {
return nil, err
}
sum.CPU += metrics.CPU
sum.Mem += metrics.Mem
}
return sum, nil
}

func (k *k8s) getMetrics(ctx context.Context, namespace, podName string) (*types.Metrics, error) {
cmd := exec.CommandContext(ctx, "kubectl", "top", "pod", "-n", namespace, podName, "--no-headers")
output, err := cmd.CombinedOutput()
if err != nil {
return nil, fmt.Errorf("kubectl top failed %q: %w", string(output), err)
}

return k.parseKubectlTopOutput(string(output))
}

func (k *k8s) parseKubectlTopOutput(output string) (*types.Metrics, error) {
// kubectl top output format: NAME CPU(cores) MEMORY(bytes)
// Example: pod-name 250m 128Mi

fields := strings.Fields(strings.TrimSpace(output))
if len(fields) < 3 {
return nil, fmt.Errorf("unexpected kubectl top output format")
}

cpuStr := fields[1]
memoryStr := fields[2]

cpuMillicores, err := k.parseCPUValue(cpuStr)
if err != nil {
return nil, fmt.Errorf("failed to parse CPU: %w", err)
}

memoryBytes, err := k.parseMemoryValue(memoryStr)
if err != nil {
return nil, fmt.Errorf("failed to parse memory: %w", err)
}

return &types.Metrics{
CPU: cpuMillicores,
Mem: memoryBytes,
}, nil
}

func (k *k8s) parseCPUValue(cpuStr string) (uint64, error) {
// Handle millicores (e.g., "250m") and cores (e.g., "1.5")
if strings.HasSuffix(cpuStr, "m") {
milliStr := strings.TrimSuffix(cpuStr, "m")
milli, err := strconv.ParseFloat(milliStr, 64)
if err != nil {
return 0, err
}
return uint64(milli), nil
}

cores, err := strconv.ParseFloat(cpuStr, 64)
if err != nil {
return 0, err
}
return uint64(cores * 1000), nil // Convert cores to millicores
}

func (k *k8s) parseMemoryValue(memoryStr string) (uint64, error) {
// Handle various memory units: Ki, Mi, Gi, K, M, G
re := regexp.MustCompile(`^(\d+(?:\.\d+)?)([KMGT]i?)?$`)
matches := re.FindStringSubmatch(memoryStr)
if len(matches) < 2 {
return 0, fmt.Errorf("invalid memory format: %s", memoryStr)
}

value, err := strconv.ParseFloat(matches[1], 64)
if err != nil {
return 0, err
}

unit := ""
if len(matches) > 2 {
unit = matches[2]
}

multiplier := uint64(1)
switch unit {
case "K":
multiplier = 1000
case "Ki":
multiplier = 1024
case "M":
multiplier = 1000 * 1000
case "Mi":
multiplier = 1024 * 1024
case "G":
multiplier = 1000 * 1000 * 1000
case "Gi":
multiplier = 1024 * 1024 * 1024
case "T":
multiplier = 1000 * 1000 * 1000 * 1000
case "Ti":
multiplier = 1024 * 1024 * 1024 * 1024
}

return uint64(value * float64(multiplier)), nil
}
6 changes: 6 additions & 0 deletions internal/proc/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,16 @@ package proc
import (
"context"
"io"

"github.com/kitproj/kit/internal/types"
)

type noop struct{}

func (n noop) Run(ctx context.Context, stdout, stderr io.Writer) error {
return nil
}

func (n noop) GetMetrics(ctx context.Context) (*types.Metrics, error) {
return &types.Metrics{}, nil
}
2 changes: 2 additions & 0 deletions internal/proc/proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
type Interface interface {
// Run runs the process.
Run(ctx context.Context, stdout, stderr io.Writer) error
// GetMetrics returns current resource metrics for this process
GetMetrics(ctx context.Context) (*types.Metrics, error)
}

func New(name string, t types.Task, log *log.Logger, spec types.Spec) Interface {
Expand Down
23 changes: 23 additions & 0 deletions internal/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,29 @@ func RunSubgraph(ctx context.Context, cancel context.CancelFunc, port int, openB
out = io.MultiWriter(out, buf)
}

go func() {
ticker := time.NewTicker(20 * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if node.Phase != "running" && node.Phase != "stalled" {
continue
}
metrics, err := p.GetMetrics(ctx)
if err != nil {
logger.Printf("failed to get metrics: %v", err)
continue
}
node.Metrics = metrics
statusEvents <- node
}
}
}()

err = p.Run(ctx, out, out)
// if the task was cancelled, we don't want to restart it, this is normal exit
if errors.Is(ctx.Err(), context.Canceled) {
Expand Down
2 changes: 2 additions & 0 deletions internal/task_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ type TaskNode struct {
Phase string `json:"phase"`
// the message for the task phase, e.g. "exit code 1'
Message string `json:"message,omitempty"`
// metrics for resource usage tracking
Metrics *types.Metrics `json:"metrics,omitempty"`
// cancel function
cancel func()
// a mutex
Expand Down
6 changes: 6 additions & 0 deletions internal/types/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package types

type Metrics struct {
CPU uint64 `json:"cpu"` // CPU usage in millicores
Mem uint64 `json:"mem"` // Memory usage in bytes
}