Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
coder-logstream-kube
coder-logstream-kube-*
*.test
build/
3 changes: 3 additions & 0 deletions helm/templates/service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
- apiGroups: [""]
resources: ["pods", "events"]
verbs: ["get", "watch", "list"]
- apiGroups: [""]
resources: ["secrets"]
verbs: ["get"]
- apiGroups: ["apps"]
resources: ["replicasets", "events"]
verbs: ["get", "watch", "list"]
Expand Down
206 changes: 206 additions & 0 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,3 +512,209 @@ func TestIntegration_LabelSelector(t *testing.T) {
require.NotContains(t, log, "test-pod-no-label", "should not receive logs for unlabeled pod")
}
}

func TestIntegration_PodWithSecretRef(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()

client := getKubeClient(t)
namespace := createTestNamespace(t, ctx, client)

// Create a secret containing the agent token
secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "agent-token-secret",
Namespace: namespace,
},
Data: map[string][]byte{
"token": []byte("secret-token-integration"),
},
}
_, err := client.CoreV1().Secrets(namespace).Create(ctx, secret, metav1.CreateOptions{})
require.NoError(t, err)

// Start fake Coder API server
api := newFakeAgentAPI(t)
defer api.server.Close()

agentURL, err := url.Parse(api.server.URL)
require.NoError(t, err)

// Create the pod event logger
reporter, err := newPodEventLogger(ctx, podEventLoggerOptions{
client: client,
coderURL: agentURL,
namespaces: []string{namespace},
logger: slogtest.Make(t, nil).Leveled(slog.LevelDebug),
logDebounce: 5 * time.Second,
})
require.NoError(t, err)
defer reporter.Close()

// Wait for informers to sync
time.Sleep(1 * time.Second)

// Create a pod with CODER_AGENT_TOKEN from secretKeyRef
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod-secret",
Namespace: namespace,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "test-container",
Image: "busybox:latest",
Command: []string{"sleep", "3600"},
Env: []corev1.EnvVar{
{
Name: "CODER_AGENT_TOKEN",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: "agent-token-secret",
},
Key: "token",
},
},
},
},
},
},
NodeSelector: map[string]string{
"non-existent-label": "non-existent-value",
},
},
}

_, err = client.CoreV1().Pods(namespace).Create(ctx, pod, metav1.CreateOptions{})
require.NoError(t, err)

// Wait for log source registration
waitForLogSource(t, ctx, api, 30*time.Second)

// Wait for the "Created pod" log
logs, found := waitForLogContaining(t, ctx, api, 30*time.Second, "Created pod")
require.True(t, found, "expected 'Created pod' log, got: %v", logs)

// Delete the pod and verify deletion event
err = client.CoreV1().Pods(namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})
require.NoError(t, err)

// Wait for the "Deleted pod" log
logs, found = waitForLogContaining(t, ctx, api, 30*time.Second, "Deleted pod")
require.True(t, found, "expected 'Deleted pod' log, got: %v", logs)
}

func TestIntegration_ReplicaSetWithSecretRef(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()

client := getKubeClient(t)
namespace := createTestNamespace(t, ctx, client)

// Create a secret containing the agent token
secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "agent-token-secret",
Namespace: namespace,
},
Data: map[string][]byte{
"token": []byte("secret-token-rs-integration"),
},
}
_, err := client.CoreV1().Secrets(namespace).Create(ctx, secret, metav1.CreateOptions{})
require.NoError(t, err)

// Start fake Coder API server
api := newFakeAgentAPI(t)
defer api.server.Close()

agentURL, err := url.Parse(api.server.URL)
require.NoError(t, err)

// Create the pod event logger
reporter, err := newPodEventLogger(ctx, podEventLoggerOptions{
client: client,
coderURL: agentURL,
namespaces: []string{namespace},
logger: slogtest.Make(t, nil).Leveled(slog.LevelDebug),
logDebounce: 5 * time.Second,
})
require.NoError(t, err)
defer reporter.Close()

// Wait for informers to sync
time.Sleep(1 * time.Second)

// Create a ReplicaSet with CODER_AGENT_TOKEN from secretKeyRef
replicas := int32(1)
rs := &appsv1.ReplicaSet{
ObjectMeta: metav1.ObjectMeta{
Name: "test-rs-secret",
Namespace: namespace,
},
Spec: appsv1.ReplicaSetSpec{
Replicas: &replicas,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "test-rs-secret",
},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "test-rs-secret",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "test-container",
Image: "busybox:latest",
Command: []string{"sleep", "3600"},
Env: []corev1.EnvVar{
{
Name: "CODER_AGENT_TOKEN",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: "agent-token-secret",
},
Key: "token",
},
},
},
},
},
},
NodeSelector: map[string]string{
"non-existent-label": "non-existent-value",
},
},
},
},
}

_, err = client.AppsV1().ReplicaSets(namespace).Create(ctx, rs, metav1.CreateOptions{})
require.NoError(t, err)

// Wait for log source registration
waitForLogSource(t, ctx, api, 30*time.Second)

// Wait for the "Queued pod from ReplicaSet" log
logs, found := waitForLogContaining(t, ctx, api, 30*time.Second, "Queued pod from ReplicaSet")
require.True(t, found, "expected 'Queued pod from ReplicaSet' log, got: %v", logs)

// Delete the ReplicaSet
err = client.AppsV1().ReplicaSets(namespace).Delete(ctx, rs.Name, metav1.DeleteOptions{})
require.NoError(t, err)

// Wait for the "Deleted ReplicaSet" log
logs, found = waitForLogContaining(t, ctx, api, 30*time.Second, "Deleted ReplicaSet")
require.True(t, found, "expected 'Deleted ReplicaSet' log, got: %v", logs)
}
68 changes: 64 additions & 4 deletions logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/google/uuid"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -117,6 +118,39 @@ type podEventLogger struct {
lq *logQueuer
}

// resolveEnvValue resolves the value of an environment variable, supporting both
// direct values and secretKeyRef references. Returns empty string if the value
// cannot be resolved (e.g., optional secret not found).
func (p *podEventLogger) resolveEnvValue(ctx context.Context, namespace string, env corev1.EnvVar) (string, error) {
// Direct value takes precedence (existing behavior)
if env.Value != "" {
return env.Value, nil
}

// Check for secretKeyRef
if env.ValueFrom != nil && env.ValueFrom.SecretKeyRef != nil {
ref := env.ValueFrom.SecretKeyRef
secret, err := p.client.CoreV1().Secrets(namespace).Get(ctx, ref.Name, v1.GetOptions{})
if err != nil {
// Handle optional secrets gracefully - only ignore NotFound errors
if ref.Optional != nil && *ref.Optional && k8serrors.IsNotFound(err) {
return "", nil
}
return "", fmt.Errorf("get secret %s: %w", ref.Name, err)
}
value, ok := secret.Data[ref.Key]
if !ok {
if ref.Optional != nil && *ref.Optional {
return "", nil
}
return "", fmt.Errorf("secret %s has no key %s", ref.Name, ref.Key)
}
return string(value), nil
}

return "", nil
}

// initNamespace starts the informer factory and registers event handlers for a given namespace.
// If provided namespace is empty, it will start the informer factory and register event handlers for all namespaces.
func (p *podEventLogger) initNamespace(namespace string) error {
Expand Down Expand Up @@ -157,15 +191,28 @@ func (p *podEventLogger) initNamespace(namespace string) error {
if env.Name != "CODER_AGENT_TOKEN" {
continue
}

token, err := p.resolveEnvValue(p.ctx, pod.Namespace, env)
if err != nil {
p.logger.Warn(p.ctx, "failed to resolve CODER_AGENT_TOKEN",
slog.F("pod", pod.Name),
slog.F("namespace", pod.Namespace),
slog.Error(err))
continue
}
if token == "" {
continue
}

registered = true
p.tc.setPodToken(pod.Name, env.Value)
p.tc.setPodToken(pod.Name, token)

// We don't want to add logs to workspaces that are already started!
if !pod.CreationTimestamp.After(startTime) {
continue
}

p.sendLog(pod.Name, env.Value, agentsdk.Log{
p.sendLog(pod.Name, token, agentsdk.Log{
CreatedAt: time.Now(),
Output: fmt.Sprintf("🐳 %s: %s", newColor(color.Bold).Sprint("Created pod"), pod.Name),
Level: codersdk.LogLevelInfo,
Expand Down Expand Up @@ -218,10 +265,23 @@ func (p *podEventLogger) initNamespace(namespace string) error {
if env.Name != "CODER_AGENT_TOKEN" {
continue
}

token, err := p.resolveEnvValue(p.ctx, replicaSet.Namespace, env)
if err != nil {
p.logger.Warn(p.ctx, "failed to resolve CODER_AGENT_TOKEN",
slog.F("replicaset", replicaSet.Name),
slog.F("namespace", replicaSet.Namespace),
slog.Error(err))
continue
}
if token == "" {
continue
}

registered = true
p.tc.setReplicaSetToken(replicaSet.Name, env.Value)
p.tc.setReplicaSetToken(replicaSet.Name, token)

p.sendLog(replicaSet.Name, env.Value, agentsdk.Log{
p.sendLog(replicaSet.Name, token, agentsdk.Log{
CreatedAt: time.Now(),
Output: fmt.Sprintf("🐳 %s: %s", newColor(color.Bold).Sprint("Queued pod from ReplicaSet"), replicaSet.Name),
Level: codersdk.LogLevelInfo,
Expand Down
Loading