Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions hypercache.go
Original file line number Diff line number Diff line change
Expand Up @@ -1056,6 +1056,22 @@ func (hyperCache *HyperCache[T]) DistRingHashSpots() []string { //nolint:ireturn
return nil
}

// DistHeartbeatMetrics returns distributed heartbeat metrics if supported.
func (hyperCache *HyperCache[T]) DistHeartbeatMetrics() any { //nolint:ireturn
if dm, ok := any(hyperCache.backend).(*backend.DistMemory); ok {
m := dm.Metrics()

return map[string]any{
"heartbeatSuccess": m.HeartbeatSuccess,
"heartbeatFailure": m.HeartbeatFailure,
"nodesRemoved": m.NodesRemoved,
"readPrimaryPromote": m.ReadPrimaryPromote,
}
}

return nil
}

// ManagementHTTPAddress returns the bound address of the optional management HTTP server.
// Empty string when the server is disabled or failed to start.
func (hyperCache *HyperCache[T]) ManagementHTTPAddress() string {
Expand Down
28 changes: 28 additions & 0 deletions internal/telemetry/attrs/keys.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Package attrs provides reusable OpenTelemetry attribute key constants to avoid duplication
// across middlewares. It defines telemetry attribute keys used for observability and monitoring
// across the hypercache system. These constants provide standardized key names for metrics,
// traces, and logs to ensure consistent telemetry data collection.
package attrs

const (
// AttrKeyLength represents the telemetry attribute key for measuring the length
// of a cache key in bytes. This metric helps monitor key size distribution
// and identify potential performance impacts from oversized keys.
AttrKeyLength = "key.len"
// AttrKeysCount represents the telemetry attribute key for measuring the number
// of cache keys being processed. This metric helps monitor the workload and
// identify potential bottlenecks in key management.
AttrKeysCount = "keys.count"
// AttrResultCount represents the telemetry attribute key for measuring the number
// of cache results returned. This metric helps monitor the effectiveness of cache
// lookups and identify potential issues with cache population.
AttrResultCount = "result.count"
// AttrFailedCount represents the telemetry attribute key for measuring the number
// of cache operations that failed. This metric helps monitor error rates and
// identify potential issues with cache reliability.
AttrFailedCount = "failed.count"
// AttrExpirationMS represents the telemetry attribute key for measuring the expiration
// time of cache items in milliseconds. This metric helps monitor cache item lifetimes
// and identify potential issues with cache eviction policies.
AttrExpirationMS = "expiration.ms"
)
8 changes: 8 additions & 0 deletions management_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ type membershipIntrospect interface {
vnodes int,
)
DistRingHashSpots() []string
DistHeartbeatMetrics() any
}

// Start launches listener (idempotent). Caller provides cache for handler wiring.
Expand Down Expand Up @@ -257,6 +258,13 @@ func (s *ManagementHTTPServer) registerCluster(useAuth func(fiber.Handler) fiber
return fiberCtx.JSON(fiber.Map{"count": len(spots), "vnodes": spots})
}

return fiberCtx.Status(fiber.StatusNotFound).JSON(fiber.Map{"error": "distributed backend unsupported"})
}))
s.app.Get("/cluster/heartbeat", useAuth(func(fiberCtx fiber.Ctx) error { // heartbeat metrics
if mi, ok := hc.(membershipIntrospect); ok {
return fiberCtx.JSON(mi.DistHeartbeatMetrics())
}

return fiberCtx.Status(fiber.StatusNotFound).JSON(fiber.Map{"error": "distributed backend unsupported"})
}))
}
Expand Down
13 changes: 7 additions & 6 deletions pkg/backend/dist_http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,13 @@ func (s *distHTTPServer) registerSet(ctx context.Context, dm *DistMemory) { //no
return fctx.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": unmarshalErr.Error()})
}

it := &cache.Item{
Key: req.Key,
Value: req.Value,
Expiration: time.Duration(req.Expiration) * time.Millisecond,
Version: req.Version,
Origin: req.Origin,
it := &cache.Item{ // LastUpdated set to now for replicated writes
Key: req.Key,
Value: req.Value,
Expiration: time.Duration(req.Expiration) * time.Millisecond,
Version: req.Version,
Origin: req.Origin,
LastUpdated: time.Now(),
}

dm.applySet(ctx, it, req.Replicate)
Expand Down
11 changes: 6 additions & 5 deletions pkg/backend/dist_http_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,12 @@ func decodeGetBody(r io.Reader) (*cache.Item, bool, error) { //nolint:ireturn
}
// reconstruct cache.Item (we ignore expiration formatting difference vs ms)
return &cache.Item{ // multi-line for readability
Key: mirror.Key,
Value: mirror.Value,
Expiration: time.Duration(mirror.Expiration) * time.Millisecond,
Version: mirror.Version,
Origin: mirror.Origin,
Key: mirror.Key,
Value: mirror.Value,
Expiration: time.Duration(mirror.Expiration) * time.Millisecond,
Version: mirror.Version,
Origin: mirror.Origin,
LastUpdated: time.Now(),
}, true, nil
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/backend/dist_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,10 @@ func (dm *DistMemory) Set(ctx context.Context, item *cache.Item) error { //nolin
}
}

// primary path: assign version
// primary path: assign version & timestamp
item.Version = atomic.AddUint64(&dm.versionCounter, 1)
item.Origin = string(dm.localNode.ID)
item.LastUpdated = time.Now()
dm.applySet(ctx, item, false)

acks := 1 + dm.replicateTo(ctx, item, owners[1:])
Expand Down
1 change: 1 addition & 0 deletions pkg/cache/v2/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type Item struct {
Key string // key of the item
Value any // value of the item
LastAccess time.Time // last access time
LastUpdated time.Time // last write/version assignment time (distributed usage)
Size int64 // size in bytes
Expiration time.Duration // expiration duration
AccessCount uint32 // number of times the item has been accessed
Expand Down
23 changes: 11 additions & 12 deletions pkg/middleware/otel_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@ import (
"go.opentelemetry.io/otel/metric"

"github.com/hyp3rd/hypercache"
"github.com/hyp3rd/hypercache/internal/telemetry/attrs"
"github.com/hyp3rd/hypercache/pkg/backend"
"github.com/hyp3rd/hypercache/pkg/cache"
"github.com/hyp3rd/hypercache/pkg/stats"
)

const attrKeyLen = "key.len" // reused attribute key name

// OTelMetricsMiddleware emits OpenTelemetry metrics for service methods.
type OTelMetricsMiddleware struct {
next hypercache.Service
Expand Down Expand Up @@ -45,7 +44,7 @@ func NewOTelMetricsMiddleware(next hypercache.Service, meter metric.Meter) (hype
func (mw *OTelMetricsMiddleware) Get(ctx context.Context, key string) (any, bool) {
start := time.Now()
v, ok := mw.next.Get(ctx, key)
mw.rec(ctx, "Get", start, attribute.Int(attrKeyLen, len(key)), attribute.Bool("hit", ok))
mw.rec(ctx, "Get", start, attribute.Int(attrs.AttrKeyLength, len(key)), attribute.Bool("hit", ok))

return v, ok
}
Expand All @@ -54,7 +53,7 @@ func (mw *OTelMetricsMiddleware) Get(ctx context.Context, key string) (any, bool
func (mw *OTelMetricsMiddleware) Set(ctx context.Context, key string, value any, expiration time.Duration) error {
start := time.Now()
err := mw.next.Set(ctx, key, value, expiration)
mw.rec(ctx, "Set", start, attribute.Int(attrKeyLen, len(key)))
mw.rec(ctx, "Set", start, attribute.Int(attrs.AttrKeyLength, len(key)))

return err
}
Expand All @@ -63,7 +62,7 @@ func (mw *OTelMetricsMiddleware) Set(ctx context.Context, key string, value any,
func (mw *OTelMetricsMiddleware) GetOrSet(ctx context.Context, key string, value any, expiration time.Duration) (any, error) {
start := time.Now()
v, err := mw.next.GetOrSet(ctx, key, value, expiration)
mw.rec(ctx, "GetOrSet", start, attribute.Int(attrKeyLen, len(key)))
mw.rec(ctx, "GetOrSet", start, attribute.Int(attrs.AttrKeyLength, len(key)))

return v, err
}
Expand All @@ -72,7 +71,7 @@ func (mw *OTelMetricsMiddleware) GetOrSet(ctx context.Context, key string, value
func (mw *OTelMetricsMiddleware) GetWithInfo(ctx context.Context, key string) (*cache.Item, bool) {
start := time.Now()
it, ok := mw.next.GetWithInfo(ctx, key)
mw.rec(ctx, "GetWithInfo", start, attribute.Int(attrKeyLen, len(key)), attribute.Bool("hit", ok))
mw.rec(ctx, "GetWithInfo", start, attribute.Int(attrs.AttrKeyLength, len(key)), attribute.Bool("hit", ok))

return it, ok
}
Expand All @@ -85,9 +84,9 @@ func (mw *OTelMetricsMiddleware) GetMultiple(ctx context.Context, keys ...string
ctx,
"GetMultiple",
start,
attribute.Int("keys.count", len(keys)),
attribute.Int("result.count", len(res)),
attribute.Int("failed.count", len(failed)),
attribute.Int(attrs.AttrKeysCount, len(keys)),
attribute.Int(attrs.AttrResultCount, len(res)),
attribute.Int(attrs.AttrFailedCount, len(failed)),
)

return res, failed
Expand Down Expand Up @@ -145,10 +144,10 @@ func (mw *OTelMetricsMiddleware) Stop(ctx context.Context) error { return mw.nex
func (mw *OTelMetricsMiddleware) GetStats() stats.Stats { return mw.next.GetStats() }

// rec records call count and duration with attributes.
func (mw *OTelMetricsMiddleware) rec(ctx context.Context, method string, start time.Time, attrs ...attribute.KeyValue) {
func (mw *OTelMetricsMiddleware) rec(ctx context.Context, method string, start time.Time, attributes ...attribute.KeyValue) {
base := []attribute.KeyValue{attribute.String("method", method)}
if len(attrs) > 0 {
base = append(base, attrs...)
if len(attributes) > 0 {
base = append(base, attributes...)
}

mw.calls.Add(ctx, 1, metric.WithAttributes(base...))
Expand Down
27 changes: 16 additions & 11 deletions pkg/middleware/otel_tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@ import (
"go.opentelemetry.io/otel/trace"

"github.com/hyp3rd/hypercache"
"github.com/hyp3rd/hypercache/internal/telemetry/attrs"
"github.com/hyp3rd/hypercache/pkg/backend"
"github.com/hyp3rd/hypercache/pkg/cache"
"github.com/hyp3rd/hypercache/pkg/stats"
)

const attrKeyLength = "key.len"

// OTelTracingMiddleware wraps hypercache.Service methods with OpenTelemetry spans.
type OTelTracingMiddleware struct {
next hypercache.Service
Expand All @@ -28,8 +27,8 @@ type OTelTracingMiddleware struct {
type OTelTracingOption func(*OTelTracingMiddleware)

// WithCommonAttributes sets attributes applied to all spans.
func WithCommonAttributes(attrs ...attribute.KeyValue) OTelTracingOption {
return func(m *OTelTracingMiddleware) { m.commonAttrs = append(m.commonAttrs, attrs...) }
func WithCommonAttributes(attributes ...attribute.KeyValue) OTelTracingOption {
return func(m *OTelTracingMiddleware) { m.commonAttrs = append(m.commonAttrs, attributes...) }
}

// NewOTelTracingMiddleware creates a tracing middleware.
Expand All @@ -44,7 +43,7 @@ func NewOTelTracingMiddleware(next hypercache.Service, tracer trace.Tracer, opts

// Get implements Service.Get with tracing.
func (mw OTelTracingMiddleware) Get(ctx context.Context, key string) (any, bool) {
ctx, span := mw.startSpan(ctx, "hypercache.Get", attribute.Int(attrKeyLength, len(key)))
ctx, span := mw.startSpan(ctx, "hypercache.Get", attribute.Int(attrs.AttrKeyLength, len(key)))
defer span.End()

v, ok := mw.next.Get(ctx, key)
Expand All @@ -55,7 +54,10 @@ func (mw OTelTracingMiddleware) Get(ctx context.Context, key string) (any, bool)

// Set implements Service.Set with tracing.
func (mw OTelTracingMiddleware) Set(ctx context.Context, key string, value any, expiration time.Duration) error {
ctx, span := mw.startSpan(ctx, "hypercache.Set", attribute.Int(attrKeyLength, len(key)), attribute.Int64("expiration.ms", expiration.Milliseconds()))
ctx, span := mw.startSpan(
ctx, "hypercache.Set",
attribute.Int(attrs.AttrKeyLength, len(key)),
attribute.Int64(attrs.AttrExpirationMS, expiration.Milliseconds()))
defer span.End()

err := mw.next.Set(ctx, key, value, expiration)
Expand All @@ -68,7 +70,10 @@ func (mw OTelTracingMiddleware) Set(ctx context.Context, key string, value any,

// GetOrSet implements Service.GetOrSet with tracing.
func (mw OTelTracingMiddleware) GetOrSet(ctx context.Context, key string, value any, expiration time.Duration) (any, error) {
ctx, span := mw.startSpan(ctx, "hypercache.GetOrSet", attribute.Int(attrKeyLength, len(key)), attribute.Int64("expiration.ms", expiration.Milliseconds()))
ctx, span := mw.startSpan(
ctx, "hypercache.GetOrSet",
attribute.Int(attrs.AttrKeyLength, len(key)),
attribute.Int64(attrs.AttrExpirationMS, expiration.Milliseconds()))
defer span.End()

v, err := mw.next.GetOrSet(ctx, key, value, expiration)
Expand All @@ -81,7 +86,7 @@ func (mw OTelTracingMiddleware) GetOrSet(ctx context.Context, key string, value

// GetWithInfo implements Service.GetWithInfo with tracing.
func (mw OTelTracingMiddleware) GetWithInfo(ctx context.Context, key string) (*cache.Item, bool) {
ctx, span := mw.startSpan(ctx, "hypercache.GetWithInfo", attribute.Int(attrKeyLength, len(key)))
ctx, span := mw.startSpan(ctx, "hypercache.GetWithInfo", attribute.Int(attrs.AttrKeyLength, len(key)))
defer span.End()

it, ok := mw.next.GetWithInfo(ctx, key)
Expand Down Expand Up @@ -173,14 +178,14 @@ func (mw OTelTracingMiddleware) Stop(ctx context.Context) error {
func (mw OTelTracingMiddleware) GetStats() stats.Stats { return mw.next.GetStats() }

// startSpan starts a span with common and provided attributes.
func (mw OTelTracingMiddleware) startSpan(ctx context.Context, name string, attrs ...attribute.KeyValue) (context.Context, trace.Span) {
func (mw OTelTracingMiddleware) startSpan(ctx context.Context, name string, attributes ...attribute.KeyValue) (context.Context, trace.Span) {
ctx, span := mw.tracer.Start(ctx, name, trace.WithSpanKind(trace.SpanKindInternal))
if len(mw.commonAttrs) > 0 {
span.SetAttributes(mw.commonAttrs...)
}

if len(attrs) > 0 {
span.SetAttributes(attrs...)
if len(attributes) > 0 {
span.SetAttributes(attributes...)
}

return ctx, span
Expand Down
Loading