Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -47,6 +48,7 @@ var (
tlsCert = kingpin.Flag("tls.cert", "<cert> Client certificate file").String()
tlsKey = kingpin.Flag("tls.key", "<key> Private key file").String()
metricsAddr = kingpin.Flag("metrics-addr", "Serve Prometheus metrics at this address").Default(":9369").String()
labels = kingpin.Flag("label", "Labels to register with client FQDN for service discovery in key=value format. Can be specified multiple times.").StringMap()

retryInitialWait = kingpin.Flag("proxy.retry.initial-wait", "Amount of time to wait after proxy failure").Default("1s").Duration()
retryMaxWait = kingpin.Flag("proxy.retry.max-wait", "Maximum amount of time to wait between proxy poll retries").Default("5s").Duration()
Expand Down Expand Up @@ -190,7 +192,19 @@ func (c *Coordinator) doPoll(client *http.Client) error {
return fmt.Errorf("error parsing url poll: %w", err)
}
url := base.ResolveReference(u)
resp, err := client.Post(url.String(), "", strings.NewReader(*myFqdn))

pollReq := util.PollRequest{
FQDN: *myFqdn,
Labels: *labels,
}

body, err := json.Marshal(pollReq)
if err != nil {
c.logger.Error("Error marshaling poll request:", "err", err)
return fmt.Errorf("error marshaling poll request: %w", err)
}

resp, err := client.Post(url.String(), "application/json", bytes.NewReader(body))
if err != nil {
c.logger.Error("Error polling:", "err", err)
return fmt.Errorf("error polling: %w", err)
Expand Down
46 changes: 29 additions & 17 deletions cmd/proxy/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ var (
registrationTimeout = kingpin.Flag("registration.timeout", "After how long a registration expires.").Default("5m").Duration()
)

type ClientInfo struct {
LastSeen time.Time
Labels map[string]string
}

// Coordinator metrics.
var (
knownClients = promauto.NewGauge(
Expand All @@ -51,8 +56,8 @@ type Coordinator struct {
waiting map[string]chan *http.Request
// Responses from clients.
responses map[string]chan *http.Response
// Clients we know about and when they last contacted us.
known map[string]time.Time
// Clients we know about with their labels and last contact time.
known map[string]ClientInfo

logger *slog.Logger
}
Expand All @@ -62,7 +67,7 @@ func NewCoordinator(logger *slog.Logger) (*Coordinator, error) {
c := &Coordinator{
waiting: map[string]chan *http.Request{},
responses: map[string]chan *http.Response{},
known: map[string]time.Time{},
known: map[string]ClientInfo{},
logger: logger,
}

Expand Down Expand Up @@ -131,10 +136,10 @@ func (c *Coordinator) DoScrape(ctx context.Context, r *http.Request) (*http.Resp
}

// WaitForScrapeInstruction registers a client waiting for a scrape result
func (c *Coordinator) WaitForScrapeInstruction(fqdn string) (*http.Request, error) {
c.logger.Info("WaitForScrapeInstruction", "fqdn", fqdn)
func (c *Coordinator) WaitForScrapeInstruction(fqdn string, labels map[string]string) (*http.Request, error) {
c.logger.Info("WaitForScrapeInstruction", "fqdn", fqdn, "labels", labels)

c.addKnownClient(fqdn)
c.addKnownClient(fqdn, labels)
// TODO: What if the client times out?
ch := c.getRequestChannel(fqdn)

Expand Down Expand Up @@ -179,24 +184,31 @@ func (c *Coordinator) ScrapeResult(r *http.Response) error {
}
}

func (c *Coordinator) addKnownClient(fqdn string) {
func (c *Coordinator) addKnownClient(fqdn string, labels map[string]string) {
c.mu.Lock()
defer c.mu.Unlock()

c.known[fqdn] = time.Now()
if labels == nil {
labels = make(map[string]string)
}

c.known[fqdn] = ClientInfo{
LastSeen: time.Now(),
Labels: labels,
}
knownClients.Set(float64(len(c.known)))
}

// KnownClients returns a list of alive clients
func (c *Coordinator) KnownClients() []string {
// KnownClients returns a map of alive clients with their info
func (c *Coordinator) KnownClients() map[string]ClientInfo {
c.mu.Lock()
defer c.mu.Unlock()

limit := time.Now().Add(-*registrationTimeout)
known := make([]string, 0, len(c.known))
for k, t := range c.known {
if limit.Before(t) {
known = append(known, k)
known := make(map[string]ClientInfo)
for fqdn, info := range c.known {
if limit.Before(info.LastSeen) {
known[fqdn] = info
}
}
return known
Expand All @@ -210,9 +222,9 @@ func (c *Coordinator) gc() {
defer c.mu.Unlock()
limit := time.Now().Add(-*registrationTimeout)
deleted := 0
for k, ts := range c.known {
if ts.Before(limit) {
delete(c.known, k)
for fqdn, info := range c.known {
if info.LastSeen.Before(limit) {
delete(c.known, fqdn)
deleted++
}
}
Expand Down
37 changes: 32 additions & 5 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,24 +141,51 @@ func (h *httpHandler) handlePush(w http.ResponseWriter, r *http.Request) {

// handlePoll handles clients registering and asking for scrapes.
func (h *httpHandler) handlePoll(w http.ResponseWriter, r *http.Request) {
fqdn, _ := io.ReadAll(r.Body)
request, err := h.coordinator.WaitForScrapeInstruction(strings.TrimSpace(string(fqdn)))
var fqdn string
var labels map[string]string

contentType := r.Header.Get("Content-Type")
if contentType == "application/json" {
var pollReq util.PollRequest
body, err := io.ReadAll(r.Body)
if err != nil {
h.logger.Error("Error reading request body:", "err", err)
http.Error(w, fmt.Sprintf("Error reading request body: %s", err.Error()), http.StatusBadRequest)
return
}
if err := json.Unmarshal(body, &pollReq); err != nil {
h.logger.Error("Error unmarshaling JSON:", "err", err)
http.Error(w, fmt.Sprintf("Error unmarshaling JSON: %s", err.Error()), http.StatusBadRequest)
return
}
fqdn = pollReq.FQDN
labels = pollReq.Labels
} else {
body, _ := io.ReadAll(r.Body)
fqdn = strings.TrimSpace(string(body))
labels = make(map[string]string)
}

request, err := h.coordinator.WaitForScrapeInstruction(fqdn, labels)
if err != nil {
h.logger.Info("Error WaitForScrapeInstruction:", "err", err)
http.Error(w, fmt.Sprintf("Error WaitForScrapeInstruction: %s", err.Error()), http.StatusRequestTimeout)
return
}
//nolint:errcheck // https://github.com/prometheus-community/PushProx/issues/111
request.WriteProxy(w) // Send full request as the body of the response.
h.logger.Info("Responded to /poll", "url", request.URL.String(), "scrape_id", request.Header.Get("Id"))
h.logger.Info("Responded to /poll", "url", request.URL.String(), "scrape_id", request.Header.Get("Id"), "labels", labels)
}

// handleListClients handles requests to list available clients as a JSON array.
func (h *httpHandler) handleListClients(w http.ResponseWriter, r *http.Request) {
known := h.coordinator.KnownClients()
targets := make([]*targetGroup, 0, len(known))
for _, k := range known {
targets = append(targets, &targetGroup{Targets: []string{k}})
for fqdn, info := range known {
targets = append(targets, &targetGroup{
Targets: []string{fqdn},
Labels: info.Labels,
})
}
w.Header().Set("Content-Type", "application/json")
//nolint:errcheck // https://github.com/prometheus-community/PushProx/issues/111
Expand Down
35 changes: 31 additions & 4 deletions end-to-end-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,40 @@ while ! curl -s -f -L http://localhost:8080/clients; do
sleep 2
done

./pushprox-client --log.level=debug --proxy-url=http://localhost:8080 &
echo $! > "${tmpdir}/client.pid"
while [ "$(curl -s -L 'http://localhost:8080/clients' | jq 'length')" != '1' ] ; do
echo 'Waiting for client'
./pushprox-client --log.level=debug --proxy-url=http://localhost:8080 &
echo $! >"${tmpdir}/client.pid"
./pushprox-client --log.level=debug --proxy-url=http://localhost:8080 --fqdn=client2 --label foo=bar --label exporter=node &
echo $! >"${tmpdir}/client2.pid"
while [ "$(curl -s -L 'http://localhost:8080/clients' | jq 'length')" != '2' ]; do
echo 'Waiting for clients'
sleep 2
done

echo "Testing client labels..."
clients_response=$(curl -s -L 'http://localhost:8080/clients')

# Check that the first client has empty labels
client1_labels=$(echo "$clients_response" | jq -r '.[] | select(.targets[] != "client2") | .labels')
if [ "$client1_labels" != "{}" ]; then
echo "ERROR: Expected client1 to have empty labels {}, got $client1_labels"
exit 1
fi

# Check that client2 has the expected labels
client2_labels=$(echo "$clients_response" | jq -r '.[] | select(.targets[] == "client2") | .labels')
label_value_foo=$(echo "$client2_labels" | jq -r '.foo // "missing"')
label_value_exporter=$(echo "$client2_labels" | jq -r '.exporter // "missing"')
if [ "$label_value_foo" != "bar" ]; then
echo "ERROR: Expected label foo=bar, got foo=$label_value_foo"
exit 1
fi
if [ "$label_value_exporter" != "node" ]; then
echo "ERROR: Expected label exporter=node, got exporter=$label_value_exporter"
exit 1
fi

echo "✅ Labels test passed: client2 has correct labels (foo=bar, exporter=node), client1 has empty labels"

prometheus --config.file=prometheus.yml --log.level=debug &
echo $! > "${tmpdir}/prometheus.pid"
while ! curl -s -f -L http://localhost:9090/-/ready; do
Expand Down
5 changes: 5 additions & 0 deletions util/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ import (
"time"
)

type PollRequest struct {
FQDN string `json:"fqdn"`
Labels map[string]string `json:"labels"`
}

func GetScrapeTimeout(maxScrapeTimeout, defaultScrapeTimeout *time.Duration, h http.Header) time.Duration {
timeout := *defaultScrapeTimeout
headerTimeout, err := GetHeaderTimeout(h)
Expand Down