diff --git a/router/core/engine_loader_hooks.go b/router/core/engine_loader_hooks.go index 455ee6ed8f..6efe7cbd0e 100644 --- a/router/core/engine_loader_hooks.go +++ b/router/core/engine_loader_hooks.go @@ -12,6 +12,7 @@ import ( rcontext "github.com/wundergraph/cosmo/router/internal/context" "github.com/wundergraph/cosmo/router/internal/requestlogger" "github.com/wundergraph/cosmo/router/internal/unique" + "github.com/wundergraph/cosmo/router/pkg/grpcconnector/grpcremote" "github.com/wundergraph/cosmo/router/pkg/metric" rotel "github.com/wundergraph/cosmo/router/pkg/otel" "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve" @@ -103,6 +104,12 @@ func (f *engineLoaderHooks) OnLoad(ctx context.Context, ds resolve.DataSourceInf return ctx } + // Inject HTTP headers into context for gRPC interceptor + // The gRPC interceptor will extract these headers and forward them as metadata + if reqContext.request != nil && reqContext.request.Header != nil { + ctx = grpcremote.WithHTTPHeaders(ctx, reqContext.request.Header) + } + ctx, _ = f.tracer.Start(ctx, "Engine - Fetch", trace.WithAttributes([]attribute.KeyValue{ rotel.WgSubgraphName.String(ds.Name), diff --git a/router/core/graph_server.go b/router/core/graph_server.go index 87aa96331c..9bcdf3aaf9 100644 --- a/router/core/graph_server.go +++ b/router/core/graph_server.go @@ -1576,9 +1576,25 @@ func (s *graphServer) setupConnector( pluginConfig := grpcConfig.GetPlugin() if pluginConfig == nil { + // Extract header forwarding configuration for this subgraph + var headersToForward []string + if s.headerRules != nil { + dataSourceRules := FetchURLRules(s.headerRules, configSubgraphs, sg.RoutingUrl) + forwardedHeaders, _, err := PropagatedHeaders(dataSourceRules) + if err != nil { + s.logger.Warn("error parsing header rules for gRPC subgraph, headers will not be forwarded", + zap.String("subgraph", sg.Name), + zap.Error(err), + ) + } else { + headersToForward = forwardedHeaders + } + } + remoteProvider, err := grpcremote.NewRemoteGRPCProvider(grpcremote.RemoteGRPCProviderConfig{ - Logger: s.logger, - Endpoint: sg.RoutingUrl, + Logger: s.logger, + Endpoint: sg.RoutingUrl, + HeadersToForward: headersToForward, }) if err != nil { diff --git a/router/pkg/grpcconnector/grpccommon/grpc_plugin_client.go b/router/pkg/grpcconnector/grpccommon/grpc_plugin_client.go index 9e1a4ea43f..f368678fb0 100644 --- a/router/pkg/grpcconnector/grpccommon/grpc_plugin_client.go +++ b/router/pkg/grpcconnector/grpccommon/grpc_plugin_client.go @@ -3,9 +3,10 @@ package grpccommon import ( "context" "errors" - "go.opentelemetry.io/otel/trace" "io" + "go.opentelemetry.io/otel/trace" + "sync" "sync/atomic" "time" diff --git a/router/pkg/grpcconnector/grpcremote/grpc_remote.go b/router/pkg/grpcconnector/grpcremote/grpc_remote.go index c6d64bf12a..140a342cf9 100644 --- a/router/pkg/grpcconnector/grpcremote/grpc_remote.go +++ b/router/pkg/grpcconnector/grpcremote/grpc_remote.go @@ -18,14 +18,17 @@ type RemoteGRPCProviderConfig struct { Logger *zap.Logger // Endpoint is the URL of the gRPC server to connect to. Endpoint string + // HeadersToForward is a list of HTTP header names to forward as gRPC metadata. + HeadersToForward []string } // RemoteGRPCProvider is a client provider that manages a gRPC client connection to a standalone gRPC server. // It is used to connect to a standalone gRPC server that is not part of the cosmo cluster. // The provider maintains a single client connection and provides thread-safe access to it. type RemoteGRPCProvider struct { - logger *zap.Logger - endpoint string + logger *zap.Logger + endpoint string + headersToForward []string cc grpc.ClientConnInterface mu sync.RWMutex @@ -46,8 +49,9 @@ func NewRemoteGRPCProvider(config RemoteGRPCProviderConfig) (*RemoteGRPCProvider } return &RemoteGRPCProvider{ - logger: config.Logger, - endpoint: config.Endpoint, + logger: config.Logger, + endpoint: config.Endpoint, + headersToForward: config.HeadersToForward, }, nil } @@ -64,7 +68,13 @@ func (g *RemoteGRPCProvider) GetClient() grpc.ClientConnInterface { // It parses the endpoint URL and creates a new insecure gRPC connection. func (g *RemoteGRPCProvider) Start(ctx context.Context) error { if g.cc == nil { - clientConn, err := grpc.NewClient(g.endpoint, grpc.WithTransportCredentials(insecure.NewCredentials())) + // Create gRPC client with header forwarding interceptor + opts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithUnaryInterceptor(HeaderForwardingInterceptor(g.headersToForward)), + } + + clientConn, err := grpc.NewClient(g.endpoint, opts...) if err != nil { return fmt.Errorf("failed to create client connection: %w", err) } diff --git a/router/pkg/grpcconnector/grpcremote/header_interceptor.go b/router/pkg/grpcconnector/grpcremote/header_interceptor.go new file mode 100644 index 0000000000..79c100881f --- /dev/null +++ b/router/pkg/grpcconnector/grpcremote/header_interceptor.go @@ -0,0 +1,79 @@ +package grpcremote + +import ( + "context" + "net/http" + + "go.opentelemetry.io/otel" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +// metadataCarrier adapts metadata.MD to the TextMapCarrier interface for OTEL propagation +type metadataCarrier struct { + metadata.MD +} + +func (mc metadataCarrier) Get(key string) string { + values := mc.MD.Get(key) + if len(values) == 0 { + return "" + } + return values[0] +} + +func (mc metadataCarrier) Set(key string, value string) { + mc.MD.Set(key, value) +} + +func (mc metadataCarrier) Keys() []string { + keys := make([]string, 0, len(mc.MD)) + for k := range mc.MD { + keys = append(keys, k) + } + return keys +} + +// httpHeadersKey is the context key for storing HTTP headers to forward +type httpHeadersKey struct{} + +// HeaderForwardingInterceptor creates a gRPC unary client interceptor that: +// 1. Extracts headers stored in the context +// 2. Forwards configured headers as gRPC metadata +// 3. Injects OTEL trace context into gRPC metadata +func HeaderForwardingInterceptor(headersToForward []string) grpc.UnaryClientInterceptor { + return func( + ctx context.Context, + method string, + req, reply interface{}, + cc *grpc.ClientConn, + invoker grpc.UnaryInvoker, + opts ...grpc.CallOption, + ) error { + md := make(metadata.MD) + + // Inject OTEL trace context + otel.GetTextMapPropagator().Inject(ctx, metadataCarrier{md}) + + // Extract HTTP headers from context if available + if httpHeaders, ok := ctx.Value(httpHeadersKey{}).(http.Header); ok && httpHeaders != nil { + // Forward configured headers from HTTP headers to gRPC metadata + for _, headerName := range headersToForward { + if values := httpHeaders.Values(headerName); len(values) > 0 { + // gRPC metadata keys are lowercase + md.Append(headerName, values...) + } + } + } + + // Create outgoing context with metadata + ctx = metadata.NewOutgoingContext(ctx, md) + + return invoker(ctx, method, req, reply, cc, opts...) + } +} + +// WithHTTPHeaders stores HTTP headers in the context for later use by the interceptor +func WithHTTPHeaders(ctx context.Context, headers http.Header) context.Context { + return context.WithValue(ctx, httpHeadersKey{}, headers) +}