diff --git a/.openapi-generator/FILES b/.openapi-generator/FILES index ddc3f045..9290b751 100644 --- a/.openapi-generator/FILES +++ b/.openapi-generator/FILES @@ -67,6 +67,8 @@ docs/RelationshipCondition.md docs/SourceInfo.md docs/Status.md docs/Store.md +docs/StreamResultOfStreamedListObjectsResponse.md +docs/StreamedListObjectsResponse.md docs/Tuple.md docs/TupleChange.md docs/TupleKey.md @@ -155,6 +157,8 @@ src/main/java/dev/openfga/sdk/api/model/RelationshipCondition.java src/main/java/dev/openfga/sdk/api/model/SourceInfo.java src/main/java/dev/openfga/sdk/api/model/Status.java src/main/java/dev/openfga/sdk/api/model/Store.java +src/main/java/dev/openfga/sdk/api/model/StreamResultOfStreamedListObjectsResponse.java +src/main/java/dev/openfga/sdk/api/model/StreamedListObjectsResponse.java src/main/java/dev/openfga/sdk/api/model/Tuple.java src/main/java/dev/openfga/sdk/api/model/TupleChange.java src/main/java/dev/openfga/sdk/api/model/TupleKey.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 6171468c..bc766c57 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ ### Fixed - fix: preserve response headers in transaction write operations (#254) +- feat: Add `streamedListObjects` API endpoint with consumer callback support (#252) ## v0.9.2 diff --git a/README.md b/README.md index fc6ee292..6fc457af 100644 --- a/README.md +++ b/README.md @@ -1185,6 +1185,7 @@ try { | [**readAuthorizationModel**](docs/OpenFgaApi.md#readauthorizationmodel) | **GET** /stores/{store_id}/authorization-models/{id} | Return a particular version of an authorization model | | [**readAuthorizationModels**](docs/OpenFgaApi.md#readauthorizationmodels) | **GET** /stores/{store_id}/authorization-models | Return all the authorization models for a particular store | | [**readChanges**](docs/OpenFgaApi.md#readchanges) | **GET** /stores/{store_id}/changes | Return a list of all the tuple changes | +| [**streamedListObjects**](docs/OpenFgaApi.md#streamedlistobjects) | **POST** /stores/{store_id}/streamed-list-objects | Stream all objects of the given type that the user has a relation with | | [**write**](docs/OpenFgaApi.md#write) | **POST** /stores/{store_id}/write | Add or delete tuples from the store | | [**writeAssertions**](docs/OpenFgaApi.md#writeassertions) | **PUT** /stores/{store_id}/assertions/{authorization_model_id} | Upsert assertions for an authorization model ID | | [**writeAuthorizationModel**](docs/OpenFgaApi.md#writeauthorizationmodel) | **POST** /stores/{store_id}/authorization-models | Create a new authorization model | @@ -1310,6 +1311,10 @@ try { - [Store](https://github.com/openfga/java-sdk/blob/main/docs/Store.md) +- [StreamResultOfStreamedListObjectsResponse](https://github.com/openfga/java-sdk/blob/main/docs/StreamResultOfStreamedListObjectsResponse.md) + +- [StreamedListObjectsResponse](https://github.com/openfga/java-sdk/blob/main/docs/StreamedListObjectsResponse.md) + - [Tuple](https://github.com/openfga/java-sdk/blob/main/docs/Tuple.md) - [TupleChange](https://github.com/openfga/java-sdk/blob/main/docs/TupleChange.md) diff --git a/docs/OpenFgaApi.md b/docs/OpenFgaApi.md index 80986b1c..09d0f1fb 100644 --- a/docs/OpenFgaApi.md +++ b/docs/OpenFgaApi.md @@ -32,6 +32,8 @@ All URIs are relative to *http://localhost* | [**readAuthorizationModelsWithHttpInfo**](OpenFgaApi.md#readAuthorizationModelsWithHttpInfo) | **GET** /stores/{store_id}/authorization-models | Return all the authorization models for a particular store | | [**readChanges**](OpenFgaApi.md#readChanges) | **GET** /stores/{store_id}/changes | Return a list of all the tuple changes | | [**readChangesWithHttpInfo**](OpenFgaApi.md#readChangesWithHttpInfo) | **GET** /stores/{store_id}/changes | Return a list of all the tuple changes | +| [**streamedListObjects**](OpenFgaApi.md#streamedListObjects) | **POST** /stores/{store_id}/streamed-list-objects | Stream all objects of the given type that the user has a relation with | +| [**streamedListObjectsWithHttpInfo**](OpenFgaApi.md#streamedListObjectsWithHttpInfo) | **POST** /stores/{store_id}/streamed-list-objects | Stream all objects of the given type that the user has a relation with | | [**write**](OpenFgaApi.md#write) | **POST** /stores/{store_id}/write | Add or delete tuples from the store | | [**writeWithHttpInfo**](OpenFgaApi.md#writeWithHttpInfo) | **POST** /stores/{store_id}/write | Add or delete tuples from the store | | [**writeAssertions**](OpenFgaApi.md#writeAssertions) | **PUT** /stores/{store_id}/assertions/{authorization_model_id} | Upsert assertions for an authorization model ID | @@ -2301,6 +2303,167 @@ No authorization required | **500** | Request failed due to internal server error. | - | +## streamedListObjects + +> CompletableFuture streamedListObjects(storeId, body) + +Stream all objects of the given type that the user has a relation with + +The Streamed ListObjects API is very similar to the the ListObjects API, with two differences: 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected. 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE. + +### Example + +```java +// Import classes: +import dev.openfga.sdk.api.client.ApiClient; +import dev.openfga.sdk.api.client.ApiException; +import dev.openfga.sdk.api.configuration.Configuration; +import dev.openfga.sdk.api.client.models.*; +import dev.openfga.sdk.api.OpenFgaApi; +import java.util.concurrent.CompletableFuture; + +public class Example { + public static void main(String[] args) { + ApiClient defaultClient = Configuration.getDefaultApiClient(); + defaultClient.setBasePath("http://localhost"); + + OpenFgaApi apiInstance = new OpenFgaApi(defaultClient); + String storeId = "storeId_example"; // String | + ListObjectsRequest body = new ListObjectsRequest(); // ListObjectsRequest | + try { + CompletableFuture result = apiInstance.streamedListObjects(storeId, body); + System.out.println(result.get()); + } catch (ApiException e) { + System.err.println("Exception when calling OpenFgaApi#streamedListObjects"); + System.err.println("Status code: " + e.getCode()); + System.err.println("Reason: " + e.getResponseBody()); + System.err.println("Response headers: " + e.getResponseHeaders()); + e.printStackTrace(); + } + } +} +``` + +### Parameters + + +| Name | Type | Description | Notes | +|------------- | ------------- | ------------- | -------------| +| **storeId** | **String**| | | +| **body** | [**ListObjectsRequest**](ListObjectsRequest.md)| | | + +### Return type + +CompletableFuture<[**StreamResultOfStreamedListObjectsResponse**](StreamResultOfStreamedListObjectsResponse.md)> + + +### Authorization + +No authorization required + +### HTTP request headers + +- **Content-Type**: application/json +- **Accept**: application/json + +### HTTP response details +| Status code | Description | Response headers | +|-------------|-------------|------------------| +| **200** | A successful response.(streaming responses) | - | +| **400** | Request failed due to invalid input. | - | +| **401** | Not authenticated. | - | +| **403** | Forbidden. | - | +| **404** | Request failed due to incorrect path. | - | +| **409** | Request was aborted due a transaction conflict. | - | +| **422** | Request timed out due to excessive request throttling. | - | +| **500** | Request failed due to internal server error. | - | + +## streamedListObjectsWithHttpInfo + +> CompletableFuture> streamedListObjects streamedListObjectsWithHttpInfo(storeId, body) + +Stream all objects of the given type that the user has a relation with + +The Streamed ListObjects API is very similar to the the ListObjects API, with two differences: 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected. 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE. + +### Example + +```java +// Import classes: +import dev.openfga.sdk.api.client.ApiClient; +import dev.openfga.sdk.api.client.ApiException; +import dev.openfga.sdk.api.client.ApiResponse; +import dev.openfga.sdk.api.configuration.Configuration; +import dev.openfga.sdk.api.client.models.*; +import dev.openfga.sdk.api.OpenFgaApi; +import java.util.concurrent.CompletableFuture; + +public class Example { + public static void main(String[] args) { + ApiClient defaultClient = Configuration.getDefaultApiClient(); + defaultClient.setBasePath("http://localhost"); + + OpenFgaApi apiInstance = new OpenFgaApi(defaultClient); + String storeId = "storeId_example"; // String | + ListObjectsRequest body = new ListObjectsRequest(); // ListObjectsRequest | + try { + CompletableFuture> response = apiInstance.streamedListObjectsWithHttpInfo(storeId, body); + System.out.println("Status code: " + response.get().getStatusCode()); + System.out.println("Response headers: " + response.get().getHeaders()); + System.out.println("Response body: " + response.get().getData()); + } catch (InterruptedException | ExecutionException e) { + ApiException apiException = (ApiException)e.getCause(); + System.err.println("Exception when calling OpenFgaApi#streamedListObjects"); + System.err.println("Status code: " + apiException.getCode()); + System.err.println("Response headers: " + apiException.getResponseHeaders()); + System.err.println("Reason: " + apiException.getResponseBody()); + e.printStackTrace(); + } catch (ApiException e) { + System.err.println("Exception when calling OpenFgaApi#streamedListObjects"); + System.err.println("Status code: " + e.getCode()); + System.err.println("Response headers: " + e.getResponseHeaders()); + System.err.println("Reason: " + e.getResponseBody()); + e.printStackTrace(); + } + } +} +``` + +### Parameters + + +| Name | Type | Description | Notes | +|------------- | ------------- | ------------- | -------------| +| **storeId** | **String**| | | +| **body** | [**ListObjectsRequest**](ListObjectsRequest.md)| | | + +### Return type + +CompletableFuture> + + +### Authorization + +No authorization required + +### HTTP request headers + +- **Content-Type**: application/json +- **Accept**: application/json + +### HTTP response details +| Status code | Description | Response headers | +|-------------|-------------|------------------| +| **200** | A successful response.(streaming responses) | - | +| **400** | Request failed due to invalid input. | - | +| **401** | Not authenticated. | - | +| **403** | Forbidden. | - | +| **404** | Request failed due to incorrect path. | - | +| **409** | Request was aborted due a transaction conflict. | - | +| **422** | Request timed out due to excessive request throttling. | - | +| **500** | Request failed due to internal server error. | - | + + ## write > CompletableFuture write(storeId, body) diff --git a/docs/StreamResultOfStreamedListObjectsResponse.md b/docs/StreamResultOfStreamedListObjectsResponse.md new file mode 100644 index 00000000..af23d053 --- /dev/null +++ b/docs/StreamResultOfStreamedListObjectsResponse.md @@ -0,0 +1,14 @@ + + +# StreamResultOfStreamedListObjectsResponse + + +## Properties + +| Name | Type | Description | Notes | +|------------ | ------------- | ------------- | -------------| +|**result** | [**StreamedListObjectsResponse**](StreamedListObjectsResponse.md) | | [optional] | +|**error** | [**Status**](Status.md) | | [optional] | + + + diff --git a/docs/StreamedListObjectsResponse.md b/docs/StreamedListObjectsResponse.md new file mode 100644 index 00000000..04b00157 --- /dev/null +++ b/docs/StreamedListObjectsResponse.md @@ -0,0 +1,14 @@ + + +# StreamedListObjectsResponse + +The response for a StreamedListObjects RPC. + +## Properties + +| Name | Type | Description | Notes | +|------------ | ------------- | ------------- | -------------| +|**_object** | **String** | | | + + + diff --git a/examples/streamed-list-objects/Makefile b/examples/streamed-list-objects/Makefile new file mode 100644 index 00000000..5651d28d --- /dev/null +++ b/examples/streamed-list-objects/Makefile @@ -0,0 +1,16 @@ +.PHONY: build run run-openfga +all: build + +project_name=. +openfga_version=latest +language=java + +build: + ./gradlew -P language=$(language) build + +run: + ./gradlew -P language=$(language) run + +run-openfga: + docker pull docker.io/openfga/openfga:${openfga_version} && \ + docker run -p 8080:8080 docker.io/openfga/openfga:${openfga_version} run \ No newline at end of file diff --git a/examples/streamed-list-objects/README.md b/examples/streamed-list-objects/README.md new file mode 100644 index 00000000..63bbf5d9 --- /dev/null +++ b/examples/streamed-list-objects/README.md @@ -0,0 +1,132 @@ +# Streamed List Objects Example + +Demonstrates using `StreamedListObjects` to retrieve objects via the streaming API in the Java SDK. + +## What is StreamedListObjects? + +The Streamed ListObjects API is very similar to the ListObjects API, with two key differences: + +1. **Streaming Results**: Instead of collecting all objects before returning a response, it streams them to the client as they are collected. +2. **No Pagination Limit**: Returns all results without the 1000-object limit of the standard ListObjects API. + +This makes it ideal for scenarios where you need to retrieve large numbers of objects, especially when querying computed relations. + +## Prerequisites + +- Java 11 or higher +- OpenFGA server running on `http://localhost:8080` (or set `FGA_API_URL`) + +## Running + +```bash +# From the SDK root directory, build the SDK first +./gradlew build + +# Then run the example +cd examples/streamed-list-objects +./gradlew run +``` + +Or using the Makefile: + +```bash +make build +make run +``` + +## What it does + +- Creates a temporary store +- Writes an authorization model with **computed relations** +- Adds 2000 tuples (1000 owners + 1000 viewers) +- Queries the **computed `can_read` relation** via `StreamedListObjects` +- Shows all 2000 results (demonstrating computed relations) +- Shows progress (first 3 objects and every 500th) +- Cleans up the store + +## Authorization Model + +The example demonstrates OpenFGA's **computed relations**: + +``` +type user + +type document + relations + define owner: [user] + define viewer: [user] + define can_read: owner or viewer +``` + +**Why this matters:** +- We write tuples to `owner` and `viewer` (base permissions) +- We query `can_read` (computed from owner OR viewer) + +**Example flow:** +1. Write: `user:anne owner document:1-1000` +2. Write: `user:anne viewer document:1001-2000` +3. Query: `StreamedListObjects(user:anne, relation:can_read, type:document)` +4. Result: All 2000 documents (because `can_read = owner OR viewer`) + +## Key Features Demonstrated + +### CompletableFuture-based Streaming Pattern + +The `streamedListObjects` method uses Java's `CompletableFuture` with a consumer callback to handle streaming data: + +```java +fga.streamedListObjects(request, response -> { + System.out.println("Received: " + response.getObject()); +}).get(); // Wait for completion +``` + +### Early Break and Cleanup + +The streaming implementation properly handles early termination through cancellation: + +```java +AtomicBoolean shouldStop = new AtomicBoolean(false); +CompletableFuture future = fga.streamedListObjects(request, response -> { + System.out.println(response.getObject()); + if (someCondition) { + shouldStop.set(true); + } +}); + +// Cancel if needed +if (shouldStop.get()) { + future.cancel(true); +} +``` + +### Exception Handling + +The example demonstrates proper error handling: + +```java +try { + fga.streamedListObjects(request, response -> { + System.out.println(response.getObject()); + }).get(); +} catch (ExecutionException ex) { + if (ex.getCause() instanceof FgaInvalidParameterException) { + System.err.println("Validation error"); + } +} catch (CancellationException ex) { + System.err.println("Operation cancelled"); +} +``` + +## Benefits Over ListObjects + +- **No Pagination**: Retrieve all objects in a single streaming request +- **Lower Memory**: Objects are processed as they arrive, not held in memory +- **Early Termination**: Can stop streaming at any point without wasting resources +- **Better for Large Results**: Ideal when expecting hundreds or thousands of objects + +## Performance Considerations + +- Streaming starts immediately - no need to wait for all results +- HTTP connection remains open during streaming +- Properly handles cleanup if consumer stops early +- Supports all the same options as `ListObjects` (consistency, contextual tuples, etc.) diff --git a/examples/streamed-list-objects/build.gradle b/examples/streamed-list-objects/build.gradle new file mode 100644 index 00000000..d3039b04 --- /dev/null +++ b/examples/streamed-list-objects/build.gradle @@ -0,0 +1,66 @@ +plugins { + id 'application' + id 'com.diffplug.spotless' version '8.0.0' +} + +application { + mainClass = 'dev.openfga.sdk.example.StreamedListObjectsExample' +} + +repositories { + mavenCentral() +} + +ext { + jacksonVersion = "2.18.2" +} + +dependencies { + // Use local build of SDK + implementation files('../../build/libs/openfga-sdk-0.9.3.jar') + + // OpenFGA Language SDK for DSL transformation + implementation("dev.openfga:openfga-language:v0.2.0-beta.1") + + // Serialization + implementation("com.fasterxml.jackson.core:jackson-core:$jacksonVersion") + implementation("com.fasterxml.jackson.core:jackson-annotations:$jacksonVersion") + implementation("com.fasterxml.jackson.core:jackson-databind:$jacksonVersion") + implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:$jacksonVersion") + implementation("org.openapitools:jackson-databind-nullable:0.2.7") + + // OpenTelemetry (required by SDK) + implementation platform("io.opentelemetry:opentelemetry-bom:1.54.1") + implementation "io.opentelemetry:opentelemetry-api" + + // JSR305 (required by SDK) + implementation "com.google.code.findbugs:jsr305:3.0.2" +} + +// Use spotless plugin to automatically format code, remove unused import, etc +// To apply changes directly to the file, run `gradlew spotlessApply` +// Ref: https://github.com/diffplug/spotless/tree/main/plugin-gradle +spotless { + // comment out below to run spotless as part of the `check` task + enforceCheck false + format 'misc', { + // define the files (e.g. '*.gradle', '*.md') to apply `misc` to + target '.gitignore' + // define the steps to apply to those files + trimTrailingWhitespace() + indentWithSpaces() // Takes an integer argument if you don't like 4 + endWithNewline() + } + java { + palantirJavaFormat() + removeUnusedImports() + importOrder() + } +} + +// Use spotless plugin to automatically format code, remove unused import, etc +// To apply changes directly to the file, run `gradlew spotlessApply` +// Ref: https://github.com/diffplug/spotless/tree/main/plugin-gradle +tasks.register('fmt') { + dependsOn 'spotlessApply' +} \ No newline at end of file diff --git a/examples/streamed-list-objects/gradle.properties b/examples/streamed-list-objects/gradle.properties new file mode 100644 index 00000000..5f544a8e --- /dev/null +++ b/examples/streamed-list-objects/gradle.properties @@ -0,0 +1 @@ +language=java \ No newline at end of file diff --git a/examples/streamed-list-objects/gradle/wrapper/gradle-wrapper.properties b/examples/streamed-list-objects/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 00000000..a80b22ce --- /dev/null +++ b/examples/streamed-list-objects/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,7 @@ +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-8.6-bin.zip +networkTimeout=10000 +validateDistributionUrl=true +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists diff --git a/examples/streamed-list-objects/gradlew b/examples/streamed-list-objects/gradlew new file mode 100755 index 00000000..005bcde0 --- /dev/null +++ b/examples/streamed-list-objects/gradlew @@ -0,0 +1,234 @@ +#!/bin/sh + +# +# Copyright © 2015-2021 the original authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +############################################################################## +# +# Gradle start up script for POSIX generated by Gradle. +# +# Important for running: +# +# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is +# noncompliant, but you have some other compliant shell such as ksh or +# bash, then to run this script, type that shell name before the whole +# command line, like: +# +# ksh Gradle +# +# Busybox and similar reduced shells will NOT work, because this script +# requires all of these POSIX shell features: +# * functions; +# * expansions «$var», «${var}», «${var:-default}», «${var+SET}», +# «${var#prefix}», «${var%suffix}», and «$( cmd )»; +# * compound commands having a testable exit status, especially «case»; +# * various built-in commands including «command», «set», and «ulimit». +# +# Important for patching: +# +# (2) This script targets any POSIX shell, so it avoids extensions provided +# by Bash, Ksh, etc; in particular arrays are avoided. +# +# The "traditional" practice of packing multiple parameters into a +# space-separated string is a well documented source of bugs and security +# problems, so this is (mostly) avoided, by progressively accumulating +# options in "$@", and eventually passing that to Java. +# +# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS, +# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly; +# see the in-line comments for details. +# +# There are tweaks for specific operating systems such as AIX, CygWin, +# Darwin, MinGW, and NonStop. +# +# (3) This script is generated from the Groovy template +# https://github.com/gradle/gradle/blob/master/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# within the Gradle project. +# +# You can find Gradle at https://github.com/gradle/gradle/. +# +############################################################################## + +# Attempt to set APP_HOME + +# Resolve links: $0 may be a link +app_path=$0 + +# Need this for daisy-chained symlinks. +while + APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path + [ -h "$app_path" ] +do + ls=$( ls -ld "$app_path" ) + link=${ls#*' -> '} + case $link in #( + /*) app_path=$link ;; #( + *) app_path=$APP_HOME$link ;; + esac +done + +APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit + +APP_NAME="Gradle" +APP_BASE_NAME=${0##*/} + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='-Dfile.encoding=UTF-8 "-Xmx64m" "-Xms64m"' + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD=maximum + +warn () { + echo "$*" +} >&2 + +die () { + echo + echo "$*" + echo + exit 1 +} >&2 + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "$( uname )" in #( + CYGWIN* ) cygwin=true ;; #( + Darwin* ) darwin=true ;; #( + MSYS* | MINGW* ) msys=true ;; #( + NONSTOP* ) nonstop=true ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD=$JAVA_HOME/jre/sh/java + else + JAVACMD=$JAVA_HOME/bin/java + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD=java + which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." +fi + +# Increase the maximum file descriptors if we can. +if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then + case $MAX_FD in #( + max*) + MAX_FD=$( ulimit -H -n ) || + warn "Could not query maximum file descriptor limit" + esac + case $MAX_FD in #( + '' | soft) :;; #( + *) + ulimit -n "$MAX_FD" || + warn "Could not set maximum file descriptor limit to $MAX_FD" + esac +fi + +# Collect all arguments for the java command, stacking in reverse order: +# * args from the command line +# * the main class name +# * -classpath +# * -D...appname settings +# * --module-path (only if needed) +# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables. + +# For Cygwin or MSYS, switch paths to Windows format before running java +if "$cygwin" || "$msys" ; then + APP_HOME=$( cygpath --path --mixed "$APP_HOME" ) + CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" ) + + JAVACMD=$( cygpath --unix "$JAVACMD" ) + + # Now convert the arguments - kludge to limit ourselves to /bin/sh + for arg do + if + case $arg in #( + -*) false ;; # don't mess with options #( + /?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath + [ -e "$t" ] ;; #( + *) false ;; + esac + then + arg=$( cygpath --path --ignore --mixed "$arg" ) + fi + # Roll the args list around exactly as many times as the number of + # args, so each arg winds up back in the position where it started, but + # possibly modified. + # + # NB: a `for` loop captures its iteration list before it begins, so + # changing the positional parameters here affects neither the number of + # iterations, nor the values presented in `arg`. + shift # remove old arg + set -- "$@" "$arg" # push replacement arg + done +fi + +# Collect all arguments for the java command; +# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of +# shell script including quotes and variable substitutions, so put them in +# double quotes to make sure that they get re-expanded; and +# * put everything else in single quotes, so that it's not re-expanded. + +set -- \ + "-Dorg.gradle.appname=$APP_BASE_NAME" \ + -classpath "$CLASSPATH" \ + org.gradle.wrapper.GradleWrapperMain \ + "$@" + +# Use "xargs" to parse quoted args. +# +# With -n1 it outputs one arg per line, with the quotes and backslashes removed. +# +# In Bash we could simply go: +# +# readarray ARGS < <( xargs -n1 <<<"$var" ) && +# set -- "${ARGS[@]}" "$@" +# +# but POSIX shell has neither arrays nor command substitution, so instead we +# post-process each arg (as a line of input to sed) to backslash-escape any +# character that might be a shell metacharacter, then use eval to reverse +# that process (while maintaining the separation between arguments), and wrap +# the whole thing up as a single "set" statement. +# +# This will of course break if any of these variables contains a newline or +# an unmatched quote. +# + +eval "set -- $( + printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" | + xargs -n1 | + sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' | + tr '\n' ' ' + )" '"$@"' + +exec "$JAVACMD" "$@" diff --git a/examples/streamed-list-objects/gradlew.bat b/examples/streamed-list-objects/gradlew.bat new file mode 100644 index 00000000..6a68175e --- /dev/null +++ b/examples/streamed-list-objects/gradlew.bat @@ -0,0 +1,89 @@ +@rem +@rem Copyright 2015 the original author or authors. +@rem +@rem Licensed under the Apache License, Version 2.0 (the "License"); +@rem you may not use this file except in compliance with the License. +@rem You may obtain a copy of the License at +@rem +@rem https://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. +@rem + +@if "%DEBUG%" == "" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%" == "" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Resolve any "." and ".." in APP_HOME to make it shorter. +for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS=-Dfile.encoding=UTF-8 "-Xmx64m" "-Xms64m" + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if "%ERRORLEVEL%" == "0" goto execute + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto execute + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* + +:end +@rem End local scope for the variables with windows NT shell +if "%ERRORLEVEL%"=="0" goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 +exit /b 1 + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/examples/streamed-list-objects/settings.gradle b/examples/streamed-list-objects/settings.gradle new file mode 100644 index 00000000..764f4506 --- /dev/null +++ b/examples/streamed-list-objects/settings.gradle @@ -0,0 +1 @@ +rootProject.name = 'streamed-list-objects-example' \ No newline at end of file diff --git a/examples/streamed-list-objects/src/main/java/dev/openfga/sdk/example/StreamedListObjectsExample.java b/examples/streamed-list-objects/src/main/java/dev/openfga/sdk/example/StreamedListObjectsExample.java new file mode 100644 index 00000000..6b07ed62 --- /dev/null +++ b/examples/streamed-list-objects/src/main/java/dev/openfga/sdk/example/StreamedListObjectsExample.java @@ -0,0 +1,194 @@ +package dev.openfga.sdk.example; + +import com.fasterxml.jackson.databind.ObjectMapper; +import dev.openfga.language.DslToJsonTransformer; +import dev.openfga.sdk.api.client.OpenFgaClient; +import dev.openfga.sdk.api.client.model.ClientListObjectsRequest; +import dev.openfga.sdk.api.client.model.ClientStreamedListObjectsOptions; +import dev.openfga.sdk.api.client.model.ClientTupleKey; +import dev.openfga.sdk.api.client.model.ClientWriteRequest; +import dev.openfga.sdk.api.configuration.ClientConfiguration; +import dev.openfga.sdk.api.configuration.Credentials; +import dev.openfga.sdk.api.model.AuthorizationModel; +import dev.openfga.sdk.api.model.ConsistencyPreference; +import dev.openfga.sdk.api.model.CreateStoreRequest; +import dev.openfga.sdk.api.model.WriteAuthorizationModelRequest; +import dev.openfga.sdk.errors.FgaInvalidParameterException; +import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicInteger; + +public class StreamedListObjectsExample { + // Configuration constants + private static final String DEFAULT_API_URL = "http://localhost:8080"; + private static final String ENV_API_URL = "FGA_API_URL"; + private static final String STORE_NAME = "streamed-list-objects"; + + // Data constants + private static final String USER_TYPE = "user"; + private static final String DOCUMENT_TYPE = "document"; + private static final String USER_ANNE = "user:anne"; + private static final String RELATION_OWNER = "owner"; + private static final String RELATION_VIEWER = "viewer"; + private static final String RELATION_CAN_READ = "can_read"; + + // Batch configuration + private static final int WRITE_BATCH_SIZE = 100; // OpenFGA limit + private static final int TOTAL_OWNER_DOCUMENTS = 1000; + private static final int TOTAL_VIEWER_DOCUMENTS = 1000; + private static final int VIEWER_DOCUMENT_OFFSET = 1000; + + // Display configuration + private static final int DISPLAY_FIRST_N = 3; + private static final int DISPLAY_EVERY_N = 500; + + public static void main(String[] args) { + try { + new StreamedListObjectsExample().run(); + } catch (Exception ex) { + // Avoid logging sensitive data; only display generic info + if (ex instanceof FgaInvalidParameterException) { + System.err.println("Validation error in configuration. Please check your configuration for errors."); + } else if (ex.getMessage() != null && ex.getMessage().contains("Connection refused") + || (ex.getCause() != null + && ex.getCause().getMessage() != null + && ex.getCause().getMessage().contains("Connection refused"))) { + System.err.println("Is OpenFGA server running? Check " + ENV_API_URL + + " environment variable or default " + DEFAULT_API_URL); + } else { + System.err.println("An error occurred. [" + ex.getClass().getSimpleName() + "]"); + } + System.exit(1); + } + } + + public void run() throws Exception { + String apiUrl = System.getenv(ENV_API_URL); + if (apiUrl == null || apiUrl.isEmpty()) { + apiUrl = DEFAULT_API_URL; + } + + var configuration = new ClientConfiguration().apiUrl(apiUrl).credentials(new Credentials()); + + var client = new OpenFgaClient(configuration); + + System.out.println("Creating temporary store"); + var store = client.createStore(new CreateStoreRequest().name(STORE_NAME)) + .get(); + + var clientWithStore = new OpenFgaClient( + new ClientConfiguration().apiUrl(apiUrl).storeId(store.getId()).credentials(new Credentials())); + + System.out.println("Writing authorization model"); + var authModel = clientWithStore + .writeAuthorizationModel(createAuthorizationModel()) + .get(); + + var fga = new OpenFgaClient(new ClientConfiguration() + .apiUrl(apiUrl) + .storeId(store.getId()) + .authorizationModelId(authModel.getAuthorizationModelId()) + .credentials(new Credentials())); + + System.out.println("Writing tuples (" + TOTAL_OWNER_DOCUMENTS + " as owner, " + TOTAL_VIEWER_DOCUMENTS + + " as viewer)"); + + int totalWritten = 0; + + // Write documents where anne is the owner + int ownerBatches = TOTAL_OWNER_DOCUMENTS / WRITE_BATCH_SIZE; + for (int batch = 0; batch < ownerBatches; batch++) { + var tuples = new ArrayList(); + for (int i = 1; i <= WRITE_BATCH_SIZE; i++) { + tuples.add(new ClientTupleKey() + .user(USER_ANNE) + .relation(RELATION_OWNER) + ._object(DOCUMENT_TYPE + ":" + (batch * WRITE_BATCH_SIZE + i))); + } + fga.write(new ClientWriteRequest().writes(tuples)).get(); + totalWritten += tuples.size(); + } + + // Write documents where anne is a viewer + int viewerBatches = TOTAL_VIEWER_DOCUMENTS / WRITE_BATCH_SIZE; + for (int batch = 0; batch < viewerBatches; batch++) { + var tuples = new ArrayList(); + for (int i = 1; i <= WRITE_BATCH_SIZE; i++) { + tuples.add(new ClientTupleKey() + .user(USER_ANNE) + .relation(RELATION_VIEWER) + ._object(DOCUMENT_TYPE + ":" + (VIEWER_DOCUMENT_OFFSET + batch * WRITE_BATCH_SIZE + i))); + } + fga.write(new ClientWriteRequest().writes(tuples)).get(); + totalWritten += tuples.size(); + } + + System.out.println("Wrote " + totalWritten + " tuples"); + + System.out.println("Streaming objects via computed '" + RELATION_CAN_READ + "' relation..."); + var count = new AtomicInteger(0); + + var request = new ClientListObjectsRequest() + .user(USER_ANNE) + .relation(RELATION_CAN_READ) // Computed: owner OR viewer + .type(DOCUMENT_TYPE); + + var options = new ClientStreamedListObjectsOptions().consistency(ConsistencyPreference.HIGHER_CONSISTENCY); + + fga.streamedListObjects(request, options, response -> { + int currentCount = count.incrementAndGet(); + if (currentCount <= DISPLAY_FIRST_N || currentCount % DISPLAY_EVERY_N == 0) { + System.out.println("- " + response.getObject()); + } + }) + .get(); + + System.out.println("✓ Streamed " + count.get() + " objects"); + + System.out.println("Cleaning up..."); + fga.deleteStore().get(); + System.out.println("Done"); + } + + private WriteAuthorizationModelRequest createAuthorizationModel() { + // Define the authorization model using OpenFGA DSL + // This is much cleaner and more readable than building the model with Java objects + var dslModel = String.format( + """ + model + schema 1.1 + + type %s + + type %s + relations + define %s: [%s] + define %s: [%s] + define %s: %s or %s + """, + USER_TYPE, + DOCUMENT_TYPE, + RELATION_OWNER, + USER_TYPE, + RELATION_VIEWER, + USER_TYPE, + RELATION_CAN_READ, + RELATION_OWNER, + RELATION_VIEWER); + + try { + // Transform DSL to JSON and parse into AuthorizationModel + var jsonModel = new DslToJsonTransformer().transform(dslModel); + var mapper = new ObjectMapper(); + mapper.findAndRegisterModules(); + + var authModel = mapper.readValue(jsonModel, AuthorizationModel.class); + + return new WriteAuthorizationModelRequest() + .typeDefinitions(authModel.getTypeDefinitions()) + .schemaVersion(authModel.getSchemaVersion()) + .conditions(authModel.getConditions()); + } catch (Exception e) { + throw new RuntimeException("Failed to transform DSL model to JSON", e); + } + } +} diff --git a/src/main/java/dev/openfga/sdk/api/OpenFgaApi.java b/src/main/java/dev/openfga/sdk/api/OpenFgaApi.java index d0589ca0..0ea565a2 100644 --- a/src/main/java/dev/openfga/sdk/api/OpenFgaApi.java +++ b/src/main/java/dev/openfga/sdk/api/OpenFgaApi.java @@ -38,6 +38,7 @@ import dev.openfga.sdk.api.model.ReadChangesResponse; import dev.openfga.sdk.api.model.ReadRequest; import dev.openfga.sdk.api.model.ReadResponse; +import dev.openfga.sdk.api.model.StreamResultOfStreamedListObjectsResponse; import dev.openfga.sdk.api.model.WriteAssertionsRequest; import dev.openfga.sdk.api.model.WriteAuthorizationModelRequest; import dev.openfga.sdk.api.model.WriteAuthorizationModelResponse; @@ -906,6 +907,68 @@ private CompletableFuture> readChanges( } } + /** + * Stream all objects of the given type that the user has a relation with + * The Streamed ListObjects API is very similar to the the ListObjects API, with two differences: 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected. 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE. + * @param storeId (required) + * @param body (required) + * @return CompletableFuture<ApiResponse<StreamResultOfStreamedListObjectsResponse>> + * @throws ApiException if fails to make API call + */ + public CompletableFuture> streamedListObjects( + String storeId, ListObjectsRequest body) throws ApiException, FgaInvalidParameterException { + return streamedListObjects(storeId, body, this.configuration); + } + + /** + * Stream all objects of the given type that the user has a relation with + * The Streamed ListObjects API is very similar to the the ListObjects API, with two differences: 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected. 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE. + * @param storeId (required) + * @param body (required) + * @param configurationOverride Override the {@link Configuration} this OpenFgaApi was constructed with + * @return CompletableFuture<ApiResponse<StreamResultOfStreamedListObjectsResponse>> + * @throws ApiException if fails to make API call + */ + public CompletableFuture> streamedListObjects( + String storeId, ListObjectsRequest body, ConfigurationOverride configurationOverride) + throws ApiException, FgaInvalidParameterException { + return streamedListObjects(storeId, body, this.configuration.override(configurationOverride)); + } + + private CompletableFuture> streamedListObjects( + String storeId, ListObjectsRequest body, Configuration configuration) + throws ApiException, FgaInvalidParameterException { + + assertParamExists(storeId, "storeId", "streamedListObjects"); + + assertParamExists(body, "body", "streamedListObjects"); + + String path = "/stores/{store_id}/streamed-list-objects" + .replace("{store_id}", StringUtil.urlEncode(storeId.toString())); + + Map methodParameters = new HashMap<>(); + methodParameters.put("storeId", storeId); + methodParameters.put("body", body); + + Map telemetryAttributes = buildTelemetryAttributes(methodParameters); + + telemetryAttributes.put(Attributes.FGA_CLIENT_REQUEST_METHOD, "StreamedListObjects"); + + try { + HttpRequest request = buildHttpRequest("POST", path, body, configuration); + return new HttpRequestAttempt<>( + request, + "streamedListObjects", + StreamResultOfStreamedListObjectsResponse.class, + apiClient, + configuration) + .addTelemetryAttributes(telemetryAttributes) + .attemptHttpRequest(); + } catch (ApiException e) { + return CompletableFuture.failedFuture(e); + } + } + /** * Add or delete tuples from the store * The Write API will transactionally update the tuples for a certain store. Tuples and type definitions allow OpenFGA to determine whether a relationship exists between an object and an user. In the body, `writes` adds new tuples and `deletes` removes existing tuples. When deleting a tuple, any `condition` specified with it is ignored. The API is not idempotent by default: if, later on, you try to add the same tuple key (even if the `condition` is different), or if you try to delete a non-existing tuple, it will throw an error. To allow writes when an identical tuple already exists in the database, set `\"on_duplicate\": \"ignore\"` on the `writes` object. To allow deletes when a tuple was already removed from the database, set `\"on_missing\": \"ignore\"` on the `deletes` object. If a Write request contains both idempotent (ignore) and non-idempotent (error) operations, the most restrictive action (error) will take precedence. If a condition fails for a sub-request with an error flag, the entire transaction will be rolled back. This gives developers explicit control over the atomicity of the requests. The API will not allow you to write tuples such as `document:2021-budget#viewer@document:2021-budget#viewer`, because they are implicit. An `authorization_model_id` may be specified in the body. If it is, it will be used to assert that each written tuple (not deleted) is valid for the model specified. If it is not specified, the latest authorization model ID will be used. ## Example ### Adding relationships To add `user:anne` as a `writer` for `document:2021-budget`, call write API with the following ```json { \"writes\": { \"tuple_keys\": [ { \"user\": \"user:anne\", \"relation\": \"writer\", \"object\": \"document:2021-budget\" } ], \"on_duplicate\": \"ignore\" }, \"authorization_model_id\": \"01G50QVV17PECNVAHX1GG4Y5NC\" } ``` ### Removing relationships To remove `user:bob` as a `reader` for `document:2021-budget`, call write API with the following ```json { \"deletes\": { \"tuple_keys\": [ { \"user\": \"user:bob\", \"relation\": \"reader\", \"object\": \"document:2021-budget\" } ], \"on_missing\": \"ignore\" } } ``` diff --git a/src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java b/src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java new file mode 100644 index 00000000..bf4dfd5a --- /dev/null +++ b/src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java @@ -0,0 +1,251 @@ +package dev.openfga.sdk.api; + +import static dev.openfga.sdk.util.StringUtil.isNullOrWhitespace; +import static dev.openfga.sdk.util.Validation.assertParamExists; + +import com.fasterxml.jackson.databind.ObjectMapper; +import dev.openfga.sdk.api.client.ApiClient; +import dev.openfga.sdk.api.configuration.Configuration; +import dev.openfga.sdk.api.configuration.ConfigurationOverride; +import dev.openfga.sdk.api.model.ListObjectsRequest; +import dev.openfga.sdk.api.model.Status; +import dev.openfga.sdk.api.model.StreamResultOfStreamedListObjectsResponse; +import dev.openfga.sdk.api.model.StreamedListObjectsResponse; +import dev.openfga.sdk.errors.ApiException; +import dev.openfga.sdk.errors.FgaInvalidParameterException; +import dev.openfga.sdk.util.StringUtil; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.stream.Stream; + +/** + * API layer for handling streaming responses from the streamedListObjects endpoint. + * This class provides true asynchronous streaming with consumer callbacks using CompletableFuture + * and Java 11's HttpClient async streaming capabilities. + */ +public class StreamedListObjectsApi { + private final Configuration configuration; + private final ApiClient apiClient; + private final ObjectMapper objectMapper; + + public StreamedListObjectsApi(Configuration configuration, ApiClient apiClient) { + this.configuration = configuration; + this.apiClient = apiClient; + this.objectMapper = apiClient.getObjectMapper(); + } + + /** + * Stream all objects of the given type that the user has a relation with. + * Each streamed response is delivered to the consumer callback asynchronously as it arrives. + * Returns a CompletableFuture that completes when streaming is finished. + * + * @param storeId The store ID + * @param body The list objects request + * @param consumer Callback to handle each StreamedListObjectsResponse (invoked asynchronously) + * @return CompletableFuture that completes when streaming finishes + * @throws ApiException if the API call fails immediately + * @throws FgaInvalidParameterException if required parameters are missing + */ + public CompletableFuture streamedListObjects( + String storeId, ListObjectsRequest body, Consumer consumer) + throws ApiException, FgaInvalidParameterException { + return streamedListObjects(storeId, body, consumer, null, this.configuration); + } + + /** + * Stream all objects of the given type that the user has a relation with. + * Each streamed response is delivered to the consumer callback asynchronously as it arrives. + * Returns a CompletableFuture that completes when streaming is finished. + * + * @param storeId The store ID + * @param body The list objects request + * @param consumer Callback to handle each StreamedListObjectsResponse (invoked asynchronously) + * @param configurationOverride Configuration overrides (e.g., additional headers) + * @return CompletableFuture that completes when streaming finishes + * @throws ApiException if the API call fails immediately + * @throws FgaInvalidParameterException if required parameters are missing + */ + public CompletableFuture streamedListObjects( + String storeId, + ListObjectsRequest body, + Consumer consumer, + ConfigurationOverride configurationOverride) + throws ApiException, FgaInvalidParameterException { + return streamedListObjects(storeId, body, consumer, null, this.configuration.override(configurationOverride)); + } + + /** + * Stream all objects of the given type that the user has a relation with. + * Each streamed response is delivered to the consumer callback asynchronously as it arrives. + * Returns a CompletableFuture that completes when streaming is finished. + * + * @param storeId The store ID + * @param body The list objects request + * @param consumer Callback to handle each StreamedListObjectsResponse (invoked asynchronously) + * @param errorConsumer Optional callback to handle errors during streaming + * @return CompletableFuture that completes when streaming finishes or exceptionally on error + * @throws ApiException if the API call fails immediately + * @throws FgaInvalidParameterException if required parameters are missing + */ + public CompletableFuture streamedListObjects( + String storeId, + ListObjectsRequest body, + Consumer consumer, + Consumer errorConsumer) + throws ApiException, FgaInvalidParameterException { + return streamedListObjects(storeId, body, consumer, errorConsumer, this.configuration); + } + + /** + * Stream all objects of the given type that the user has a relation with. + * Each streamed response is delivered to the consumer callback asynchronously as it arrives. + * Returns a CompletableFuture that completes when streaming is finished. + * + * @param storeId The store ID + * @param body The list objects request + * @param consumer Callback to handle each StreamedListObjectsResponse (invoked asynchronously) + * @param errorConsumer Optional callback to handle errors during streaming + * @param configurationOverride Configuration overrides (e.g., additional headers) + * @return CompletableFuture that completes when streaming finishes or exceptionally on error + * @throws ApiException if the API call fails immediately + * @throws FgaInvalidParameterException if required parameters are missing + */ + public CompletableFuture streamedListObjects( + String storeId, + ListObjectsRequest body, + Consumer consumer, + Consumer errorConsumer, + ConfigurationOverride configurationOverride) + throws ApiException, FgaInvalidParameterException { + return streamedListObjects( + storeId, body, consumer, errorConsumer, this.configuration.override(configurationOverride)); + } + + /** + * Internal implementation that accepts a final Configuration to use for the request. + */ + private CompletableFuture streamedListObjects( + String storeId, + ListObjectsRequest body, + Consumer consumer, + Consumer errorConsumer, + Configuration configuration) + throws ApiException, FgaInvalidParameterException { + + assertParamExists(storeId, "storeId", "streamedListObjects"); + assertParamExists(body, "body", "streamedListObjects"); + + String path = "/stores/{store_id}/streamed-list-objects" + .replace("{store_id}", StringUtil.urlEncode(storeId.toString())); + + try { + HttpRequest request = buildHttpRequest("POST", path, body, configuration); + + // Use async HTTP client with streaming body handler + // ofLines() provides line-by-line streaming which is perfect for NDJSON + return apiClient + .getHttpClient() + .sendAsync(request, HttpResponse.BodyHandlers.ofLines()) + .thenCompose(response -> { + // Check response status + int statusCode = response.statusCode(); + if (statusCode < 200 || statusCode >= 300) { + ApiException apiException = + new ApiException(statusCode, "API error: " + statusCode, response.headers(), null); + return CompletableFuture.failedFuture(apiException); + } + + // Process the stream - this runs on HttpClient's executor thread + try (Stream lines = response.body()) { + lines.forEach(line -> { + if (!isNullOrWhitespace(line)) { + processLine(line, consumer, errorConsumer); + } + }); + return CompletableFuture.completedFuture((Void) null); + } catch (Exception e) { + return CompletableFuture.failedFuture(e); + } + }) + .handle((result, throwable) -> { + if (throwable != null) { + // Unwrap CompletionException to get the original exception + Throwable actualException = throwable; + if (throwable instanceof java.util.concurrent.CompletionException + && throwable.getCause() != null) { + actualException = throwable.getCause(); + } + + if (errorConsumer != null) { + errorConsumer.accept(actualException); + } + // Re-throw to keep the CompletableFuture in failed state + if (actualException instanceof RuntimeException) { + throw (RuntimeException) actualException; + } + throw new RuntimeException(actualException); + } + return result; + }); + + } catch (Exception e) { + if (errorConsumer != null) { + errorConsumer.accept(e); + } + return CompletableFuture.failedFuture(e); + } + } + + /** + * Process a single line from the NDJSON stream + */ + private void processLine( + String line, Consumer consumer, Consumer errorConsumer) { + try { + // Parse the JSON line to extract the object + StreamResultOfStreamedListObjectsResponse streamResult = + objectMapper.readValue(line, StreamResultOfStreamedListObjectsResponse.class); + + if (streamResult.getError() != null) { + // Handle error in stream + if (errorConsumer != null) { + Status error = streamResult.getError(); + String errorMessage = error.getMessage() != null + ? "Stream error: " + error.getMessage() + : "Stream error: " + (error.getCode() != null ? "code " + error.getCode() : "unknown"); + errorConsumer.accept(new ApiException(errorMessage)); + } + } else if (streamResult.getResult() != null) { + // Deliver the response object to the consumer + StreamedListObjectsResponse result = streamResult.getResult(); + if (result != null) { + consumer.accept(result); + } + } + } catch (Exception e) { + if (errorConsumer != null) { + errorConsumer.accept(e); + } + } + } + + private HttpRequest buildHttpRequest(String method, String path, Object body, Configuration configuration) + throws ApiException, FgaInvalidParameterException { + try { + byte[] bodyBytes = objectMapper.writeValueAsBytes(body); + HttpRequest.Builder requestBuilder = ApiClient.requestBuilder(method, path, bodyBytes, configuration); + + // Apply request interceptors if any + var interceptor = apiClient.getRequestInterceptor(); + if (interceptor != null) { + interceptor.accept(requestBuilder); + } + + return requestBuilder.build(); + } catch (Exception e) { + throw new ApiException(e); + } + } +} diff --git a/src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java b/src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java index fb6c3eb5..4d4df9c3 100644 --- a/src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java +++ b/src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java @@ -1104,6 +1104,167 @@ public CompletableFuture listObjects( return call(() -> api.listObjects(storeId, body, overrides)).thenApply(ClientListObjectsResponse::new); } + /** + * StreamedListObjects - Stream all objects of a particular type that the user has a relation to. + * This method provides true asynchronous streaming with consumer callbacks. + * Objects are delivered to the consumer as they are received from the server asynchronously. + * Returns a CompletableFuture that completes when streaming is finished. + * + *

Example usage:

+ *
{@code
+     * ClientListObjectsRequest request = new ClientListObjectsRequest()
+     *     .user("user:anne")
+     *     .relation("viewer")
+     *     .type("document");
+     *
+     * client.streamedListObjects(request,
+     *     response -> System.out.println("Found object: " + response.getObject())
+     * ).thenRun(() -> System.out.println("Streaming complete"))
+     *  .exceptionally(error -> {
+     *      System.err.println("Error: " + error.getMessage());
+     *      return null;
+     *  });
+     * }
+ * + * @param request The list objects request containing type, relation, and user + * @param consumer Callback to handle each StreamedListObjectsResponse as it arrives + * @return CompletableFuture that completes when streaming finishes + * @throws FgaInvalidParameterException When the Store ID is null, empty, or whitespace, or consumer is null + */ + public CompletableFuture streamedListObjects( + ClientListObjectsRequest request, Consumer consumer) + throws FgaInvalidParameterException { + if (consumer == null) { + throw new FgaInvalidParameterException("consumer", "streamedListObjects"); + } + return streamedListObjects(request, null, consumer, null); + } + + /** + * StreamedListObjects - Stream all objects of a particular type that the user has a relation to. + * This method provides true asynchronous streaming with consumer callbacks. + * Objects are delivered to the consumer as they are received from the server asynchronously. + * Returns a CompletableFuture that completes when streaming is finished. + * + *

Example usage with options:

+ *
{@code
+     * ClientListObjectsRequest request = new ClientListObjectsRequest()
+     *     .user("user:anne")
+     *     .relation("viewer")
+     *     .type("document");
+     *
+     * ClientStreamedListObjectsOptions options = new ClientStreamedListObjectsOptions()
+     *     .authorizationModelId("01HVMMBCMGZNT3SED4Z17ECXCA");
+     *
+     * client.streamedListObjects(request, options,
+     *     response -> System.out.println("Found object: " + response.getObject())
+     * ).thenRun(() -> System.out.println("Streaming complete"))
+     *  .exceptionally(error -> {
+     *      System.err.println("Error: " + error.getMessage());
+     *      return null;
+     *  });
+     * }
+ * + * @param request The list objects request containing type, relation, and user + * @param options Options for the streaming request + * @param consumer Callback to handle each StreamedListObjectsResponse as it arrives + * @return CompletableFuture that completes when streaming finishes + * @throws FgaInvalidParameterException When the Store ID is null, empty, or whitespace, or consumer is null + */ + public CompletableFuture streamedListObjects( + ClientListObjectsRequest request, + ClientStreamedListObjectsOptions options, + Consumer consumer) + throws FgaInvalidParameterException { + if (consumer == null) { + throw new FgaInvalidParameterException("consumer", "streamedListObjects"); + } + return streamedListObjects(request, options, consumer, null); + } + + /** + * StreamedListObjects - Stream all objects of a particular type that the user has a relation to. + * This method provides true asynchronous streaming with consumer callbacks. + * Objects are delivered to the consumer as they are received from the server asynchronously. + * Returns a CompletableFuture that completes when streaming is finished. + * + *

Example usage with error handling:

+ *
{@code
+     * ClientListObjectsRequest request = new ClientListObjectsRequest()
+     *     .user("user:anne")
+     *     .relation("viewer")
+     *     .type("document");
+     *
+     * ClientStreamedListObjectsOptions options = new ClientStreamedListObjectsOptions()
+     *     .authorizationModelId("01HVMMBCMGZNT3SED4Z17ECXCA");
+     *
+     * client.streamedListObjects(request, options,
+     *     response -> System.out.println("Found object: " + response.getObject()),
+     *     error -> System.err.println("Streaming error: " + error.getMessage())
+     * ).thenRun(() -> System.out.println("Streaming complete"));
+     * }
+ * + * @param request The list objects request containing type, relation, and user + * @param options Options for the streaming request + * @param consumer Callback to handle each StreamedListObjectsResponse as it arrives + * @param errorConsumer Optional callback to handle errors during streaming + * @return CompletableFuture that completes when streaming finishes or exceptionally on error + * @throws FgaInvalidParameterException When the Store ID is null, empty, or whitespace, or consumer is null + */ + public CompletableFuture streamedListObjects( + ClientListObjectsRequest request, + ClientStreamedListObjectsOptions options, + Consumer consumer, + Consumer errorConsumer) + throws FgaInvalidParameterException { + if (consumer == null) { + throw new FgaInvalidParameterException("consumer", "streamedListObjects"); + } + configuration.assertValid(); + String storeId = configuration.getStoreIdChecked(); + + ListObjectsRequest body = new ListObjectsRequest(); + + if (request != null) { + body.user(request.getUser()).relation(request.getRelation()).type(request.getType()); + if (request.getContextualTupleKeys() != null) { + var contextualTuples = request.getContextualTupleKeys(); + var bodyContextualTuples = ClientTupleKey.asContextualTupleKeys(contextualTuples); + body.contextualTuples(bodyContextualTuples); + } + if (request.getContext() != null) { + body.context(request.getContext()); + } + } + + if (options != null) { + if (options.getConsistency() != null) { + body.consistency(options.getConsistency()); + } + + // Set authorizationModelId from options if available; otherwise, use the default from configuration + String authorizationModelId = !isNullOrWhitespace(options.getAuthorizationModelId()) + ? options.getAuthorizationModelId() + : configuration.getAuthorizationModelId(); + body.authorizationModelId(authorizationModelId); + } else { + body.setAuthorizationModelId(configuration.getAuthorizationModelId()); + } + + var overrides = new ConfigurationOverride().addHeaders(options); + + // Create streaming API instance and execute streaming request asynchronously + StreamedListObjectsApi streamingApi = new StreamedListObjectsApi(configuration, apiClient); + try { + return streamingApi.streamedListObjects(storeId, body, consumer, errorConsumer, overrides); + } catch (ApiException e) { + if (errorConsumer != null) { + errorConsumer.accept(e); + } + return CompletableFuture.failedFuture(e); + } + } + /** * ListRelations - List allowed relations a user has with an object (evaluates) */ diff --git a/src/main/java/dev/openfga/sdk/api/client/model/ClientStreamedListObjectsOptions.java b/src/main/java/dev/openfga/sdk/api/client/model/ClientStreamedListObjectsOptions.java new file mode 100644 index 00000000..bf4c8c61 --- /dev/null +++ b/src/main/java/dev/openfga/sdk/api/client/model/ClientStreamedListObjectsOptions.java @@ -0,0 +1,56 @@ +package dev.openfga.sdk.api.client.model; + +import dev.openfga.sdk.api.configuration.AdditionalHeadersSupplier; +import dev.openfga.sdk.api.model.ConsistencyPreference; +import java.util.Map; + +/** + * Options for the streamedListObjects API call. + * + *

This class allows you to configure the streaming request with: + *

    + *
  • Authorization model ID - Override the default model ID for this request
  • + *
  • Consistency preference - Specify the desired consistency level
  • + *
  • Additional headers - Include custom HTTP headers in the request
  • + *
+ * + *

Example usage: + *

+ * ClientStreamedListObjectsOptions options = new ClientStreamedListObjectsOptions()
+ *     .authorizationModelId("custom-model-id")
+ *     .consistency(ConsistencyPreference.HIGHER_CONSISTENCY);
+ * 
+ */ +public class ClientStreamedListObjectsOptions implements AdditionalHeadersSupplier { + private String authorizationModelId; + private ConsistencyPreference consistency; + private Map additionalHeaders; + + public ClientStreamedListObjectsOptions authorizationModelId(String authorizationModelId) { + this.authorizationModelId = authorizationModelId; + return this; + } + + public String getAuthorizationModelId() { + return authorizationModelId; + } + + public ClientStreamedListObjectsOptions consistency(ConsistencyPreference consistency) { + this.consistency = consistency; + return this; + } + + public ConsistencyPreference getConsistency() { + return consistency; + } + + public ClientStreamedListObjectsOptions additionalHeaders(Map additionalHeaders) { + this.additionalHeaders = additionalHeaders; + return this; + } + + @Override + public Map getAdditionalHeaders() { + return additionalHeaders; + } +} diff --git a/src/main/java/dev/openfga/sdk/api/model/StreamResultOfStreamedListObjectsResponse.java b/src/main/java/dev/openfga/sdk/api/model/StreamResultOfStreamedListObjectsResponse.java new file mode 100644 index 00000000..20331735 --- /dev/null +++ b/src/main/java/dev/openfga/sdk/api/model/StreamResultOfStreamedListObjectsResponse.java @@ -0,0 +1,168 @@ +/* + * OpenFGA + * A high performance and flexible authorization/permission engine built for developers and inspired by Google Zanzibar. + * + * The version of the OpenAPI document: 1.x + * Contact: community@openfga.dev + * + * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech). + * https://openapi-generator.tech + * Do not edit the class manually. + */ + +package dev.openfga.sdk.api.model; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import java.util.Objects; +import java.util.StringJoiner; + +/** + * StreamResultOfStreamedListObjectsResponse + */ +@JsonPropertyOrder({ + StreamResultOfStreamedListObjectsResponse.JSON_PROPERTY_RESULT, + StreamResultOfStreamedListObjectsResponse.JSON_PROPERTY_ERROR +}) +public class StreamResultOfStreamedListObjectsResponse { + public static final String JSON_PROPERTY_RESULT = "result"; + private StreamedListObjectsResponse result; + + public static final String JSON_PROPERTY_ERROR = "error"; + private Status error; + + public StreamResultOfStreamedListObjectsResponse() {} + + public StreamResultOfStreamedListObjectsResponse result(StreamedListObjectsResponse result) { + this.result = result; + return this; + } + + /** + * Get result + * @return result + **/ + @javax.annotation.Nullable + @JsonProperty(JSON_PROPERTY_RESULT) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + public StreamedListObjectsResponse getResult() { + return result; + } + + @JsonProperty(JSON_PROPERTY_RESULT) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + public void setResult(StreamedListObjectsResponse result) { + this.result = result; + } + + public StreamResultOfStreamedListObjectsResponse error(Status error) { + this.error = error; + return this; + } + + /** + * Get error + * @return error + **/ + @javax.annotation.Nullable + @JsonProperty(JSON_PROPERTY_ERROR) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + public Status getError() { + return error; + } + + @JsonProperty(JSON_PROPERTY_ERROR) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + public void setError(Status error) { + this.error = error; + } + + /** + * Return true if this Stream_result_of_StreamedListObjectsResponse object is equal to o. + */ + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + StreamResultOfStreamedListObjectsResponse streamResultOfStreamedListObjectsResponse = + (StreamResultOfStreamedListObjectsResponse) o; + return Objects.equals(this.result, streamResultOfStreamedListObjectsResponse.result) + && Objects.equals(this.error, streamResultOfStreamedListObjectsResponse.error); + } + + @Override + public int hashCode() { + return Objects.hash(result, error); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("class StreamResultOfStreamedListObjectsResponse {\n"); + sb.append(" result: ").append(toIndentedString(result)).append("\n"); + sb.append(" error: ").append(toIndentedString(error)).append("\n"); + sb.append("}"); + return sb.toString(); + } + + /** + * Convert the given object to string with each line indented by 4 spaces + * (except the first line). + */ + private String toIndentedString(Object o) { + if (o == null) { + return "null"; + } + return o.toString().replace("\n", "\n "); + } + + /** + * Convert the instance into URL query string. + * + * @return URL query string + */ + public String toUrlQueryString() { + return toUrlQueryString(null); + } + + /** + * Convert the instance into URL query string. + * + * @param prefix prefix of the query string + * @return URL query string + */ + public String toUrlQueryString(String prefix) { + String suffix = ""; + String containerSuffix = ""; + String containerPrefix = ""; + if (prefix == null) { + // style=form, explode=true, e.g. /pet?name=cat&type=manx + prefix = ""; + } else { + // deepObject style e.g. /pet?id[name]=cat&id[type]=manx + prefix = prefix + "["; + suffix = "]"; + containerSuffix = "]"; + containerPrefix = "["; + } + + StringJoiner joiner = new StringJoiner("&"); + + // add `result` to the URL query string + if (getResult() != null) { + joiner.add(getResult().toUrlQueryString(prefix + "result" + suffix)); + } + + // add `error` to the URL query string + if (getError() != null) { + joiner.add(getError().toUrlQueryString(prefix + "error" + suffix)); + } + + return joiner.toString(); + } +} diff --git a/src/main/java/dev/openfga/sdk/api/model/StreamedListObjectsResponse.java b/src/main/java/dev/openfga/sdk/api/model/StreamedListObjectsResponse.java new file mode 100644 index 00000000..43c02ab5 --- /dev/null +++ b/src/main/java/dev/openfga/sdk/api/model/StreamedListObjectsResponse.java @@ -0,0 +1,139 @@ +/* + * OpenFGA + * A high performance and flexible authorization/permission engine built for developers and inspired by Google Zanzibar. + * + * The version of the OpenAPI document: 1.x + * Contact: community@openfga.dev + * + * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech). + * https://openapi-generator.tech + * Do not edit the class manually. + */ + +package dev.openfga.sdk.api.model; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.util.Objects; +import java.util.StringJoiner; + +/** + * The response for a StreamedListObjects RPC. + */ +@JsonPropertyOrder({StreamedListObjectsResponse.JSON_PROPERTY_OBJECT}) +public class StreamedListObjectsResponse { + public static final String JSON_PROPERTY_OBJECT = "object"; + private String _object; + + public StreamedListObjectsResponse() {} + + public StreamedListObjectsResponse _object(String _object) { + this._object = _object; + return this; + } + + /** + * Get _object + * @return _object + **/ + @javax.annotation.Nonnull + @JsonProperty(JSON_PROPERTY_OBJECT) + @JsonInclude(value = JsonInclude.Include.ALWAYS) + public String getObject() { + return _object; + } + + @JsonProperty(JSON_PROPERTY_OBJECT) + @JsonInclude(value = JsonInclude.Include.ALWAYS) + public void setObject(String _object) { + this._object = _object; + } + + /** + * Return true if this StreamedListObjectsResponse object is equal to o. + */ + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + StreamedListObjectsResponse streamedListObjectsResponse = (StreamedListObjectsResponse) o; + return Objects.equals(this._object, streamedListObjectsResponse._object); + } + + @Override + public int hashCode() { + return Objects.hash(_object); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("class StreamedListObjectsResponse {\n"); + sb.append(" _object: ").append(toIndentedString(_object)).append("\n"); + sb.append("}"); + return sb.toString(); + } + + /** + * Convert the given object to string with each line indented by 4 spaces + * (except the first line). + */ + private String toIndentedString(Object o) { + if (o == null) { + return "null"; + } + return o.toString().replace("\n", "\n "); + } + + /** + * Convert the instance into URL query string. + * + * @return URL query string + */ + public String toUrlQueryString() { + return toUrlQueryString(null); + } + + /** + * Convert the instance into URL query string. + * + * @param prefix prefix of the query string + * @return URL query string + */ + public String toUrlQueryString(String prefix) { + String suffix = ""; + String containerSuffix = ""; + String containerPrefix = ""; + if (prefix == null) { + // style=form, explode=true, e.g. /pet?name=cat&type=manx + prefix = ""; + } else { + // deepObject style e.g. /pet?id[name]=cat&id[type]=manx + prefix = prefix + "["; + suffix = "]"; + containerSuffix = "]"; + containerPrefix = "["; + } + + StringJoiner joiner = new StringJoiner("&"); + + // add `object` to the URL query string + if (getObject() != null) { + joiner.add(String.format( + "%sobject%s=%s", + prefix, + suffix, + URLEncoder.encode(String.valueOf(getObject()), StandardCharsets.UTF_8) + .replaceAll("\\+", "%20"))); + } + + return joiner.toString(); + } +} diff --git a/src/test-integration/java/dev/openfga/sdk/api/client/OpenFgaClientIntegrationTest.java b/src/test-integration/java/dev/openfga/sdk/api/client/OpenFgaClientIntegrationTest.java index 8ed6255d..1501ba89 100644 --- a/src/test-integration/java/dev/openfga/sdk/api/client/OpenFgaClientIntegrationTest.java +++ b/src/test-integration/java/dev/openfga/sdk/api/client/OpenFgaClientIntegrationTest.java @@ -11,6 +11,7 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.util.List; +import java.util.concurrent.CompletableFuture; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -372,6 +373,98 @@ private String writeAuthModel(String storeId) throws Exception { return response.getAuthorizationModelId(); } + @Test + public void streamedListObjects() throws Exception { + // Given - Create a single store for all streaming tests + String storeId = createStore(thisTestName()); + fga.setStoreId(storeId); + String authorizationModelId = writeAuthModel(storeId); + fga.setAuthorizationModelId(authorizationModelId); + + // Write tuples for different test scenarios + // Tuples for basic streaming test (user:test) + for (int i = 0; i < 50; i++) { + ClientWriteRequest writeRequest = new ClientWriteRequest() + .writes(List.of(new ClientTupleKey() + .user("user:test") + .relation("reader") + ._object("document:test-" + i))); + fga.write(writeRequest).get(); + } + + // Tuples for error handling test (user:error-test) + for (int i = 0; i < 10; i++) { + ClientWriteRequest writeRequest = new ClientWriteRequest() + .writes(List.of(new ClientTupleKey() + .user("user:error-test") + .relation("reader") + ._object("document:error-test-" + i))); + fga.write(writeRequest).get(); + } + + // Tuples for chaining operations test (user:chain-test) + for (int i = 0; i < 20; i++) { + ClientWriteRequest writeRequest = new ClientWriteRequest() + .writes(List.of(new ClientTupleKey() + .user("user:chain-test") + .relation("reader") + ._object("document:chain-" + i))); + fga.write(writeRequest).get(); + } + + // Test 1: Basic streaming - verify async execution and all objects received + List streamedObjects = new java.util.ArrayList<>(); + ClientListObjectsRequest request1 = new ClientListObjectsRequest() + .type("document") + .relation("reader") + .user("user:test"); + + CompletableFuture streamingFuture1 = + fga.streamedListObjects(request1, response -> streamedObjects.add(response.getObject())); + streamingFuture1.get(); // Wait for completion + + assertEquals(50, streamedObjects.size()); + for (int i = 0; i < 50; i++) { + assertTrue(streamedObjects.contains("document:test-" + i)); + } + + // Test 2: Error handling - verify error consumer works + List errorTestObjects = new java.util.ArrayList<>(); + List errors = new java.util.ArrayList<>(); + ClientListObjectsRequest request2 = new ClientListObjectsRequest() + .type("document") + .relation("reader") + .user("user:error-test"); + + CompletableFuture streamingFuture2 = fga.streamedListObjects( + request2, null, response -> errorTestObjects.add(response.getObject()), errors::add); + streamingFuture2.get(); + + assertEquals(10, errorTestObjects.size()); + assertEquals(0, errors.size()); // Should have no errors in normal operation + + // Test 3: Chaining operations - verify CompletableFuture chaining works + List chainTestObjects = new java.util.ArrayList<>(); + ClientListObjectsRequest request3 = new ClientListObjectsRequest() + .type("document") + .relation("reader") + .user("user:chain-test"); + + java.util.concurrent.atomic.AtomicBoolean chainedOperationExecuted = + new java.util.concurrent.atomic.AtomicBoolean(false); + + CompletableFuture chainedFuture = fga.streamedListObjects( + request3, response -> chainTestObjects.add(response.getObject())) + .thenRun(() -> { + chainedOperationExecuted.set(true); + }); + + chainedFuture.get(); // Wait for all chained operations + + assertEquals(20, chainTestObjects.size()); + assertTrue(chainedOperationExecuted.get()); + } + /** Get the name of the test that invokes this function. Returned in the form: "$class.$fn" */ private String thisTestName() { // Tracing the stack gives an array of: diff --git a/src/test/java/dev/openfga/sdk/api/client/StreamedListObjectsTest.java b/src/test/java/dev/openfga/sdk/api/client/StreamedListObjectsTest.java new file mode 100644 index 00000000..a509f68a --- /dev/null +++ b/src/test/java/dev/openfga/sdk/api/client/StreamedListObjectsTest.java @@ -0,0 +1,426 @@ +package dev.openfga.sdk.api.client; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +import com.fasterxml.jackson.databind.ObjectMapper; +import dev.openfga.sdk.api.client.model.ClientListObjectsRequest; +import dev.openfga.sdk.api.client.model.ClientStreamedListObjectsOptions; +import dev.openfga.sdk.api.configuration.ClientConfiguration; +import dev.openfga.sdk.api.configuration.Credentials; +import dev.openfga.sdk.api.model.ConsistencyPreference; +import dev.openfga.sdk.api.model.StreamedListObjectsResponse; +import dev.openfga.sdk.constants.FgaConstants; +import dev.openfga.sdk.errors.FgaInvalidParameterException; +import java.net.http.HttpClient; +import java.net.http.HttpHeaders; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** Tests for streaming list objects functionality with CompletableFuture. */ +public class StreamedListObjectsTest { + private static final String DEFAULT_STORE_ID = "01YCP46JKYM8FJCQ37NMBYHE5X"; + private static final String DEFAULT_AUTH_MODEL_ID = "01G5JAVJ41T49E9TT3SKVS7X1J"; + private static final String DEFAULT_USER = "user:81684243-9356-4421-8fbf-a4f8d36aa31b"; + private static final String DEFAULT_RELATION = "owner"; + private static final String DEFAULT_TYPE = "document"; + + private OpenFgaClient fga; + private ClientConfiguration clientConfiguration; + private HttpClient mockHttpClient; + private ApiClient mockApiClient; + + @BeforeEach + public void beforeEachTest() throws Exception { + mockHttpClient = mock(HttpClient.class); + var mockHttpClientBuilder = mock(HttpClient.Builder.class); + when(mockHttpClientBuilder.executor(any())).thenReturn(mockHttpClientBuilder); + when(mockHttpClientBuilder.build()).thenReturn(mockHttpClient); + + clientConfiguration = new ClientConfiguration() + .storeId(DEFAULT_STORE_ID) + .authorizationModelId(DEFAULT_AUTH_MODEL_ID) + .apiUrl(FgaConstants.TEST_API_URL) + .credentials(new Credentials()) + .readTimeout(Duration.ofMillis(250)); + + mockApiClient = mock(ApiClient.class); + when(mockApiClient.getHttpClient()).thenReturn(mockHttpClient); + when(mockApiClient.getObjectMapper()).thenReturn(new ObjectMapper()); + when(mockApiClient.getHttpClientBuilder()).thenReturn(mockHttpClientBuilder); + + fga = new OpenFgaClient(clientConfiguration, mockApiClient); + } + + @Test + public void streamedListObjects_success() throws Exception { + // Given + String line1 = "{\"result\":{\"object\":\"document:1\"}}"; + String line2 = "{\"result\":{\"object\":\"document:2\"}}"; + String line3 = "{\"result\":{\"object\":\"document:3\"}}"; + Stream streamResponse = Stream.of(line1, line2, line3); + + HttpResponse> mockResponse = createMockStreamResponse(200, streamResponse); + CompletableFuture>> responseFuture = + CompletableFuture.completedFuture(mockResponse); + + when(mockHttpClient.>sendAsync(any(), any())).thenReturn(responseFuture); + + List receivedObjects = new ArrayList<>(); + ClientListObjectsRequest request = new ClientListObjectsRequest() + .type(DEFAULT_TYPE) + .relation(DEFAULT_RELATION) + .user(DEFAULT_USER); + + // When + CompletableFuture future = fga.streamedListObjects(request, receivedObjects::add); + future.get(); // Wait for completion + + // Then + assertEquals(3, receivedObjects.size()); + assertEquals("document:1", receivedObjects.get(0).getObject()); + assertEquals("document:2", receivedObjects.get(1).getObject()); + assertEquals("document:3", receivedObjects.get(2).getObject()); + verify(mockHttpClient, times(1)).sendAsync(any(), any()); + } + + @Test + public void streamedListObjects_withOptions() throws Exception { + // Given + String line1 = "{\"result\":{\"object\":\"document:1\"}}"; + Stream streamResponse = Stream.of(line1); + + HttpResponse> mockResponse = createMockStreamResponse(200, streamResponse); + CompletableFuture>> responseFuture = + CompletableFuture.completedFuture(mockResponse); + + when(mockHttpClient.>sendAsync(any(), any())).thenReturn(responseFuture); + + List receivedObjects = new ArrayList<>(); + ClientListObjectsRequest request = new ClientListObjectsRequest() + .type(DEFAULT_TYPE) + .relation(DEFAULT_RELATION) + .user(DEFAULT_USER); + + ClientStreamedListObjectsOptions options = new ClientStreamedListObjectsOptions() + .authorizationModelId("custom-model-id") + .consistency(ConsistencyPreference.HIGHER_CONSISTENCY); + + // When + CompletableFuture future = fga.streamedListObjects(request, options, receivedObjects::add); + future.get(); // Wait for completion + + // Then + assertEquals(1, receivedObjects.size()); + assertEquals("document:1", receivedObjects.get(0).getObject()); + } + + @Test + public void streamedListObjects_emptyStream() throws Exception { + // Given + Stream streamResponse = Stream.empty(); + + HttpResponse> mockResponse = createMockStreamResponse(200, streamResponse); + CompletableFuture>> responseFuture = + CompletableFuture.completedFuture(mockResponse); + + when(mockHttpClient.>sendAsync(any(), any())).thenReturn(responseFuture); + + List receivedObjects = new ArrayList<>(); + ClientListObjectsRequest request = new ClientListObjectsRequest() + .type(DEFAULT_TYPE) + .relation(DEFAULT_RELATION) + .user(DEFAULT_USER); + + // When + CompletableFuture future = fga.streamedListObjects(request, receivedObjects::add); + future.get(); // Wait for completion + + // Then + assertEquals(0, receivedObjects.size()); + } + + @Test + public void streamedListObjects_storeIdRequired() { + // Given + clientConfiguration.storeId(null); + ClientListObjectsRequest request = new ClientListObjectsRequest() + .type(DEFAULT_TYPE) + .relation(DEFAULT_RELATION) + .user(DEFAULT_USER); + + // When/Then + var exception = assertThrows(FgaInvalidParameterException.class, () -> { + fga.streamedListObjects(request, response -> {}); + }); + + assertEquals( + "Required parameter storeId was invalid when calling ClientConfiguration.", exception.getMessage()); + } + + @Test + public void streamedListObjects_errorHandling() throws Exception { + // Given + String line1 = "{\"result\":{\"object\":\"document:1\"}}"; + String line2 = "{\"error\":{\"message\":\"Something went wrong\"}}"; + String line3 = "{\"result\":{\"object\":\"document:2\"}}"; + Stream streamResponse = Stream.of(line1, line2, line3); + + HttpResponse> mockResponse = createMockStreamResponse(200, streamResponse); + CompletableFuture>> responseFuture = + CompletableFuture.completedFuture(mockResponse); + + when(mockHttpClient.>sendAsync(any(), any())).thenReturn(responseFuture); + + List receivedObjects = new ArrayList<>(); + List receivedErrors = new ArrayList<>(); + ClientListObjectsRequest request = new ClientListObjectsRequest() + .type(DEFAULT_TYPE) + .relation(DEFAULT_RELATION) + .user(DEFAULT_USER); + + // When + CompletableFuture future = + fga.streamedListObjects(request, null, receivedObjects::add, receivedErrors::add); + future.get(); // Wait for completion + + // Then + assertEquals(2, receivedObjects.size()); + assertEquals("document:1", receivedObjects.get(0).getObject()); + assertEquals("document:2", receivedObjects.get(1).getObject()); + assertEquals(1, receivedErrors.size()); + } + + @Test + public void streamedListObjects_errorHandlingWithNullMessage() throws Exception { + // Given - error with null message and no code + String line1 = "{\"result\":{\"object\":\"document:1\"}}"; + String line2 = "{\"error\":{}}"; // Empty error object (null message, null code) + String line3 = "{\"error\":{\"code\":123}}"; // Error with code but null message + Stream streamResponse = Stream.of(line1, line2, line3); + + HttpResponse> mockResponse = createMockStreamResponse(200, streamResponse); + CompletableFuture>> responseFuture = + CompletableFuture.completedFuture(mockResponse); + + when(mockHttpClient.>sendAsync(any(), any())).thenReturn(responseFuture); + + List receivedObjects = new ArrayList<>(); + List receivedErrors = new ArrayList<>(); + ClientListObjectsRequest request = new ClientListObjectsRequest() + .type(DEFAULT_TYPE) + .relation(DEFAULT_RELATION) + .user(DEFAULT_USER); + + // When + CompletableFuture future = + fga.streamedListObjects(request, null, receivedObjects::add, receivedErrors::add); + future.get(); // Wait for completion + + // Then - should handle null messages gracefully without NPE + assertEquals(1, receivedObjects.size()); + assertEquals(2, receivedErrors.size()); + + // Verify first error has a fallback message + assertTrue(receivedErrors.get(0) instanceof dev.openfga.sdk.errors.ApiException); + String firstErrorMsg = ((dev.openfga.sdk.errors.ApiException) receivedErrors.get(0)).getMessage(); + assertTrue(firstErrorMsg.contains("Stream error")); + assertTrue(firstErrorMsg.contains("unknown")); + + // Verify second error has code in message + assertTrue(receivedErrors.get(1) instanceof dev.openfga.sdk.errors.ApiException); + String secondErrorMsg = ((dev.openfga.sdk.errors.ApiException) receivedErrors.get(1)).getMessage(); + assertTrue(secondErrorMsg.contains("Stream error")); + assertTrue(secondErrorMsg.contains("code 123")); + } + + @Test + public void streamedListObjects_httpError() throws Exception { + // Given + Stream streamResponse = Stream.empty(); + HttpResponse> mockResponse = createMockStreamResponse(400, streamResponse); + CompletableFuture>> responseFuture = + CompletableFuture.completedFuture(mockResponse); + + when(mockHttpClient.>sendAsync(any(), any())).thenReturn(responseFuture); + + List receivedObjects = new ArrayList<>(); + List receivedErrors = new ArrayList<>(); + ClientListObjectsRequest request = new ClientListObjectsRequest() + .type(DEFAULT_TYPE) + .relation(DEFAULT_RELATION) + .user(DEFAULT_USER); + + // When + CompletableFuture future = + fga.streamedListObjects(request, null, receivedObjects::add, receivedErrors::add); + + try { + future.get(); // Wait for completion - should fail + fail("Expected exception"); + } catch (Exception e) { + // Expected + } + + // Then + assertEquals(0, receivedObjects.size()); + assertEquals(1, receivedErrors.size()); + } + + @Test + public void streamedListObjects_consumerInvocationCount() throws Exception { + // Given + int expectedCount = 100; + List lines = new ArrayList<>(); + for (int i = 0; i < expectedCount; i++) { + lines.add(String.format("{\"result\":{\"object\":\"document:%d\"}}", i)); + } + Stream streamResponse = lines.stream(); + + HttpResponse> mockResponse = createMockStreamResponse(200, streamResponse); + CompletableFuture>> responseFuture = + CompletableFuture.completedFuture(mockResponse); + + when(mockHttpClient.>sendAsync(any(), any())).thenReturn(responseFuture); + + AtomicInteger callCount = new AtomicInteger(0); + ClientListObjectsRequest request = new ClientListObjectsRequest() + .type(DEFAULT_TYPE) + .relation(DEFAULT_RELATION) + .user(DEFAULT_USER); + + // When + CompletableFuture future = fga.streamedListObjects(request, response -> callCount.incrementAndGet()); + future.get(); // Wait for completion + + // Then + assertEquals(expectedCount, callCount.get()); + } + + @Test + public void streamedListObjects_chainingWithOtherOperations() throws Exception { + // Given + String line1 = "{\"result\":{\"object\":\"document:1\"}}"; + Stream streamResponse = Stream.of(line1); + + HttpResponse> mockResponse = createMockStreamResponse(200, streamResponse); + CompletableFuture>> responseFuture = + CompletableFuture.completedFuture(mockResponse); + + when(mockHttpClient.>sendAsync(any(), any())).thenReturn(responseFuture); + + List receivedObjects = new ArrayList<>(); + ClientListObjectsRequest request = new ClientListObjectsRequest() + .type(DEFAULT_TYPE) + .relation(DEFAULT_RELATION) + .user(DEFAULT_USER); + + // When - Chain with other async operations + AtomicInteger completionFlag = new AtomicInteger(0); + CompletableFuture future = fga.streamedListObjects(request, receivedObjects::add) + .thenRun(() -> completionFlag.set(1)) + .thenRun(() -> completionFlag.set(2)); + + future.get(); // Wait for all chained operations + + // Then + assertEquals(1, receivedObjects.size()); + assertEquals(2, completionFlag.get()); + } + + @Test + public void streamedListObjects_additionalHeadersPassedThrough() throws Exception { + // Given + String line1 = "{\"result\":{\"object\":\"document:1\"}}"; + Stream streamResponse = Stream.of(line1); + + HttpResponse> mockResponse = createMockStreamResponse(200, streamResponse); + CompletableFuture>> responseFuture = + CompletableFuture.completedFuture(mockResponse); + + when(mockHttpClient.>sendAsync(any(), any())).thenReturn(responseFuture); + + List receivedObjects = new ArrayList<>(); + ClientListObjectsRequest request = new ClientListObjectsRequest() + .type(DEFAULT_TYPE) + .relation(DEFAULT_RELATION) + .user(DEFAULT_USER); + + // Create options with additional headers + Map additionalHeaders = Map.of( + "X-Custom-Header", "custom-value", + "X-Request-ID", "test-request-123"); + + ClientStreamedListObjectsOptions options = + new ClientStreamedListObjectsOptions().additionalHeaders(additionalHeaders); + + // When + CompletableFuture future = fga.streamedListObjects(request, options, receivedObjects::add); + future.get(); // Wait for completion + + // Then + assertEquals(1, receivedObjects.size()); + assertEquals("document:1", receivedObjects.get(0).getObject()); + + // Verify that the HTTP client was called (which means headers were applied) + verify(mockHttpClient, times(1)).sendAsync(any(), any()); + } + + @Test + public void streamedListObjects_preservesApiExceptionType() throws Exception { + // Given - HTTP 400 error should create ApiException + Stream streamResponse = Stream.empty(); + HttpResponse> mockResponse = createMockStreamResponse(400, streamResponse); + CompletableFuture>> responseFuture = + CompletableFuture.completedFuture(mockResponse); + + when(mockHttpClient.>sendAsync(any(), any())).thenReturn(responseFuture); + + List receivedErrors = new ArrayList<>(); + ClientListObjectsRequest request = new ClientListObjectsRequest() + .type(DEFAULT_TYPE) + .relation(DEFAULT_RELATION) + .user(DEFAULT_USER); + + // When + CompletableFuture future = fga.streamedListObjects(request, null, obj -> {}, receivedErrors::add); + + try { + future.get(); + fail("Expected exception"); + } catch (Exception e) { + // Expected to fail + } + + // Then - verify the error consumer received the original ApiException, not wrapped + assertEquals(1, receivedErrors.size()); + Throwable error = receivedErrors.get(0); + assertTrue( + error instanceof dev.openfga.sdk.errors.ApiException, + "Expected ApiException but got " + error.getClass().getName()); + dev.openfga.sdk.errors.ApiException apiException = (dev.openfga.sdk.errors.ApiException) error; + assertEquals(400, apiException.getStatusCode()); + } + + private HttpResponse> createMockStreamResponse(int statusCode, Stream body) { + HttpResponse> mockResponse = mock(HttpResponse.class); + when(mockResponse.statusCode()).thenReturn(statusCode); + when(mockResponse.body()).thenReturn(body); + + // Create mock headers + HttpHeaders mockHeaders = mock(HttpHeaders.class); + when(mockHeaders.map()).thenReturn(Map.of("content-type", List.of("application/json"))); + when(mockResponse.headers()).thenReturn(mockHeaders); + + return mockResponse; + } +}