Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions router/core/engine_loader_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
rcontext "github.com/wundergraph/cosmo/router/internal/context"
"github.com/wundergraph/cosmo/router/internal/requestlogger"
"github.com/wundergraph/cosmo/router/internal/unique"
"github.com/wundergraph/cosmo/router/pkg/grpcconnector/grpcremote"
"github.com/wundergraph/cosmo/router/pkg/metric"
rotel "github.com/wundergraph/cosmo/router/pkg/otel"
"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve"
Expand Down Expand Up @@ -103,6 +104,12 @@ func (f *engineLoaderHooks) OnLoad(ctx context.Context, ds resolve.DataSourceInf
return ctx
}

// Inject HTTP headers into context for gRPC interceptor
// The gRPC interceptor will extract these headers and forward them as metadata
if reqContext.request != nil && reqContext.request.Header != nil {
ctx = grpcremote.WithHTTPHeaders(ctx, reqContext.request.Header)
}

ctx, _ = f.tracer.Start(ctx, "Engine - Fetch",
trace.WithAttributes([]attribute.KeyValue{
rotel.WgSubgraphName.String(ds.Name),
Expand Down
20 changes: 18 additions & 2 deletions router/core/graph_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1576,9 +1576,25 @@ func (s *graphServer) setupConnector(

pluginConfig := grpcConfig.GetPlugin()
if pluginConfig == nil {
// Extract header forwarding configuration for this subgraph
var headersToForward []string
if s.headerRules != nil {
dataSourceRules := FetchURLRules(s.headerRules, configSubgraphs, sg.RoutingUrl)
forwardedHeaders, _, err := PropagatedHeaders(dataSourceRules)
if err != nil {
s.logger.Warn("error parsing header rules for gRPC subgraph, headers will not be forwarded",
zap.String("subgraph", sg.Name),
zap.Error(err),
)
} else {
headersToForward = forwardedHeaders
}
}

remoteProvider, err := grpcremote.NewRemoteGRPCProvider(grpcremote.RemoteGRPCProviderConfig{
Logger: s.logger,
Endpoint: sg.RoutingUrl,
Logger: s.logger,
Endpoint: sg.RoutingUrl,
HeadersToForward: headersToForward,
})
Comment on lines +1579 to 1598
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# Find the PropagatedHeaders function definition and understand what it does
rg -n "func PropagatedHeaders" --type go -A 30

Repository: wundergraph/cosmo

Length of output: 2613


🏁 Script executed:

# Search for how headerNameRegexps is used throughout the codebase
rg -n "headerNameRegexps" --type go -B 2 -A 5

Repository: wundergraph/cosmo

Length of output: 1824


🏁 Script executed:

# Check if headerNameRegexps is used in any gRPC-related code
rg -n "gRPC|grpc" --type go -A 5 | grep -A 5 -B 5 "headerNameRegexps" || echo "No gRPC + headerNameRegexps found together"

Repository: wundergraph/cosmo

Length of output: 103


🏁 Script executed:

# Look at the header rules configuration in the schema
cat router/pkg/config/config.schema.json | grep -A 20 '"header' | head -50

Repository: wundergraph/cosmo

Length of output: 2285


🏁 Script executed:

# Find all calls to PropagatedHeaders
rg -n "PropagatedHeaders\(" --type go -B 2 -A 5

Repository: wundergraph/cosmo

Length of output: 2249


🏁 Script executed:

# Search for usage of headerNameRegexps in graphql_datasource or other files
rg -n "headerNameRegexps" --type go --type ts

Repository: wundergraph/cosmo

Length of output: 458


🏁 Script executed:

# Check how regex headers are handled in HTTP datasources
rg -n "RegularExpression" --type go -B 3 -A 3

Repository: wundergraph/cosmo

Length of output: 1945


🏁 Script executed:

# Check the RemoteGRPCProviderConfig struct definition
rg -n "type RemoteGRPCProviderConfig" --type go -A 15

Repository: wundergraph/cosmo

Length of output: 1673


🏁 Script executed:

# Look for grpcremote package to understand gRPC provider capabilities
fd -t f "grpcremote" --type go | head -5

Repository: wundergraph/cosmo

Length of output: 230


🏁 Script executed:

# Compare with HTTP datasource to see how it uses regex headers
rg -n "ForwardedClientHeaderRegularExpressions" --type go -B 5 -A 5

Repository: wundergraph/cosmo

Length of output: 1091


🏁 Script executed:

# Look for gRPC interceptor implementations that might handle headers
rg -n "interceptor|Interceptor" router/pkg/grpcconnector/ --type go -B 2 -A 5

Repository: wundergraph/cosmo

Length of output: 2895


🏁 Script executed:

# Check if headers are processed anywhere in the gRPC remote provider
rg -n "headersToForward|HeadersToForward" --type go -B 3 -A 10

Repository: wundergraph/cosmo

Length of output: 9457


Regex-based header propagation rules are not supported for gRPC subgraphs.

The PropagatedHeaders function returns both explicit header names and regex-based patterns, but only the explicit names are forwarded to gRPC subgraphs (the regex patterns are discarded). The HeaderForwardingInterceptor only supports exact header name matching and has no infrastructure for evaluating regex patterns. Consider adding a comment explaining this is a deliberate limitation of the gRPC implementation, as HTTP subscriptions support both header name and regex-based propagation rules.

🤖 Prompt for AI Agents
In `@router/core/graph_server.go` around lines 1579 - 1598, The PropagatedHeaders
result currently discards regex-based patterns when building headersToForward
for gRPC subgraphs; update the block around PropagatedHeaders/FetchURLRules
(used when s.headerRules != nil) and the grpcremote.NewRemoteGRPCProvider call
to include a brief comment that regex-based header propagation is intentionally
not supported for gRPC subgraphs because the HeaderForwardingInterceptor only
accepts exact header names (HeadersToForward) and lacks runtime regex
evaluation, while HTTP subscriptions support both explicit names and regex
rules—mention the deliberate limitation and reference PropagatedHeaders,
HeaderForwardingInterceptor, and HeadersToForward so future maintainers
understand why regex patterns are ignored.


if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion router/pkg/grpcconnector/grpccommon/grpc_plugin_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package grpccommon
import (
"context"
"errors"
"go.opentelemetry.io/otel/trace"
"io"

"go.opentelemetry.io/otel/trace"

"sync"
"sync/atomic"
"time"
Expand Down
20 changes: 15 additions & 5 deletions router/pkg/grpcconnector/grpcremote/grpc_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ type RemoteGRPCProviderConfig struct {
Logger *zap.Logger
// Endpoint is the URL of the gRPC server to connect to.
Endpoint string
// HeadersToForward is a list of HTTP header names to forward as gRPC metadata.
HeadersToForward []string
}

// RemoteGRPCProvider is a client provider that manages a gRPC client connection to a standalone gRPC server.
// It is used to connect to a standalone gRPC server that is not part of the cosmo cluster.
// The provider maintains a single client connection and provides thread-safe access to it.
type RemoteGRPCProvider struct {
logger *zap.Logger
endpoint string
logger *zap.Logger
endpoint string
headersToForward []string

cc grpc.ClientConnInterface
mu sync.RWMutex
Expand All @@ -46,8 +49,9 @@ func NewRemoteGRPCProvider(config RemoteGRPCProviderConfig) (*RemoteGRPCProvider
}

return &RemoteGRPCProvider{
logger: config.Logger,
endpoint: config.Endpoint,
logger: config.Logger,
endpoint: config.Endpoint,
headersToForward: config.HeadersToForward,
}, nil
}

Expand All @@ -64,7 +68,13 @@ func (g *RemoteGRPCProvider) GetClient() grpc.ClientConnInterface {
// It parses the endpoint URL and creates a new insecure gRPC connection.
func (g *RemoteGRPCProvider) Start(ctx context.Context) error {
if g.cc == nil {
clientConn, err := grpc.NewClient(g.endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()))
// Create gRPC client with header forwarding interceptor
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUnaryInterceptor(HeaderForwardingInterceptor(g.headersToForward)),
}

clientConn, err := grpc.NewClient(g.endpoint, opts...)
if err != nil {
return fmt.Errorf("failed to create client connection: %w", err)
}
Expand Down
79 changes: 79 additions & 0 deletions router/pkg/grpcconnector/grpcremote/header_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package grpcremote

import (
"context"
"net/http"

"go.opentelemetry.io/otel"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

// metadataCarrier adapts metadata.MD to the TextMapCarrier interface for OTEL propagation
type metadataCarrier struct {
metadata.MD
}

func (mc metadataCarrier) Get(key string) string {
values := mc.MD.Get(key)
if len(values) == 0 {
return ""
}
return values[0]
}

func (mc metadataCarrier) Set(key string, value string) {
mc.MD.Set(key, value)
}

func (mc metadataCarrier) Keys() []string {
keys := make([]string, 0, len(mc.MD))
for k := range mc.MD {
keys = append(keys, k)
}
return keys
}

// httpHeadersKey is the context key for storing HTTP headers to forward
type httpHeadersKey struct{}

// HeaderForwardingInterceptor creates a gRPC unary client interceptor that:
// 1. Extracts headers stored in the context
// 2. Forwards configured headers as gRPC metadata
// 3. Injects OTEL trace context into gRPC metadata
func HeaderForwardingInterceptor(headersToForward []string) grpc.UnaryClientInterceptor {
return func(
ctx context.Context,
method string,
req, reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
md := make(metadata.MD)

// Inject OTEL trace context
otel.GetTextMapPropagator().Inject(ctx, metadataCarrier{md})

// Extract HTTP headers from context if available
if httpHeaders, ok := ctx.Value(httpHeadersKey{}).(http.Header); ok && httpHeaders != nil {
// Forward configured headers from HTTP headers to gRPC metadata
for _, headerName := range headersToForward {
if values := httpHeaders.Values(headerName); len(values) > 0 {
// gRPC metadata keys are lowercase
md.Append(headerName, values...)
}
Comment on lines +61 to +65
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Header names should be lowercased before appending to gRPC metadata.

The comment on line 63 correctly notes that "gRPC metadata keys are lowercase", but headerName is used directly without conversion. HTTP header names are case-insensitive and may arrive in mixed case (e.g., X-Request-ID), which could cause mismatches when the gRPC server looks up headers using lowercase keys.

🔧 Proposed fix to lowercase header names
+import "strings"
+
 			// Forward configured headers from HTTP headers to gRPC metadata
 			for _, headerName := range headersToForward {
 				if values := httpHeaders.Values(headerName); len(values) > 0 {
 					// gRPC metadata keys are lowercase
-					md.Append(headerName, values...)
+					md.Append(strings.ToLower(headerName), values...)
 				}
 			}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for _, headerName := range headersToForward {
if values := httpHeaders.Values(headerName); len(values) > 0 {
// gRPC metadata keys are lowercase
md.Append(headerName, values...)
}
for _, headerName := range headersToForward {
if values := httpHeaders.Values(headerName); len(values) > 0 {
// gRPC metadata keys are lowercase
md.Append(strings.ToLower(headerName), values...)
}
🤖 Prompt for AI Agents
In @router/pkg/grpcconnector/grpcremote/header_interceptor.go around lines 61 -
65, The loop that appends HTTP headers to gRPC metadata uses headerName directly
(see headersToForward, httpHeaders.Values and md.Append) but gRPC metadata keys
must be lowercase; update the code to lowercase the header name (e.g., via
strings.ToLower) before calling md.Append so keys like "X-Request-ID" become
"x-request-id" when added to metadata.

}
}

// Create outgoing context with metadata
ctx = metadata.NewOutgoingContext(ctx, md)

return invoker(ctx, method, req, reply, cc, opts...)
}
}

// WithHTTPHeaders stores HTTP headers in the context for later use by the interceptor
func WithHTTPHeaders(ctx context.Context, headers http.Header) context.Context {
return context.WithValue(ctx, httpHeadersKey{}, headers)
}