1212import dev .openfga .sdk .util .RetryAfterHeaderParser ;
1313import dev .openfga .sdk .util .RetryStrategy ;
1414import java .io .IOException ;
15+ import java .io .InputStream ;
1516import java .io .PrintStream ;
1617import java .net .http .*;
1718import java .nio .ByteBuffer ;
@@ -90,18 +91,27 @@ private HttpClient getHttpClient() {
9091
9192 private CompletableFuture <ApiResponse <T >> attemptHttpRequest (
9293 HttpClient httpClient , int retryNumber , Throwable previousError ) {
93- return httpClient
94- .sendAsync (request , HttpResponse .BodyHandlers .ofString ())
95- .handle ((response , throwable ) -> {
96- if (throwable != null ) {
97- // Handle network errors (no HTTP response received)
98- return handleNetworkError (throwable , retryNumber );
99- }
100-
101- // Handle HTTP response (including error status codes)
102- return processHttpResponse (response , retryNumber , previousError );
103- })
104- .thenCompose (Function .identity ());
94+ if (clazz == StreamingResponseBody .class ) {
95+ return httpClient
96+ .sendAsync (request , HttpResponse .BodyHandlers .ofInputStream ())
97+ .handle ((response , throwable ) -> {
98+ if (throwable != null ) {
99+ return handleNetworkError (throwable , retryNumber );
100+ }
101+ return processHttpResponseStreaming (response , retryNumber , previousError );
102+ })
103+ .thenCompose (Function .identity ());
104+ } else {
105+ return httpClient
106+ .sendAsync (request , HttpResponse .BodyHandlers .ofString ())
107+ .handle ((response , throwable ) -> {
108+ if (throwable != null ) {
109+ return handleNetworkError (throwable , retryNumber );
110+ }
111+ return processHttpResponse (response , retryNumber , previousError );
112+ })
113+ .thenCompose (Function .identity ());
114+ }
105115 }
106116
107117 private CompletableFuture <ApiResponse <T >> handleNetworkError (Throwable throwable , int retryNumber ) {
@@ -211,6 +221,105 @@ private CompletableFuture<ApiResponse<T>> processHttpResponse(
211221 response .statusCode (), response .headers ().map (), response .body (), modeledResponse ));
212222 }
213223
224+ private CompletableFuture <ApiResponse <T >> processHttpResponseStreaming (
225+ HttpResponse <InputStream > response , int retryNumber , Throwable previousError ) {
226+ int statusCode = response .statusCode ();
227+
228+ if (!HttpStatusCode .isSuccessful (statusCode )) {
229+ try {
230+ String bodyStr = new String (response .body ().readAllBytes (), StandardCharsets .UTF_8 );
231+ HttpResponse <String > stringResponse = new HttpResponse <>() {
232+ @ Override
233+ public int statusCode () {
234+ return response .statusCode ();
235+ }
236+
237+ @ Override
238+ public HttpRequest request () {
239+ return response .request ();
240+ }
241+
242+ @ Override
243+ public java .util .Optional <HttpResponse <String >> previousResponse () {
244+ return java .util .Optional .empty ();
245+ }
246+
247+ @ Override
248+ public HttpHeaders headers () {
249+ return response .headers ();
250+ }
251+
252+ @ Override
253+ public String body () {
254+ return bodyStr ;
255+ }
256+
257+ @ Override
258+ public java .util .Optional <javax .net .ssl .SSLSession > sslSession () {
259+ return response .sslSession ();
260+ }
261+
262+ @ Override
263+ public java .net .URI uri () {
264+ return response .uri ();
265+ }
266+
267+ @ Override
268+ public HttpClient .Version version () {
269+ return response .version ();
270+ }
271+ };
272+
273+ Optional <FgaError > fgaError =
274+ FgaError .getError (name , request , configuration , stringResponse , previousError );
275+ if (fgaError .isPresent ()) {
276+ FgaError error = fgaError .get ();
277+ if (retryNumber < configuration .getMaxRetries ()) {
278+ Optional <Duration > retryAfterDelay = response .headers ()
279+ .firstValue (FgaConstants .RETRY_AFTER_HEADER_NAME )
280+ .flatMap (RetryAfterHeaderParser ::parseRetryAfter );
281+ if (RetryStrategy .shouldRetry (statusCode )) {
282+ return handleHttpErrorRetry (retryAfterDelay , retryNumber , error );
283+ }
284+ }
285+ return CompletableFuture .failedFuture (error );
286+ }
287+ } catch (IOException e ) {
288+ return CompletableFuture .failedFuture (new ApiException (e ));
289+ }
290+ }
291+
292+ addTelemetryAttributes (Attributes .fromHttpResponse (response , this .configuration .getCredentials ()));
293+
294+ if (retryNumber > 0 ) {
295+ addTelemetryAttribute (Attributes .HTTP_REQUEST_RESEND_COUNT , String .valueOf (retryNumber ));
296+ }
297+
298+ if (response .headers ()
299+ .firstValue (FgaConstants .QUERY_DURATION_HEADER_NAME )
300+ .isPresent ()) {
301+ String queryDuration = response .headers ()
302+ .firstValue (FgaConstants .QUERY_DURATION_HEADER_NAME )
303+ .orElse (null );
304+
305+ if (!isNullOrWhitespace (queryDuration )) {
306+ try {
307+ double queryDurationDouble = Double .parseDouble (queryDuration );
308+ telemetry .metrics ().queryDuration (queryDurationDouble , this .getTelemetryAttributes ());
309+ } catch (NumberFormatException e ) {
310+ }
311+ }
312+ }
313+
314+ Double requestDuration = (double ) (System .currentTimeMillis () - requestStarted );
315+ telemetry .metrics ().requestDuration (requestDuration , this .getTelemetryAttributes ());
316+
317+ @ SuppressWarnings ("unchecked" )
318+ T result = (T ) new StreamingResponseBody (response .body ());
319+ return CompletableFuture .completedFuture (
320+ new ApiResponse <>(response .statusCode (), response .headers ().map (), null , result ));
321+ }
322+
214323 private CompletableFuture <T > deserializeResponse (HttpResponse <String > response ) {
215324 if (clazz == Void .class && isNullOrWhitespace (response .body ())) {
216325 return CompletableFuture .completedFuture (null );
@@ -262,4 +371,4 @@ public void onComplete() {
262371 out .flush ();
263372 }
264373 }
265- }
374+ }
0 commit comments