From 8a394c2dbf6753362c7709a3c3f36c15e055cacf Mon Sep 17 00:00:00 2001 From: SoulPancake Date: Thu, 20 Nov 2025 14:57:33 +0530 Subject: [PATCH 1/2] feat: streaming endpoint utils, using templates for streaming --- config/clients/go/config.overrides.json | 8 + config/clients/go/template/streaming.mustache | 239 ++++++++++ .../go/template/streaming_test.mustache | 425 ++++++++++++++++++ 3 files changed, 672 insertions(+) create mode 100644 config/clients/go/template/streaming.mustache create mode 100644 config/clients/go/template/streaming_test.mustache diff --git a/config/clients/go/config.overrides.json b/config/clients/go/config.overrides.json index fcf54bce..1dff2ac6 100644 --- a/config/clients/go/config.overrides.json +++ b/config/clients/go/config.overrides.json @@ -20,6 +20,14 @@ "api_client.mustache": { "destinationFilename": "api_client.go", "templateType": "SupportingFiles" + }, + "streaming.mustache": { + "destinationFilename": "streaming.go", + "templateType": "SupportingFiles" + }, + "streaming_test.mustache": { + "destinationFilename": "streaming_test.go", + "templateType": "SupportingFiles" } } } diff --git a/config/clients/go/template/streaming.mustache b/config/clients/go/template/streaming.mustache new file mode 100644 index 00000000..cb7725cf --- /dev/null +++ b/config/clients/go/template/streaming.mustache @@ -0,0 +1,239 @@ +{{>partial_header}} +package {{packageName}} + +import ( + "bufio" + "context" + "encoding/json" + "errors" + "io" + "net/http" + "net/url" + "strings" +) + +// StreamResult represents a generic streaming result wrapper with either a result or an error +type StreamResult[T any] struct { + Result *T `json:"result,omitempty" yaml:"result,omitempty"` + Error *Status `json:"error,omitempty" yaml:"error,omitempty"` +} + +// StreamingChannel represents a generic channel for streaming responses +type StreamingChannel[T any] struct { + Results chan T + Errors chan error + cancel context.CancelFunc +} + +// Close cancels the streaming context and cleans up resources +func (s *StreamingChannel[T]) Close() { + if s.cancel != nil { + s.cancel() + } +} + +// ProcessStreamingResponse processes an HTTP response as a streaming NDJSON response +// and returns a StreamingChannel with results and errors +// +// Parameters: +// - ctx: The context for cancellation +// - httpResponse: The HTTP response to process +// - bufferSize: The buffer size for the channels (default 10 if <= 0) +// +// Returns: +// - *StreamingChannel[T]: A channel containing streaming results and errors +// - error: An error if the response is invalid +func ProcessStreamingResponse[T any](ctx context.Context, httpResponse *http.Response, bufferSize int) (*StreamingChannel[T], error) { + streamCtx, cancel := context.WithCancel(ctx) + + // Use default buffer size of 10 if not specified or invalid + if bufferSize <= 0 { + bufferSize = 10 + } + + channel := &StreamingChannel[T]{ + Results: make(chan T, bufferSize), + Errors: make(chan error, 1), + cancel: cancel, + } + + if httpResponse == nil || httpResponse.Body == nil { + cancel() + return nil, errors.New("response or response body is nil") + } + + go func() { + defer close(channel.Results) + defer close(channel.Errors) + defer cancel() + defer func() { _ = httpResponse.Body.Close() }() + + scanner := bufio.NewScanner(httpResponse.Body) + // Allow large NDJSON entries (up to 10MB). Tune as needed. + buf := make([]byte, 0, 64*1024) + scanner.Buffer(buf, 10*1024*1024) + + for scanner.Scan() { + select { + case <-streamCtx.Done(): + channel.Errors <- streamCtx.Err() + return + default: + line := scanner.Bytes() + if len(line) == 0 { + continue + } + + var streamResult StreamResult[T] + if err := json.Unmarshal(line, &streamResult); err != nil { + channel.Errors <- err + return + } + + if streamResult.Error != nil { + msg := "stream error" + if streamResult.Error.Message != nil { + msg = *streamResult.Error.Message + } + channel.Errors <- errors.New(msg) + return + } + + if streamResult.Result != nil { + select { + case <-streamCtx.Done(): + channel.Errors <- streamCtx.Err() + return + case channel.Results <- *streamResult.Result: + } + } + } + } + + if err := scanner.Err(); err != nil { + // Prefer context error if we were canceled to avoid surfacing net/http "use of closed network connection". + if streamCtx.Err() != nil { + channel.Errors <- streamCtx.Err() + return + } + channel.Errors <- err + } + }() + + return channel, nil +} + +// StreamedListObjectsChannel maintains backward compatibility with the old channel structure +type StreamedListObjectsChannel struct { + Objects chan StreamedListObjectsResponse + Errors chan error + cancel context.CancelFunc +} + +// Close cancels the streaming context and cleans up resources +func (s *StreamedListObjectsChannel) Close() { + if s.cancel != nil { + s.cancel() + } +} + +// ProcessStreamedListObjectsResponse processes a StreamedListObjects response +// This is a backward compatibility wrapper around ProcessStreamingResponse +func ProcessStreamedListObjectsResponse(ctx context.Context, httpResponse *http.Response, bufferSize int) (*StreamedListObjectsChannel, error) { + channel, err := ProcessStreamingResponse[StreamedListObjectsResponse](ctx, httpResponse, bufferSize) + if err != nil { + return nil, err + } + + // Create a new channel with the old field name for backward compatibility + compatChannel := &StreamedListObjectsChannel{ + Objects: channel.Results, + Errors: channel.Errors, + cancel: channel.cancel, + } + + return compatChannel, nil +} + +// ExecuteStreamedListObjects executes a StreamedListObjects request +func ExecuteStreamedListObjects(client *APIClient, ctx context.Context, storeId string, body ListObjectsRequest, options RequestOptions) (*StreamedListObjectsChannel, error) { + return ExecuteStreamedListObjectsWithBufferSize(client, ctx, storeId, body, options, 0) +} + +// ExecuteStreamedListObjectsWithBufferSize executes a StreamedListObjects request with a custom buffer size +func ExecuteStreamedListObjectsWithBufferSize(client *APIClient, ctx context.Context, storeId string, body ListObjectsRequest, options RequestOptions, bufferSize int) (*StreamedListObjectsChannel, error) { + channel, err := executeStreamingRequest[ListObjectsRequest, StreamedListObjectsResponse]( + client, + ctx, + "/stores/{store_id}/streamed-list-objects", + storeId, + body, + options, + bufferSize, + "StreamedListObjects", + ) + if err != nil { + return nil, err + } + + // Convert to backward-compatible channel structure + return &StreamedListObjectsChannel{ + Objects: channel.Results, + Errors: channel.Errors, + cancel: channel.cancel, + }, nil +} + +// executeStreamingRequest is a generic function to execute streaming requests +func executeStreamingRequest[TReq any, TRes any]( + client *APIClient, + ctx context.Context, + pathTemplate string, + storeId string, + body TReq, + options RequestOptions, + bufferSize int, + operationName string, +) (*StreamingChannel[TRes], error) { + if storeId == "" { + return nil, reportError("storeId is required and must be specified") + } + + path := pathTemplate + path = strings.ReplaceAll(path, "{"+"store_id"+"}", url.PathEscape(parameterToString(storeId, ""))) + + localVarHeaderParams := make(map[string]string) + localVarQueryParams := url.Values{} + + localVarHTTPContentType := "application/json" + localVarHeaderParams["Content-Type"] = localVarHTTPContentType + localVarHeaderParams["Accept"] = "application/x-ndjson" + + for header, val := range options.Headers { + localVarHeaderParams[header] = val + } + + req, err := client.prepareRequest(ctx, path, http.MethodPost, body, localVarHeaderParams, localVarQueryParams) + if err != nil { + return nil, err + } + + httpResponse, err := client.callAPI(req) + if err != nil || httpResponse == nil { + return nil, err + } + + if httpResponse.StatusCode >= http.StatusMultipleChoices { + responseBody, readErr := io.ReadAll(httpResponse.Body) + _ = httpResponse.Body.Close() + if readErr != nil { + return nil, readErr + } + err = client.handleAPIError(httpResponse, responseBody, body, operationName, storeId) + return nil, err + } + + return ProcessStreamingResponse[TRes](ctx, httpResponse, bufferSize) +} + + diff --git a/config/clients/go/template/streaming_test.mustache b/config/clients/go/template/streaming_test.mustache new file mode 100644 index 00000000..ee488bcb --- /dev/null +++ b/config/clients/go/template/streaming_test.mustache @@ -0,0 +1,425 @@ +{{>partial_header}} +package {{packageName}} + +import ( + "context" + "net/http" + "net/http/httptest" + "strconv" + "strings" + "testing" + "time" +) + +func TestStreamingChannel_Close(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + channel := &StreamingChannel[StreamedListObjectsResponse]{ + Results: make(chan StreamedListObjectsResponse), + Errors: make(chan error), + cancel: cancel, + } + + channel.Close() + + select { + case <-ctx.Done(): + case <-time.After(100 * time.Millisecond): + t.Error("Context was not cancelled") + } +} + +func TestStreamedListObjectsChannel_Close(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + channel := &StreamedListObjectsChannel{ + Objects: make(chan StreamedListObjectsResponse), + Errors: make(chan error), + cancel: cancel, + } + + channel.Close() + + select { + case <-ctx.Done(): + case <-time.After(100 * time.Millisecond): + t.Error("Context was not cancelled") + } +} + +func TestStreamedListObjectsWithChannel_Success(t *testing.T) { + objects := []string{"document:1", "document:2", "document:3"} + expectedResults := []string{} + for _, obj := range objects { + expectedResults = append(expectedResults, `{"result":{"object":"`+obj+`"}}`) + } + responseBody := strings.Join(expectedResults, "\n") + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/stores/test-store/streamed-list-objects" { + t.Errorf("Expected path /stores/test-store/streamed-list-objects, got %s", r.URL.Path) + } + if r.Method != http.MethodPost { + t.Errorf("Expected POST method, got %s", r.Method) + } + + w.Header().Set("Content-Type", "application/x-ndjson") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(responseBody)) + })) + defer server.Close() + + config, err := NewConfiguration(Configuration{ + ApiUrl: server.URL, + }) + if err != nil { + t.Fatalf("Failed to create configuration: %v", err) + } + + client := NewAPIClient(config) + ctx := context.Background() + + request := ListObjectsRequest{ + Type: "document", + Relation: "viewer", + User: "user:anne", + } + + channel, err := ExecuteStreamedListObjects(client, ctx, "test-store", request, RequestOptions{}) + + if err != nil { + t.Fatalf("ExecuteStreamedListObjects failed: %v", err) + } + + defer channel.Close() + + receivedObjects := []string{} + for obj := range channel.Objects { + receivedObjects = append(receivedObjects, obj.Object) + } + + if err := <-channel.Errors; err != nil { + t.Fatalf("Received error from channel: %v", err) + } + + if len(receivedObjects) != len(objects) { + t.Fatalf("Expected %d objects, got %d", len(objects), len(receivedObjects)) + } + + for i, expected := range objects { + if receivedObjects[i] != expected { + t.Errorf("Expected object %s at index %d, got %s", expected, i, receivedObjects[i]) + } + } +} + +func TestStreamedListObjectsWithChannel_EmptyLines(t *testing.T) { + responseBody := `{"result":{"object":"document:1"}} + +{"result":{"object":"document:2"}} + +{"result":{"object":"document:3"}}` + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/x-ndjson") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(responseBody)) + })) + defer server.Close() + + config, err := NewConfiguration(Configuration{ + ApiUrl: server.URL, + }) + if err != nil { + t.Fatalf("Failed to create configuration: %v", err) + } + + client := NewAPIClient(config) + ctx := context.Background() + + request := ListObjectsRequest{ + Type: "document", + Relation: "viewer", + User: "user:anne", + } + + channel, err := ExecuteStreamedListObjects(client, ctx, "test-store", request, RequestOptions{}) + + if err != nil { + t.Fatalf("ExecuteStreamedListObjects failed: %v", err) + } + + defer channel.Close() + + receivedObjects := []string{} + for obj := range channel.Objects { + receivedObjects = append(receivedObjects, obj.Object) + } + + if len(receivedObjects) != 3 { + t.Fatalf("Expected 3 objects, got %d", len(receivedObjects)) + } +} + +func TestStreamedListObjectsWithChannel_ErrorInStream(t *testing.T) { + responseBody := `{"result":{"object":"document:1"}} +{"error":{"code":500,"message":"Internal error"}}` + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/x-ndjson") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(responseBody)) + })) + defer server.Close() + + config, err := NewConfiguration(Configuration{ + ApiUrl: server.URL, + }) + if err != nil { + t.Fatalf("Failed to create configuration: %v", err) + } + + client := NewAPIClient(config) + ctx := context.Background() + + request := ListObjectsRequest{ + Type: "document", + Relation: "viewer", + User: "user:anne", + } + + channel, err := ExecuteStreamedListObjects(client, ctx, "test-store", request, RequestOptions{}) + + if err != nil { + t.Fatalf("ExecuteStreamedListObjects failed: %v", err) + } + + defer channel.Close() + + receivedObjects := []string{} + for obj := range channel.Objects { + receivedObjects = append(receivedObjects, obj.Object) + } + + err = <-channel.Errors + if err == nil { + t.Fatal("Expected error from channel, got nil") + } + + if !strings.Contains(err.Error(), "Internal error") { + t.Errorf("Expected error message to contain 'Internal error', got %s", err.Error()) + } + + if len(receivedObjects) != 1 { + t.Fatalf("Expected 1 object before error, got %d", len(receivedObjects)) + } +} + +func TestStreamedListObjectsWithChannel_InvalidJSON(t *testing.T) { + responseBody := `{"result":{"object":"document:1"}} +invalid json` + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/x-ndjson") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(responseBody)) + })) + defer server.Close() + + config, err := NewConfiguration(Configuration{ + ApiUrl: server.URL, + }) + if err != nil { + t.Fatalf("Failed to create configuration: %v", err) + } + + client := NewAPIClient(config) + ctx := context.Background() + + request := ListObjectsRequest{ + Type: "document", + Relation: "viewer", + User: "user:anne", + } + + channel, err := ExecuteStreamedListObjects(client, ctx, "test-store", request, RequestOptions{}) + + if err != nil { + t.Fatalf("ExecuteStreamedListObjects failed: %v", err) + } + + defer channel.Close() + + receivedObjects := []string{} + for obj := range channel.Objects { + receivedObjects = append(receivedObjects, obj.Object) + } + + err = <-channel.Errors + if err == nil { + t.Fatal("Expected error from channel, got nil") + } + + if len(receivedObjects) != 1 { + t.Fatalf("Expected 1 object before error, got %d", len(receivedObjects)) + } +} + +func TestStreamedListObjectsWithChannel_ContextCancellation(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/x-ndjson") + w.WriteHeader(http.StatusOK) + // Send first result + _, _ = w.Write([]byte(`{"result":{"object":"document:1"}}` + "\n")) + if f, ok := w.(http.Flusher); ok { + f.Flush() + } + // Wait indefinitely to simulate a long stream + <-r.Context().Done() + })) + defer server.Close() + + config, err := NewConfiguration(Configuration{ + ApiUrl: server.URL, + }) + if err != nil { + t.Fatalf("Failed to create configuration: %v", err) + } + + client := NewAPIClient(config) + ctx, cancel := context.WithCancel(context.Background()) + + request := ListObjectsRequest{ + Type: "document", + Relation: "viewer", + User: "user:anne", + } + + channel, err := ExecuteStreamedListObjects(client, ctx, "test-store", request, RequestOptions{}) + + if err != nil { + t.Fatalf("ExecuteStreamedListObjects failed: %v", err) + } + + defer channel.Close() + + // Read first result + obj := <-channel.Objects + if obj.Object != "document:1" { + t.Errorf("Expected document:1, got %s", obj.Object) + } + + // Cancel context + cancel() + + // Check that we get a cancellation error + err = <-channel.Errors + if err == nil { + t.Fatal("Expected cancellation error, got nil") + } + + if !strings.Contains(err.Error(), "context canceled") && !strings.Contains(err.Error(), "operation was canceled") { + t.Errorf("Expected context cancellation error, got: %v", err) + } +} + +func TestStreamedListObjectsWithChannel_CustomBufferSize(t *testing.T) { + numObjects := 100 + expectedResults := []string{} + for i := 0; i < numObjects; i++ { + expectedResults = append(expectedResults, `{"result":{"object":"document:`+strconv.Itoa(i)+`"}}`) + } + responseBody := strings.Join(expectedResults, "\n") + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/x-ndjson") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(responseBody)) + })) + defer server.Close() + + config, err := NewConfiguration(Configuration{ + ApiUrl: server.URL, + }) + if err != nil { + t.Fatalf("Failed to create configuration: %v", err) + } + + client := NewAPIClient(config) + ctx := context.Background() + + request := ListObjectsRequest{ + Type: "document", + Relation: "viewer", + User: "user:anne", + } + + // Use custom buffer size of 50 + channel, err := ExecuteStreamedListObjectsWithBufferSize(client, ctx, "test-store", request, RequestOptions{}, 50) + + if err != nil { + t.Fatalf("ExecuteStreamedListObjectsWithBufferSize failed: %v", err) + } + + defer channel.Close() + + receivedObjects := []string{} + for obj := range channel.Objects { + receivedObjects = append(receivedObjects, obj.Object) + } + + if err := <-channel.Errors; err != nil { + t.Fatalf("Received error from channel: %v", err) + } + + if len(receivedObjects) != numObjects { + t.Fatalf("Expected %d objects, got %d", numObjects, len(receivedObjects)) + } +} + +func TestProcessStreamingResponse_Generic(t *testing.T) { + // Test the generic ProcessStreamingResponse function + responseBody := `{"result":{"object":"document:1"}} +{"result":{"object":"document:2"}}` + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/x-ndjson") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(responseBody)) + })) + defer server.Close() + + resp, err := http.Get(server.URL) + if err != nil { + t.Fatalf("Failed to make request: %v", err) + } + + ctx := context.Background() + channel, err := ProcessStreamingResponse[StreamedListObjectsResponse](ctx, resp, 10) + + if err != nil { + t.Fatalf("ProcessStreamingResponse failed: %v", err) + } + + defer channel.Close() + + receivedObjects := []string{} + for obj := range channel.Results { + receivedObjects = append(receivedObjects, obj.Object) + } + + if err := <-channel.Errors; err != nil { + t.Fatalf("Received error from channel: %v", err) + } + + if len(receivedObjects) != 2 { + t.Fatalf("Expected 2 objects, got %d", len(receivedObjects)) + } + + if receivedObjects[0] != "document:1" { + t.Errorf("Expected document:1, got %s", receivedObjects[0]) + } + if receivedObjects[1] != "document:2" { + t.Errorf("Expected document:2, got %s", receivedObjects[1]) + } +} + + From b8d5a963ebd899e0bae00de5bd015f36a7196b91 Mon Sep 17 00:00:00 2001 From: SoulPancake Date: Thu, 20 Nov 2025 15:44:42 +0530 Subject: [PATCH 2/2] feat: address copilot/coderabbit comments --- config/clients/go/template/streaming.mustache | 9 ++++++--- config/clients/go/template/streaming_test.mustache | 2 -- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/config/clients/go/template/streaming.mustache b/config/clients/go/template/streaming.mustache index cb7725cf..610e914f 100644 --- a/config/clients/go/template/streaming.mustache +++ b/config/clients/go/template/streaming.mustache @@ -219,9 +219,12 @@ func executeStreamingRequest[TReq any, TRes any]( } httpResponse, err := client.callAPI(req) - if err != nil || httpResponse == nil { - return nil, err - } + if err != nil { + return nil, err + } + if httpResponse == nil { + return nil, errors.New("nil HTTP response from API client") + } if httpResponse.StatusCode >= http.StatusMultipleChoices { responseBody, readErr := io.ReadAll(httpResponse.Body) diff --git a/config/clients/go/template/streaming_test.mustache b/config/clients/go/template/streaming_test.mustache index ee488bcb..38938e7f 100644 --- a/config/clients/go/template/streaming_test.mustache +++ b/config/clients/go/template/streaming_test.mustache @@ -421,5 +421,3 @@ func TestProcessStreamingResponse_Generic(t *testing.T) { t.Errorf("Expected document:2, got %s", receivedObjects[1]) } } - -