diff --git a/cmd/client/main.go b/cmd/client/main.go index 63b3a268..b09c713f 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -19,6 +19,7 @@ import ( "context" "crypto/tls" "crypto/x509" + "encoding/json" "errors" "fmt" "io" @@ -47,6 +48,7 @@ var ( tlsCert = kingpin.Flag("tls.cert", " Client certificate file").String() tlsKey = kingpin.Flag("tls.key", " Private key file").String() metricsAddr = kingpin.Flag("metrics-addr", "Serve Prometheus metrics at this address").Default(":9369").String() + labels = kingpin.Flag("label", "Labels to register with client FQDN for service discovery in key=value format. Can be specified multiple times.").StringMap() retryInitialWait = kingpin.Flag("proxy.retry.initial-wait", "Amount of time to wait after proxy failure").Default("1s").Duration() retryMaxWait = kingpin.Flag("proxy.retry.max-wait", "Maximum amount of time to wait between proxy poll retries").Default("5s").Duration() @@ -190,7 +192,19 @@ func (c *Coordinator) doPoll(client *http.Client) error { return fmt.Errorf("error parsing url poll: %w", err) } url := base.ResolveReference(u) - resp, err := client.Post(url.String(), "", strings.NewReader(*myFqdn)) + + pollReq := util.PollRequest{ + FQDN: *myFqdn, + Labels: *labels, + } + + body, err := json.Marshal(pollReq) + if err != nil { + c.logger.Error("Error marshaling poll request:", "err", err) + return fmt.Errorf("error marshaling poll request: %w", err) + } + + resp, err := client.Post(url.String(), "application/json", bytes.NewReader(body)) if err != nil { c.logger.Error("Error polling:", "err", err) return fmt.Errorf("error polling: %w", err) diff --git a/cmd/proxy/coordinator.go b/cmd/proxy/coordinator.go index a9ccdb11..9e509d66 100644 --- a/cmd/proxy/coordinator.go +++ b/cmd/proxy/coordinator.go @@ -32,6 +32,11 @@ var ( registrationTimeout = kingpin.Flag("registration.timeout", "After how long a registration expires.").Default("5m").Duration() ) +type ClientInfo struct { + LastSeen time.Time + Labels map[string]string +} + // Coordinator metrics. var ( knownClients = promauto.NewGauge( @@ -51,8 +56,8 @@ type Coordinator struct { waiting map[string]chan *http.Request // Responses from clients. responses map[string]chan *http.Response - // Clients we know about and when they last contacted us. - known map[string]time.Time + // Clients we know about with their labels and last contact time. + known map[string]ClientInfo logger *slog.Logger } @@ -62,7 +67,7 @@ func NewCoordinator(logger *slog.Logger) (*Coordinator, error) { c := &Coordinator{ waiting: map[string]chan *http.Request{}, responses: map[string]chan *http.Response{}, - known: map[string]time.Time{}, + known: map[string]ClientInfo{}, logger: logger, } @@ -131,10 +136,10 @@ func (c *Coordinator) DoScrape(ctx context.Context, r *http.Request) (*http.Resp } // WaitForScrapeInstruction registers a client waiting for a scrape result -func (c *Coordinator) WaitForScrapeInstruction(fqdn string) (*http.Request, error) { - c.logger.Info("WaitForScrapeInstruction", "fqdn", fqdn) +func (c *Coordinator) WaitForScrapeInstruction(fqdn string, labels map[string]string) (*http.Request, error) { + c.logger.Info("WaitForScrapeInstruction", "fqdn", fqdn, "labels", labels) - c.addKnownClient(fqdn) + c.addKnownClient(fqdn, labels) // TODO: What if the client times out? ch := c.getRequestChannel(fqdn) @@ -179,24 +184,31 @@ func (c *Coordinator) ScrapeResult(r *http.Response) error { } } -func (c *Coordinator) addKnownClient(fqdn string) { +func (c *Coordinator) addKnownClient(fqdn string, labels map[string]string) { c.mu.Lock() defer c.mu.Unlock() - c.known[fqdn] = time.Now() + if labels == nil { + labels = make(map[string]string) + } + + c.known[fqdn] = ClientInfo{ + LastSeen: time.Now(), + Labels: labels, + } knownClients.Set(float64(len(c.known))) } -// KnownClients returns a list of alive clients -func (c *Coordinator) KnownClients() []string { +// KnownClients returns a map of alive clients with their info +func (c *Coordinator) KnownClients() map[string]ClientInfo { c.mu.Lock() defer c.mu.Unlock() limit := time.Now().Add(-*registrationTimeout) - known := make([]string, 0, len(c.known)) - for k, t := range c.known { - if limit.Before(t) { - known = append(known, k) + known := make(map[string]ClientInfo) + for fqdn, info := range c.known { + if limit.Before(info.LastSeen) { + known[fqdn] = info } } return known @@ -210,9 +222,9 @@ func (c *Coordinator) gc() { defer c.mu.Unlock() limit := time.Now().Add(-*registrationTimeout) deleted := 0 - for k, ts := range c.known { - if ts.Before(limit) { - delete(c.known, k) + for fqdn, info := range c.known { + if info.LastSeen.Before(limit) { + delete(c.known, fqdn) deleted++ } } diff --git a/cmd/proxy/main.go b/cmd/proxy/main.go index fb0d8315..f2a1bc62 100644 --- a/cmd/proxy/main.go +++ b/cmd/proxy/main.go @@ -141,8 +141,32 @@ func (h *httpHandler) handlePush(w http.ResponseWriter, r *http.Request) { // handlePoll handles clients registering and asking for scrapes. func (h *httpHandler) handlePoll(w http.ResponseWriter, r *http.Request) { - fqdn, _ := io.ReadAll(r.Body) - request, err := h.coordinator.WaitForScrapeInstruction(strings.TrimSpace(string(fqdn))) + var fqdn string + var labels map[string]string + + contentType := r.Header.Get("Content-Type") + if contentType == "application/json" { + var pollReq util.PollRequest + body, err := io.ReadAll(r.Body) + if err != nil { + h.logger.Error("Error reading request body:", "err", err) + http.Error(w, fmt.Sprintf("Error reading request body: %s", err.Error()), http.StatusBadRequest) + return + } + if err := json.Unmarshal(body, &pollReq); err != nil { + h.logger.Error("Error unmarshaling JSON:", "err", err) + http.Error(w, fmt.Sprintf("Error unmarshaling JSON: %s", err.Error()), http.StatusBadRequest) + return + } + fqdn = pollReq.FQDN + labels = pollReq.Labels + } else { + body, _ := io.ReadAll(r.Body) + fqdn = strings.TrimSpace(string(body)) + labels = make(map[string]string) + } + + request, err := h.coordinator.WaitForScrapeInstruction(fqdn, labels) if err != nil { h.logger.Info("Error WaitForScrapeInstruction:", "err", err) http.Error(w, fmt.Sprintf("Error WaitForScrapeInstruction: %s", err.Error()), http.StatusRequestTimeout) @@ -150,15 +174,18 @@ func (h *httpHandler) handlePoll(w http.ResponseWriter, r *http.Request) { } //nolint:errcheck // https://github.com/prometheus-community/PushProx/issues/111 request.WriteProxy(w) // Send full request as the body of the response. - h.logger.Info("Responded to /poll", "url", request.URL.String(), "scrape_id", request.Header.Get("Id")) + h.logger.Info("Responded to /poll", "url", request.URL.String(), "scrape_id", request.Header.Get("Id"), "labels", labels) } // handleListClients handles requests to list available clients as a JSON array. func (h *httpHandler) handleListClients(w http.ResponseWriter, r *http.Request) { known := h.coordinator.KnownClients() targets := make([]*targetGroup, 0, len(known)) - for _, k := range known { - targets = append(targets, &targetGroup{Targets: []string{k}}) + for fqdn, info := range known { + targets = append(targets, &targetGroup{ + Targets: []string{fqdn}, + Labels: info.Labels, + }) } w.Header().Set("Content-Type", "application/json") //nolint:errcheck // https://github.com/prometheus-community/PushProx/issues/111 diff --git a/end-to-end-test.sh b/end-to-end-test.sh index 3ceb681b..7a4a9342 100755 --- a/end-to-end-test.sh +++ b/end-to-end-test.sh @@ -29,13 +29,40 @@ while ! curl -s -f -L http://localhost:8080/clients; do sleep 2 done -./pushprox-client --log.level=debug --proxy-url=http://localhost:8080 & -echo $! > "${tmpdir}/client.pid" -while [ "$(curl -s -L 'http://localhost:8080/clients' | jq 'length')" != '1' ] ; do - echo 'Waiting for client' +./pushprox-client --log.level=debug --proxy-url=http://localhost:8080 & +echo $! >"${tmpdir}/client.pid" +./pushprox-client --log.level=debug --proxy-url=http://localhost:8080 --fqdn=client2 --label foo=bar --label exporter=node & +echo $! >"${tmpdir}/client2.pid" +while [ "$(curl -s -L 'http://localhost:8080/clients' | jq 'length')" != '2' ]; do + echo 'Waiting for clients' sleep 2 done +echo "Testing client labels..." +clients_response=$(curl -s -L 'http://localhost:8080/clients') + +# Check that the first client has empty labels +client1_labels=$(echo "$clients_response" | jq -r '.[] | select(.targets[] != "client2") | .labels') +if [ "$client1_labels" != "{}" ]; then + echo "ERROR: Expected client1 to have empty labels {}, got $client1_labels" + exit 1 +fi + +# Check that client2 has the expected labels +client2_labels=$(echo "$clients_response" | jq -r '.[] | select(.targets[] == "client2") | .labels') +label_value_foo=$(echo "$client2_labels" | jq -r '.foo // "missing"') +label_value_exporter=$(echo "$client2_labels" | jq -r '.exporter // "missing"') +if [ "$label_value_foo" != "bar" ]; then + echo "ERROR: Expected label foo=bar, got foo=$label_value_foo" + exit 1 +fi +if [ "$label_value_exporter" != "node" ]; then + echo "ERROR: Expected label exporter=node, got exporter=$label_value_exporter" + exit 1 +fi + +echo "✅ Labels test passed: client2 has correct labels (foo=bar, exporter=node), client1 has empty labels" + prometheus --config.file=prometheus.yml --log.level=debug & echo $! > "${tmpdir}/prometheus.pid" while ! curl -s -f -L http://localhost:9090/-/ready; do diff --git a/util/proxy.go b/util/proxy.go index 67a03ac5..ac7a23be 100644 --- a/util/proxy.go +++ b/util/proxy.go @@ -19,6 +19,11 @@ import ( "time" ) +type PollRequest struct { + FQDN string `json:"fqdn"` + Labels map[string]string `json:"labels"` +} + func GetScrapeTimeout(maxScrapeTimeout, defaultScrapeTimeout *time.Duration, h http.Header) time.Duration { timeout := *defaultScrapeTimeout headerTimeout, err := GetHeaderTimeout(h)