Skip to content

feat(consumoor): add staging docker-compose infra and test scripts#766

Open
samcm wants to merge 2 commits intomasterfrom
feat/consumoor-staging-infra
Open

feat(consumoor): add staging docker-compose infra and test scripts#766
samcm wants to merge 2 commits intomasterfrom
feat/consumoor-staging-infra

Conversation

@samcm
Copy link
Member

@samcm samcm commented Feb 24, 2026

Summary

  • Add local development infrastructure for testing the consumoor pipeline against a staging ClickHouse instance
  • Includes docker-compose staging config, ClickHouse init script, correctness test script, topic survey utility, and example consumoor YAML config

Extracted from the consumoor routes branch to keep infrastructure changes separate from code restructuring.

…t scripts

Add local development infrastructure for testing the consumoor pipeline
against a staging ClickHouse instance:

- consumoor-clickhouse-init.sh: init script for local ClickHouse container
- docker-compose.staging.yml: staging compose configuration
- staging-correctness-test.sh: correctness test comparing staging vs local
- staging-topic-survey.sh: topic survey utility for staging Kafka
- xatu-consumoor.yaml: example consumoor configuration
@samcm samcm requested a review from Savid as a code owner February 24, 2026 03:19
samcm added a commit that referenced this pull request Feb 24, 2026
Staging docker-compose infrastructure moved to #766 to keep this
branch focused on the package restructuring.
- Add xatu-consumoor service and clickhouse-consumoor-init container
- Create proto-* Kafka topics for protobuf-encoded consumoor pipeline
- Update example_consumoor.yaml with per-topic stream comments
- Gitignore local staging config override
samcm added a commit that referenced this pull request Feb 24, 2026
…766)

Restore docker-compose.yml, example_consumoor.yaml, and .gitignore to
master state. Consumoor docker-compose services and proto Kafka topics
are now in the staging infra PR.
samcm added a commit that referenced this pull request Feb 24, 2026
Staging docker-compose infrastructure moved to #766 to keep this
branch focused on the package restructuring.
samcm added a commit that referenced this pull request Feb 24, 2026
…766)

Restore docker-compose.yml, example_consumoor.yaml, and .gitignore to
master state. Consumoor docker-compose services and proto Kafka topics
are now in the staging infra PR.
samcm added a commit that referenced this pull request Feb 25, 2026
…v infra (#757)

* feat(consumoor): add table route implementations, tests, and local dev infra

Add FlattenTo() implementations for all 76 table routes across 6
domains (beacon, canonical, execution, libp2p, mev, node), wire up
the route registry with side-effect imports, and include tests and
local development infrastructure.

Each route file follows a consistent pattern:
- init() registers via flattener.MustRegister(NewStaticRoute(...))
- FlattenTo() maps DecoratedEvent proto fields to typed columnar batch
  columns using appendRuntime(), appendMetadata(), appendPayload(),
  and appendAdditionalData() helpers

Tests:
- Config validation (ch-go settings, table config merging, canonical
  defaults, delivery modes)
- Metadata extraction from DecoratedEvent protos
- Route registration (every event name maps to exactly one route)
- Correctness tests (FlattenTo produces expected column values)
- SeaHash, convert helpers, benchmarks
- Writer error classification and retry logic
- Benthos stream configuration and batch output

Local dev infrastructure:
- ClickHouse init script for consumoor tables
- Consumoor YAML config for docker-compose
- Smoke test correctness script
- docker-compose.yml: consumoor service, proto Kafka topics,
  ClickHouse init container

* feat(consumoor): add snapshot/fuzz tests, fix Slot double-append bug, staging test infra

Add comprehensive test suite for all 77 flattener routes:

- TestSnapshotCorrectness: synthetic proto events with exact column value
  assertions for every route, completeness enforcement requiring non-empty
  checks for each fixture
- FuzzFlattenColumnAlignment: random proto bytes verify no panics and
  column alignment across all routes
- FuzzFlattenNilSafety: partially-constructed events verify nil-safety
- Extended TestConditionalRoutingPredicates to cover all 5 predicate routes
- Extended TestValidatorsFanout to cover all 3 fan-out routes

Fix canonical_beacon_block Slot column double-append: both appendPayload
and appendAdditionalData were appending to b.Slot when additional data
had Slot.Number set, breaking column alignment.

Refactor staging correctness test infrastructure: replace smoke-test script
with staging-correctness-test.sh and staging-topic-survey.sh, add docker
compose staging overlay.

* feat(consumoor): per-topic pipeline isolation with scoped flush

One Benthos stream per Kafka topic, discovered automatically at startup
via kadm. Each topic gets its own consumer group (base-{topic}) and
isolated offset tracking. All streams share a single ChGoWriter for
efficient ClickHouse connection reuse.

Key changes:
- Add FlushTables(ctx, tables) to Writer interface for scoped flushes
- Switch output plugin from FlushAll to FlushTables so each stream only
  flushes tables it wrote to
- Add ownsWriter lifecycle gating so multiple streams share one writer
- Create DiscoverTopics using kadm for automatic topic discovery
- Restructure shutdown: streams exit fully before writer stops
- Escape literal topic names with regexp.QuoteMeta for per-topic configs

* ci: increase sentry smoke test docker-compose timeout to 7 min

The 5-minute timeout is too tight for the sentry stack (ClickHouse
cluster + ZooKeeper + Kafka + Postgres + migrations). clickhouse-02
intermittently fails on startup, and recovery pushes past the limit.

* feat(consumoor): align config with Kafka conventions, add missing validation

Rename offsetDefault values to standard Kafka terminology (oldest/newest
→ earliest/latest), derive heartbeatIntervalMs automatically from
sessionTimeoutMs/10, add sessionTimeoutMs>0 and bufferSize>=batchSize
validation, remove phantom batchBytes config, lower fetchWaitMaxMs
default to 250ms, and refactor staging test scripts to remove sudo
host-mutation.

* refactor(consumoor): flatten package hierarchy — sinks/clickhouse/transform/flattener → route

Restructure the consumoor package tree to reduce nesting depth from 8
levels to 4 and rename packages to match their actual responsibilities:

- sinks/clickhouse/ → clickhouse/ (drop unnecessary sinks/ layer)
- sinks/clickhouse/transform/ → router/ (it routes, not transforms)
- sinks/clickhouse/transform/flattener/ → route/ (route definitions)
- flattener/tables/{domain}/ → route/{domain}/ (drop tables/ layer)
- flattener/tables/registry.go → route/all/all.go (blank-import aggregator)

All changes are mechanical: move files, update package declarations,
update import paths and qualifiers. No logic changes.

* refactor(consumoor): remove staging infra (extracted to separate PR)

Staging docker-compose infrastructure moved to #766 to keep this
branch focused on the package restructuring.

* refactor(consumoor): revert docker-compose/config infra (extracted to #766)

Restore docker-compose.yml, example_consumoor.yaml, and .gitignore to
master state. Consumoor docker-compose services and proto Kafka topics
are now in the staging infra PR.

* fix(consumoor): prevent scaffold column misalignment in FlattenTo

Return "not implemented" error before any columns are appended,
removing the appendRuntime/appendPayload stubs that caused partial
column writes and panics in fuzz/alignment tests.

* fix(consumoor): use valid offset value in example config (#768)

* feat(consumoor): add Grafana dashboard (#774)

Add a comprehensive Grafana dashboard for consumoor metrics covering
Kafka consumption, message routing, ClickHouse writes, buffer usage,
error tracking, and ch-go connection pool stats.

* fix(consumoor): validate SASL mechanism at config time (#769)

SASLConfig.Validate() accepted any mechanism string, but only PLAIN,
SCRAM-SHA-256, SCRAM-SHA-512, and OAUTHBEARER are supported. An
unsupported value (e.g. AWS_MSK_IAM) silently passed validation and
only failed at connection time. Move the check into Validate() so
misconfiguration is caught at startup.

Define mechanism constants and a lookup set in config.go to eliminate
string duplication between validation, the franz-go factory, and the
Benthos SASL builder.

* feat(consumoor): add Prometheus alerting rules (#773)

Add alerting rules covering buffer capacity, write errors, pipeline
stalls, flatten errors, DLQ failures, Kafka lag, pool exhaustion,
and decode errors.

* fix(consumoor): collect all table errors in FlushTables to prevent data loss (#770)

FlushTables and FlushAll previously returned only the first error when
multiple tables failed simultaneously. This caused writeBatchMode to
mark only the first failed table's messages in the BatchError, silently
committing the second table's failures and losing data.

Now both methods drain all results and return errors.Join(...) so every
failed table is represented. failedIndexesForWriteError unwraps joined
errors to collect message indexes across all failed tables, and
isPermanentWriteError checks all sub-errors for permanence.

* docs(consumoor): add operational runbook (#779)

Adds a comprehensive runbook for operating consumoor in production,
covering architecture, key metrics, common failure scenarios, startup
troubleshooting, graceful restart procedures, scaling, and config tuning.

* test(consumoor): add unit tests for DSN parsing, retry logic, and topic discovery (#782)

Cover critical untested code paths: parseChGoOptions with all DSN schemes
and TLS combos, doWithRetry with backoff/cancellation/non-retryable errors,
getOrCreateTableWriter concurrent safety, and matchTopics regex matching
with deduplication.

* feat(consumoor): add /healthz and /readyz probe endpoints (#775)

Add liveness and readiness HTTP endpoints to the existing metrics
server so Kubernetes can properly probe pod health. The readiness
check pings the ClickHouse connection pool and returns 503 when
connectivity is lost.

* feat(consumoor): make hardcoded timeouts configurable (#778)

Add configurable fields for previously hardcoded timeout values:

- ClickHouse dial/read timeouts (ChGoConfig.DialTimeout, ReadTimeout)
- Organic retry backoff delays (Config.OrganicRetryInitDelay, OrganicRetryMaxDelay)
- Benthos stream shutdown timeout (KafkaConfig.ShutdownTimeout)

All fields default to their previous hardcoded values and include
validation. The organic retry backoff uses exponential delay with
ticker reset so the table writer backs off intelligently on transient
flush failures.

* feat(consumoor): add write-path throughput benchmarks (#781)

Add benchmarks measuring Go pipeline overhead for the consumoor write
path (channel dispatch, batch accumulation, FlattenTo, flush) using a
noop ClickHouse sink so results reflect pure CPU/allocation cost.

Introduces a poolDoFn test hook on ChGoWriter so benchmarks can bypass
the real connection pool without modifying the production call path.

* feat(consumoor): add periodic topic hot-discovery via metadata refresh (#783)

New topics matching regex patterns are now picked up without restart.
The TopicRefreshInterval config (default 60s) controls both the Benthos
metadata_max_age for actual consumption and a lightweight watcher that
logs topic changes and updates the xatu_consumoor_active_topics gauge.

* feat(consumoor): validate ClickHouse tables exist on startup (#771)

* feat(consumoor): validate ClickHouse tables exist on startup

Query system.tables after the connection pool is established and compare
against all registered route table names (with TableSuffix applied).
Missing tables are logged as warnings by default. A new config option
failOnMissingTables (default false) promotes warnings to a fatal startup
error.

This prevents silent data loss where INSERTs to missing tables fail with
ErrUnknownTable, get classified as permanent errors, and the batch is
quietly dropped.

* fix(consumoor): default failOnMissingTables to true

Silent data loss from missing tables is worse than a noisy startup
crash. Flip the default so operators discover migration gaps immediately
rather than hours later when data is already lost.

* feat(consumoor): add buffer backpressure visibility (#777)

Add aggregate buffer metric and per-table warning logs to give operators
early visibility into memory pressure before OOM conditions.

- Add xatu_consumoor_buffer_usage_total gauge tracking sum of all table
  buffer usages for single-number alerting
- Add bufferWarningThreshold config (0-1, default 0.8) that triggers
  rate-limited warnings (once/min per table) when exceeded
- Track aggregate gauge on both enqueue (Write) and dequeue (drain) paths

* feat(consumoor): add Kafka consumer lag Prometheus metric (#772)

Operators had no direct visibility into how far behind the pipeline was
from Kafka partition head. The existing buffer_usage gauge only reflects
internal buffering, not actual Kafka lag.

Add xatu_consumoor_kafka_consumer_lag gauge (labels: topic, partition,
consumer_group) powered by a periodic poller using franz-go kadm.Client.Lag.
The poll interval is configurable via kafka.lagPollInterval (default 30s,
0 disables).

* feat(consumoor): add custom CA and client cert TLS support for ClickHouse and Kafka (#776)

Replace bare `tls.Config{MinVersion: tls.VersionTLS12}` with a shared
TLSConfig struct supporting custom CA files, client certificates, and
InsecureSkipVerify. Existing `tls: {enabled: true}` and DSN-triggered
TLS (`clickhouses://`) remain backward compatible.

* fix(consumoor): fix post-merge integration issues

- Remove unused crypto/tls import from lag_monitor.go and use TLS
  struct config instead of bool
- Add missing DialTimeout/ReadTimeout to buffer warning test fixtures
- Add missing OrganicRetryInitDelay/OrganicRetryMaxDelay and
  ShutdownTimeout to config_test.go fixtures via shared helpers

* fix(consumoor): fix CI lint and go mod tidy failures

Extract repeated "clickhouses" string to a constant to satisfy goconst,
and promote franz-go/pkg/kmsg to a direct dependency per go mod tidy.

* fix(consumoor): address code review findings for data safety

Three fixes from the Codex-driven review:

1. No-DLQ decode nack: when no DLQ (rejected_topic) is configured,
   decode errors and permanent write failures now fail the batch instead
   of silently acking. Route rejections remain acked since they are
   intentional. This prevents silent data loss when the DLQ is absent.

2. Shutdown drain timeout: table writers now use a configurable
   drainTimeout (default 30s) for the final flush during shutdown,
   preventing indefinite hangs when ClickHouse is unresponsive.

3. Parse error propagation: ParseUInt128, ParseUInt256, and ScaleDecimal
   now return (value, error) instead of silently returning zero on
   failure. All 54 call sites across 11 route files updated to surface
   these errors through FlattenTo, which rejects the event rather than
   writing corrupt zero values to ClickHouse.

* feat(consumoor): add ErrInvalidEvent sentinel to drop nil-payload events (#786)

* fix(consumoor): write NULL on parse failure for nullable columns

Parse errors in nullable ClickHouse columns should write NULL rather
than rejecting the entire event. Non-nullable columns still return
errors on parse failure to prevent corrupt zero values.

* fix(consumoor): fix test failures after ErrInvalidEvent rebase

Tolerate ErrInvalidEvent in TestCompletenessMinimalFlatten and
TestColumnAlignment for nil-payload minimal events, and update
TestFlattenDoesNotEmitLegacyUniqueColumn to use V2 head event type.
Re-add fmt imports removed during nullable fix but reintroduced by
the rebased ErrInvalidEvent commit.

* chore: update CODEOWNERS to claude (#784)

* chore: update CODEOWNERS to mattevans

* chore: update CODEOWNERS to claude

* ci: add path filters to skip irrelevant workflow runs (#785)

Add path filters to golangci-lint, govulncheck, test, and test-build
workflows so they only trigger when relevant files change. Previously
these ran on every PR, even for doc-only or config-only changes.

* refactor(consumoor): group batch processing by event type

Replace dual writeBatchMode/writeMessageMode with a single WriteBatch
that groups messages by xatu.Event_Name and processes each group
independently. This simplifies error attribution since all messages in
a group share the same target tables.

- Rewrite output.go with eventGroup/groupMessage types (~450→~280 lines)
- Replace FlushAll with FlushTables to flush only relevant tables
- Remove WriteErrorClassifier interface and DefaultErrorClassifier
- Remove DeliveryMode config (batch/message distinction no longer needed)

* fix(consumoor): resolve lint issues for CI

- Flatten nested if in table_writer FlattenTo loop (gocritic nestingReduce)
- Add blank lines before return statements (nlreturn)
- Add blank lines before variable assignments following if blocks (wsl_v5)
- Remove unused appendZeroPayload methods from MEV routes (unused)

* feat(consumoor): reject events with nil proto wrappers on non-nullable columns

Add validate() methods to all route FlattenTo implementations that check
non-nullable proto wrapper fields for nil before any column appends.
Events with missing required fields now return ErrInvalidEvent (already
handled by table_writer with skip + metric increment) instead of
silently writing zero values to ClickHouse.

Also fixes testWriter.FlushTables in benthos_test.go to match errors
by table name, preventing flaky test failures from non-deterministic
map iteration order in group-based batch processing.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant