Skip to content

Commit aff7424

Browse files
committed
feat: streamed list objects support
1 parent 3a3641a commit aff7424

6 files changed

Lines changed: 424 additions & 1 deletion

File tree

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package dev.openfga.sdk.api;
2+
3+
import static dev.openfga.sdk.util.Validation.assertParamExists;
4+
5+
import dev.openfga.sdk.api.client.ApiClient;
6+
import dev.openfga.sdk.api.client.ApiResponse;
7+
import dev.openfga.sdk.api.client.HttpRequestAttempt;
8+
import dev.openfga.sdk.api.client.StreamingResponseString;
9+
import dev.openfga.sdk.api.configuration.Configuration;
10+
import dev.openfga.sdk.api.model.ListObjectsRequest;
11+
import dev.openfga.sdk.errors.ApiException;
12+
import dev.openfga.sdk.errors.FgaInvalidParameterException;
13+
import dev.openfga.sdk.telemetry.Attribute;
14+
import dev.openfga.sdk.telemetry.Attributes;
15+
import java.util.HashMap;
16+
import java.util.Map;
17+
import java.util.concurrent.CompletableFuture;
18+
19+
/**
20+
* API handler for streamed list objects operations.
21+
* This class is separate from the generated OpenFgaApi to avoid modifications to generated code.
22+
*/
23+
public class StreamedListObjectsApi {
24+
private final ApiClient apiClient;
25+
26+
public StreamedListObjectsApi(ApiClient apiClient) {
27+
this.apiClient = apiClient;
28+
}
29+
30+
/**
31+
* Stream all objects of the given type that the user has a relation with.
32+
* Returns raw NDJSON response for parsing by the client layer.
33+
*
34+
* @param storeId The store ID (required)
35+
* @param body The list objects request body (required)
36+
* @param configuration The configuration to use for this request
37+
* @return CompletableFuture with raw streaming response
38+
* @throws ApiException if fails to make API call
39+
* @throws FgaInvalidParameterException if required parameters are missing
40+
*/
41+
public CompletableFuture<ApiResponse<StreamingResponseString>> streamedListObjects(
42+
String storeId, ListObjectsRequest body, Configuration configuration)
43+
throws ApiException, FgaInvalidParameterException {
44+
45+
assertParamExists(storeId, "storeId", "streamedListObjects");
46+
assertParamExists(body, "body", "streamedListObjects");
47+
48+
String path = "/stores/" + storeId + "/streamed-list-objects";
49+
50+
try {
51+
// Build the HTTP request
52+
byte[] requestBody = apiClient.getObjectMapper().writeValueAsBytes(body);
53+
var bodyPublisher = java.net.http.HttpRequest.BodyPublishers.ofByteArray(requestBody);
54+
55+
var requestBuilder = java.net.http.HttpRequest.newBuilder()
56+
.uri(java.net.URI.create(configuration.getApiUrl() + path))
57+
.header("Content-Type", "application/json")
58+
.header("User-Agent", configuration.getUserAgent())
59+
.POST(bodyPublisher);
60+
61+
// Add authorization header if needed
62+
if (configuration.getCredentials() != null
63+
&& configuration.getCredentials().getApiToken() != null) {
64+
requestBuilder.header(
65+
"Authorization",
66+
"Bearer " + configuration.getCredentials().getApiToken());
67+
}
68+
69+
// Add default headers
70+
if (configuration.getDefaultHeaders() != null) {
71+
configuration.getDefaultHeaders().forEach(requestBuilder::header);
72+
}
73+
74+
var httpRequest = requestBuilder.build();
75+
76+
// Build telemetry attributes
77+
Map<String, Object> methodParameters = new HashMap<>();
78+
methodParameters.put("storeId", storeId);
79+
methodParameters.put("body", body);
80+
81+
Map<Attribute, String> telemetryAttributes = new HashMap<>();
82+
telemetryAttributes.put(Attributes.FGA_CLIENT_REQUEST_METHOD, "StreamedListObjects");
83+
84+
// Use HttpRequestAttempt with StreamingResponseString to get raw response
85+
return new HttpRequestAttempt<>(
86+
httpRequest, "streamedListObjects", StreamingResponseString.class, apiClient, configuration)
87+
.addTelemetryAttributes(telemetryAttributes)
88+
.attemptHttpRequest();
89+
} catch (Exception e) {
90+
return CompletableFuture.failedFuture(new ApiException(e));
91+
}
92+
}
93+
}

src/main/java/dev/openfga/sdk/api/client/HttpRequestAttempt.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,12 @@ private CompletableFuture<T> deserializeResponse(HttpResponse<String> response)
215215
if (clazz == Void.class && isNullOrWhitespace(response.body())) {
216216
return CompletableFuture.completedFuture(null);
217217
}
218-
218+
// Special handling for streaming responses - don't deserialize, just wrap the raw string
219+
if (clazz == StreamingResponseString.class) {
220+
@SuppressWarnings("unchecked")
221+
T result = (T) new StreamingResponseString(response.body());
222+
return CompletableFuture.completedFuture(result);
223+
}
219224
try {
220225
T deserialized = apiClient.getObjectMapper().readValue(response.body(), clazz);
221226
return CompletableFuture.completedFuture(deserialized);

src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public class OpenFgaClient {
2424
private final ApiClient apiClient;
2525
private ClientConfiguration configuration;
2626
private OpenFgaApi api;
27+
private StreamedListObjectsApi streamedListObjectsApi;
2728

2829
public OpenFgaClient(ClientConfiguration configuration) throws FgaInvalidParameterException {
2930
this(configuration, new ApiClient());
@@ -33,6 +34,7 @@ public OpenFgaClient(ClientConfiguration configuration, ApiClient apiClient) thr
3334
this.apiClient = apiClient;
3435
this.configuration = configuration;
3536
this.api = new OpenFgaApi(configuration, apiClient);
37+
this.streamedListObjectsApi = new StreamedListObjectsApi(apiClient);
3638
}
3739

3840
/* ***********
@@ -1104,6 +1106,75 @@ public CompletableFuture<ClientListObjectsResponse> listObjects(
11041106
return call(() -> api.listObjects(storeId, body, overrides)).thenApply(ClientListObjectsResponse::new);
11051107
}
11061108

1109+
/**
1110+
* StreamedListObjects - Stream all objects of the given type that the user has a relation to (evaluates)
1111+
*
1112+
* Returns a Stream of objects that can be iterated. The streaming API returns results as they
1113+
* are computed, rather than collecting all results before returning. This is useful for
1114+
* large result sets.
1115+
*
1116+
* @throws FgaInvalidParameterException When the Store ID is null, empty, or whitespace
1117+
*/
1118+
public CompletableFuture<Stream<StreamedListObjectsResponse>> streamedListObjects(ClientListObjectsRequest request)
1119+
throws FgaInvalidParameterException {
1120+
return streamedListObjects(request, null);
1121+
}
1122+
1123+
/**
1124+
* StreamedListObjects - Stream all objects of the given type that the user has a relation to (evaluates)
1125+
*
1126+
* Returns a Stream of objects that can be iterated. The streaming API returns results as they
1127+
* are computed, rather than collecting all results before returning. This is useful for
1128+
* large result sets.
1129+
*
1130+
* @throws FgaInvalidParameterException When the Store ID is null, empty, or whitespace
1131+
*/
1132+
public CompletableFuture<Stream<StreamedListObjectsResponse>> streamedListObjects(
1133+
ClientListObjectsRequest request, ClientListObjectsOptions options) throws FgaInvalidParameterException {
1134+
configuration.assertValid();
1135+
String storeId = configuration.getStoreIdChecked();
1136+
1137+
ListObjectsRequest body = new ListObjectsRequest();
1138+
1139+
if (request != null) {
1140+
body.user(request.getUser()).relation(request.getRelation()).type(request.getType());
1141+
if (request.getContextualTupleKeys() != null) {
1142+
var contextualTuples = request.getContextualTupleKeys();
1143+
var bodyContextualTuples = ClientTupleKey.asContextualTupleKeys(contextualTuples);
1144+
body.contextualTuples(bodyContextualTuples);
1145+
}
1146+
if (request.getContext() != null) {
1147+
body.context(request.getContext());
1148+
}
1149+
}
1150+
1151+
if (options != null) {
1152+
if (options.getConsistency() != null) {
1153+
body.consistency(options.getConsistency());
1154+
}
1155+
1156+
// Set authorizationModelId from options if available; otherwise, use the default from configuration
1157+
String authorizationModelId = !isNullOrWhitespace(options.getAuthorizationModelId())
1158+
? options.getAuthorizationModelId()
1159+
: configuration.getAuthorizationModelId();
1160+
body.authorizationModelId(authorizationModelId);
1161+
} else {
1162+
body.setAuthorizationModelId(configuration.getAuthorizationModelId());
1163+
}
1164+
1165+
Configuration config = configuration.override(new ConfigurationOverride().addHeaders(options));
1166+
1167+
return call(() -> streamedListObjectsApi.streamedListObjects(storeId, body, config))
1168+
.thenApply(response -> {
1169+
String ndjsonResponse = response.getData().getRawResponse();
1170+
StreamedResponseIterator iterator =
1171+
new StreamedResponseIterator(ndjsonResponse, apiClient.getObjectMapper());
1172+
// Convert Iterator to Stream
1173+
return java.util.stream.StreamSupport.stream(
1174+
((Iterable<StreamedListObjectsResponse>) () -> iterator).spliterator(), false);
1175+
});
1176+
}
1177+
11071178
/**
11081179
* ListRelations - List allowed relations a user has with an object (evaluates)
11091180
*/
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package dev.openfga.sdk.api.client;
2+
3+
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import dev.openfga.sdk.api.model.Status;
5+
import dev.openfga.sdk.api.model.StreamResultOfStreamedListObjectsResponse;
6+
import dev.openfga.sdk.api.model.StreamedListObjectsResponse;
7+
import java.io.BufferedReader;
8+
import java.io.IOException;
9+
import java.io.StringReader;
10+
import java.util.Iterator;
11+
import java.util.NoSuchElementException;
12+
13+
/**
14+
* Iterator for parsing newline-delimited JSON streaming responses.
15+
* Each line in the response is a StreamResultOfStreamedListObjectsResponse.
16+
*
17+
* If an error is encountered in the stream (either from parsing or from an error
18+
* response), it will be thrown as a RuntimeException when hasNext() or next() is called.
19+
*/
20+
public class StreamedResponseIterator implements Iterator<StreamedListObjectsResponse> {
21+
private final BufferedReader reader;
22+
private final ObjectMapper objectMapper;
23+
private StreamedListObjectsResponse nextItem;
24+
private boolean hasNext;
25+
private RuntimeException pendingException;
26+
27+
public StreamedResponseIterator(String ndjsonResponse, ObjectMapper objectMapper) {
28+
this.reader = new BufferedReader(new StringReader(ndjsonResponse));
29+
this.objectMapper = objectMapper;
30+
this.hasNext = true;
31+
this.pendingException = null;
32+
advance();
33+
}
34+
35+
private void advance() {
36+
try {
37+
String line;
38+
while ((line = reader.readLine()) != null) {
39+
line = line.trim();
40+
if (line.isEmpty()) {
41+
continue;
42+
}
43+
44+
StreamResultOfStreamedListObjectsResponse streamResult =
45+
objectMapper.readValue(line, StreamResultOfStreamedListObjectsResponse.class);
46+
47+
if (streamResult.getResult() != null) {
48+
nextItem = streamResult.getResult();
49+
return;
50+
}
51+
52+
if (streamResult.getError() != null) {
53+
// Handle error in stream - convert to exception
54+
Status error = streamResult.getError();
55+
String errorMessage = String.format(
56+
"Error in streaming response: code=%d, message=%s",
57+
error.getCode(), error.getMessage() != null ? error.getMessage() : "Unknown error");
58+
pendingException = new RuntimeException(errorMessage);
59+
hasNext = false;
60+
nextItem = null;
61+
return;
62+
}
63+
}
64+
// No more lines
65+
hasNext = false;
66+
nextItem = null;
67+
} catch (IOException e) {
68+
pendingException = new RuntimeException("Failed to parse streaming response", e);
69+
hasNext = false;
70+
nextItem = null;
71+
}
72+
}
73+
74+
@Override
75+
public boolean hasNext() {
76+
// If there's a pending exception, throw it before returning false
77+
if (pendingException != null) {
78+
RuntimeException ex = pendingException;
79+
pendingException = null; // Clear it so we don't throw multiple times
80+
throw ex;
81+
}
82+
return hasNext && nextItem != null;
83+
}
84+
85+
@Override
86+
public StreamedListObjectsResponse next() {
87+
// Check for pending exception first
88+
if (pendingException != null) {
89+
RuntimeException ex = pendingException;
90+
pendingException = null;
91+
throw ex;
92+
}
93+
94+
if (!hasNext()) {
95+
throw new NoSuchElementException();
96+
}
97+
98+
StreamedListObjectsResponse current = nextItem;
99+
advance();
100+
101+
// Check again after advance in case an error occurred
102+
if (pendingException != null) {
103+
RuntimeException ex = pendingException;
104+
pendingException = null;
105+
throw ex;
106+
}
107+
108+
return current;
109+
}
110+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package dev.openfga.sdk.api.client;
2+
3+
/**
4+
* Marker class to indicate that the response should not be deserialized by HttpRequestAttempt.
5+
* Instead, the raw response string should be returned for manual parsing (e.g., NDJSON).
6+
*/
7+
public class StreamingResponseString {
8+
private final String rawResponse;
9+
10+
public StreamingResponseString(String rawResponse) {
11+
this.rawResponse = rawResponse;
12+
}
13+
14+
public String getRawResponse() {
15+
return rawResponse;
16+
}
17+
}

0 commit comments

Comments
 (0)