From 1195f66daf9516a5fe66f38b55a47c2e40d4eea8 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Tue, 24 Feb 2026 13:19:35 +1000 Subject: [PATCH 1/2] feat(consumoor): add staging docker-compose infra and correctness test scripts Add local development infrastructure for testing the consumoor pipeline against a staging ClickHouse instance: - consumoor-clickhouse-init.sh: init script for local ClickHouse container - docker-compose.staging.yml: staging compose configuration - staging-correctness-test.sh: correctness test comparing staging vs local - staging-topic-survey.sh: topic survey utility for staging Kafka - xatu-consumoor.yaml: example consumoor configuration --- .../consumoor-clickhouse-init.sh | 39 ++ .../docker-compose/docker-compose.staging.yml | 47 ++ .../staging-correctness-test.sh | 400 ++++++++++++++++ .../docker-compose/staging-topic-survey.sh | 447 ++++++++++++++++++ .../local/docker-compose/xatu-consumoor.yaml | 24 + 5 files changed, 957 insertions(+) create mode 100644 deploy/local/docker-compose/consumoor-clickhouse-init.sh create mode 100644 deploy/local/docker-compose/docker-compose.staging.yml create mode 100755 deploy/local/docker-compose/staging-correctness-test.sh create mode 100755 deploy/local/docker-compose/staging-topic-survey.sh create mode 100644 deploy/local/docker-compose/xatu-consumoor.yaml diff --git a/deploy/local/docker-compose/consumoor-clickhouse-init.sh b/deploy/local/docker-compose/consumoor-clickhouse-init.sh new file mode 100644 index 000000000..08976f8c1 --- /dev/null +++ b/deploy/local/docker-compose/consumoor-clickhouse-init.sh @@ -0,0 +1,39 @@ +#!/bin/bash +# Initialize the consumoor ClickHouse database by copying all table schemas from default. +set -e + +CH_HOST="${CH_HOST:-xatu-clickhouse-01}" + +ch() { + clickhouse-client --host="$CH_HOST" "$@" +} + +echo "Creating consumoor database..." +ch --query="CREATE DATABASE IF NOT EXISTS consumoor ON CLUSTER '{cluster}'" + +echo "Getting local tables from default database..." +tables=$(ch --query="SELECT name FROM system.tables WHERE database = 'default' AND name LIKE '%_local' AND engine LIKE 'Replicated%' ORDER BY name") + +for table in $tables; do + echo "Copying: default.$table -> consumoor.$table" + + # Get the CREATE TABLE DDL and modify it for consumoor database: + # 1. Replace database qualifier: default. -> consumoor. + # 2. Add ON CLUSTER after table name + # 3. Append /consumoor to the ZK path (first quoted arg in MergeTree()) + # to ensure unique paths regardless of path pattern variant + ch --format=TSVRaw --query="SHOW CREATE TABLE default.\`$table\`" \ + | sed '1s/^CREATE TABLE default\./CREATE TABLE IF NOT EXISTS consumoor./' \ + | sed "1s/\$/ ON CLUSTER '{cluster}'/" \ + | sed "s|MergeTree('/\([^']*\)'|MergeTree('/\1/consumoor'|" \ + | ch --multiquery + + # Create corresponding distributed table + dist_table="${table%_local}" + echo "Creating distributed: consumoor.$dist_table" + ch --query="CREATE TABLE IF NOT EXISTS consumoor.\`$dist_table\` ON CLUSTER '{cluster}' AS consumoor.\`$table\` ENGINE = Distributed('{cluster}', 'consumoor', '$table', rand())" +done + +echo "" +table_count=$(echo "$tables" | wc -w | tr -d ' ') +echo "Consumoor database initialization complete! ($table_count tables copied)" diff --git a/deploy/local/docker-compose/docker-compose.staging.yml b/deploy/local/docker-compose/docker-compose.staging.yml new file mode 100644 index 000000000..3076d71fc --- /dev/null +++ b/deploy/local/docker-compose/docker-compose.staging.yml @@ -0,0 +1,47 @@ +# Staging overlay: disables local Kafka, Vector, consumoor, and non-essential services. +# Consumoor runs as a native binary on the host (not in Docker) because Kafka +# brokers advertise internal K8s DNS names that aren't resolvable from Docker. +# +# Usage: +# docker compose -f docker-compose.yml -f deploy/local/docker-compose/docker-compose.staging.yml up -d +# +# See deploy/local/docker-compose/staging-correctness-test.sh for the full workflow. + +services: + # --- Disabled services (not needed for staging correctness test) --- + xatu-kafka: + profiles: ["disabled"] + xatu-kafka-storage-format: + profiles: ["disabled"] + xatu-init-kafka: + profiles: ["disabled"] + xatu-server: + profiles: ["disabled"] + xatu-postgres: + profiles: ["disabled"] + xatu-postgres-migrator: + profiles: ["disabled"] + xatu-vector-http-kafka: + profiles: ["disabled"] + xatu-vector-kafka-clickhouse: + profiles: ["disabled"] + xatu-vector-kafka-clickhouse-libp2p: + profiles: ["disabled"] + xatu-sentry-logs: + profiles: ["disabled"] + xatu-cannon: + profiles: ["disabled"] + xatu-nginx: + profiles: ["disabled"] + xatu-grafana: + profiles: ["disabled"] + xatu-prometheus: + profiles: ["disabled"] + tempo: + profiles: ["disabled"] + tempo-init: + profiles: ["disabled"] + + # --- Consumoor runs as native binary on the host, not in Docker --- + xatu-consumoor: + profiles: ["disabled"] diff --git a/deploy/local/docker-compose/staging-correctness-test.sh b/deploy/local/docker-compose/staging-correctness-test.sh new file mode 100755 index 000000000..404967da6 --- /dev/null +++ b/deploy/local/docker-compose/staging-correctness-test.sh @@ -0,0 +1,400 @@ +#!/usr/bin/env bash +# Staging correctness test for consumoor. +# +# Builds consumoor as a native binary and connects it to a staging/production +# Kafka cluster via per-broker port-forwards. Each broker pod is forwarded to +# a unique loopback IP (127.0.0.{2,3,4,...}:9092). Broker hostnames are mapped +# to these loopback IPs via /etc/hosts entries managed by this script (entries +# are tagged with "# xatu-staging-test" and cleaned up on exit). +# +# After consuming for a configurable duration, the script compares consumoor's +# ClickHouse output against the reference ClickHouse (staging Vector output). +# +# Prerequisites: +# - kubectl access to the target cluster +# - Docker running locally (for ClickHouse) +# - Go toolchain for building consumoor and running the comparison test +# - Loopback aliases for Kafka brokers (see one-time setup below) +# - sudo access (for /etc/hosts management) +# +# Usage: +# ./staging-correctness-test.sh [flags] +# +# Flags: +# --consume-duration SEC How long to consume from Kafka (default: 300 = 5m) +# --topics PATTERN Kafka topic regex pattern (default: all proto topics) +# --skip-build Skip building the consumoor binary +# --skip-portforward Skip kubectl port-forward setup +# --compare-only Skip consuming, just run the comparison test +# --help Show this help message +# +# Environment variables: +# KUBE_CONTEXT kubectl context (default: platform-analytics-hel1-staging) +# KUBE_NAMESPACE kubectl namespace (default: xatu) +# CLICKHOUSE_SERVICE ClickHouse service name (default: svc/chendpoint-xatu-clickhouse) +# STAGING_CLICKHOUSE_USER ClickHouse user for staging (default: empty) +# STAGING_CLICKHOUSE_PASSWORD ClickHouse password for staging (default: empty) +# MAX_COMPARE_ROWS Max rows to compare per table (default: 10000) +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +REPO_ROOT="$(cd "$SCRIPT_DIR/../../.." && pwd)" + +# --- Defaults --- +CONSUME_DURATION="${CONSUME_DURATION:-300}" +TOPICS="${TOPICS:-^xatu-protobuf-.+}" +SKIP_BUILD=false +SKIP_PORTFORWARD=false +COMPARE_ONLY=false +KUBE_CONTEXT="${KUBE_CONTEXT:-platform-analytics-hel1-staging}" +KUBE_NAMESPACE="${KUBE_NAMESPACE:-xatu}" +CLICKHOUSE_SERVICE="${CLICKHOUSE_SERVICE:-svc/chendpoint-xatu-clickhouse}" +STAGING_CLICKHOUSE_USER="${STAGING_CLICKHOUSE_USER:-}" +STAGING_CLICKHOUSE_PASSWORD="${STAGING_CLICKHOUSE_PASSWORD:-}" +MAX_COMPARE_ROWS="${MAX_COMPARE_ROWS:-10000}" +RUN_ID="$(date +%s)" + +# Broker discovery settings. +BROKER_POD_LABEL="${BROKER_POD_LABEL:-strimzi.io/name=xatu-internal-kafka-kafka}" +BROKER_SVC_SUFFIX="${BROKER_SVC_SUFFIX:-.xatu-internal-kafka-kafka-brokers.xatu.svc}" +BROKER_PORT=9092 + +# Paths. +XATU_BINARY="/tmp/xatu-staging-test-$$" +CONFIG_FILE="$SCRIPT_DIR/xatu-consumoor-staging.yaml" + +# Random port in 21000s for staging ClickHouse. +CH_LOCAL_PORT="${CH_LOCAL_PORT:-$((21000 + RANDOM % 1000))}" + +# --- Parse flags --- +while [[ $# -gt 0 ]]; do + case "$1" in + --consume-duration) + CONSUME_DURATION="$2" + shift 2 + ;; + --topics) + TOPICS="$2" + shift 2 + ;; + --skip-build) + SKIP_BUILD=true + shift + ;; + --skip-portforward) + SKIP_PORTFORWARD=true + shift + ;; + --compare-only) + COMPARE_ONLY=true + shift + ;; + --help) + head -37 "$0" | tail -32 + echo "" + echo "Examples:" + echo " # Run against staging with default settings (all topics, 5m consume):" + echo " ./staging-correctness-test.sh" + echo "" + echo " # Quick test with only head events for 2 minutes:" + echo ' ./staging-correctness-test.sh --topics "^xatu-protobuf-beacon-api-eth-v1-events-head" --consume-duration 120' + echo "" + echo " # Run comparison only (consumoor already consumed):" + echo " ./staging-correctness-test.sh --compare-only" + echo "" + echo " # Skip rebuilding binary for fast iteration:" + echo " ./staging-correctness-test.sh --skip-build --consume-duration 60" + echo "" + echo " # Run against production:" + echo " KUBE_CONTEXT=platform-analytics-hel1-production ./staging-correctness-test.sh" + exit 0 + ;; + *) + echo "Unknown flag: $1" + exit 1 + ;; + esac +done + +# --- Sudo pre-check --- +echo "This script needs sudo to manage /etc/hosts entries." +sudo -v || { echo "ERROR: sudo access required"; exit 1; } + +# --- PID / resource tracking for cleanup --- +PF_PIDS=() +CONSUMOOR_PID="" + +cleanup() { + echo "" + echo "=== Cleaning up ===" + + # Kill consumoor if running. + if [[ -n "$CONSUMOOR_PID" ]] && kill -0 "$CONSUMOOR_PID" 2>/dev/null; then + echo "Stopping consumoor (PID $CONSUMOOR_PID)..." + kill "$CONSUMOOR_PID" 2>/dev/null || true + wait "$CONSUMOOR_PID" 2>/dev/null || true + fi + + # Docker compose down. + if [[ "$COMPARE_ONLY" != "true" ]]; then + echo "Stopping docker compose..." + cd "$REPO_ROOT" + docker compose -f docker-compose.yml \ + -f deploy/local/docker-compose/docker-compose.staging.yml \ + down --timeout 10 2>/dev/null || true + fi + + # Kill port-forwards. + for pid in "${PF_PIDS[@]}"; do + if kill -0 "$pid" 2>/dev/null; then + echo "Killing port-forward (PID $pid)" + kill "$pid" 2>/dev/null || true + fi + done + + # Clean /etc/hosts entries. + sudo sed -i '' '/# xatu-staging-test$/d' /etc/hosts 2>/dev/null || true + sudo dscacheutil -flushcache 2>/dev/null || true + sudo killall -HUP mDNSResponder 2>/dev/null || true + + # Remove generated files. + rm -f "$CONFIG_FILE" + rm -f "$XATU_BINARY" + + echo "Cleanup complete." +} + +trap cleanup EXIT + +# --- Build binary --- +if [[ "$COMPARE_ONLY" != "true" && "$SKIP_BUILD" != "true" ]]; then + echo "=== Building consumoor binary ===" + cd "$REPO_ROOT" + go build -o "$XATU_BINARY" . + echo "Binary built: $XATU_BINARY" +elif [[ "$SKIP_BUILD" == "true" && ! -f "$XATU_BINARY" ]]; then + # If --skip-build but no binary exists, check for a previous build. + EXISTING=$(ls /tmp/xatu-staging-test-* 2>/dev/null | head -1 || true) + if [[ -n "$EXISTING" ]]; then + XATU_BINARY="$EXISTING" + echo "Reusing existing binary: $XATU_BINARY" + else + echo "ERROR: --skip-build specified but no binary found. Run without --skip-build first." + exit 1 + fi +fi + +# --- Discover broker pods and set up port-forwards --- +if [[ "$SKIP_PORTFORWARD" != "true" ]]; then + echo "=== Discovering Kafka broker pods (context: $KUBE_CONTEXT, namespace: $KUBE_NAMESPACE) ===" + + BROKER_PODS=$(kubectl --context "$KUBE_CONTEXT" -n "$KUBE_NAMESPACE" \ + get pods -l "$BROKER_POD_LABEL" -o jsonpath='{.items[*].metadata.name}') + + if [[ -z "$BROKER_PODS" ]]; then + echo "ERROR: No broker pods found with label $BROKER_POD_LABEL" + exit 1 + fi + + # Convert to array. + read -ra BROKER_POD_ARRAY <<< "$BROKER_PODS" + NUM_BROKERS=${#BROKER_POD_ARRAY[@]} + echo "Found $NUM_BROKERS broker pods: ${BROKER_POD_ARRAY[*]}" + + # Pre-flight: verify loopback aliases and /etc/hosts are set up. + MISSING_SETUP=false + for i in "${!BROKER_POD_ARRAY[@]}"; do + loopback_ip="127.0.0.$((i + 2))" + if ! ifconfig lo0 | grep -q "inet $loopback_ip "; then + echo "ERROR: Missing loopback alias for $loopback_ip" + MISSING_SETUP=true + fi + done + + if [[ "$MISSING_SETUP" == "true" ]]; then + echo "" + echo "Loopback aliases are missing (they don't survive reboot). Run this once:" + echo "" + for i in "${!BROKER_POD_ARRAY[@]}"; do + echo " sudo ifconfig lo0 alias 127.0.0.$((i + 2))" + done + exit 1 + fi + + echo "" + echo "=== Starting port-forwards ===" + + # Port-forward each broker to its loopback IP. + SEED_BROKER="" + for i in "${!BROKER_POD_ARRAY[@]}"; do + pod="${BROKER_POD_ARRAY[$i]}" + loopback_ip="127.0.0.$((i + 2))" + + echo " $pod → $loopback_ip:$BROKER_PORT" + + kubectl --context "$KUBE_CONTEXT" -n "$KUBE_NAMESPACE" \ + port-forward --address "$loopback_ip" "pod/$pod" "${BROKER_PORT}:${BROKER_PORT}" & + PF_PIDS+=($!) + + if [[ -z "$SEED_BROKER" ]]; then + SEED_BROKER="${loopback_ip}:${BROKER_PORT}" + fi + done + + # Port-forward staging ClickHouse. + echo " Staging ClickHouse → localhost:$CH_LOCAL_PORT" + kubectl --context "$KUBE_CONTEXT" -n "$KUBE_NAMESPACE" \ + port-forward "$CLICKHOUSE_SERVICE" "${CH_LOCAL_PORT}:8123" & + PF_PIDS+=($!) + + # Add /etc/hosts entries for broker hostnames. + sudo sed -i '' '/# xatu-staging-test$/d' /etc/hosts + for i in "${!BROKER_POD_ARRAY[@]}"; do + pod="${BROKER_POD_ARRAY[$i]}" + loopback_ip="127.0.0.$((i + 2))" + printf '%s %s%s # xatu-staging-test\n' "$loopback_ip" "$pod" "$BROKER_SVC_SUFFIX" | sudo tee -a /etc/hosts >/dev/null + done + sudo dscacheutil -flushcache 2>/dev/null || true + sudo killall -HUP mDNSResponder 2>/dev/null || true + echo " /etc/hosts entries added for ${NUM_BROKERS} brokers" + + echo "" + echo "Waiting for port-forwards to establish..." + for i in $(seq 1 15); do + if curl -sf "http://localhost:${CH_LOCAL_PORT}/?query=SELECT+1" >/dev/null 2>&1; then + echo "Port-forwards established." + break + fi + if [[ "$i" -eq 15 ]]; then + echo "ERROR: Staging ClickHouse not reachable at localhost:${CH_LOCAL_PORT} after 15s" + exit 1 + fi + sleep 1 + done +else + echo "=== Skipping port-forward setup ===" + SEED_BROKER="${SEED_BROKER:-127.0.0.2:${BROKER_PORT}}" +fi + +echo "" +echo "=== Staging Correctness Test ===" +echo " Seed broker: ${SEED_BROKER}" +echo " Staging CH port-forward: localhost:${CH_LOCAL_PORT}" +echo " Topics: ${TOPICS}" +echo " Consume duration: ${CONSUME_DURATION}s" +echo "" + +# --- Start docker compose (ClickHouse + init only; consumoor is disabled) --- +if [[ "$COMPARE_ONLY" != "true" ]]; then + echo "=== Generating consumoor staging config ===" + echo " Topics: $TOPICS" + echo " Consumer group: correctness-test-consumoor-${RUN_ID}" + + cat > "$CONFIG_FILE" </dev/null 2>&1; then + echo "Local ClickHouse is healthy." + break + fi + + if [[ "$i" -eq 30 ]]; then + echo "ERROR: Local ClickHouse not healthy after 30 attempts." + exit 1 + fi + + sleep 2 + done + + # Wait for consumoor-init to finish (creates consumoor database). + echo "Waiting for consumoor-init..." + for i in $(seq 1 30); do + init_status=$(docker inspect -f '{{.State.Status}}' xatu-clickhouse-consumoor-init 2>/dev/null || echo "unknown") + if [[ "$init_status" == "exited" ]]; then + echo "consumoor-init completed." + break + fi + if [[ "$i" -eq 30 ]]; then + echo "WARNING: consumoor-init may not have completed. Continuing anyway." + fi + sleep 2 + done + + # --- Run consumoor as native binary --- + CONSUMOOR_LOG="/tmp/xatu-consumoor-staging-$$.log" + echo "=== Starting consumoor (native binary, logs: $CONSUMOOR_LOG) ===" + "$XATU_BINARY" consumoor --config "$CONFIG_FILE" >"$CONSUMOOR_LOG" 2>&1 & + CONSUMOOR_PID=$! + + # Give it a moment to start and check it's still alive. + sleep 3 + if ! kill -0 "$CONSUMOOR_PID" 2>/dev/null; then + echo "ERROR: consumoor exited immediately. Check output above for errors." + exit 1 + fi + echo "consumoor started (PID $CONSUMOOR_PID)." + + # --- Consume --- + echo "=== Consuming from staging Kafka for ${CONSUME_DURATION}s ===" + echo " (started at $(date))" + sleep "$CONSUME_DURATION" + echo "Consume duration elapsed." + + # Stop consumoor. + echo "Stopping consumoor..." + kill "$CONSUMOOR_PID" 2>/dev/null || true + wait "$CONSUMOOR_PID" 2>/dev/null || true + CONSUMOOR_PID="" +fi + +# --- Run comparison test --- +echo "=== Running comparison test ===" +cd "$REPO_ROOT" + +CONSUMOOR_STAGING_TEST=true \ +CLICKHOUSE_URL="http://localhost:8123" \ +STAGING_CLICKHOUSE_URL="http://localhost:${CH_LOCAL_PORT}" \ +STAGING_CLICKHOUSE_USER="${STAGING_CLICKHOUSE_USER}" \ +STAGING_CLICKHOUSE_PASSWORD="${STAGING_CLICKHOUSE_PASSWORD}" \ +MAX_COMPARE_ROWS="${MAX_COMPARE_ROWS}" \ +go test ./pkg/consumoor/sinks/clickhouse/transform/flattener/ \ + -run TestStagingCorrectness -v -timeout 300s + +echo "=== Done ===" diff --git a/deploy/local/docker-compose/staging-topic-survey.sh b/deploy/local/docker-compose/staging-topic-survey.sh new file mode 100755 index 000000000..88e96b815 --- /dev/null +++ b/deploy/local/docker-compose/staging-topic-survey.sh @@ -0,0 +1,447 @@ +#!/usr/bin/env bash +# Survey which topics/tables consumoor handles from staging Kafka. +# +# Builds consumoor as a native binary and connects it to staging Kafka via +# per-broker port-forwards (same approach as staging-correctness-test.sh). +# Broker hostnames are mapped to loopback IPs via /etc/hosts entries managed +# by this script (tagged with "# xatu-staging-test", cleaned up on exit). +# After consuming briefly, reports which ClickHouse tables received data and +# compares row counts against the staging reference. +# +# Usage: +# ./staging-topic-survey.sh [flags] +# +# Flags: +# --consume-seconds SEC How long to consume (default: 10) +# --topics PATTERN Kafka topic regex (default: ^xatu-protobuf-.+) +# --skip-build Skip building the consumoor binary +# --skip-portforward Skip kubectl port-forward setup +# --compare-only Skip consuming, just survey existing data +# --help Show this help +# +# Environment: +# KUBE_CONTEXT (default: platform-analytics-hel1-staging) +# KUBE_NAMESPACE (default: xatu) +# CLICKHOUSE_SERVICE (default: svc/chendpoint-xatu-clickhouse) +# STAGING_CLICKHOUSE_USER (default: empty) +# STAGING_CLICKHOUSE_PASSWORD (default: empty) +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +REPO_ROOT="$(cd "$SCRIPT_DIR/../../.." && pwd)" + +# --- Defaults --- +CONSUME_SECONDS="${CONSUME_SECONDS:-10}" +TOPICS="${TOPICS:-^xatu-protobuf-.+}" +SKIP_BUILD=false +SKIP_PORTFORWARD=false +COMPARE_ONLY=false +KUBE_CONTEXT="${KUBE_CONTEXT:-platform-analytics-hel1-staging}" +KUBE_NAMESPACE="${KUBE_NAMESPACE:-xatu}" +CLICKHOUSE_SERVICE="${CLICKHOUSE_SERVICE:-svc/chendpoint-xatu-clickhouse}" +STAGING_CLICKHOUSE_USER="${STAGING_CLICKHOUSE_USER:-}" +STAGING_CLICKHOUSE_PASSWORD="${STAGING_CLICKHOUSE_PASSWORD:-}" + +# Broker discovery settings. +BROKER_POD_LABEL="${BROKER_POD_LABEL:-strimzi.io/name=xatu-internal-kafka-kafka}" +BROKER_SVC_SUFFIX="${BROKER_SVC_SUFFIX:-.xatu-internal-kafka-kafka-brokers.xatu.svc}" +BROKER_PORT=9092 + +# Paths. +XATU_BINARY="/tmp/xatu-staging-test-$$" +CONFIG_FILE="$SCRIPT_DIR/xatu-consumoor-staging.yaml" + +# Random port in 21000s for staging ClickHouse. +CH_STAGING_PORT=$((21000 + RANDOM % 1000)) + +RUN_ID="$(date +%s)" + +# --- Parse flags --- +while [[ $# -gt 0 ]]; do + case "$1" in + --consume-seconds) CONSUME_SECONDS="$2"; shift 2 ;; + --topics) TOPICS="$2"; shift 2 ;; + --skip-build) SKIP_BUILD=true; shift ;; + --skip-portforward) SKIP_PORTFORWARD=true; shift ;; + --compare-only) COMPARE_ONLY=true; shift ;; + --help) + sed -n '2,/^set -/p' "$0" | head -n -1 + exit 0 + ;; + *) echo "Unknown flag: $1"; exit 1 ;; + esac +done + +# --- Sudo pre-check --- +echo "This script needs sudo to manage /etc/hosts entries." +sudo -v || { echo "ERROR: sudo access required"; exit 1; } + +# --- Cleanup --- +PF_PIDS=() +CONSUMOOR_PID="" + +cleanup() { + echo "" + echo "=== Cleaning up ===" + + # Kill consumoor if running. + if [[ -n "$CONSUMOOR_PID" ]] && kill -0 "$CONSUMOOR_PID" 2>/dev/null; then + echo "Stopping consumoor (PID $CONSUMOOR_PID)..." + kill "$CONSUMOOR_PID" 2>/dev/null || true + wait "$CONSUMOOR_PID" 2>/dev/null || true + fi + + if [[ "$COMPARE_ONLY" != "true" ]]; then + cd "$REPO_ROOT" + docker compose -f docker-compose.yml \ + -f deploy/local/docker-compose/docker-compose.staging.yml \ + down --timeout 5 2>/dev/null || true + fi + + for pid in "${PF_PIDS[@]}"; do + kill "$pid" 2>/dev/null || true + done + + # Clean /etc/hosts entries. + sudo sed -i '' '/# xatu-staging-test$/d' /etc/hosts 2>/dev/null || true + sudo dscacheutil -flushcache 2>/dev/null || true + sudo killall -HUP mDNSResponder 2>/dev/null || true + + rm -f "$CONFIG_FILE" + rm -f "$XATU_BINARY" + echo "Done." +} +trap cleanup EXIT + +# --- Helpers --- +ch_local() { + curl -sf "http://localhost:8123/" --data-binary "$1" +} + +ch_staging() { + if [[ -n "$STAGING_CLICKHOUSE_USER" ]]; then + curl -sf "http://localhost:${CH_STAGING_PORT}/" \ + --user "${STAGING_CLICKHOUSE_USER}:${STAGING_CLICKHOUSE_PASSWORD}" \ + --data-binary "$1" + else + curl -sf "http://localhost:${CH_STAGING_PORT}/" --data-binary "$1" + fi +} + +# Infer the proto topic name(s) from a ClickHouse table name. +# e.g. beacon_api_eth_v1_events_head → xatu-protobuf-beacon-api-eth-v1-events-head +infer_topic() { + local base="$1" + echo "xatu-protobuf-$(echo "$base" | tr '_' '-')" +} + +# --- Build binary --- +if [[ "$COMPARE_ONLY" != "true" && "$SKIP_BUILD" != "true" ]]; then + echo "=== Building consumoor binary ===" + cd "$REPO_ROOT" + go build -o "$XATU_BINARY" . + echo "Binary built: $XATU_BINARY" +elif [[ "$SKIP_BUILD" == "true" && ! -f "$XATU_BINARY" ]]; then + EXISTING=$(ls /tmp/xatu-staging-test-* 2>/dev/null | head -1 || true) + if [[ -n "$EXISTING" ]]; then + XATU_BINARY="$EXISTING" + echo "Reusing existing binary: $XATU_BINARY" + else + echo "ERROR: --skip-build specified but no binary found. Run without --skip-build first." + exit 1 + fi +fi + +# --- Discover broker pods and set up port-forwards --- +if [[ "$SKIP_PORTFORWARD" != "true" ]]; then + echo "=== Discovering Kafka broker pods (context: $KUBE_CONTEXT, namespace: $KUBE_NAMESPACE) ===" + + BROKER_PODS=$(kubectl --context "$KUBE_CONTEXT" -n "$KUBE_NAMESPACE" \ + get pods -l "$BROKER_POD_LABEL" -o jsonpath='{.items[*].metadata.name}') + + if [[ -z "$BROKER_PODS" ]]; then + echo "ERROR: No broker pods found with label $BROKER_POD_LABEL" + exit 1 + fi + + read -ra BROKER_POD_ARRAY <<< "$BROKER_PODS" + NUM_BROKERS=${#BROKER_POD_ARRAY[@]} + echo "Found $NUM_BROKERS broker pods: ${BROKER_POD_ARRAY[*]}" + + # Pre-flight: verify loopback aliases are set up. + MISSING_SETUP=false + for i in "${!BROKER_POD_ARRAY[@]}"; do + loopback_ip="127.0.0.$((i + 2))" + if ! ifconfig lo0 | grep -q "inet $loopback_ip "; then + echo "ERROR: Missing loopback alias for $loopback_ip" + MISSING_SETUP=true + fi + done + + if [[ "$MISSING_SETUP" == "true" ]]; then + echo "" + echo "Loopback aliases are missing (they don't survive reboot). Run this once:" + echo "" + for i in "${!BROKER_POD_ARRAY[@]}"; do + echo " sudo ifconfig lo0 alias 127.0.0.$((i + 2))" + done + exit 1 + fi + + echo "" + echo "=== Starting port-forwards ===" + + # Port-forward each broker to its loopback IP. + SEED_BROKER="" + for i in "${!BROKER_POD_ARRAY[@]}"; do + pod="${BROKER_POD_ARRAY[$i]}" + loopback_ip="127.0.0.$((i + 2))" + + echo " $pod → $loopback_ip:$BROKER_PORT" + + kubectl --context "$KUBE_CONTEXT" -n "$KUBE_NAMESPACE" \ + port-forward --address "$loopback_ip" "pod/$pod" "${BROKER_PORT}:${BROKER_PORT}" & + PF_PIDS+=($!) + + if [[ -z "$SEED_BROKER" ]]; then + SEED_BROKER="${loopback_ip}:${BROKER_PORT}" + fi + done + + # Port-forward staging ClickHouse. + echo " Staging ClickHouse → localhost:$CH_STAGING_PORT" + kubectl --context "$KUBE_CONTEXT" -n "$KUBE_NAMESPACE" \ + port-forward "$CLICKHOUSE_SERVICE" "${CH_STAGING_PORT}:8123" & + PF_PIDS+=($!) + + # Add /etc/hosts entries for broker hostnames. + sudo sed -i '' '/# xatu-staging-test$/d' /etc/hosts + for i in "${!BROKER_POD_ARRAY[@]}"; do + pod="${BROKER_POD_ARRAY[$i]}" + loopback_ip="127.0.0.$((i + 2))" + printf '%s %s%s # xatu-staging-test\n' "$loopback_ip" "$pod" "$BROKER_SVC_SUFFIX" | sudo tee -a /etc/hosts >/dev/null + done + sudo dscacheutil -flushcache 2>/dev/null || true + sudo killall -HUP mDNSResponder 2>/dev/null || true + echo " /etc/hosts entries added for ${NUM_BROKERS} brokers" + + echo "" + echo "Waiting for port-forwards to establish..." + for i in $(seq 1 15); do + if curl -sf "http://localhost:${CH_STAGING_PORT}/?query=SELECT+1" >/dev/null 2>&1; then + echo "Port-forwards ready." + break + fi + if [[ "$i" -eq 15 ]]; then + echo "ERROR: Staging ClickHouse not reachable at localhost:${CH_STAGING_PORT} after 15s" + exit 1 + fi + sleep 1 + done +else + echo "=== Skipping port-forward setup ===" + SEED_BROKER="${SEED_BROKER:-127.0.0.2:${BROKER_PORT}}" + CH_STAGING_PORT="${CH_STAGING_LOCAL_PORT:-$CH_STAGING_PORT}" +fi + +# --- Banner --- +echo "" +echo "=== Staging Topic Survey ===" +echo " Seed broker: ${SEED_BROKER}" +echo " Staging CH port-forward: localhost:${CH_STAGING_PORT}" +echo " Topics pattern: ${TOPICS}" +echo " Consume duration: ${CONSUME_SECONDS}s" +echo "" + +# --- Consume (unless --compare-only) --- +if [[ "$COMPARE_ONLY" != "true" ]]; then + # Generate consumoor config. + cat > "$CONFIG_FILE" </dev/null 2>&1; then + echo "Local ClickHouse healthy." + break + fi + if [[ "$i" -eq 60 ]]; then + echo "ERROR: Local ClickHouse not healthy after 120s." + exit 1 + fi + sleep 2 + done + + # Wait for consumoor-init to finish (creates consumoor database). + echo "Waiting for consumoor-init..." + for i in $(seq 1 30); do + init_status=$(docker inspect -f '{{.State.Status}}' xatu-clickhouse-consumoor-init 2>/dev/null || echo "unknown") + if [[ "$init_status" == "exited" ]]; then + echo "consumoor-init completed." + break + fi + if [[ "$i" -eq 30 ]]; then + echo "WARNING: consumoor-init may not have completed. Continuing anyway." + fi + sleep 2 + done + + # Start consumoor as native binary. + echo "=== Starting consumoor (native binary) ===" + "$XATU_BINARY" consumoor --config "$CONFIG_FILE" & + CONSUMOOR_PID=$! + + sleep 3 + if ! kill -0 "$CONSUMOOR_PID" 2>/dev/null; then + echo "ERROR: consumoor exited immediately. Check output above for errors." + exit 1 + fi + echo "consumoor started (PID $CONSUMOOR_PID)." + + echo "=== Consuming for ${CONSUME_SECONDS}s ===" + sleep "$CONSUME_SECONDS" + + # Stop consumoor but keep ClickHouse running for the survey. + echo "Stopping consumoor..." + kill "$CONSUMOOR_PID" 2>/dev/null || true + wait "$CONSUMOOR_PID" 2>/dev/null || true + CONSUMOOR_PID="" +fi + +# --- Survey tables --- +echo "" +echo "=== Survey Results ===" +echo "" + +# Known V2 events: these have V1 counterparts that are deprecated/empty. +# Data lands in the same table from the V2 topic. +# Format: v2_topic_suffix → v1_topic_suffix +declare -A V2_EVENTS=( + ["beacon-api-eth-v1-events-attestation-v2"]="beacon-api-eth-v1-events-attestation" + ["beacon-api-eth-v1-events-block-v2"]="beacon-api-eth-v1-events-block" + ["beacon-api-eth-v1-events-chain-reorg-v2"]="beacon-api-eth-v1-events-chain-reorg" + ["beacon-api-eth-v1-events-finalized-checkpoint-v2"]="beacon-api-eth-v1-events-finalized-checkpoint" + ["beacon-api-eth-v1-events-head-v2"]="beacon-api-eth-v1-events-head" + ["beacon-api-eth-v1-events-voluntary-exit-v2"]="beacon-api-eth-v1-events-voluntary-exit" + ["beacon-api-eth-v1-events-contribution-and-proof-v2"]="beacon-api-eth-v1-events-contribution-and-proof" + ["beacon-api-eth-v1-debug-fork-choice-v2"]="beacon-api-eth-v1-debug-fork-choice" + ["beacon-api-eth-v1-debug-fork-choice-reorg-v2"]="beacon-api-eth-v1-debug-fork-choice-reorg" + ["beacon-api-eth-v2-beacon-block-v2"]="beacon-api-eth-v2-beacon-block" + ["mempool-transaction-v2"]="mempool-transaction" +) + +# Get all consumoor _local tables. +tables=$(ch_local "SELECT name FROM system.tables WHERE database='consumoor' AND name LIKE '%_local' AND engine NOT IN ('MaterializedView') ORDER BY name") + +if [[ -z "$tables" ]]; then + echo "ERROR: No consumoor _local tables found. Did consumoor-init run?" + exit 1 +fi + +# Print header. +printf "%-60s │ %7s │ %7s │ %-30s │ %s\n" "TABLE" "ROWS" "STG" "TOPIC" "NOTES" +printf "%-60s─┼─%7s─┼─%7s─┼─%-30s─┼─%s\n" \ + "────────────────────────────────────────────────────────────" \ + "───────" "───────" "──────────────────────────────" "──────────────" + +matched=0 +empty=0 +no_stg=0 +total=0 + +while IFS= read -r local_table; do + [[ -z "$local_table" ]] && continue + total=$((total + 1)) + + base="${local_table%_local}" + + # Count consumoor rows. + rows=$(ch_local "SELECT count() FROM consumoor.\`${local_table}\`" 2>/dev/null) || rows="0" + + # Infer topic name. + topic=$(infer_topic "$base") + + # Check if this is a V2 event table. + topic_short="${topic#xatu-protobuf-}" + notes="" + + # Check if the inferred topic has a V2 variant. + if [[ -n "${V2_EVENTS[${topic_short}-v2]+x}" ]] || [[ -n "${V2_EVENTS[${topic_short}]+x}" ]]; then + if [[ -n "${V2_EVENTS[${topic_short}-v2]+x}" ]]; then + notes="V1+V2 → same table" + topic="${topic}-v2 (+ v1)" + fi + fi + + if [[ "$rows" == "0" ]]; then + printf "%-60s │ %7s │ %7s │ %-30s │ %s\n" "$base" "0" "-" "$topic_short" "${notes:-no data}" + empty=$((empty + 1)) + continue + fi + + # Get time range for staging comparison. + time_range=$(ch_local "SELECT min(event_date_time), max(event_date_time) FROM consumoor.\`${local_table}\`") + min_time=$(echo "$time_range" | cut -f1) + max_time=$(echo "$time_range" | cut -f2) + + # Check staging table (try distributed, then _local). + stg_table="$base" + stg_exists=$(ch_staging "SELECT count() FROM system.tables WHERE database='default' AND name='${stg_table}'" 2>/dev/null) || stg_exists="0" + + if [[ "$stg_exists" == "0" ]]; then + stg_table="${local_table}" + stg_exists=$(ch_staging "SELECT count() FROM system.tables WHERE database='default' AND name='${stg_table}'" 2>/dev/null) || stg_exists="0" + fi + + if [[ "$stg_exists" == "0" ]]; then + printf "%-60s │ %7s │ %7s │ %-30s │ %s\n" "$base" "$rows" "?" "$topic_short" "no staging table" + no_stg=$((no_stg + 1)) + continue + fi + + # Count staging rows in same time window. + stg_rows=$(ch_staging "SELECT count() FROM default.\`${stg_table}\` WHERE event_date_time >= '${min_time}' AND event_date_time <= '${max_time}'" 2>/dev/null) || stg_rows="err" + + printf "%-60s │ %7s │ %7s │ %-30s │ %s\n" "$base" "$rows" "$stg_rows" "$topic_short" "$notes" + matched=$((matched + 1)) + +done <<< "$tables" + +echo "" +echo "=== Summary ===" +echo " Tables with data: $matched" +echo " Tables empty: $empty" +echo " No staging table: $no_stg" +echo " Total tables checked: $total" diff --git a/deploy/local/docker-compose/xatu-consumoor.yaml b/deploy/local/docker-compose/xatu-consumoor.yaml new file mode 100644 index 000000000..87e7359aa --- /dev/null +++ b/deploy/local/docker-compose/xatu-consumoor.yaml @@ -0,0 +1,24 @@ +logging: "debug" +metricsAddr: ":9091" + +kafka: + brokers: + - xatu-kafka:29092 + topics: + - "^proto-.+" + consumerGroup: xatu-consumoor-v3 + encoding: protobuf + commitInterval: 1s + +clickhouse: + dsn: "clickhouse://xatu-clickhouse-01:9000/consumoor" + tableSuffix: "_local" + chgo: + maxConns: 32 + queryTimeout: 120s + defaults: + batchSize: 1000 + flushInterval: 1s + bufferSize: 10000 + insertSettings: + insert_quorum: 0 From 5858e1878ce3d46a9b6d844a300e1b08ddf15ce0 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Tue, 24 Feb 2026 13:30:04 +1000 Subject: [PATCH 2/2] feat(consumoor): add docker-compose services and proto Kafka topics - Add xatu-consumoor service and clickhouse-consumoor-init container - Create proto-* Kafka topics for protobuf-encoded consumoor pipeline - Update example_consumoor.yaml with per-topic stream comments - Gitignore local staging config override --- .gitignore | 1 + docker-compose.yml | 52 ++++++++++++++++++++++++++++++++++++++++++ example_consumoor.yaml | 10 ++++---- 3 files changed, 57 insertions(+), 6 deletions(-) diff --git a/.gitignore b/.gitignore index cc66be56d..e8183c18b 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,4 @@ sage.yaml relay-monitor.yaml cities1000.txt countries.txt +deploy/local/docker-compose/xatu-consumoor-staging.yaml diff --git a/docker-compose.yml b/docker-compose.yml index f71bd53f0..b78eacf85 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -162,6 +162,7 @@ services: dockerfile: Dockerfile ports: - "${XATU_SERVER_ADDRESS:-0.0.0.0}:${XATU_SERVER_PORT:-8080}:8080" + - "${XATU_SERVER_HTTP_ADDRESS:-0.0.0.0}:${XATU_SERVER_HTTP_PORT:-8087}:8087" # environment: # SERVER_EVENT_INGESTER_BASIC_AUTH_USERNAME: ${SERVER_EVENT_INGESTER_BASIC_AUTH_USERNAME:-xatu} # SERVER_EVENT_INGESTER_BASIC_AUTH_PASSWORD: ${SERVER_EVENT_INGESTER_BASIC_AUTH_PASSWORD:-example} @@ -356,6 +357,17 @@ services: kafka-topics --create --if-not-exists --bootstrap-server xatu-kafka:29092 --partitions 1 --replication-factor 1 --config cleanup.policy=delete --config retention.ms=259200000 --config compression.type=snappy --config segment.bytes=536870912 --topic "$$topic" done + # Proto topics for consumoor (protobuf encoding) + for topic in "$${large_message_topics[@]}"; do + echo "Creating proto topic with 10MB message limit: proto-$$topic"; + kafka-topics --create --if-not-exists --bootstrap-server xatu-kafka:29092 --partitions 1 --replication-factor 1 --config cleanup.policy=delete --config retention.ms=259200000 --config compression.type=snappy --config segment.bytes=536870912 --config max.message.bytes=10485760 --topic "proto-$$topic" + done + + for topic in "$${regular_topics[@]}"; do + echo "Creating proto topic: proto-$$topic"; + kafka-topics --create --if-not-exists --bootstrap-server xatu-kafka:29092 --partitions 1 --replication-factor 1 --config cleanup.policy=delete --config retention.ms=259200000 --config compression.type=snappy --config segment.bytes=536870912 --topic "proto-$$topic" + done + sleep 3; depends_on: xatu-kafka: @@ -566,6 +578,24 @@ services: condition: service_healthy networks: - xatu-net + xatu-clickhouse-consumoor-init: + profiles: + - "" + image: "clickhouse/clickhouse-server:${CHVER:-latest}" + container_name: xatu-clickhouse-consumoor-init + entrypoint: ["bash", "/init.sh"] + volumes: + - ./deploy/local/docker-compose/consumoor-clickhouse-init.sh:/init.sh:ro + depends_on: + xatu-clickhouse-migrator: + condition: service_completed_successfully + xatu-clickhouse-01: + condition: service_healthy + xatu-clickhouse-02: + condition: service_healthy + networks: + - xatu-net + tempo-init: image: busybox:latest user: root @@ -640,6 +670,28 @@ services: depends_on: - xatu-server + xatu-consumoor: + profiles: + - "" + command: consumoor --config /etc/consumoor/config.yaml + container_name: xatu-consumoor + build: + context: . + dockerfile: Dockerfile + volumes: + - ./deploy/local/docker-compose/xatu-consumoor.yaml:/etc/consumoor/config.yaml + networks: + - xatu-net + depends_on: + xatu-kafka: + condition: service_healthy + xatu-clickhouse-01: + condition: service_healthy + xatu-clickhouse-consumoor-init: + condition: service_completed_successfully + xatu-init-kafka: + condition: service_completed_successfully + xatu-cannon: profiles: - "cannon" diff --git a/example_consumoor.yaml b/example_consumoor.yaml index 09d2829d8..985db2112 100644 --- a/example_consumoor.yaml +++ b/example_consumoor.yaml @@ -6,17 +6,15 @@ kafka: brokers: - localhost:9092 topics: - - "^general-.+" - consumerGroup: xatu-consumoor-general + - "^general-.+" # regex patterns — each matched topic gets its own isolated stream + consumerGroup: xatu-consumoor-general # base name — per-topic suffix appended (e.g. xatu-consumoor-general-general-BEACON_API_ETH_V1_EVENTS_HEAD_V2) encoding: json # switch to "protobuf" after Vector removal - # offsetDefault: oldest + # offsetDefault: earliest # fetchMinBytes: 1 - # fetchWaitMaxMs: 500 + # fetchWaitMaxMs: 250 # maxPartitionFetchBytes: 10485760 # sessionTimeoutMs: 30000 - # heartbeatIntervalMs: 3000 commitInterval: 5s # Kafka offset commit interval - # deliveryMode: batch # batch (faster) or message (safer, per-message flush) # rejectedTopic: xatu-consumoor-rejected # tls: false # sasl: