Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions config/clients/go/config.overrides.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@
"api_client.mustache": {
"destinationFilename": "api_client.go",
"templateType": "SupportingFiles"
},
"streaming.mustache": {
"destinationFilename": "streaming.go",
"templateType": "SupportingFiles"
},
"streaming_test.mustache": {
"destinationFilename": "streaming_test.go",
"templateType": "SupportingFiles"
}
}
}
242 changes: 242 additions & 0 deletions config/clients/go/template/streaming.mustache
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
{{>partial_header}}
package {{packageName}}

import (
"bufio"
"context"
"encoding/json"
"errors"
"io"
"net/http"
"net/url"
"strings"
)

// StreamResult represents a generic streaming result wrapper with either a result or an error
type StreamResult[T any] struct {
Result *T `json:"result,omitempty" yaml:"result,omitempty"`
Error *Status `json:"error,omitempty" yaml:"error,omitempty"`
}

// StreamingChannel represents a generic channel for streaming responses
type StreamingChannel[T any] struct {
Results chan T
Errors chan error
cancel context.CancelFunc
}

// Close cancels the streaming context and cleans up resources
func (s *StreamingChannel[T]) Close() {
if s.cancel != nil {
s.cancel()
}
}

// ProcessStreamingResponse processes an HTTP response as a streaming NDJSON response
// and returns a StreamingChannel with results and errors
//
// Parameters:
// - ctx: The context for cancellation
// - httpResponse: The HTTP response to process
// - bufferSize: The buffer size for the channels (default 10 if <= 0)
//
// Returns:
// - *StreamingChannel[T]: A channel containing streaming results and errors
// - error: An error if the response is invalid
func ProcessStreamingResponse[T any](ctx context.Context, httpResponse *http.Response, bufferSize int) (*StreamingChannel[T], error) {
streamCtx, cancel := context.WithCancel(ctx)

// Use default buffer size of 10 if not specified or invalid
if bufferSize <= 0 {
bufferSize = 10
}

channel := &StreamingChannel[T]{
Results: make(chan T, bufferSize),
Errors: make(chan error, 1),
cancel: cancel,
}

if httpResponse == nil || httpResponse.Body == nil {
cancel()
return nil, errors.New("response or response body is nil")
}

go func() {
defer close(channel.Results)
defer close(channel.Errors)
defer cancel()
defer func() { _ = httpResponse.Body.Close() }()

scanner := bufio.NewScanner(httpResponse.Body)
// Allow large NDJSON entries (up to 10MB). Tune as needed.
buf := make([]byte, 0, 64*1024)
scanner.Buffer(buf, 10*1024*1024)

for scanner.Scan() {
select {
case <-streamCtx.Done():
channel.Errors <- streamCtx.Err()
return
default:
line := scanner.Bytes()
if len(line) == 0 {
continue
}

var streamResult StreamResult[T]
if err := json.Unmarshal(line, &streamResult); err != nil {
channel.Errors <- err
return
}

if streamResult.Error != nil {
msg := "stream error"
if streamResult.Error.Message != nil {
msg = *streamResult.Error.Message
}
channel.Errors <- errors.New(msg)
return
}

if streamResult.Result != nil {
select {
case <-streamCtx.Done():
channel.Errors <- streamCtx.Err()
return
case channel.Results <- *streamResult.Result:
}
}
}
}

if err := scanner.Err(); err != nil {
// Prefer context error if we were canceled to avoid surfacing net/http "use of closed network connection".
if streamCtx.Err() != nil {
channel.Errors <- streamCtx.Err()
return
}
channel.Errors <- err
}
}()

return channel, nil
}

// StreamedListObjectsChannel maintains backward compatibility with the old channel structure
type StreamedListObjectsChannel struct {
Objects chan StreamedListObjectsResponse
Errors chan error
cancel context.CancelFunc
}

// Close cancels the streaming context and cleans up resources
func (s *StreamedListObjectsChannel) Close() {
if s.cancel != nil {
s.cancel()
}
}

// ProcessStreamedListObjectsResponse processes a StreamedListObjects response
// This is a backward compatibility wrapper around ProcessStreamingResponse
func ProcessStreamedListObjectsResponse(ctx context.Context, httpResponse *http.Response, bufferSize int) (*StreamedListObjectsChannel, error) {
channel, err := ProcessStreamingResponse[StreamedListObjectsResponse](ctx, httpResponse, bufferSize)
if err != nil {
return nil, err
}

// Create a new channel with the old field name for backward compatibility
compatChannel := &StreamedListObjectsChannel{
Objects: channel.Results,
Errors: channel.Errors,
cancel: channel.cancel,
}

return compatChannel, nil
}

// ExecuteStreamedListObjects executes a StreamedListObjects request
func ExecuteStreamedListObjects(client *APIClient, ctx context.Context, storeId string, body ListObjectsRequest, options RequestOptions) (*StreamedListObjectsChannel, error) {
return ExecuteStreamedListObjectsWithBufferSize(client, ctx, storeId, body, options, 0)
}

// ExecuteStreamedListObjectsWithBufferSize executes a StreamedListObjects request with a custom buffer size
func ExecuteStreamedListObjectsWithBufferSize(client *APIClient, ctx context.Context, storeId string, body ListObjectsRequest, options RequestOptions, bufferSize int) (*StreamedListObjectsChannel, error) {
channel, err := executeStreamingRequest[ListObjectsRequest, StreamedListObjectsResponse](
client,
ctx,
"/stores/{store_id}/streamed-list-objects",
storeId,
body,
options,
bufferSize,
"StreamedListObjects",
)
if err != nil {
return nil, err
}

// Convert to backward-compatible channel structure
return &StreamedListObjectsChannel{
Objects: channel.Results,
Errors: channel.Errors,
cancel: channel.cancel,
}, nil
}

// executeStreamingRequest is a generic function to execute streaming requests
func executeStreamingRequest[TReq any, TRes any](
client *APIClient,
ctx context.Context,
pathTemplate string,
storeId string,
body TReq,
options RequestOptions,
bufferSize int,
operationName string,
) (*StreamingChannel[TRes], error) {
if storeId == "" {
return nil, reportError("storeId is required and must be specified")
}

path := pathTemplate
path = strings.ReplaceAll(path, "{"+"store_id"+"}", url.PathEscape(parameterToString(storeId, "")))

localVarHeaderParams := make(map[string]string)
localVarQueryParams := url.Values{}

localVarHTTPContentType := "application/json"
localVarHeaderParams["Content-Type"] = localVarHTTPContentType
localVarHeaderParams["Accept"] = "application/x-ndjson"

for header, val := range options.Headers {
localVarHeaderParams[header] = val
}

req, err := client.prepareRequest(ctx, path, http.MethodPost, body, localVarHeaderParams, localVarQueryParams)
if err != nil {
return nil, err
}

httpResponse, err := client.callAPI(req)
if err != nil {
return nil, err
}
if httpResponse == nil {
return nil, errors.New("nil HTTP response from API client")
}

if httpResponse.StatusCode >= http.StatusMultipleChoices {
responseBody, readErr := io.ReadAll(httpResponse.Body)
_ = httpResponse.Body.Close()
if readErr != nil {
return nil, readErr
}
err = client.handleAPIError(httpResponse, responseBody, body, operationName, storeId)
return nil, err
}

return ProcessStreamingResponse[TRes](ctx, httpResponse, bufferSize)
}


Loading
Loading