Skip to content

Feature: Enable Unbounded (Streaming) Mode for BufferedStorageBackend with GCS Datastore #377

@chowbao

Description

@chowbao

Feature: Enable Unbounded (Streaming) Mode for BufferedStorageBackend with GCS Datastore

Summary

Add support for unbounded mode across all export commands when using the BufferedStorageBackend with the GCS ledger metadata datastore. Currently, unbounded mode is documented but marked as "Currently Unsupported" in the README. This feature will allow stellar-etl to run as a long-lived streaming process that continuously exports new ledgers as they close on the Stellar network, without requiring an --end-ledger flag.

Motivation

Today, every export command (export_ledgers, export_transactions, export_operations, export_effects, export_assets, export_trades, export_diagnostic_events, export_ledger_entry_changes) requires both --start-ledger and --end-ledger to define a bounded range. This means:

  • No continuous streaming: Users must repeatedly launch new stellar-etl processes with updated ledger ranges, adding operational overhead and introducing gaps or latency.
  • Wasted infrastructure cycles: Each bounded invocation incurs startup/teardown costs (container spin-up, datastore connection, buffer warming).
  • Near-real-time pipelines are difficult to build: Downstream consumers (BigQuery, Kafka, custom pipelines) cannot receive data as soon as ledgers close without a wrapper orchestrator that polls for the latest ledger and re-invokes stellar-etl.

The upstream Go SDK (github.com/stellar/go/ingest/ledgerbackend) already supports UnboundedRange on BufferedStorageBackend. The PrepareRange method accepts ledgerbackend.UnboundedRange(startLedger), and GetLedger will block and wait for the next sequentially written ledger file in the datastore. The CDP ApplyLedgerMetadata helper also supports unbounded ranges. This means the infrastructure to support streaming already exists at the SDK level — stellar-etl just needs to wire it up.

Current Behavior

  • The README documents unbounded mode under export_ledger_entry_changes as "Unbounded (Currently Unsupported)".
  • When only --start-ledger is provided (no --end-ledger), the ETL either errors out or does not function as a streaming process for the datastore backend.
  • Unbounded mode was previously supported when using the captive-core backend (--captive-core flag), where Stellar-Core connects directly to the network. It has not been implemented for the GCS BufferedStorageBackend path.

Proposed Behavior

When a user provides only --start-ledger (and omits --end-ledger, or sets --end-ledger 0) without the --captive-core flag, stellar-etl should:

  1. Initialize the BufferedStorageBackend with the GCS datastore as it does today.
  2. Call PrepareRange with ledgerbackend.UnboundedRange(startLedger) instead of ledgerbackend.BoundedRange(startLedger, endLedger).
  3. Enter a continuous processing loop where GetLedger blocks and waits for the next ledger file to appear in the GCS datastore (written by Galexie / Ledger Exporter).
  4. Transform and export each ledger's data as it becomes available, using the same output semantics (batched file output, stdout, etc.).
  5. Continue indefinitely until the process is terminated (SIGINT/SIGTERM), at which point it should gracefully shut down, flush any pending batch, and close the backend.

This should apply to all export commands, not just export_ledger_entry_changes.

Scope of Changes

1. Backend Initialization (internal/input/)

  • In the function(s) that create the BufferedStorageBackend and call PrepareRange, detect whether the run is bounded or unbounded based on the presence/absence of --end-ledger.
  • If unbounded, use ledgerbackend.UnboundedRange(startLedger).
  • If bounded, continue using ledgerbackend.BoundedRange(startLedger, endLedger) as today.

2. Export Command Logic (cmd/)

  • For each export_*.go command, update the ledger processing loop to support indefinite iteration when in unbounded mode.
  • The loop should call GetLedger(ctx, nextSequence) which will block until that ledger is available in the datastore.
  • Handle context cancellation and OS signals (SIGINT, SIGTERM) for graceful shutdown.
  • Continue to export in batches (controlled by --batch-size), flushing each completed batch to output before starting the next.

3. Retry and Resilience

  • Leverage the existing --retry-limit and --retry-wait flags for transient GCS read failures.
  • Consider adding a --max-wait or similar flag to control how long the process waits for a new ledger before logging a warning (optional, for observability — the default behavior should be to wait indefinitely as the SDK does).

4. Graceful Shutdown

  • Register OS signal handlers (SIGINT, SIGTERM).
  • On signal, cancel the context passed to GetLedger, flush any in-progress batch to output, and call backend.Close().
  • Exit with code 0 on clean shutdown.

5. README / Documentation

  • Update the "Unbounded (Currently Unsupported)" section to reflect that unbounded mode is now supported for the datastore backend.
  • Add usage examples for unbounded mode, e.g.:
    # Stream all ledger data starting from ledger 57000000stellar-etl export_ledgers --start-ledger 57000000 --output streamed_ledgers/# Stream ledger entry changes continuouslystellar-etl export_ledger_entry_changes --start-ledger 57000000 \  --output streamed_changes/ --batch-size 64
    
  • Document the graceful shutdown behavior.

6. Tests

  • Add unit tests for the unbounded range detection logic.
  • Add integration tests that simulate the datastore having ledgers written incrementally and verify the ETL processes them in order.
  • Test graceful shutdown behavior (context cancellation mid-stream).

CLI Interface

No new flags are strictly required. The existing convention is:

Flags Provided Mode
--start-ledger + --end-ledger Bounded (existing behavior)
--start-ledger only (no --end-ledger or --end-ledger 0) Unbounded / Streaming (new)

Optional new flag (nice-to-have):

  • --unbounded-idle-timeout (duration): If no new ledger appears in the datastore within this duration, log a warning. Default: no timeout (wait forever).

Reference Implementation

The upstream Go SDK CDP example demonstrates unbounded streaming with BufferedStorageBackend:

// From Stellar CDP consumer pipeline sample
latestNetworkLedger, err := historyArchive.GetLatestLedgerSequence()
ledgerRange := ledgerbackend.UnboundedRange(latestNetworkLedger)

pubConfig := cdp.PublisherConfig{
DataStoreConfig: adapter.dataStoreConfig,
BufferedStorageConfig: cdp.DefaultBufferedStorageBackendConfig(
adapter.dataStoreConfig.Schema.LedgersPerFile,
),
}

cdp.ApplyLedgerMetadata(ledgerRange, pubConfig, ctx, callback)

The BufferedStorageBackend.GetLedger() call will block until the requested ledger sequence is available in the GCS bucket, making it naturally suitable for streaming.

Acceptance Criteria

  • All export commands (export_ledgers, export_transactions, export_operations, export_effects, export_assets, export_trades, export_diagnostic_events, export_ledger_entry_changes) support unbounded mode when using the datastore backend.
  • Providing only --start-ledger (without --end-ledger) starts the ETL in unbounded/streaming mode.
  • The process continuously exports new ledger data as it appears in the GCS datastore.
  • Batch output semantics are preserved (files are flushed per --batch-size ledgers).
  • Graceful shutdown on SIGINT/SIGTERM: in-progress batch is flushed, backend is closed, exit code 0.
  • Existing bounded mode behavior is unaffected (no regressions).
  • Unit and integration tests cover unbounded mode initialization, streaming, and shutdown.
  • README updated to remove "Currently Unsupported" label and document unbounded mode usage.

Labels

enhancement, feature

# Feature: Enable Unbounded (Streaming) Mode for BufferedStorageBackend with GCS Datastore

Summary

Add support for unbounded mode across all export commands when using the BufferedStorageBackend with the GCS ledger metadata datastore. Currently, unbounded mode is documented but marked as "Currently Unsupported" in the README. This feature will allow stellar-etl to run as a long-lived streaming process that continuously exports new ledgers as they close on the Stellar network, without requiring an --end-ledger flag.

Motivation

Today, every export command (export_ledgers, export_transactions, export_operations, export_effects, export_assets, export_trades, export_diagnostic_events, export_ledger_entry_changes) requires both --start-ledger and --end-ledger to define a bounded range. This means:

  • No continuous streaming: Users must repeatedly launch new stellar-etl processes with updated ledger ranges, adding operational overhead and introducing gaps or latency.
  • Wasted infrastructure cycles: Each bounded invocation incurs startup/teardown costs (container spin-up, datastore connection, buffer warming).
  • Near-real-time pipelines are difficult to build: Downstream consumers (BigQuery, Kafka, custom pipelines) cannot receive data as soon as ledgers close without a wrapper orchestrator that polls for the latest ledger and re-invokes stellar-etl.

The upstream Go SDK (github.com/stellar/go/ingest/ledgerbackend) already supports UnboundedRange on BufferedStorageBackend. The PrepareRange method accepts ledgerbackend.UnboundedRange(startLedger), and GetLedger will block and wait for the next sequentially written ledger file in the datastore. The CDP ApplyLedgerMetadata helper also supports unbounded ranges. This means the infrastructure to support streaming already exists at the SDK level — stellar-etl just needs to wire it up.

Current Behavior

  • The README documents unbounded mode under export_ledger_entry_changes as "Unbounded (Currently Unsupported)".
  • When only --start-ledger is provided (no --end-ledger), the ETL either errors out or does not function as a streaming process for the datastore backend.
  • Unbounded mode was previously supported when using the captive-core backend (--captive-core flag), where Stellar-Core connects directly to the network. It has not been implemented for the GCS BufferedStorageBackend path.

Proposed Behavior

When a user provides only --start-ledger (and omits --end-ledger, or sets --end-ledger 0) without the --captive-core flag, stellar-etl should:

  1. Initialize the BufferedStorageBackend with the GCS datastore as it does today.
  2. Call PrepareRange with ledgerbackend.UnboundedRange(startLedger) instead of ledgerbackend.BoundedRange(startLedger, endLedger).
  3. Enter a continuous processing loop where GetLedger blocks and waits for the next ledger file to appear in the GCS datastore (written by [Galexie / Ledger Exporter](https://github.com/stellar/go/blob/master/exp/services/ledgerexporter/README.md)).
  4. Transform and export each ledger's data as it becomes available, using the same output semantics (batched file output, stdout, etc.).
  5. Continue indefinitely until the process is terminated (SIGINT/SIGTERM), at which point it should gracefully shut down, flush any pending batch, and close the backend.

This should apply to all export commands, not just export_ledger_entry_changes.

Scope of Changes

1. Backend Initialization (internal/input/)

  • In the function(s) that create the BufferedStorageBackend and call PrepareRange, detect whether the run is bounded or unbounded based on the presence/absence of --end-ledger.
  • If unbounded, use ledgerbackend.UnboundedRange(startLedger).
  • If bounded, continue using ledgerbackend.BoundedRange(startLedger, endLedger) as today.

2. Export Command Logic (cmd/)

  • For each export_*.go command, update the ledger processing loop to support indefinite iteration when in unbounded mode.
  • The loop should call GetLedger(ctx, nextSequence) which will block until that ledger is available in the datastore.
  • Handle context cancellation and OS signals (SIGINT, SIGTERM) for graceful shutdown.
  • Continue to export in batches (controlled by --batch-size), flushing each completed batch to output before starting the next.

3. Retry and Resilience

  • Leverage the existing --retry-limit and --retry-wait flags for transient GCS read failures.
  • Consider adding a --max-wait or similar flag to control how long the process waits for a new ledger before logging a warning (optional, for observability — the default behavior should be to wait indefinitely as the SDK does).

4. Graceful Shutdown

  • Register OS signal handlers (SIGINT, SIGTERM).
  • On signal, cancel the context passed to GetLedger, flush any in-progress batch to output, and call backend.Close().
  • Exit with code 0 on clean shutdown.

5. README / Documentation

  • Update the "Unbounded (Currently Unsupported)" section to reflect that unbounded mode is now supported for the datastore backend.
  • Add usage examples for unbounded mode, e.g.:
    # Stream all ledger data starting from ledger 57000000
    stellar-etl export_ledgers --start-ledger 57000000 --output streamed_ledgers/
    
    # Stream ledger entry changes continuously
    stellar-etl export_ledger_entry_changes --start-ledger 57000000 \
      --output streamed_changes/ --batch-size 64
  • Document the graceful shutdown behavior.

6. Tests

  • Add unit tests for the unbounded range detection logic.
  • Add integration tests that simulate the datastore having ledgers written incrementally and verify the ETL processes them in order.
  • Test graceful shutdown behavior (context cancellation mid-stream).

CLI Interface

No new flags are strictly required. The existing convention is:

Flags Provided Mode
--start-ledger + --end-ledger Bounded (existing behavior)
--start-ledger only (no --end-ledger or --end-ledger 0) Unbounded / Streaming (new)

Optional new flag (nice-to-have):

  • --unbounded-idle-timeout (duration): If no new ledger appears in the datastore within this duration, log a warning. Default: no timeout (wait forever).

Reference Implementation

The upstream Go SDK CDP example demonstrates unbounded streaming with BufferedStorageBackend:

// From Stellar CDP consumer pipeline sample
latestNetworkLedger, err := historyArchive.GetLatestLedgerSequence()
ledgerRange := ledgerbackend.UnboundedRange(latestNetworkLedger)

pubConfig := cdp.PublisherConfig{
    DataStoreConfig:       adapter.dataStoreConfig,
    BufferedStorageConfig: cdp.DefaultBufferedStorageBackendConfig(
        adapter.dataStoreConfig.Schema.LedgersPerFile,
    ),
}

cdp.ApplyLedgerMetadata(ledgerRange, pubConfig, ctx, callback)

The BufferedStorageBackend.GetLedger() call will block until the requested ledger sequence is available in the GCS bucket, making it naturally suitable for streaming.

Acceptance Criteria

  • All export commands (export_ledgers, export_transactions, export_operations, export_effects, export_assets, export_trades, export_diagnostic_events, export_ledger_entry_changes) support unbounded mode when using the datastore backend.
  • Providing only --start-ledger (without --end-ledger) starts the ETL in unbounded/streaming mode.
  • The process continuously exports new ledger data as it appears in the GCS datastore.
  • Batch output semantics are preserved (files are flushed per --batch-size ledgers).
  • Graceful shutdown on SIGINT/SIGTERM: in-progress batch is flushed, backend is closed, exit code 0.
  • Existing bounded mode behavior is unaffected (no regressions).
  • Unit and integration tests cover unbounded mode initialization, streaming, and shutdown.
  • README updated to remove "Currently Unsupported" label and document unbounded mode usage.

Labels

enhancement, feature

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions