-
Notifications
You must be signed in to change notification settings - Fork 208
feat(router): add connectrpc handler for graphql-to-grpc bridge #2379
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
asoorm
wants to merge
5
commits into
main
Choose a base branch
from
ahmet/eng-8277-connect-rpc-handler-final
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
2635b33
feat(router): add ConnectRPC server for gRPC/Connect protocol transco…
asoorm f6f702c
refactor(router-tests): replace golden tests with inline assertions i…
asoorm c5550ac
refactor(router-tests): remove duplicate proto/graphql services
asoorm 4d64865
test(router-tests): remove redundant and flaky connectrpc tests
asoorm 4559131
test(router): remove trivial and implementation-detail connectrpc tests
asoorm File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,305 @@ | ||
| package integration | ||
|
|
||
| import ( | ||
| "context" | ||
| "crypto/tls" | ||
| "encoding/json" | ||
| "net" | ||
| "net/http" | ||
| "testing" | ||
|
|
||
| "connectrpc.com/connect" | ||
| "github.com/stretchr/testify/assert" | ||
| "github.com/stretchr/testify/require" | ||
| employeev1 "github.com/wundergraph/cosmo/router-tests/testdata/connectrpc/client/employee.v1" | ||
| "github.com/wundergraph/cosmo/router-tests/testdata/connectrpc/client/employee.v1/employeev1connect" | ||
| "golang.org/x/net/http2" | ||
| ) | ||
|
|
||
| // TestConnectRPC_ClientProtocols tests all three RPC protocols (Connect, gRPC, gRPC-Web) | ||
| // using generated client code to ensure proper multi-protocol support | ||
| func TestConnectRPC_ClientProtocols(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| // Use shared helper for employee GraphQL handler | ||
| ts := NewTestConnectRPCServer(t, ConnectRPCServerOptions{ | ||
| GraphQLHandler: EmployeeGraphQLHandler(), | ||
| }) | ||
| defer ts.Close() | ||
|
|
||
| err := ts.Start() | ||
asoorm marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| require.NoError(t, err) | ||
|
|
||
| baseURL := "http://" + ts.Addr().String() | ||
|
|
||
| expectedEmployee := `{ | ||
| "id": 1, | ||
| "tag": "employee-1", | ||
| "details": { | ||
| "forename": "John", | ||
| "surname": "Doe", | ||
| "pets": [{"name": "Fluffy"}], | ||
| "location": {"key": {"name": "San Francisco"}} | ||
| } | ||
| }` | ||
|
|
||
| t.Run("Connect protocol", func(t *testing.T) { | ||
| client := employeev1connect.NewEmployeeServiceClient( | ||
| http.DefaultClient, | ||
| baseURL, | ||
| // Connect protocol is the default | ||
| ) | ||
|
|
||
| req := connect.NewRequest(&employeev1.GetEmployeeByIdRequest{ | ||
| EmployeeId: 1, | ||
| }) | ||
|
|
||
| resp, err := client.GetEmployeeById(context.Background(), req) | ||
| require.NoError(t, err) | ||
| require.NotNil(t, resp.Msg.Employee) | ||
|
|
||
| employeeJSON, err := json.Marshal(resp.Msg.Employee) | ||
| require.NoError(t, err) | ||
| require.JSONEq(t, expectedEmployee, string(employeeJSON)) | ||
| }) | ||
|
|
||
| t.Run("gRPC protocol", func(t *testing.T) { | ||
| // Create HTTP client with h2c support for gRPC over HTTP/1.1 | ||
| // This mimics what grpcurl does with -plaintext flag | ||
| h2cClient := &http.Client{ | ||
| Transport: &http2.Transport{ | ||
| // Allow HTTP/2 without TLS (h2c) | ||
| AllowHTTP: true, | ||
| // Use a custom dialer that doesn't require TLS | ||
| DialTLSContext: func(ctx context.Context, network, addr string, cfg *tls.Config) (net.Conn, error) { | ||
| var d net.Dialer | ||
| return d.DialContext(ctx, network, addr) | ||
| }, | ||
| }, | ||
| } | ||
|
|
||
| client := employeev1connect.NewEmployeeServiceClient( | ||
| h2cClient, | ||
| baseURL, | ||
| connect.WithGRPC(), | ||
| ) | ||
|
|
||
| req := connect.NewRequest(&employeev1.GetEmployeeByIdRequest{ | ||
| EmployeeId: 1, | ||
| }) | ||
|
|
||
| resp, err := client.GetEmployeeById(context.Background(), req) | ||
| require.NoError(t, err) | ||
| require.NotNil(t, resp.Msg.Employee) | ||
|
|
||
| employeeJSON, err := json.Marshal(resp.Msg.Employee) | ||
| require.NoError(t, err) | ||
| require.JSONEq(t, expectedEmployee, string(employeeJSON)) | ||
| }) | ||
|
|
||
| t.Run("gRPC-Web protocol", func(t *testing.T) { | ||
| client := employeev1connect.NewEmployeeServiceClient( | ||
| http.DefaultClient, | ||
| baseURL, | ||
| connect.WithGRPCWeb(), | ||
| ) | ||
|
|
||
| req := connect.NewRequest(&employeev1.GetEmployeeByIdRequest{ | ||
| EmployeeId: 1, | ||
| }) | ||
|
|
||
| resp, err := client.GetEmployeeById(context.Background(), req) | ||
| require.NoError(t, err) | ||
| require.NotNil(t, resp.Msg.Employee) | ||
|
|
||
| employeeJSON, err := json.Marshal(resp.Msg.Employee) | ||
| require.NoError(t, err) | ||
| require.JSONEq(t, expectedEmployee, string(employeeJSON)) | ||
| }) | ||
| } | ||
|
|
||
| // TestConnectRPC_ClientErrorHandling tests error scenarios with generated client | ||
| func TestConnectRPC_ClientErrorHandling(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| t.Run("GraphQL error with no data returns error", func(t *testing.T) { | ||
| ts := NewTestConnectRPCServer(t, ConnectRPCServerOptions{ | ||
| GraphQLHandler: ErrorGraphQLHandler("Employee not found"), | ||
| }) | ||
|
|
||
| err := ts.Start() | ||
| require.NoError(t, err) | ||
|
|
||
| client := employeev1connect.NewEmployeeServiceClient( | ||
| http.DefaultClient, | ||
| "http://"+ts.Addr().String(), | ||
| ) | ||
|
|
||
| req := connect.NewRequest(&employeev1.GetEmployeeByIdRequest{ | ||
| EmployeeId: 999, | ||
| }) | ||
|
|
||
| _, err = client.GetEmployeeById(context.Background(), req) | ||
| require.Error(t, err) | ||
|
|
||
| var connectErr *connect.Error | ||
| require.ErrorAs(t, err, &connectErr) | ||
| // GraphQL errors use CodeUnknown (not CodeInternal which implies server bugs) | ||
| assert.Equal(t, connect.CodeUnknown, connectErr.Code()) | ||
| assert.Contains(t, connectErr.Message(), "Employee not found") | ||
| }) | ||
|
|
||
| t.Run("GraphQL error with partial data returns error", func(t *testing.T) { | ||
| // Custom handler for partial data with errors | ||
| // Per GraphQL spec, errors at top level indicate a failure even with partial data | ||
| handler := func(w http.ResponseWriter, r *http.Request) { | ||
| w.Header().Set("Content-Type", "application/json") | ||
| w.WriteHeader(http.StatusOK) | ||
| _, _ = w.Write([]byte(`{ | ||
| "data": { | ||
| "employee": { | ||
| "id": 1, | ||
| "tag": "employee-1", | ||
| "details": { | ||
| "forename": "John", | ||
| "surname": "Doe" | ||
| } | ||
| } | ||
| }, | ||
| "errors": [{"message": "Could not fetch pets"}] | ||
| }`)) | ||
| } | ||
|
|
||
| ts := NewTestConnectRPCServer(t, ConnectRPCServerOptions{ | ||
| GraphQLHandler: handler, | ||
| }) | ||
|
|
||
| err := ts.Start() | ||
| require.NoError(t, err) | ||
|
|
||
| client := employeev1connect.NewEmployeeServiceClient( | ||
| http.DefaultClient, | ||
| "http://"+ts.Addr().String(), | ||
| ) | ||
|
|
||
| req := connect.NewRequest(&employeev1.GetEmployeeByIdRequest{ | ||
| EmployeeId: 1, | ||
| }) | ||
|
|
||
| _, err = client.GetEmployeeById(context.Background(), req) | ||
| // Per GraphQL spec, errors at top level should result in an error | ||
| require.Error(t, err) | ||
|
|
||
| var connectErr *connect.Error | ||
| require.ErrorAs(t, err, &connectErr) | ||
| assert.Equal(t, connect.CodeUnknown, connectErr.Code()) | ||
| assert.Contains(t, connectErr.Message(), "GraphQL partial success with errors") | ||
| }) | ||
|
|
||
| t.Run("HTTP 404 maps to CodeNotFound", func(t *testing.T) { | ||
| ts := NewTestConnectRPCServer(t, ConnectRPCServerOptions{ | ||
| GraphQLHandler: HTTPErrorHandler(http.StatusNotFound, "Not Found"), | ||
| }) | ||
|
|
||
| err := ts.Start() | ||
| require.NoError(t, err) | ||
|
|
||
| client := employeev1connect.NewEmployeeServiceClient( | ||
| http.DefaultClient, | ||
| "http://"+ts.Addr().String(), | ||
| ) | ||
|
|
||
| req := connect.NewRequest(&employeev1.GetEmployeeByIdRequest{ | ||
| EmployeeId: 1, | ||
| }) | ||
|
|
||
| _, err = client.GetEmployeeById(context.Background(), req) | ||
| require.Error(t, err) | ||
|
|
||
| var connectErr *connect.Error | ||
| require.ErrorAs(t, err, &connectErr) | ||
| assert.Equal(t, connect.CodeNotFound, connectErr.Code()) | ||
| }) | ||
|
|
||
| t.Run("HTTP 500 maps to CodeInternal", func(t *testing.T) { | ||
| ts := NewTestConnectRPCServer(t, ConnectRPCServerOptions{ | ||
| GraphQLHandler: HTTPErrorHandler(http.StatusInternalServerError, "Internal Server Error"), | ||
| }) | ||
|
|
||
| err := ts.Start() | ||
| require.NoError(t, err) | ||
|
|
||
| client := employeev1connect.NewEmployeeServiceClient( | ||
| http.DefaultClient, | ||
| "http://"+ts.Addr().String(), | ||
| ) | ||
|
|
||
| req := connect.NewRequest(&employeev1.GetEmployeeByIdRequest{ | ||
| EmployeeId: 1, | ||
| }) | ||
|
|
||
| _, err = client.GetEmployeeById(context.Background(), req) | ||
| require.Error(t, err) | ||
|
|
||
| var connectErr *connect.Error | ||
| require.ErrorAs(t, err, &connectErr) | ||
| assert.Equal(t, connect.CodeInternal, connectErr.Code()) | ||
| }) | ||
|
|
||
| t.Run("multiple GraphQL errors with extension codes", func(t *testing.T) { | ||
| // Simulate a GraphQL response with multiple errors containing extension codes | ||
| handler := func(w http.ResponseWriter, r *http.Request) { | ||
| w.Header().Set("Content-Type", "application/json") | ||
| w.WriteHeader(http.StatusOK) | ||
| _, _ = w.Write([]byte(`{ | ||
| "data": null, | ||
| "errors": [ | ||
| { | ||
| "message": "You are not authorized to access this resource", | ||
| "path": ["employee"], | ||
| "extensions": { | ||
| "code": "UNAUTHORIZED", | ||
| "statusCode": 401 | ||
| } | ||
| }, | ||
| { | ||
| "message": "Rate limit exceeded", | ||
| "path": ["employee"], | ||
| "extensions": { | ||
| "code": "RATE_LIMITED", | ||
| "retryAfter": 60 | ||
| } | ||
| } | ||
| ] | ||
| }`)) | ||
| } | ||
|
|
||
| ts := NewTestConnectRPCServer(t, ConnectRPCServerOptions{ | ||
| GraphQLHandler: handler, | ||
| }) | ||
|
|
||
| err := ts.Start() | ||
| require.NoError(t, err) | ||
|
|
||
| client := employeev1connect.NewEmployeeServiceClient( | ||
| http.DefaultClient, | ||
| "http://"+ts.Addr().String(), | ||
| ) | ||
|
|
||
| req := connect.NewRequest(&employeev1.GetEmployeeByIdRequest{ | ||
| EmployeeId: 1, | ||
| }) | ||
|
|
||
| _, err = client.GetEmployeeById(context.Background(), req) | ||
| require.Error(t, err) | ||
|
|
||
| var connectErr *connect.Error | ||
| require.ErrorAs(t, err, &connectErr) | ||
| assert.Equal(t, connect.CodeUnknown, connectErr.Code()) | ||
|
|
||
| // The error message contains the first GraphQL error and indicates additional errors | ||
| // Format: "GraphQL operation failed: <first error message> (and N more errors)" | ||
| assert.Contains(t, connectErr.Message(), "You are not authorized to access this resource") | ||
| assert.Contains(t, connectErr.Message(), "and 1 more errors") | ||
| }) | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
Repository: wundergraph/cosmo
Length of output: 113
🏁 Script executed:
Repository: wundergraph/cosmo
Length of output: 2586
🏁 Script executed:
Repository: wundergraph/cosmo
Length of output: 166
Remove the explicit
defer ts.Close()as cleanup is already registered.NewTestConnectRPCServeralready registers cleanup viat.Cleanup(func() { ts.Close() }), making the explicitdefer ts.Close()redundant. WhileClose()is idempotent and handles multiple calls safely, the defer duplicates the cleanup unnecessarily.Suggested change
📝 Committable suggestion
🤖 Prompt for AI Agents