Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
ef7e645
feat: cache plans
SkArchon Jan 9, 2026
419cabf
fix: cleanup
SkArchon Jan 9, 2026
e65cdf2
fix: updates
SkArchon Jan 13, 2026
2f89643
fix: updates
SkArchon Jan 13, 2026
cf6e9df
fix: updates
SkArchon Jan 13, 2026
0d2e2db
fix: updates
SkArchon Jan 13, 2026
52aa474
fix: updates
SkArchon Jan 13, 2026
8cd21fe
fix: updates
SkArchon Jan 13, 2026
822a4a4
fix: config updates
SkArchon Jan 13, 2026
3159713
fix: config reloading
SkArchon Jan 14, 2026
d576e03
Merge branch 'main' into milinda/eng-8701-implement-cache-warmer-from…
SkArchon Jan 14, 2026
2bdf426
fix: update router-tests
SkArchon Jan 14, 2026
0e92ca1
fix: cleanup
SkArchon Jan 14, 2026
78fc67d
fix: use assert
SkArchon Jan 14, 2026
f4850f6
fix: review comments
SkArchon Jan 14, 2026
7ec23f1
fix: linting
SkArchon Jan 14, 2026
42bbcbd
fix: linting
SkArchon Jan 14, 2026
480967d
fix: go modules
SkArchon Jan 18, 2026
d542eec
fix: updates
SkArchon Jan 18, 2026
3564f0d
fix: updates
SkArchon Jan 19, 2026
eb69a26
fix: make in memory switcher default when cache warmer is not enabled
SkArchon Jan 19, 2026
98608c7
fix: review comments
SkArchon Jan 19, 2026
a7adf3d
fix: updates
SkArchon Jan 20, 2026
05026d3
Merge remote-tracking branch 'origin/main' into milinda/eng-8701-impl…
SkArchon Jan 20, 2026
6d23b7e
fix: go mod
SkArchon Jan 20, 2026
ceabb75
fix: update enabled
SkArchon Jan 20, 2026
ae920ef
fix: update enabled
SkArchon Jan 20, 2026
3cfca20
fix: review comments
SkArchon Jan 20, 2026
d4ad399
fix: review comments
SkArchon Jan 20, 2026
5d79a9e
fix: feature flags didnt pass in cosmo flag status
SkArchon Jan 20, 2026
7f099f5
fix: nil pointer
SkArchon Jan 20, 2026
f382e46
fix: review comments
SkArchon Jan 20, 2026
9a34515
fix: use the new ristretto version
SkArchon Jan 22, 2026
42c8c81
fix: formatting
SkArchon Jan 22, 2026
ef33a6c
fix: review comments
SkArchon Jan 22, 2026
e0734f5
fix: review comments
SkArchon Jan 22, 2026
11158f6
fix: review comments
SkArchon Jan 22, 2026
54f24c1
fix: tests
SkArchon Jan 22, 2026
40548ad
Merge branch 'main' into milinda/eng-8701-implement-cache-warmer-from…
SkArchon Jan 22, 2026
fd68357
fix: review comments
SkArchon Jan 22, 2026
4fa6007
fix: review comments
SkArchon Jan 26, 2026
d26cb4e
fix: review comments
SkArchon Jan 26, 2026
e371592
fix: review comments
SkArchon Jan 26, 2026
335c0f1
fix: review comments
SkArchon Jan 26, 2026
2fd4793
fix: review comments
SkArchon Jan 26, 2026
9308bdd
fix: review comments
SkArchon Jan 26, 2026
f96e1f1
fix: review comments
SkArchon Jan 26, 2026
ab75a5a
fix: review comments
SkArchon Jan 26, 2026
172df66
fix: review comments
SkArchon Jan 26, 2026
44875f2
Merge branch 'main' into milinda/eng-8701-implement-cache-warmer-from…
SkArchon Jan 26, 2026
c3d2336
fix: update comments
SkArchon Jan 26, 2026
0f98b1d
fix: review comments
SkArchon Jan 29, 2026
e511bef
fix: review comments
SkArchon Jan 29, 2026
91b9bfc
fix: revert cdn
SkArchon Jan 30, 2026
c10b5bf
fix: review comments
SkArchon Jan 30, 2026
4ef640c
fix: review comments
SkArchon Jan 31, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
407 changes: 395 additions & 12 deletions router-tests/cache_warmup_test.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion router-tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ require (
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.7 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dgraph-io/ristretto/v2 v2.1.0 // indirect
github.com/dgraph-io/ristretto/v2 v2.4.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/docker/cli v28.2.2+incompatible // indirect
github.com/docker/distribution v2.8.3+incompatible // indirect
Expand Down
8 changes: 4 additions & 4 deletions router-tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/ristretto/v2 v2.1.0 h1:59LjpOJLNDULHh8MC4UaegN52lC4JnO2dITsie/Pa8I=
github.com/dgraph-io/ristretto/v2 v2.1.0/go.mod h1:uejeqfYXpUomfse0+lO+13ATz4TypQYLJZzBSAemuB4=
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 h1:fAjc9m62+UWV/WAFKLNi6ZS0675eEUC9y3AlwSbQu1Y=
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dgraph-io/ristretto/v2 v2.4.0 h1:I/w09yLjhdcVD2QV192UJcq8dPBaAJb9pOuMyNy0XlU=
github.com/dgraph-io/ristretto/v2 v2.4.0/go.mod h1:0KsrXtXvnv0EqnzyowllbVJB8yBonswa2lTCK2gGo9E=
github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da h1:aIftn67I1fkbMa512G+w+Pxci9hJPB8oMnkcP3iZF38=
github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/dgryski/trifles v0.0.0-20230903005119-f50d829f2e54 h1:SG7nF6SRlWhcT7cNTs5R6Hk4V2lcmLz2NsG2VnInyNo=
Expand Down
3 changes: 2 additions & 1 deletion router/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ func Main() {
}

rs, err := core.NewRouterSupervisor(&core.RouterSupervisorOpts{
BaseLogger: baseLogger,
BaseLogger: baseLogger,
SwitchoverConfig: core.NewSwitchoverConfig(baseLogger),
ConfigFactory: func() (*config.Config, error) {
result, err := config.LoadConfig(*configPathFlag)
if err != nil {
Expand Down
28 changes: 28 additions & 0 deletions router/core/cache_warmup.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type CacheWarmupProcessor interface {
type CacheWarmupConfig struct {
Log *zap.Logger
Source CacheWarmupSource
FallbackSource CacheWarmupSource
Workers int
ItemsPerSecond int
Timeout time.Duration
Expand All @@ -45,6 +46,7 @@ func WarmupCaches(ctx context.Context, cfg *CacheWarmupConfig) (err error) {
w := &cacheWarmup{
log: cfg.Log.With(zap.String("component", "cache_warmup")),
source: cfg.Source,
fallbackSource: cfg.FallbackSource,
workers: cfg.Workers,
itemsPerSecond: cfg.ItemsPerSecond,
timeout: cfg.Timeout,
Expand Down Expand Up @@ -92,6 +94,7 @@ func WarmupCaches(ctx context.Context, cfg *CacheWarmupConfig) (err error) {
type cacheWarmup struct {
log *zap.Logger
source CacheWarmupSource
fallbackSource CacheWarmupSource
workers int
itemsPerSecond int
timeout time.Duration
Expand All @@ -105,6 +108,12 @@ func (w *cacheWarmup) run(ctx context.Context) (int, error) {
defer cancel()

items, err := w.source.LoadItems(ctx, w.log)

// Try fallback if no items were loaded OR there was an error loading from main source
if len(items) == 0 || err != nil {
items, err = w.loadFromFallbackSource(ctx, err)
}

if err != nil {
return 0, err
}
Expand Down Expand Up @@ -197,6 +206,25 @@ func (w *cacheWarmup) run(ctx context.Context) (int, error) {
return len(items), nil
}

func (w *cacheWarmup) loadFromFallbackSource(ctx context.Context, mainErr error) ([]*nodev1.Operation, error) {
if w.fallbackSource == nil {
return nil, mainErr
}

fallbackItems, err := w.fallbackSource.LoadItems(ctx, w.log)
if err != nil {
// If fallback source also failed, log the fallback error and return the original error
w.log.Error("Failed to load cache warmup config from fallback source", zap.Error(err))
return nil, mainErr
}

// In case we went to the fallback because the main source had an error, log the original error
if mainErr != nil {
w.log.Error("Falling back to PlanSource due to error loading cache warmup config from CDN", zap.Error(mainErr))
}
return fallbackItems, nil
}

type CacheWarmupPlanningProcessorOptions struct {
OperationProcessor *OperationProcessor
OperationPlanner *OperationPlanner
Expand Down
32 changes: 32 additions & 0 deletions router/core/cache_warmup_plans.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package core

import (
"context"

nodev1 "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/node/v1"
"go.uber.org/zap"
)

var _ CacheWarmupSource = (*PlanSource)(nil)

// PlanSource is a very basic cache warmup source that relies on the caller of this type to pass in the
// queries to be used for cache warming directly
type PlanSource struct {
queries []*nodev1.Operation
}

// NewPlanSource creates a new PlanSource with the given queries from the caller
func NewPlanSource(switchoverCacheWarmerQueries []*nodev1.Operation) *PlanSource {
if switchoverCacheWarmerQueries == nil {
switchoverCacheWarmerQueries = make([]*nodev1.Operation, 0)
}
return &PlanSource{queries: switchoverCacheWarmerQueries}
}

// LoadItems loads the items from the plan source when called by the cache warmer
func (c *PlanSource) LoadItems(_ context.Context, _ *zap.Logger) ([]*nodev1.Operation, error) {
if c == nil {
return nil, nil
}
return c.queries, nil
}
67 changes: 50 additions & 17 deletions router/core/graph_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,20 @@ type BuildGraphMuxOptions struct {
EngineConfig *nodev1.EngineConfiguration
ConfigSubgraphs []*nodev1.Subgraph
RoutingUrlGroupings map[string]map[string]bool
SwitchoverConfig *SwitchoverConfig
}

func (b BuildGraphMuxOptions) IsBaseGraph() bool {
return b.FeatureFlagName == ""
}

// buildMultiGraphHandlerOptions contains the configuration options for building a multi-graph handler.
type buildMultiGraphHandlerOptions struct {
baseMux *chi.Mux
featureFlagConfigs map[string]*nodev1.FeatureFlagRouterExecutionConfig
switchoverConfig *SwitchoverConfig
}

// newGraphServer creates a new server instance.
func newGraphServer(ctx context.Context, r *Router, routerConfig *nodev1.RouterConfig, proxy ProxyFunc) (*graphServer, error) {
/* Older versions of composition will not populate a compatibility version.
Expand Down Expand Up @@ -273,6 +281,7 @@ func newGraphServer(ctx context.Context, r *Router, routerConfig *nodev1.RouterC
EngineConfig: routerConfig.GetEngineConfig(),
ConfigSubgraphs: routerConfig.GetSubgraphs(),
RoutingUrlGroupings: routingUrlGroupings,
SwitchoverConfig: r.switchoverConfig,
})
if err != nil {
return nil, fmt.Errorf("failed to build base mux: %w", err)
Expand All @@ -283,7 +292,11 @@ func newGraphServer(ctx context.Context, r *Router, routerConfig *nodev1.RouterC
s.logger.Info("Feature flags enabled", zap.Strings("flags", maps.Keys(featureFlagConfigMap)))
}

multiGraphHandler, err := s.buildMultiGraphHandler(ctx, gm.mux, featureFlagConfigMap)
multiGraphHandler, err := s.buildMultiGraphHandler(ctx, buildMultiGraphHandlerOptions{
baseMux: gm.mux,
featureFlagConfigs: featureFlagConfigMap,
switchoverConfig: r.switchoverConfig,
})
if err != nil {
return nil, fmt.Errorf("failed to build feature flag handler: %w", err)
}
Expand Down Expand Up @@ -431,22 +444,22 @@ func getRoutingUrlGroupingForCircuitBreakers(

func (s *graphServer) buildMultiGraphHandler(
ctx context.Context,
baseMux *chi.Mux,
featureFlagConfigs map[string]*nodev1.FeatureFlagRouterExecutionConfig,
opts buildMultiGraphHandlerOptions,
) (http.HandlerFunc, error) {
if len(featureFlagConfigs) == 0 {
return baseMux.ServeHTTP, nil
if len(opts.featureFlagConfigs) == 0 {
return opts.baseMux.ServeHTTP, nil
}

featureFlagToMux := make(map[string]*chi.Mux, len(featureFlagConfigs))
featureFlagToMux := make(map[string]*chi.Mux, len(opts.featureFlagConfigs))

// Build all the muxes for the feature flags in serial to avoid any race conditions
for featureFlagName, executionConfig := range featureFlagConfigs {
for featureFlagName, executionConfig := range opts.featureFlagConfigs {
gm, err := s.buildGraphMux(ctx, BuildGraphMuxOptions{
FeatureFlagName: featureFlagName,
RouterConfigVersion: executionConfig.GetVersion(),
EngineConfig: executionConfig.GetEngineConfig(),
ConfigSubgraphs: executionConfig.Subgraphs,
SwitchoverConfig: opts.switchoverConfig,
})
if err != nil {
return nil, fmt.Errorf("failed to build mux for feature flag '%s': %w", featureFlagName, err)
Expand All @@ -473,7 +486,7 @@ func (s *graphServer) buildMultiGraphHandler(
return
}

baseMux.ServeHTTP(w, r)
opts.baseMux.ServeHTTP(w, r)
}, nil
}

Expand Down Expand Up @@ -519,12 +532,12 @@ type graphMux struct {
validationCache *ristretto.Cache[uint64, bool]
operationHashCache *ristretto.Cache[uint64, string]

accessLogsFileLogger *logging.BufferedLogger
metricStore rmetric.Store
prometheusCacheMetrics *rmetric.CacheMetrics
otelCacheMetrics *rmetric.CacheMetrics
streamMetricStore rmetric.StreamMetricStore
prometheusMetricsExporter *graphqlmetrics.PrometheusMetricsExporter
accessLogsFileLogger *logging.BufferedLogger
metricStore rmetric.Store
prometheusCacheMetrics *rmetric.CacheMetrics
otelCacheMetrics *rmetric.CacheMetrics
streamMetricStore rmetric.StreamMetricStore
prometheusMetricsExporter *graphqlmetrics.PrometheusMetricsExporter
}

// buildOperationCaches creates the caches for the graph mux.
Expand Down Expand Up @@ -1296,7 +1309,8 @@ func (s *graphServer) buildGraphMux(
DisableExposingVariablesContentOnValidationError: s.engineExecutionConfiguration.DisableExposingVariablesContentOnValidationError,
ComplexityLimits: s.securityConfiguration.ComplexityLimits,
})
operationPlanner := NewOperationPlanner(executor, gm.planCache)

operationPlanner := NewOperationPlanner(executor, gm.planCache, opts.SwitchoverConfig.inMemoryPlanCacheFallback.IsEnabled())

// We support the MCP only on the base graph. Feature flags are not supported yet.
if opts.IsBaseGraph() && s.mcpServer != nil {
Expand Down Expand Up @@ -1346,16 +1360,35 @@ func (s *graphServer) buildGraphMux(
)
}

if s.Config.cacheWarmup.Source.Filesystem != nil {
switch {
case s.cacheWarmup.Source.Filesystem != nil:
warmupConfig.Source = NewFileSystemSource(&FileSystemSourceConfig{
RootPath: s.Config.cacheWarmup.Source.Filesystem.Path,
})
} else {
// Enable in-memory switchover fallback when:
// - Router has cache warmer with inMemoryFallback enabled, AND
// - Either:
// - Using static execution config (not Cosmo): s.selfRegister == nil
// - OR CDN cache warmer is explictly disabled
case s.cacheWarmup.InMemoryFallback && (s.selfRegister == nil || !s.Config.cacheWarmup.Source.CdnSource.Enabled):
// We first utilize the existing plan cache (if it was already set, i.e., not on the first start) to create a list of queries
// and then reset the plan cache to the new plan cache for this start afterwards.
warmupConfig.Source = NewPlanSource(opts.SwitchoverConfig.inMemoryPlanCacheFallback.getPlanCacheForFF(opts.FeatureFlagName))
opts.SwitchoverConfig.inMemoryPlanCacheFallback.setPlanCacheForFF(opts.FeatureFlagName, gm.planCache)
case s.Config.cacheWarmup.Source.CdnSource.Enabled:
// We use the in-memory cache as a fallback if enabled
// This is useful for when an issue occurs with the CDN when retrieving the required manifest
if s.cacheWarmup.InMemoryFallback {
warmupConfig.FallbackSource = NewPlanSource(opts.SwitchoverConfig.inMemoryPlanCacheFallback.getPlanCacheForFF(opts.FeatureFlagName))
opts.SwitchoverConfig.inMemoryPlanCacheFallback.setPlanCacheForFF(opts.FeatureFlagName, gm.planCache)
}
cdnSource, err := NewCDNSource(s.Config.cdnConfig.URL, s.graphApiToken, s.logger)
if err != nil {
return nil, fmt.Errorf("failed to create cdn source: %w", err)
}
warmupConfig.Source = cdnSource
default:
return nil, fmt.Errorf("unexpected cache warmer source provided")
}

err = WarmupCaches(ctx, warmupConfig)
Expand Down
35 changes: 24 additions & 11 deletions router/core/operation_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,33 +21,42 @@ type planWithMetaData struct {
operationDocument, schemaDocument *ast.Document
typeFieldUsageInfo []*graphqlschemausage.TypeFieldUsageInfo
argumentUsageInfo []*graphqlmetricsv1.ArgumentUsageInfo
content string
}

type OperationPlanner struct {
sf singleflight.Group
planCache ExecutionPlanCache[uint64, *planWithMetaData]
executor *Executor
trackUsageInfo bool
sf singleflight.Group
planCache ExecutionPlanCache[uint64, *planWithMetaData]
executor *Executor
trackUsageInfo bool
operationContent bool
}

type operationPlannerOpts struct {
operationContent bool
}

type ExecutionPlanCache[K any, V any] interface {
// Get the value from the cache
Get(key K) (V, bool)
// Set the value in the cache with a cost. The cost depends on the cache implementation
Set(key K, value V, cost int64) bool
// Iterate over all items in the cache (non-deterministic)
IterValues(cb func(v V) (stop bool))
// Close the cache and free resources
Close()
}

func NewOperationPlanner(executor *Executor, planCache ExecutionPlanCache[uint64, *planWithMetaData]) *OperationPlanner {
func NewOperationPlanner(executor *Executor, planCache ExecutionPlanCache[uint64, *planWithMetaData], storeContent bool) *OperationPlanner {
return &OperationPlanner{
planCache: planCache,
executor: executor,
trackUsageInfo: executor.TrackUsageInfo,
planCache: planCache,
executor: executor,
trackUsageInfo: executor.TrackUsageInfo,
operationContent: storeContent,
}
}

func (p *OperationPlanner) preparePlan(ctx *operationContext) (*planWithMetaData, error) {
func (p *OperationPlanner) preparePlan(ctx *operationContext, opts operationPlannerOpts) (*planWithMetaData, error) {
doc, report := astparser.ParseGraphqlDocumentString(ctx.content)
if report.HasErrors() {
return nil, &reportError{report: &report}
Expand Down Expand Up @@ -81,6 +90,10 @@ func (p *OperationPlanner) preparePlan(ctx *operationContext) (*planWithMetaData
schemaDocument: p.executor.RouterSchema,
}

if opts.operationContent {
out.content = ctx.Content()
}

if p.trackUsageInfo {
out.typeFieldUsageInfo = graphqlschemausage.GetTypeFieldUsageInfo(preparedPlan)
out.argumentUsageInfo, err = graphqlschemausage.GetArgumentUsageInfo(&doc, p.executor.RouterSchema, ctx.variables, preparedPlan, ctx.remapVariables)
Expand All @@ -106,7 +119,7 @@ func (p *OperationPlanner) plan(opContext *operationContext, options PlanOptions
skipCache := options.TraceOptions.Enable || options.ExecutionOptions.IncludeQueryPlanInResponse

if skipCache {
prepared, err := p.preparePlan(opContext)
prepared, err := p.preparePlan(opContext, operationPlannerOpts{operationContent: false})
if err != nil {
return err
}
Expand Down Expand Up @@ -134,7 +147,7 @@ func (p *OperationPlanner) plan(opContext *operationContext, options PlanOptions
// this ensures that we only prepare the plan once for this operation ID
operationIDStr := strconv.FormatUint(operationID, 10)
sharedPreparedPlan, err, _ := p.sf.Do(operationIDStr, func() (interface{}, error) {
prepared, err := p.preparePlan(opContext)
prepared, err := p.preparePlan(opContext, operationPlannerOpts{operationContent: p.operationContent})
if err != nil {
return nil, err
}
Expand Down
Loading
Loading