Skip to content

Commit 6db1626

Browse files
authored
feat: streaming APIExecutor (#296)
* feat: streaming raw requests feat: docs e.g. and changelog fix: changelog feat: cleanup feat: address comments feat: provide typereference overload example feat: nullable consumer feat: cleanup fix: async contract leak feat: refactor * feat: address coderabbit comments * feat: update changelog * feat: address comments
1 parent cc51890 commit 6db1626

13 files changed

Lines changed: 1132 additions & 175 deletions

File tree

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
## [Unreleased](https://github.com/openfga/java-sdk/compare/v0.9.6...HEAD)
44

5+
### Added
6+
- Introduced `StreamingApiExecutor` for executing HTTP requests to streaming endpoints not yet wrapped by the SDK (#296)
7+
58
## v0.9.6
69

710
### [0.9.6](https://github.com/openfga/java-sdk/compare/v0.9.5...v0.9.6) (2026-02-18)

README.md

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1245,11 +1245,33 @@ System.out.println("Status Code: " + response.getStatusCode());
12451245
System.out.println("Headers: " + response.getHeaders());
12461246
```
12471247

1248+
#### Calling a streaming endpoint
1249+
1250+
For streaming endpoints, use `streamingApiExecutor` instead. Pass the response class directly — the SDK handles the rest. It delivers each response object to a consumer callback as it arrives, and returns a `CompletableFuture<Void>` that completes when the stream is exhausted.
1251+
1252+
```java
1253+
ApiExecutorRequestBuilder request = ApiExecutorRequestBuilder.builder(HttpMethod.POST, "/stores/{store_id}/streamed-list-objects")
1254+
.body(new ListObjectsRequest().user("user:anne").relation("viewer").type("document"))
1255+
.build();
1256+
1257+
fgaClient.streamingApiExecutor(StreamedListObjectsResponse.class)
1258+
.stream(
1259+
request,
1260+
response -> System.out.println("Object: " + response.getObject()), // called per object
1261+
error -> System.err.println("Stream error: " + error.getMessage()) // optional
1262+
)
1263+
.thenRun(() -> System.out.println("Streaming complete"))
1264+
.exceptionally(err -> {
1265+
System.err.println("Fatal error: " + err.getMessage());
1266+
return null;
1267+
});
1268+
```
1269+
12481270
For a complete working example, see [examples/api-executor](examples/api-executor).
12491271

12501272
#### Documentation
12511273

1252-
See [docs/ApiExecutor.md](docs/ApiExecutor.md) for complete API reference and examples.
1274+
See [docs/ApiExecutor.md](docs/ApiExecutor.md) for complete API reference and examples for both `ApiExecutor` and `StreamingApiExecutor`.
12531275

12541276
### API Endpoints
12551277

docs/ApiExecutor.md

Lines changed: 95 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,48 @@
11
# API Executor
22

3-
Direct HTTP access to OpenFGA endpoints.
3+
Direct HTTP access to OpenFGA endpoints — for both standard and streaming responses.
44

55
## Quick Start
66

7+
### Standard (non-streaming) endpoint
8+
79
```java
810
OpenFgaClient client = new OpenFgaClient(config);
911

10-
// Build request
1112
ApiExecutorRequestBuilder request = ApiExecutorRequestBuilder.builder(HttpMethod.POST, "/stores/{store_id}/check")
1213
.pathParam("store_id", storeId)
1314
.body(Map.of("tuple_key", Map.of("user", "user:jon", "relation", "reader", "object", "doc:1")))
1415
.build();
1516

16-
// Execute - typed response
17+
// Typed response
1718
ApiResponse<CheckResponse> response = client.apiExecutor().send(request, CheckResponse.class).get();
1819

19-
// Execute - raw JSON
20+
// Raw JSON
2021
ApiResponse<String> rawResponse = client.apiExecutor().send(request).get();
2122
```
2223

24+
### Streaming endpoint
25+
26+
```java
27+
ApiExecutorRequestBuilder request = ApiExecutorRequestBuilder.builder(HttpMethod.POST, "/stores/{store_id}/streamed-list-objects")
28+
.body(listObjectsRequest)
29+
.build();
30+
31+
client.streamingApiExecutor(StreamedListObjectsResponse.class)
32+
.stream(
33+
request,
34+
response -> System.out.println("Object: " + response.getObject()),
35+
error -> System.err.println("Error: " + error.getMessage())
36+
)
37+
.thenRun(() -> System.out.println("Done"));
38+
```
39+
2340
## API Reference
2441

2542
### ApiExecutorRequestBuilder
2643

44+
Shared by both `ApiExecutor` and `StreamingApiExecutor`.
45+
2746
**Factory:**
2847
```java
2948
ApiExecutorRequestBuilder.builder(HttpMethod method, String path)
@@ -38,30 +57,47 @@ ApiExecutorRequestBuilder.builder(HttpMethod method, String path)
3857
.build() // Complete the builder (required)
3958
```
4059

41-
**Example:**
42-
```java
43-
ApiExecutorRequestBuilder request = ApiExecutorRequestBuilder.builder(HttpMethod.POST, "/stores/{store_id}/write")
44-
.pathParam("store_id", "01ABC")
45-
.queryParam("dry_run", "true")
46-
.header("X-Request-ID", "uuid")
47-
.body(requestObject)
48-
.build();
49-
```
50-
5160
### ApiExecutor
5261

5362
**Access:**
5463
```java
55-
ApiExecutor apiExecutor = client.apiExecutor();
64+
ApiExecutor executor = client.apiExecutor();
5665
```
5766

5867
**Methods:**
5968
```java
6069
CompletableFuture<ApiResponse<String>> send(ApiExecutorRequestBuilder request)
61-
CompletableFuture<ApiResponse<T>> send(ApiExecutorRequestBuilder request, Class<T> responseType)
70+
CompletableFuture<ApiResponse<T>> send(ApiExecutorRequestBuilder request, Class<T> responseType)
71+
```
72+
73+
### StreamingApiExecutor\<T\>
74+
75+
For streaming endpoints. Each response object is delivered to a consumer callback as it arrives.
76+
77+
**Access — preferred (concrete response types):**
78+
```java
79+
StreamingApiExecutor<MyResponse> executor = client.streamingApiExecutor(MyResponse.class);
6280
```
6381

64-
### ApiResponse<T>
82+
**Access — escape hatch (when T is itself generic):**
83+
```java
84+
TypeReference<StreamResult<MyResponse>> typeRef = new TypeReference<StreamResult<MyResponse>>() {};
85+
StreamingApiExecutor<MyResponse> executor = client.streamingApiExecutor(typeRef);
86+
```
87+
88+
**Methods:**
89+
```java
90+
CompletableFuture<Void> stream(ApiExecutorRequestBuilder request, Consumer<T> consumer)
91+
CompletableFuture<Void> stream(ApiExecutorRequestBuilder request, Consumer<T> consumer, Consumer<Throwable> errorConsumer)
92+
```
93+
94+
- The `consumer` is invoked once per successfully parsed response object.
95+
- The optional `errorConsumer` is invoked for errors within the stream or on HTTP error.
96+
- The returned `CompletableFuture<Void>` completes when the stream is exhausted or fails exceptionally on unrecoverable error.
97+
98+
### ApiResponse\<T\>
99+
100+
Returned by `ApiExecutor.send(...)`.
65101

66102
```java
67103
int getStatusCode() // HTTP status
@@ -72,7 +108,7 @@ T getData() // Deserialized data
72108

73109
## Examples
74110

75-
### GET Request
111+
### GET Request (ApiExecutor)
76112
```java
77113
ApiExecutorRequestBuilder request = ApiExecutorRequestBuilder.builder(HttpMethod.GET, "/stores/{store_id}/feature")
78114
.pathParam("store_id", storeId)
@@ -82,7 +118,7 @@ client.apiExecutor().send(request, FeatureResponse.class)
82118
.thenAccept(r -> System.out.println("Status: " + r.getStatusCode()));
83119
```
84120

85-
### POST with Body
121+
### POST with Body (ApiExecutor)
86122
```java
87123
ApiExecutorRequestBuilder request = ApiExecutorRequestBuilder.builder(HttpMethod.POST, "/stores/{store_id}/bulk-delete")
88124
.pathParam("store_id", storeId)
@@ -93,15 +129,45 @@ ApiExecutorRequestBuilder request = ApiExecutorRequestBuilder.builder(HttpMethod
93129
client.apiExecutor().send(request, BulkDeleteResponse.class).get();
94130
```
95131

96-
### Raw JSON Response
132+
### Raw JSON Response (ApiExecutor)
97133
```java
98134
ApiResponse<String> response = client.apiExecutor().send(request).get();
99-
String json = response.getRawResponse(); // Raw JSON
135+
String json = response.getRawResponse();
136+
```
137+
138+
### Streaming endpoint (StreamingApiExecutor)
139+
```java
140+
ApiExecutorRequestBuilder request = ApiExecutorRequestBuilder.builder(HttpMethod.POST, "/stores/{store_id}/streamed-list-objects")
141+
.body(new ListObjectsRequest().user("user:anne").relation("viewer").type("document"))
142+
.build();
143+
144+
List<String> objects = new ArrayList<>();
145+
client.streamingApiExecutor(StreamedListObjectsResponse.class)
146+
.stream(request, response -> objects.add(response.getObject()))
147+
.thenRun(() -> System.out.println("Received " + objects.size() + " objects"));
148+
```
149+
150+
### Streaming endpoint with TypeReference (escape hatch for generic response types)
151+
152+
Use `TypeReference` only when the response type `T` is itself generic. For all concrete
153+
types — which covers the vast majority of endpoints — use `streamingApiExecutor(MyResponse.class)` instead.
154+
155+
```java
156+
// Hypothetical endpoint whose response wraps a generic Page<Item>
157+
TypeReference<StreamResult<Page<Item>>> typeRef = new TypeReference<StreamResult<Page<Item>>>() {};
158+
159+
ApiExecutorRequestBuilder request = ApiExecutorRequestBuilder.builder(HttpMethod.POST, "/stores/{store_id}/streamed-paged-items")
160+
.body(requestBody)
161+
.build();
162+
163+
client.streamingApiExecutor(typeRef)
164+
.stream(request, page -> page.getItems().forEach(System.out::println))
165+
.thenRun(() -> System.out.println("Done"));
100166
```
101167

102168
### Query Parameters
103169
```java
104-
ApiExecutorRequestBuilder.builder("GET", "/stores/{store_id}/items")
170+
ApiExecutorRequestBuilder.builder(HttpMethod.GET, "/stores/{store_id}/items")
105171
.pathParam("store_id", storeId)
106172
.queryParam("page", "1")
107173
.queryParam("limit", "50")
@@ -111,14 +177,14 @@ ApiExecutorRequestBuilder.builder("GET", "/stores/{store_id}/items")
111177

112178
### Custom Headers
113179
```java
114-
ApiExecutorRequestBuilder.builder("POST", "/stores/{store_id}/action")
180+
ApiExecutorRequestBuilder.builder(HttpMethod.POST, "/stores/{store_id}/action")
115181
.header("X-Request-ID", UUID.randomUUID().toString())
116182
.header("X-Idempotency-Key", "key-123")
117183
.body(data)
118184
.build();
119185
```
120186

121-
### Error Handling
187+
### Error Handling (ApiExecutor)
122188
```java
123189
client.apiExecutor().send(request, ResponseType.class)
124190
.exceptionally(e -> {
@@ -132,7 +198,7 @@ client.apiExecutor().send(request, ResponseType.class)
132198

133199
### Map as Request Body
134200
```java
135-
ApiExecutorRequestBuilder.builder("POST", "/stores/{store_id}/settings")
201+
ApiExecutorRequestBuilder.builder(HttpMethod.POST, "/stores/{store_id}/settings")
136202
.pathParam("store_id", storeId)
137203
.body(Map.of(
138204
"setting", "value",
@@ -148,17 +214,18 @@ ApiExecutorRequestBuilder.builder("POST", "/stores/{store_id}/settings")
148214
- Path/query parameters are URL-encoded automatically
149215
- Authentication tokens injected from client config
150216
- `{store_id}` auto-replaced if not provided via `.pathParam()`
217+
- For `StreamingApiExecutor`, pass the response class directly (`MyResponse.class`). The SDK builds the required Jackson type internally. Use the `TypeReference` overload only when `T` is itself a generic type.
151218

152219
## Migration to Typed Methods
153220

154-
When SDK adds typed methods for an endpoint, you can migrate from API Executor:
221+
When the SDK adds typed methods for an endpoint, you can migrate from API Executor:
155222

156223
```java
157224
// API Executor
158-
ApiExecutorRequestBuilder request = ApiExecutorRequestBuilder.builder("POST", "/stores/{store_id}/check")
225+
ApiExecutorRequestBuilder request = ApiExecutorRequestBuilder.builder(HttpMethod.POST, "/stores/{store_id}/check")
159226
.body(req)
160227
.build();
161-
228+
162229
client.apiExecutor().send(request, CheckResponse.class).get();
163230

164231
// Typed SDK (when available)

examples/README.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ A simple example that creates a store, runs a set of calls against it including
1313
#### Streaming Examples
1414
- `streamed-list-objects/` - Demonstrates using the StreamedListObjects API to retrieve large result sets without pagination limits
1515

16-
#### API Executor Examples
17-
- `api-executor/` - Demonstrates direct HTTP access to OpenFGA endpoints not yet wrapped by the SDK, maintaining SDK configuration (authentication, retries, error handling)
16+
#### API Executor Examples (`api-executor/`)
17+
Demonstrates direct HTTP access to OpenFGA endpoints not yet wrapped by the SDK. Standard requests go through the SDK's full configuration (authentication, retries, error handling, telemetry). Streaming requests use direct HTTP streaming — authentication is applied but retries and telemetry are not.
1818

19+
- **`ApiExecutorExample.java`** — standard (non-streaming) endpoints: typed responses, raw JSON, query parameters, custom headers, error handling
20+
- **`StreamingApiExecutorExample.java`** — streaming endpoints: calls `streamed-list-objects` via `client.streamingApiExecutor(StreamedListObjectsResponse.class).stream(request, consumer)`, demonstrating per-object callbacks and error handling
21+
22+
Run with `make run` (standard) or `make run-streaming` (streaming) from the `api-executor/` directory.

examples/api-executor/Makefile

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
.PHONY: build run run-openfga
1+
.PHONY: build run run-streaming run-openfga
22
all: build
33

44
project_name=.
@@ -11,6 +11,9 @@ build:
1111
run:
1212
../../gradlew -P language=$(language) run
1313

14+
run-streaming:
15+
../../gradlew -P language=$(language) runStreaming
16+
1417
run-openfga:
1518
docker pull docker.io/openfga/openfga:${openfga_version} && \
1619
docker run -p 8080:8080 docker.io/openfga/openfga:${openfga_version} run

0 commit comments

Comments
 (0)