Skip to content

feat: add streaming support to api executor#292

Merged
SoulPancake merged 12 commits intomainfrom
feat/streamed-api-executor
Apr 2, 2026
Merged

feat: add streaming support to api executor#292
SoulPancake merged 12 commits intomainfrom
feat/streamed-api-executor

Conversation

@rhamzeh
Copy link
Copy Markdown
Member

@rhamzeh rhamzeh commented Mar 30, 2026

Description

What problem is being solved?

How is it being solved?

What changes are made to solve it?

References

Review Checklist

  • I have clicked on "allow edits by maintainers".
  • I have added documentation for new/changed functionality in this PR or in a PR to openfga.dev [Provide a link to any relevant PRs in the references section above]
  • The correct base branch is being used, if not main
  • I have added tests to validate that the change in functionality is working as expected

Summary by CodeRabbit

Release Notes

  • New Features

    • Added streaming support for consuming list objects responses via channel-based APIs
    • Introduced low-level streaming capability for advanced use cases
    • Improved nil parameter handling in request builders
  • Documentation

    • Added comprehensive streaming API documentation with practical examples
    • New example projects demonstrating streaming workflows

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 30, 2026

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: eb4716b4-04cc-4d0e-aa47-544c74808cdc

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review

Walkthrough

This PR introduces comprehensive streaming support to the OpenFGA Go SDK by adding generic streaming infrastructure to the API executor layer, consolidating streaming logic, and refactoring the streamed-list-objects API. It removes the legacy streaming.go implementation, adds new ExecuteStreaming method to APIExecutor, and updates the OpenFgaApi to use typed streaming channels. Tests and examples are significantly expanded.

Changes

Cohort / File(s) Summary
Core Streaming Infrastructure
api_executor.go
Added ExecuteStreaming method, generic StreamResult[T], StreamingChannel[T], and ProcessStreamingResponse[T] for NDJSON streaming. Updated request builders to treat nil parameters as no-ops. Added APIExecutorStreamingChannel with raw bytes streaming and DefaultStreamBufferSize constant.
API Streaming Integration
api_open_fga.go
Added StreamingRequestOptions with BufferSize field. Changed StreamedListObjectsExecute signature from returning decoded response to returning *StreamedListObjectsChannel. Implemented StreamedListObjectsChannel with typed Objects and Errors channels plus Close() method. Added convertToStreamedListObjectsChannel for unmarshalling raw streamed results.
Streaming Test Coverage
api_executor_test.go, api_open_fga_test.go
Added 1261 + 631 lines of tests covering channel lifecycle, buffer sizing, NDJSON parsing, error handling, retry semantics, context cancellation, and streaming for both raw and typed APIs. Introduced testStreamRetryTransport helper for simulating transport failures.
Legacy Streaming Removal & Refactor
streaming.go, streaming_test.go
Removed 251-line streaming.go file with old StreamResult, StreamingChannel, and ExecuteStreamedListObjects implementations. Renamed tests from TestStreamedListObjectsWithChannel_* to TestStreamedListObjectsWithAPI_* and updated to use new fluent API.
Client Integration
client/client.go
Updated StreamedListObjectsExecute to use fluent client.OpenFgaApi.StreamedListObjects(...).Options(StreamingRequestOptions{...}).Execute() pattern instead of separate function call with explicit buffer size.
Example: API Executor
example/api_executor/main.go, example/api_executor/README.md, example/api_executor/go.mod
Added new low-level APIExecutor example demonstrating usage of Execute, ExecuteWithDecode, and ExecuteStreaming methods. Includes operations on stores, models, tuples, checks, and streaming list objects. Documented prerequisites and execution instructions.
Example: Streamed List Objects Updates
example/streamed_list_objects/main.go, example/streamed_list_objects/README.md, example/streamed_list_objects/go.mod
Refactored to use fluent typed API with StreamedListObjects(...).Body(...).Options(...). Updated to inline request body construction. Changed dependency versions for indirect deps.
Example Configuration & Documentation
example/Makefile, example/README.md, example/example1/go.mod, example/opentelemetry/go.mod
Added Makefile targets for new examples. Updated example README to recommend typed StreamedListObjects method and reference low-level APIExecutor streaming. Updated indirect dependency versions across examples.
Main README & Core Utilities
README.md, utils.go
Added documentation for streaming endpoints with examples of both raw and typed APIs. Enhanced validateParameter to detect non-nil interfaces wrapping nil pointers as invalid.

Sequence Diagram

sequenceDiagram
    participant Client as Client Code
    participant APIExec as APIExecutor
    participant HTTPLayer as HTTP Layer
    participant StreamCh as Streaming Channel
    participant Consumer as Result Consumer

    Client->>APIExec: ExecuteStreaming(ctx, request, bufferSize)
    APIExec->>APIExec: Validate request & build path
    APIExec->>APIExec: Prepare headers (Accept: application/x-ndjson)
    APIExec->>HTTPLayer: Send HTTP request
    HTTPLayer-->>APIExec: Response stream (NDJSON)
    APIExec->>StreamCh: Create StreamingChannel
    APIExec->>StreamCh: Start processStreamingResponseRaw()
    StreamCh->>StreamCh: Scan lines, unmarshal JSON
    StreamCh->>StreamCh: Send results to Results chan
    APIExec-->>Client: Return *StreamingChannel
    Client->>Consumer: Iterate channel.Results
    Consumer->>Consumer: Process streamed objects
    Consumer->>Consumer: Check channel.Errors
    Consumer->>StreamCh: Close() when done
    StreamCh->>StreamCh: Cancel context & close channels
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested reviewers

  • curfew-marathon
  • jimmyjames
🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 5.56% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly summarizes the main change: adding streaming support to the API executor, which is the primary objective of this multi-file PR.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/streamed-api-executor

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@socket-security
Copy link
Copy Markdown

socket-security bot commented Mar 30, 2026

Warning

Review the following alerts detected in dependencies.

According to your organization's Security Policy, it is recommended to resolve "Warn" alerts. Learn more about Socket for GitHub.

Action Severity Alert  (click "▶" to expand/collapse)
Warn Critical
Critical CVE: gRPC-Go has an authorization bypass via missing leading slash in :path in golang google.golang.org/grpc

CVE: GHSA-p77j-4mvh-x3m3 gRPC-Go has an authorization bypass via missing leading slash in :path (CRITICAL)

Affected versions: < 1.79.3

Patched version: 1.79.3

From: ?golang/github.com/openfga/language/pkg/go@v0.2.0-beta.2golang/google.golang.org/grpc@v1.66.0

ℹ Read more on: This package | This alert | What is a critical CVE?

Next steps: Take a moment to review the security alert above. Review the linked package source code to understand the potential risk. Ensure the package is not malicious before proceeding. If you're unsure how to proceed, reach out to your security team or ask the Socket team for help at support@socket.dev.

Suggestion: Remove or replace dependencies that include known critical CVEs. Consumers can use dependency overrides or npm audit fix --force to remove vulnerable dependencies.

Mark the package as acceptable risk. To ignore this alert only in this pull request, reply with the comment @SocketSecurity ignore golang/google.golang.org/grpc@v1.66.0. You can also ignore all packages with @SocketSecurity ignore-all. To ignore an alert for all future pull requests, use Socket's Dashboard to change the triage state of this alert.

Warn Medium
Medium CVE: HTTP Proxy bypass using IPv6 Zone IDs in golang golang.org/x/net

CVE: GHSA-qxp5-gwg8-xv66 HTTP Proxy bypass using IPv6 Zone IDs in golang.org/x/net (MODERATE)

Affected versions: < 0.36.0

Patched version: 0.36.0

From: ?golang/github.com/openfga/language/pkg/go@v0.2.0-beta.2golang/golang.org/x/net@v0.29.0

ℹ Read more on: This package | This alert | What is a medium CVE?

Next steps: Take a moment to review the security alert above. Review the linked package source code to understand the potential risk. Ensure the package is not malicious before proceeding. If you're unsure how to proceed, reach out to your security team or ask the Socket team for help at support@socket.dev.

Suggestion: Remove or replace dependencies that include known medium severity CVEs. Consumers can use dependency overrides or npm audit fix --force to remove vulnerable dependencies.

Mark the package as acceptable risk. To ignore this alert only in this pull request, reply with the comment @SocketSecurity ignore golang/golang.org/x/net@v0.29.0. You can also ignore all packages with @SocketSecurity ignore-all. To ignore an alert for all future pull requests, use Socket's Dashboard to change the triage state of this alert.

Warn Medium
Medium CVE: golang golang.org/x/net vulnerable to Cross-site Scripting

CVE: GHSA-vvgc-356p-c3xw golang.org/x/net vulnerable to Cross-site Scripting (MODERATE)

Affected versions: < 0.38.0

Patched version: 0.38.0

From: ?golang/github.com/openfga/language/pkg/go@v0.2.0-beta.2golang/golang.org/x/net@v0.29.0

ℹ Read more on: This package | This alert | What is a medium CVE?

Next steps: Take a moment to review the security alert above. Review the linked package source code to understand the potential risk. Ensure the package is not malicious before proceeding. If you're unsure how to proceed, reach out to your security team or ask the Socket team for help at support@socket.dev.

Suggestion: Remove or replace dependencies that include known medium severity CVEs. Consumers can use dependency overrides or npm audit fix --force to remove vulnerable dependencies.

Mark the package as acceptable risk. To ignore this alert only in this pull request, reply with the comment @SocketSecurity ignore golang/golang.org/x/net@v0.29.0. You can also ignore all packages with @SocketSecurity ignore-all. To ignore an alert for all future pull requests, use Socket's Dashboard to change the triage state of this alert.

View full report

@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Mar 31, 2026

Codecov Report

❌ Patch coverage is 77.48092% with 59 lines in your changes missing coverage. Please review.
✅ Project coverage is 34.78%. Comparing base (6851b16) to head (cfae88c).

Files with missing lines Patch % Lines
api_executor.go 72.27% 47 Missing and 9 partials ⚠️
api_open_fga.go 94.23% 3 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #292      +/-   ##
==========================================
+ Coverage   33.85%   34.78%   +0.93%     
==========================================
  Files         115      114       -1     
  Lines        9854     9980     +126     
==========================================
+ Hits         3336     3472     +136     
+ Misses       6248     6237      -11     
- Partials      270      271       +1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@SoulPancake SoulPancake marked this pull request as ready for review April 1, 2026 06:44
@SoulPancake SoulPancake requested review from a team as code owners April 1, 2026 06:44
Copilot AI review requested due to automatic review settings April 1, 2026 06:44
@dosubot
Copy link
Copy Markdown

dosubot bot commented Apr 1, 2026

Documentation Updates

1 document(s) were updated by changes in this PR:

StreamedListObjects Feature Overview
View Changes
@@ -78,7 +78,10 @@
 As of v0.10.0 (released 2026-03-24), the internal kwargs (`_return_http_data_only`, `_preload_content`, `_request_auth`, `async_req`, and `_request_timeout`) have been removed from all `OpenFgaApi` and `SyncOpenFgaApi` endpoint methods. These were internal implementation details not intended for external use. All endpoint methods now return the deserialized response object directly. Users who need access to HTTP response metadata (status, headers) should use `execute_api_request` instead.
 
 ### Go SDK
-The Go SDK provides a public `StreamedListObjects` method on `OpenFgaClient`, which streams results using channels. This method allows you to retrieve all objects of a given type that a user has access to, without the 1000-object pagination limit.
+The Go SDK provides two approaches for streaming with `StreamedListObjects`:
+
+#### High-Level Client Method (Recommended)
+The `StreamedListObjects` method on `OpenFgaClient` provides typed `StreamedListObjectsResponse` objects directly, with automatic retry and error handling. This is the recommended approach for typical usage.
 
 **Example:**
 ```go
@@ -110,7 +113,63 @@
 // objects = ["document:0192ab2a-d83f-756d-9397-c5ed9f3cb69a"]
 ```
 
-For more details, see the [Go SDK StreamedListObjects Example](https://github.com/openfga/go-sdk/blob/main/example/streamed_list_objects/main.go).
+**Breaking Changes**: The return type of `Execute()` changed from `StreamResultOfStreamedListObjectsResponse` to `*StreamedListObjectsChannel`. The `Options()` parameter type changed from `RequestOptions` to `StreamingRequestOptions`.
+
+#### Low-Level APIExecutor (Advanced)
+For advanced use cases requiring direct NDJSON streaming with raw JSON bytes, use the `ExecuteStreaming` method on `APIExecutor`. This provides lower-level control at the cost of manual JSON unmarshalling.
+
+**Example:**
+```go
+executor := fgaClient.GetAPIExecutor()
+
+request := openfga.NewAPIExecutorRequestBuilder("StreamedListObjects", http.MethodPost, "/stores/{store_id}/streamed-list-objects").
+    WithPathParameter("store_id", storeID).
+    WithBody(openfga.ListObjectsRequest{
+        AuthorizationModelId: openfga.PtrString(modelID),
+        Type:                 "document",
+        Relation:             "viewer",
+        User:                 "user:alice",
+    }).
+    Build()
+
+channel, err := executor.ExecuteStreaming(ctx, request, openfga.DefaultStreamBufferSize)
+if err != nil {
+    log.Fatal(err)
+}
+defer channel.Close()
+
+for {
+    select {
+    case result, ok := <-channel.Results:
+        if !ok {
+            // Stream completed
+            return
+        }
+        var response openfga.StreamedListObjectsResponse
+        if err := json.Unmarshal(result, &response); err != nil {
+            log.Fatal(err)
+        }
+        fmt.Printf("Object: %s\n", response.Object)
+    case err := <-channel.Errors:
+        if err != nil {
+            log.Fatal(err)
+        }
+    }
+}
+```
+
+The `ExecuteStreaming` method returns an `APIExecutorStreamingChannel` with:
+- `Results chan []byte`: Raw JSON bytes for each streamed result
+- `Errors chan error`: Any errors that occur during streaming
+- `Close()`: Method to cancel streaming and cleanup resources
+
+**Technical Details:**
+- `StreamResult[T]`: Generic type wrapping results/errors in streaming responses
+- `StreamStatusError`: Error type for server-sent errors in streams (includes `Status` with code, message, details)
+- `ProcessStreamingResponse[T]`: Helper function to process NDJSON streaming responses
+- `DefaultStreamBufferSize`: Default channel buffer size (value: 10)
+
+For more details, see the [high-level example](https://github.com/openfga/go-sdk/blob/main/example/streamed_list_objects/main.go) and the [low-level APIExecutor example](https://github.com/openfga/go-sdk/blob/main/example/api_executor/README.md).
 
 ### JS SDK
 The JS SDK supports StreamedListObjects in v0.9.3 and later. The method streams results using async iterators, allowing you to process each object as it arrives. See the [documentation](#streamed-list-objects) for usage details.
@@ -183,7 +242,8 @@
 ## References and Further Reading
 - [Java SDK StreamingApiExecutor Example](https://github.com/openfga/java-sdk/blob/main/examples/api-executor/README.md)
 - [Python SDK StreamedListObjects Example](https://github.com/openfga/python-sdk/blob/fd04bc4b8525e536208e2091dce16edf2f220250/example/streamed-list-objects/README.md)
-- [Go SDK StreamedListObjects Example](https://github.com/openfga/go-sdk/blob/e6dd3e6c12ecb6654531d5febe80404db4018c26/example/streamed_list_objects/README.md)
+- [Go SDK StreamedListObjects Example](https://github.com/openfga/go-sdk/blob/e6dd3e6c12ecb6654531d5febe80404db4018c26/example/streamed_list_objects/README.md) (high-level client method)
+- [Go SDK APIExecutor Example](https://github.com/openfga/go-sdk/blob/main/example/api_executor/README.md) (low-level streaming)
 - [.NET SDK ApiExecutor Example](https://github.com/openfga/dotnet-sdk/tree/main/example/ApiExecutorExample/)
 - [JS SDK Feature Request](https://github.com/openfga/js-sdk/issues/236)
 - [.NET SDK Feature Request](https://github.com/openfga/dotnet-sdk/issues/110)

How did I do? Any feedback?  Join Discord

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds first-class NDJSON streaming support to the low-level APIExecutor, and updates StreamedListObjects to use the executor-backed streaming implementation (returning channels instead of a fully-decoded response), along with docs/examples and expanded tests.

Changes:

  • Introduces APIExecutor.ExecuteStreaming + streaming channel types/utilities (raw bytes + typed helpers).
  • Reworks StreamedListObjects execution path to stream via APIExecutor, and converts raw NDJSON results into typed StreamedListObjectsResponse objects.
  • Updates README/examples and refactors tests to use the new streaming API.

Reviewed changes

Copilot reviewed 19 out of 19 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
utils.go Enhances required-parameter validation to catch typed nil pointers passed via interface{}.
streaming.go Removes the previous standalone streaming implementation (now moved/rewritten elsewhere).
api_executor.go Adds ExecuteStreaming, streaming channel types, and NDJSON processing logic with retries/telemetry integration.
api_executor_test.go Adds extensive unit test coverage for executor streaming, buffering, retry behavior, and conversion helpers.
api_open_fga.go Changes StreamedListObjects to return a streaming channel and implements conversion from raw streamed bytes to typed results.
api_open_fga_test.go Adds integration-style tests for StreamedListObjects streaming behavior (success/error/retry cases).
client/client.go Updates client streaming wrapper to use new StreamingRequestOptions and the updated StreamedListObjects execution path.
streaming_test.go Refactors streaming tests to use the typed OpenFgaApi.StreamedListObjects(...).Execute() flow; adds nil-response/body tests.
README.md Documents streaming usage via APIExecutor.ExecuteStreaming and the typed StreamedListObjects API.
example/api_executor/* Adds a new runnable example demonstrating raw APIExecutor usage including streaming.
example/streamed_list_objects/* Updates example to emphasize the typed/high-level streaming API.
example/README.md Updates example index to distinguish typed streaming vs low-level executor streaming examples.
example/Makefile Adds convenience targets for running the new/updated examples.
example/*/go.mod Updates indirect dependencies for examples to align with repo/module requirements.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (7)
example/opentelemetry/go.mod (1)

32-33: Consider regenerating dependencies via go mod tidy in the example module.

Lines 32–33 pin indirect versions atomic v1.7.0 and multierr v1.9.0, which are outdated compared to the latest stable versions (both v1.11.0). Since no security advisories exist for these versions and no OpenTelemetry constraints require them, allow the Go module solver to resolve to compatible newer versions by running go mod tidy.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@example/opentelemetry/go.mod` around lines 32 - 33, Regenerate the example
module's dependency graph so indirect pins for go.uber.org/atomic and
go.uber.org/multierr are resolved to current compatible versions: run `go mod
tidy` in the example module to let the Go module solver update go.mod/go.sum,
then commit the updated go.mod and go.sum; specifically ensure the indirect
entries for "go.uber.org/atomic" and "go.uber.org/multierr" are removed or
bumped to the resolved versions after the tidy.
example/api_executor/go.mod (1)

22-22: Consider removing the inline comment from the replace directive.

The comment // added this to point to local module is helpful during development but may be unnecessary noise in the committed file since replace directives for local development are self-explanatory.

🔧 Suggested change
-replace github.com/openfga/go-sdk => ../.. // added this to point to local module
+replace github.com/openfga/go-sdk => ../..
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@example/api_executor/go.mod` at line 22, The replace directive line "replace
github.com/openfga/go-sdk => ../.." contains an inline comment "// added this to
point to local module" that should be removed; edit the go.mod entry for the
replace directive to remove the trailing comment so the line reads only the
replace directive (keep the local path if still needed) and save the file to
eliminate unnecessary commit noise.
example/streamed_list_objects/go.mod (1)

40-40: Consider removing the inline comment for consistency.

Same as the api_executor example - the comment may be unnecessary.

🔧 Suggested change
-replace github.com/openfga/go-sdk => ../.. // added this to point to local module
+replace github.com/openfga/go-sdk => ../..
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@example/streamed_list_objects/go.mod` at line 40, Remove the inline comment
from the go.mod replace directive so it reads exactly "replace
github.com/openfga/go-sdk => ../.." for consistency with the api_executor
example; locate the replace line in example/streamed_list_objects/go.mod and
delete the trailing "// added this to point to local module" comment.
api_open_fga_test.go (1)

1993-1999: Consider using a timeout when checking for errors to prevent potential test hangs.

The select with a default case will not wait for any in-flight error processing. While this works for the happy path, a short timeout might be more robust.

💡 Alternative error check with timeout
 		// Check for errors
 		select {
 		case err := <-channel.Errors:
 			if err != nil {
 				t.Fatalf("unexpected stream error: %v", err)
 			}
-		default:
+		case <-time.After(100 * time.Millisecond):
+			// No error received within timeout, which is fine
 		}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api_open_fga_test.go` around lines 1993 - 1999, The current non-blocking
select over channel.Errors can miss delayed errors; replace it with a short
timeout-based select that waits briefly for an error before proceeding. Locate
the select that reads from channel.Errors (the block that calls
t.Fatalf("unexpected stream error: %v", err)) and change it to a select that has
three branches: case err := <-channel.Errors to fail the test on a non-nil
error, case <-time.After(...) to proceed after a short timeout (e.g. 100–500ms),
and no default branch so the test actually waits for either an error or the
timeout.
example/api_executor/main.go (1)

309-338: Consider refactoring the streaming loop to avoid goto.

The goto streamDone pattern works but is unconventional in Go. A cleaner approach would be to extract the streaming logic into a helper function or use a labeled break.

💡 Alternative without goto
-	count := 0
-	for {
-		select {
-		case result, ok := <-channel.Results:
-			if !ok {
-				select {
-				case err := <-channel.Errors:
-					if err != nil {
-						handleError("StreamedListObjects stream", err)
-					}
-				default:
-				}
-				fmt.Printf("   ✓ Streamed %d objects\n\n", count)
-				goto streamDone
-			}
-			var obj openfga.StreamedListObjectsResponse
-			if err := json.Unmarshal(result, &obj); err != nil {
-				handleError("decode stream result", err)
-			}
-			count++
-			if count <= 3 || count%50 == 0 {
-				fmt.Printf("     Object: %s\n", obj.Object)
-			}
-		case err := <-channel.Errors:
-			if err != nil {
-				handleError("StreamedListObjects error", err)
-			}
-		}
-	}
-streamDone:
+	count := 0
+streamLoop:
+	for {
+		select {
+		case result, ok := <-channel.Results:
+			if !ok {
+				select {
+				case err := <-channel.Errors:
+					if err != nil {
+						handleError("StreamedListObjects stream", err)
+					}
+				default:
+				}
+				fmt.Printf("   ✓ Streamed %d objects\n\n", count)
+				break streamLoop
+			}
+			var obj openfga.StreamedListObjectsResponse
+			if err := json.Unmarshal(result, &obj); err != nil {
+				handleError("decode stream result", err)
+			}
+			count++
+			if count <= 3 || count%50 == 0 {
+				fmt.Printf("     Object: %s\n", obj.Object)
+			}
+		case err := <-channel.Errors:
+			if err != nil {
+				handleError("StreamedListObjects error", err)
+			}
+		}
+	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@example/api_executor/main.go` around lines 309 - 338, The loop uses a goto to
exit to streamDone; refactor it to avoid goto by moving the streaming logic into
a helper or using a labeled break: extract the body that reads from
channel.Results and channel.Errors into a helper function (e.g.,
streamObjects(channel) returning (count int, err error)) or wrap the for-select
in a labeled loop and use break <label> to exit; preserve behavior around
json.Unmarshal into openfga.StreamedListObjectsResponse, incrementing count,
printing the first 3 and every 50th object, and calling
handleError("StreamedListObjects stream"/"decode stream
result"/"StreamedListObjects error", err) on errors so semantics remain
identical (replace references to channel.Results, channel.Errors, count,
handleError, and StreamedListObjectsResponse accordingly).
example/api_executor/README.md (1)

100-115: Streaming example should handle channel closure before checking errors.

The streaming code pattern shows a select with both channel.Results and channel.Errors, but the return on Line 104 exits without checking the Errors channel. This could cause users to miss stream errors in edge cases. Consider documenting that errors should be checked after the Results channel closes, consistent with how other tests in this PR handle it.

📝 Consider updating the streaming pattern example
 for {
     select {
     case result, ok := <-channel.Results:
         if !ok {
-            return // Stream completed
+            // Stream completed, check for any errors
+            if err := <-channel.Errors; err != nil {
+                log.Fatal(err)
+            }
+            return
         }
         var response openfga.StreamedListObjectsResponse
         json.Unmarshal(result, &response)
         fmt.Println(response.Object)
     case err := <-channel.Errors:
         if err != nil {
             log.Fatal(err)
         }
     }
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@example/api_executor/README.md` around lines 100 - 115, The streaming loop
returns immediately when channel.Results is closed, which can skip reading any
pending errors from channel.Errors; update the loop around channel.Results and
channel.Errors (the select using channel.Results and channel.Errors) so that
when channel.Results is closed you break out of the loop and then drain or check
channel.Errors (the same channel.Errors used in the select) for any final error
before returning/finishing, ensuring any non-nil error is handled (e.g., logged
or returned) after the Results stream ends.
example/streamed_list_objects/main.go (1)

210-218: Error handling could leak sensitive information in some edge cases.

The handleError function checks if err.Error() == "connection refused" using string comparison, which is fragile and may not match the actual error message on all platforms or Go versions. Network errors often contain additional context.

💡 Consider more robust error detection
 func handleError(err error) {
 	// Avoid logging sensitive data; only display generic info
-	if err.Error() == "connection refused" {
+	if strings.Contains(err.Error(), "connection refused") || strings.Contains(err.Error(), "dial tcp") {
 		fmt.Fprintln(os.Stderr, "Is OpenFGA server running? Check FGA_API_URL environment variable or default http://localhost:8080")
 	} else {
 		fmt.Fprintf(os.Stderr, "An error occurred. [%T]\n", err)
 	}
 	os.Exit(1)
 }

Note: This would require adding "strings" to the imports.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@example/streamed_list_objects/main.go` around lines 210 - 218, The current
handleError uses fragile string equality on err.Error(); change it to robust
detection by using errors.Is(err, syscall.ECONNREFUSED) (and/or detecting
net.OpError wrapper) and fall back to a contains check like
strings.Contains(err.Error(), "connection refused") to cover platform
variations; update imports to include "errors", "syscall", and "strings" as
needed and keep the existing generic stderr message path for all other errors in
the handleError function.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@api_executor.go`:
- Around line 604-610: The current code in the stream handling block creates
errors.New(msg) and loses the original *Status; instead define and use a typed
error (e.g., StatusError struct implementing error and holding a *Status field)
and send that into channel.Errors so callers can inspect/classify the Status;
update the stream error handling in the block that checks streamResult.Error
(the code that sets msg from streamResult.Error.Message) to send
&StatusError{Status: streamResult.Error} (or equivalent constructor) rather than
errors.New(msg), and apply the same change to the other occurrence referenced
(lines around the second block handling streamResult.Error) so both places
preserve the full *Status.
- Around line 793-799: The retry backoff currently calls time.Sleep
unconditionally in ExecuteStreaming (where e.determineRetry(...) is used), which
blocks cancellation; replace each time.Sleep(waitDuration) in the retry paths
with a cancelable wait that selects on ctx.Done() and the backoff timer (e.g.,
using time.After or time.NewTimer) so that if ctx is canceled you return
immediately with ctx.Err(); update all occurrences around the calls to
e.determineRetry and the subsequent retry logic to use this select/timer pattern
to respect context cancellation.
- Around line 537-541: The Close() on StreamingChannel[T] only cancels the
context (s.cancel()) and does not interrupt a blocking scanner.Scan() in
ProcessStreamingResponse[T] / processStreamingResponseRaw, so fix by wiring the
cancellation to the HTTP response body: capture and store the httpResponse.Body
reference (from processStreamingResponseRaw / ProcessStreamingResponse[T]) in
StreamingChannel (e.g., a body io.ReadCloser field) and on Close() call
body.Close() in addition to s.cancel(), or alternatively pass the parent request
context into the HTTP client so cancellation is applied to the transport; also
make body.Close() safe for concurrent calls (check nil, guard with mutex if
needed) and ensure the goroutine also listens for streamCtx.Done() and closes
httpResponse.Body immediately when done to unblock scanner.Scan().

In `@api_open_fga.go`:
- Around line 784-788: The change broke backwards compatibility by altering
StreamedListObjectsExecute/Execute return types and removing RequestOptions from
Options; revert those public signatures and instead add a new streaming entry
point (e.g., StreamedListObjectsStream or StreamedListObjectsStreamExecute) that
returns the streaming channel while preserving access to *http.Response, and add
a BufferSize(int) builder on ApiStreamedListObjectsRequest to configure internal
buffer capacity; ensure ApiStreamedListObjectsRequest.Options still accepts
RequestOptions and leave the existing StreamedListObjectsExecute/Execute and
StreamedListObjectsChannel types untouched so existing callers keep compiling
and retain response access.
- Around line 2470-2495: The example loop for StreamedListObjects can hide
terminal errors by returning on Objects channel close without draining Errors;
update the usage example for OpenFgaApi.StreamedListObjects /
StreamedListObjectsChannel to mirror the executor pattern: after calling
Execute() and deferring channel.Close(), loop selecting on both channel.Objects
and channel.Errors and only exit once both channels are closed/drained (i.e.,
continue selecting after Objects returns !ok to read any final error from
Errors), ensuring you check and handle Errors even when Objects is closed;
reference StreamedListObjects, StreamedListObjectsChannel, Objects, Errors,
Execute(), and Close() when making the change.

---

Nitpick comments:
In `@api_open_fga_test.go`:
- Around line 1993-1999: The current non-blocking select over channel.Errors can
miss delayed errors; replace it with a short timeout-based select that waits
briefly for an error before proceeding. Locate the select that reads from
channel.Errors (the block that calls t.Fatalf("unexpected stream error: %v",
err)) and change it to a select that has three branches: case err :=
<-channel.Errors to fail the test on a non-nil error, case <-time.After(...) to
proceed after a short timeout (e.g. 100–500ms), and no default branch so the
test actually waits for either an error or the timeout.

In `@example/api_executor/go.mod`:
- Line 22: The replace directive line "replace github.com/openfga/go-sdk =>
../.." contains an inline comment "// added this to point to local module" that
should be removed; edit the go.mod entry for the replace directive to remove the
trailing comment so the line reads only the replace directive (keep the local
path if still needed) and save the file to eliminate unnecessary commit noise.

In `@example/api_executor/main.go`:
- Around line 309-338: The loop uses a goto to exit to streamDone; refactor it
to avoid goto by moving the streaming logic into a helper or using a labeled
break: extract the body that reads from channel.Results and channel.Errors into
a helper function (e.g., streamObjects(channel) returning (count int, err
error)) or wrap the for-select in a labeled loop and use break <label> to exit;
preserve behavior around json.Unmarshal into
openfga.StreamedListObjectsResponse, incrementing count, printing the first 3
and every 50th object, and calling handleError("StreamedListObjects
stream"/"decode stream result"/"StreamedListObjects error", err) on errors so
semantics remain identical (replace references to channel.Results,
channel.Errors, count, handleError, and StreamedListObjectsResponse
accordingly).

In `@example/api_executor/README.md`:
- Around line 100-115: The streaming loop returns immediately when
channel.Results is closed, which can skip reading any pending errors from
channel.Errors; update the loop around channel.Results and channel.Errors (the
select using channel.Results and channel.Errors) so that when channel.Results is
closed you break out of the loop and then drain or check channel.Errors (the
same channel.Errors used in the select) for any final error before
returning/finishing, ensuring any non-nil error is handled (e.g., logged or
returned) after the Results stream ends.

In `@example/opentelemetry/go.mod`:
- Around line 32-33: Regenerate the example module's dependency graph so
indirect pins for go.uber.org/atomic and go.uber.org/multierr are resolved to
current compatible versions: run `go mod tidy` in the example module to let the
Go module solver update go.mod/go.sum, then commit the updated go.mod and
go.sum; specifically ensure the indirect entries for "go.uber.org/atomic" and
"go.uber.org/multierr" are removed or bumped to the resolved versions after the
tidy.

In `@example/streamed_list_objects/go.mod`:
- Line 40: Remove the inline comment from the go.mod replace directive so it
reads exactly "replace github.com/openfga/go-sdk => ../.." for consistency with
the api_executor example; locate the replace line in
example/streamed_list_objects/go.mod and delete the trailing "// added this to
point to local module" comment.

In `@example/streamed_list_objects/main.go`:
- Around line 210-218: The current handleError uses fragile string equality on
err.Error(); change it to robust detection by using errors.Is(err,
syscall.ECONNREFUSED) (and/or detecting net.OpError wrapper) and fall back to a
contains check like strings.Contains(err.Error(), "connection refused") to cover
platform variations; update imports to include "errors", "syscall", and
"strings" as needed and keep the existing generic stderr message path for all
other errors in the handleError function.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 9136d615-a21e-4212-93aa-edcd4eacd5c9

📥 Commits

Reviewing files that changed from the base of the PR and between 6851b16 and d706b8e.

📒 Files selected for processing (19)
  • README.md
  • api_executor.go
  • api_executor_test.go
  • api_open_fga.go
  • api_open_fga_test.go
  • client/client.go
  • example/Makefile
  • example/README.md
  • example/api_executor/README.md
  • example/api_executor/go.mod
  • example/api_executor/main.go
  • example/example1/go.mod
  • example/opentelemetry/go.mod
  • example/streamed_list_objects/README.md
  • example/streamed_list_objects/go.mod
  • example/streamed_list_objects/main.go
  • streaming.go
  • streaming_test.go
  • utils.go
💤 Files with no reviewable changes (1)
  • streaming.go

@SoulPancake SoulPancake linked an issue Apr 2, 2026 that may be closed by this pull request
1 task
emilic
emilic previously approved these changes Apr 2, 2026
@SoulPancake SoulPancake added this pull request to the merge queue Apr 2, 2026
Merged via the queue into main with commit e0ce647 Apr 2, 2026
18 checks passed
@SoulPancake SoulPancake deleted the feat/streamed-api-executor branch April 2, 2026 14:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add "streaming variant" of ApiExecutor

5 participants