Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
f6d76f5
feat: provide map of field arguments in operation context
dkorittki Dec 2, 2025
ec45e9e
chore: use a walker
dkorittki Dec 10, 2025
c9cc1e6
fix: only map arguments when needed
dkorittki Dec 11, 2025
9bf3562
chore: clean up
dkorittki Dec 11, 2025
8477e1a
chore: rename FieldArguments() to Arguments()
dkorittki Dec 11, 2025
c5d995f
chore: add unit tests
dkorittki Dec 11, 2025
4800a8c
chore: improve godoc
dkorittki Dec 11, 2025
130bd22
chore: add router tests
dkorittki Dec 11, 2025
9f0c25f
chore: simplify Get method
dkorittki Dec 11, 2025
08ca36e
Merge branch 'main' into dominik/eng-8582-support-access-to-field-arg…
dkorittki Dec 12, 2025
b28d9a7
chore: remove garbage, add comments
dkorittki Dec 12, 2025
a63ff06
fix: log a warning when value cant resolve
dkorittki Dec 12, 2025
d6fcf63
chore: add test verifying direct value resolving
dkorittki Dec 12, 2025
cdca088
chore: nil checks
dkorittki Dec 12, 2025
b9cbf97
fix: support aliased fields
dkorittki Dec 12, 2025
a14b7c4
fix: check report for errors
dkorittki Dec 12, 2025
33d8dfc
chore: fix typo in comments
dkorittki Dec 12, 2025
1c1490e
Merge branch 'main' into dominik/eng-8582-support-access-to-field-arg…
dkorittki Dec 15, 2025
0766e76
chore: use custom type for field args
dkorittki Jan 2, 2026
5dbc44a
feat: use engines field arg mapping
dkorittki Jan 26, 2026
9395881
fix: avoid map creation during request processing
dkorittki Jan 27, 2026
9f0e470
feat: populate option to disable field arg mapping
dkorittki Feb 2, 2026
b102472
chore: use corresponding graph-go-tools version
dkorittki Feb 2, 2026
86ce4f7
Merge branch 'main'
dkorittki Feb 2, 2026
72b7375
fix: prefix with mutation
dkorittki Feb 2, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion router-tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
github.com/wundergraph/cosmo/demo/pkg/subgraphs/projects v0.0.0-20250715110703-10f2e5f9c79e
github.com/wundergraph/cosmo/router v0.0.0-20251125205644-175f80c4e6d9
github.com/wundergraph/cosmo/router-plugin v0.0.0-20250808194725-de123ba1c65e
github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.245
github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.246.0.20260202150435-e2c713b42e65
go.opentelemetry.io/otel v1.36.0
go.opentelemetry.io/otel/sdk v1.36.0
go.opentelemetry.io/otel/sdk/metric v1.36.0
Expand Down
4 changes: 2 additions & 2 deletions router-tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,8 @@ github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/
github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw=
github.com/wundergraph/astjson v0.0.0-20250106123708-be463c97e083 h1:8/D7f8gKxTBjW+SZK4mhxTTBVpxcqeBgWF1Rfmltbfk=
github.com/wundergraph/astjson v0.0.0-20250106123708-be463c97e083/go.mod h1:eOTL6acwctsN4F3b7YE+eE2t8zcJ/doLm9sZzsxxxrE=
github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.245 h1:MYewlXgIhI9jusocPUeyo346J3M5cqzc6ddru1qp+S8=
github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.245/go.mod h1:mX25ASEQiKamxaFSK6NZihh0oDCigIuzro30up4mFH4=
github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.246.0.20260202150435-e2c713b42e65 h1:o5wqeMnGK2GdyTXlcLKZzPRrVtpzT55Iua5zyMT4ErU=
github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.246.0.20260202150435-e2c713b42e65/go.mod h1:mX25ASEQiKamxaFSK6NZihh0oDCigIuzro30up4mFH4=
github.com/xrash/smetrics v0.0.0-20250705151800-55b8f293f342 h1:FnBeRrxr7OU4VvAzt5X7s6266i6cSVkkFPS0TuXWbIg=
github.com/xrash/smetrics v0.0.0-20250705151800-55b8f293f342/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM=
github.com/yosida95/uritemplate/v3 v3.0.2 h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4=
Expand Down
72 changes: 72 additions & 0 deletions router-tests/modules/start_subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,4 +711,76 @@ func TestStartSubscriptionHook(t *testing.T) {
assert.Equal(t, int32(1), customModule.HookCallCount.Load())
})
})

t.Run("Test StartSubscription hook can access field arguments", func(t *testing.T) {
t.Parallel()

// This test verifies that the subscription start hook can access GraphQL field arguments
// via ctx.Operation().Arguments().

var capturedEmployeeID int

customModule := &start_subscription.StartSubscriptionModule{
HookCallCount: &atomic.Int32{},
Callback: func(ctx core.SubscriptionOnStartHandlerContext) error {
args := ctx.Operation().Arguments()
if args != nil {
employeeIDArg := args.Get("subscription.employeeUpdatedMyKafka.employeeID")
if employeeIDArg != nil {
capturedEmployeeID = employeeIDArg.GetInt()
}
}
return nil
},
}

cfg := config.Config{
Graph: config.Graph{},
Modules: map[string]interface{}{
"startSubscriptionModule": customModule,
},
}

testenv.Run(t, &testenv.Config{
RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate,
EnableKafka: true,
RouterOptions: []core.Option{
core.WithModulesConfig(cfg.Modules),
core.WithCustomModules(&start_subscription.StartSubscriptionModule{}),
},
}, func(t *testing.T, xEnv *testenv.Environment) {
var subscriptionOne struct {
employeeUpdatedMyKafka struct {
ID float64 `graphql:"id"`
} `graphql:"employeeUpdatedMyKafka(employeeID: $employeeID)"`
}

surl := xEnv.GraphQLWebSocketSubscriptionURL()
client := graphql.NewSubscriptionClient(surl)

vars := map[string]interface{}{
"employeeID": 7,
}
subscriptionOneID, err := client.Subscribe(&subscriptionOne, vars, func(dataValue []byte, errValue error) error {
return nil
})
require.NoError(t, err)
require.NotEmpty(t, subscriptionOneID)

clientRunCh := make(chan error)
go func() {
clientRunCh <- client.Run()
}()

xEnv.WaitForSubscriptionCount(1, time.Second*10)

require.NoError(t, client.Close())
testenv.AwaitChannelWithT(t, time.Second*10, clientRunCh, func(t *testing.T, err error) {
require.NoError(t, err)
}, "unable to close client before timeout")

assert.Equal(t, int32(1), customModule.HookCallCount.Load())
assert.Equal(t, 7, capturedEmployeeID, "expected to capture employeeID argument value")
})
})
}
48 changes: 48 additions & 0 deletions router-tests/modules/stream_publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,4 +358,52 @@ func TestPublishHook(t *testing.T) {
require.Equal(t, []byte("3"), header.Value)
})
})

t.Run("Test Publish hook can access field arguments", func(t *testing.T) {
t.Parallel()

// This test verifies that the publish hook can access GraphQL field arguments
// via ctx.Operation().Arguments().

var capturedEmployeeID int

customModule := stream_publish.PublishModule{
HookCallCount: &atomic.Int32{},
Callback: func(ctx core.StreamPublishEventHandlerContext, events datasource.StreamEvents) (datasource.StreamEvents, error) {
args := ctx.Operation().Arguments()
if args != nil {
employeeIDArg := args.Get("mutation.updateEmployeeMyKafka.employeeID")
if employeeIDArg != nil {
capturedEmployeeID = employeeIDArg.GetInt()
}
}
return events, nil
},
}

cfg := config.Config{
Graph: config.Graph{},
Modules: map[string]interface{}{
"publishModule": customModule,
},
}

testenv.Run(t, &testenv.Config{
RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate,
EnableKafka: true,
RouterOptions: []core.Option{
core.WithModulesConfig(cfg.Modules),
core.WithCustomModules(&stream_publish.PublishModule{}),
},
}, func(t *testing.T, xEnv *testenv.Environment) {
events.KafkaEnsureTopicExists(t, xEnv, time.Second, "employeeUpdated")
resOne := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: `mutation { updateEmployeeMyKafka(employeeID: 5, update: {name: "test"}) { success } }`,
})
require.JSONEq(t, `{"data":{"updateEmployeeMyKafka":{"success":true}}}`, resOne.Body)

assert.Equal(t, int32(1), customModule.HookCallCount.Load())
assert.Equal(t, 5, capturedEmployeeID, "expected to capture employeeID argument value")
})
})
}
87 changes: 87 additions & 0 deletions router-tests/modules/stream_receive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -963,4 +963,91 @@ func TestReceiveHook(t *testing.T) {
assert.Equal(t, int32(3), customModule.HookCallCount.Load())
})
})

t.Run("Test Receive hook can access field arguments", func(t *testing.T) {
t.Parallel()

// This test verifies that the receive hook can access GraphQL field arguments
// via ctx.Operation().Arguments().

var capturedEmployeeID int

customModule := stream_receive.StreamReceiveModule{
HookCallCount: &atomic.Int32{},
Callback: func(ctx core.StreamReceiveEventHandlerContext, events datasource.StreamEvents) (datasource.StreamEvents, error) {
args := ctx.Operation().Arguments()
if args != nil {
employeeIDArg := args.Get("subscription.employeeUpdatedMyKafka.employeeID")
if employeeIDArg != nil {
capturedEmployeeID = employeeIDArg.GetInt()
}
}
return events, nil
},
}

cfg := config.Config{
Graph: config.Graph{},
Modules: map[string]interface{}{
"streamReceiveModule": customModule,
},
}

testenv.Run(t, &testenv.Config{
RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate,
EnableKafka: true,
RouterOptions: []core.Option{
core.WithModulesConfig(cfg.Modules),
core.WithCustomModules(&stream_receive.StreamReceiveModule{}),
},
}, func(t *testing.T, xEnv *testenv.Environment) {
topics := []string{"employeeUpdated"}
events.KafkaEnsureTopicExists(t, xEnv, time.Second, topics...)

var subscriptionOne struct {
employeeUpdatedMyKafka struct {
ID float64 `graphql:"id"`
} `graphql:"employeeUpdatedMyKafka(employeeID: 3)"`
}

surl := xEnv.GraphQLWebSocketSubscriptionURL()
client := graphql.NewSubscriptionClient(surl)

type kafkaSubscriptionArgs struct {
dataValue []byte
errValue error
}
subscriptionArgsCh := make(chan kafkaSubscriptionArgs)
subscriptionOneID, err := client.Subscribe(&subscriptionOne, nil, func(dataValue []byte, errValue error) error {
subscriptionArgsCh <- kafkaSubscriptionArgs{
dataValue: dataValue,
errValue: errValue,
}
return nil
})
require.NoError(t, err)
require.NotEmpty(t, subscriptionOneID)

clientRunCh := make(chan error)
go func() {
clientRunCh <- client.Run()
}()

xEnv.WaitForSubscriptionCount(1, Timeout)

events.ProduceKafkaMessage(t, xEnv, Timeout, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`)

testenv.AwaitChannelWithT(t, Timeout, subscriptionArgsCh, func(t *testing.T, args kafkaSubscriptionArgs) {
require.NoError(t, args.errValue)
})

require.NoError(t, client.Close())
testenv.AwaitChannelWithT(t, Timeout, clientRunCh, func(t *testing.T, err error) {
require.NoError(t, err)
}, "unable to close client before timeout")

assert.Equal(t, int32(1), customModule.HookCallCount.Load())
assert.Equal(t, 3, capturedEmployeeID, "expected to capture employeeID argument value")
})
})
}
106 changes: 106 additions & 0 deletions router/core/arguments.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package core

import (
"github.com/wundergraph/astjson"
"github.com/wundergraph/graphql-go-tools/v2/pkg/astnormalization"
)

// Arguments allow access to GraphQL field arguments used by clients.
type Arguments struct {
// mapping maps "fieldPath.argumentName" to "variableName".
// For example: {"user.posts.limit": "a", "user.id": "userId"}
mapping astnormalization.FieldArgumentMapping

// variables contains the JSON-parsed variables from the request.
variables *astjson.Value
}

// NewArguments creates an Arguments instance.
func NewArguments(
mapping astnormalization.FieldArgumentMapping,
variables *astjson.Value,
) Arguments {
return Arguments{
mapping: mapping,
variables: variables,
}
}

// Get will return the value of the field argument at path.
//
// To access a specific field argument you need to provide
// the path in it's GraphQL operation via dot notation,
// prefixed by the root levels type.
//
// Get("rootfield_operation_type.rootfield_name.other.fields.argument_name")
//
// To access the storeId field argument of the operation
//
// subscription {
// orderUpdated(storeId: 1) {
// id
// status
// }
// }
//
// you need to call Get("subscription.orderUpdated.storeId") .
// You can also access deeper nested fields.
// For example you can access the categoryId field of the operation
//
// subscription {
// orderUpdated(storeId: 1) {
// lineItems(categoryId: 2) {
// id
// name
// }
// }
// }
//
// by calling Get("subscription.orderUpdated.lineItems.categoryId") .
//
// If you use aliases in operation you need to provide the alias name
// instead of the field name.
//
// query {
// a: user(id: "1") { name }
// b: user(id: "2") { name }
// }
//
// You need to call Get("query.a.id") or Get("query.b.id") respectively.
//
// If you want to access field arguments of fragments, you need to
// access it on one of the fields where the fragment is resolved.
//
// fragment GoldTrophies on RaceDrivers {
// trophies(color:"gold") {
// title
// }
// }
//
// subscription {
// driversFinish {
// name
// ... GoldTrophies
// }
// }
//
// If you want to access the "color" field argument, you need to
// call Get("subscription.driversFinish.trophies.color") .
// The same concept applies to inline fragments.
//
// If fa is nil, or f or a cannot be found, nil is returned.
func (fa *Arguments) Get(path string) *astjson.Value {
if fa == nil || len(fa.mapping) == 0 || fa.variables == nil {
return nil
}

// Look up variable name from field argument map
varName, ok := fa.mapping[path]
if !ok {
return nil
}

// Use the name to get the actual value from
// the operation contexts variables.
return fa.variables.Get(varName)
}
Loading
Loading