Version: 3.0
Target Environment: Kubernetes (On-Premise / Private Cloud)
Architecture Pattern: Lakehouse (Compute/Storage Separation)
Storage Backend: SeaweedFS
Focus: Data Engineering Best Practices
This document provides a production-ready data lakehouse architecture focusing on:
- Scalable storage with SeaweedFS (object + file system capabilities)
- Data organization patterns for real-world projects
- Engineering workflows for batch and streaming pipelines
- Operational excellence for monitoring and maintenance
DevOps-specific components (Rancher, Nginx, cert-manager) are intentionally excluded as they are managed by your infrastructure team.
Phase 1 (MVP - 6 months):
Data Storage Nodes (SeaweedFS):
- 4 nodes minimum for redundancy
- 8 vCPU, 32GB RAM, 8TB HDD each
- Total raw capacity: 32TB
- Usable capacity: ~21TB (with replication factor 1.5)
Compute Nodes (Trino/Spark):
- 3 nodes for query processing
- 16 vCPU, 64GB RAM, 500GB SSD each
Metadata & Services Nodes:
- 2 nodes for PostgreSQL, Airflow, Nessie
- 8 vCPU, 32GB RAM, 1TB SSD each
Phase 2 (Scale - 12 months):
- Add 4 more storage nodes (SeaweedFS volume servers)
- Add 3 more compute nodes
- Total: 16 nodes
Phase 3 (Advanced - 18 months):
- Add 2 dedicated Spark nodes: 32 vCPU, 128GB RAM each
- Add 3 Kafka nodes: 8 vCPU, 32GB RAM, 2TB SSD each
- Total: 21 nodes
- Internal: 10Gbps between nodes (minimum)
- Storage Network: Dedicated VLAN for SeaweedFS traffic (recommended)
- Latency: <1ms between storage and compute nodes
Goal: Establish secure, queryable data lake with organized data structure
SeaweedFS Advantages:
- Dual Interface: Both S3 API (object storage) AND POSIX (file system)
- Better Small Files: Handles millions of small files efficiently (critical for Iceberg metadata)
- Lower Latency: Built for high-throughput, low-latency access
- Resource Efficient: Lower memory footprint than MinIO
- Replication: Built-in replication across racks/datacenters
Real-world analogy:
- MinIO is like a warehouse (good for big boxes)
- SeaweedFS is like a warehouse + filing cabinet (good for both big boxes and small documents)
Components:
-
Master Servers (3 replicas for HA):
- Manages cluster topology
- Handles file-to-volume mapping
- Resource: 2 vCPU, 4GB RAM each
- Storage: 100GB SSD (metadata only)
-
Volume Servers (4 nodes minimum):
- Stores actual data chunks
- Resource: 8 vCPU, 32GB RAM, 8TB HDD each
- Replication: 010 (1 copy on same rack, 0 copies other racks)
-
Filer Servers (2 replicas for HA):
- Provides POSIX file system interface
- Enables directory browsing
- Resource: 4 vCPU, 8GB RAM each
- Backed by PostgreSQL for metadata
# seaweedfs-values.yaml
master:
replicas: 3
resources:
requests:
cpu: 2
memory: 4Gi
limits:
cpu: 4
memory: 8Gi
volume:
replicas: 4
dataDirs:
- /data1
- /data2
resources:
requests:
cpu: 8
memory: 32Gi
limits:
cpu: 16
memory: 64Gi
filer:
replicas: 2
s3:
enabled: true
port: 8333
encryptionKey: <your-key-here>
resources:
requests:
cpu: 4
memory: 8GiConnection Endpoints:
- S3 API:
http://seaweedfs-s3.data-platform.svc.cluster.local:8333 - Filer API:
http://seaweedfs-filer.data-platform.svc.cluster.local:8888 - WebDAV:
http://seaweedfs-filer.data-platform.svc.cluster.local:7333
Creating Access Credentials:
# Generate S3-compatible credentials
weed shell
> s3.configure -access_key=datalake_user -secret_key=<strong-password> -buckets=bronze,silver,gold -user=datalake_user -actions=Read,Write,ListThe #1 mistake in data lakes: Random file organization leading to:
- Files scattered everywhere
- Unable to find data after 6 months
- Difficult to implement lifecycle policies
- Slow queries due to poor partitioning
Our Production-Ready Structure:
/datalake/
├── bronze/ # Raw ingested data
│ ├── source_systems/ # Organized by source
│ │ ├── erp_sap/
│ │ │ ├── customers/ # Table/entity name
│ │ │ │ ├── _metadata/ # Iceberg metadata
│ │ │ │ │ ├── snap-*.avro
│ │ │ │ │ └── v1-metadata.json
│ │ │ │ ├── data/ # Actual data files
│ │ │ │ │ ├── year=2024/
│ │ │ │ │ │ ├── month=12/
│ │ │ │ │ │ │ ├── day=01/
│ │ │ │ │ │ │ │ └── part-00000.parquet
│ │ │ │ │ │ │ └── day=02/
│ │ │ │ │ └── month=11/
│ │ │ │ └── _ingestion_logs/ # Audit trail
│ │ │ │ └── 2024-12-01.log
│ │ │ ├── orders/
│ │ │ └── products/
│ │ ├── crm_salesforce/
│ │ │ ├── accounts/
│ │ │ ├── opportunities/
│ │ │ └── contacts/
│ │ └── web_analytics/
│ │ ├── clickstream/
│ │ └── sessions/
│ └── external_data/ # Third-party data
│ ├── weather_api/
│ └── market_data/
│
├── silver/ # Cleaned & conformed data
│ ├── domain/ # Organized by business domain
│ │ ├── sales/
│ │ │ ├── fact_orders/ # Fact tables
│ │ │ │ ├── _metadata/
│ │ │ │ └── data/
│ │ │ │ └── order_date=2024-12-01/
│ │ │ ├── dim_customers/ # Dimension tables
│ │ │ └── dim_products/
│ │ ├── finance/
│ │ │ ├── fact_transactions/
│ │ │ └── dim_accounts/
│ │ └── marketing/
│ │ ├── fact_campaigns/
│ │ └── dim_channels/
│ └── _audit/ # Data quality results
│ └── great_expectations/
│
├── gold/ # Business-ready analytics tables
│ ├── reporting/ # Pre-aggregated for dashboards
│ │ ├── daily_sales_summary/
│ │ ├── monthly_revenue/
│ │ └── customer_lifetime_value/
│ ├── ml_features/ # Feature store for ML
│ │ ├── customer_features/
│ │ └── product_features/
│ └── exports/ # Data for external systems
│ ├── bi_tool_extracts/
│ └── partner_data_shares/
│
├── staging/ # Temporary processing area
│ ├── spark_temp/ # Spark shuffle/temp files
│ ├── trino_spill/ # Trino memory spill
│ └── dbt_temp/ # dbt incremental models temp
│
├── checkpoints/ # Streaming job checkpoints
│ ├── kafka_to_iceberg/
│ │ ├── customers_stream/
│ │ └── orders_stream/
│ └── debezium_cdc/
│
├── archives/ # Long-term retention
│ ├── bronze/
│ │ └── year=2023/ # Old raw data (compliance)
│ └── silver/
│ └── year=2022/
│
└── metadata/ # Platform metadata
├── schemas/ # Schema definitions
│ └── avro_schemas/
├── mappings/ # Source-to-target mappings
└── data_contracts/ # Data SLAs and contracts
Bronze Layer - Partition by Ingestion Time:
bronze/erp_sap/customers/data/
└── ingest_date=2024-12-01/ # When data arrived
└── hour=14/ # For CDC or real-time
└── part-00000.parquet
Why: Easy to implement lifecycle policies (delete data older than 90 days)
Silver Layer - Partition by Business Time:
silver/sales/fact_orders/data/
└── order_date=2024-12-01/ # When business event occurred
└── region=US/ # Optional: high-cardinality dimension
└── part-00000.parquet
Why: Optimizes query performance for analytical patterns
Gold Layer - Partition by Report Period:
gold/reporting/daily_sales_summary/data/
└── report_date=2024-12-01/ # Reporting period
└── part-00000.parquet
Why: Matches how business users slice data
Pattern: {layer}_{domain}_{entity}_{partition}_{timestamp}_{uuid}.parquet
Example:
bronze_erp_customers_20241201_143052_a1b2c3d4.parquet
silver_sales_orders_20241201_150000_e5f6g7h8.parquet
gold_rpt_daily_sales_20241201_000000_i9j0k1l2.parquet
Benefits:
- Sortable by time
- Identifies data lineage at a glance
- UUID prevents overwrites in concurrent jobs
Every table must have:
- README.md in table root:
# customers
**Owner:** Sales Engineering Team
**Update Frequency:** Daily at 2 AM UTC
**Retention:** 7 years (regulatory requirement)
**SLA:** Available by 6 AM UTC
## Schema
| Column | Type | Description |
|--------|------|-------------|
| customer_id | BIGINT | Primary key |
| email | VARCHAR | Masked in silver layer |
| created_at | TIMESTAMP | Account creation time |
## Dependencies
- Source: ERP SAP database (table: CUSTOMERS)
- Consumers: CRM Dashboard, ML Churn Model
## Known Issues
- Email field has nulls for customers before 2020- CHANGELOG.md:
## 2024-12-01
- Added `phone_number` column
- Backfilled data to 2023-01-01
## 2024-11-15
- Changed partitioning from daily to monthly (performance optimization)- data_contract.yaml:
# Data contract for downstream consumers
version: 1.0
table: bronze.erp_sap.customers
owner: data-engineering@company.com
schema:
- name: customer_id
type: bigint
nullable: false
primary_key: true
- name: email
type: varchar
nullable: true
pii: true
sla:
latency: 4 hours
availability: 99.5%
quality_checks:
- customer_id is unique
- email matches regex .*@.*\..*What it does (Simple explanation): Git for your data. Every change to your data tables is tracked like code commits.
Real-world analogy:
- Without Nessie: You update a table, realize it's wrong 2 days later, but can't undo it
- With Nessie: You update a table on branch "dev", test it, then merge to "main" only when confident
Deployment:
nessie:
replicas: 2
versionStoreType: JDBC
postgres:
host: postgresql.data-platform.svc.cluster.local
database: nessie
authentication:
enabled: true
oidc:
issuerUri: https://keycloak.company.com/realms/datalakeBranch Strategy for Real Projects:
main # Production data
├── dev # Development/testing
├── staging # Pre-production validation
└── data-science # Experimental models (isolated)
Example workflow:
-- Data engineer creates new feature
CREATE BRANCH dev FROM main;
USE REFERENCE dev;
-- Build new table
CREATE TABLE silver.sales.customer_segments AS
SELECT customer_id, segment
FROM silver.sales.customers
WHERE active = true;
-- Test queries
SELECT segment, COUNT(*) FROM silver.sales.customer_segments GROUP BY 1;
-- If good, merge to production
MERGE BRANCH dev INTO main;Configuration for SeaweedFS:
# nessie-catalog.yaml
catalogs:
- name: iceberg
warehouse: s3://datalake/
s3:
endpoint: http://seaweedfs-s3:8333
access-key-id: datalake_user
secret-access-key: <secret>
path-style-access: trueDeployment:
coordinator:
replicas: 1
resources:
requests:
cpu: 4
memory: 16Gi
config:
query:
maxMemory: 10GB
maxMemoryPerNode: 8GB
workers:
replicas: 3
autoscaling:
enabled: true
minReplicas: 3
maxReplicas: 10
targetCPUUtilizationPercentage: 70
resources:
requests:
cpu: 8
memory: 32GiCatalogs Configuration:
# iceberg.properties
connector.name=iceberg
iceberg.catalog.type=nessie
iceberg.nessie.uri=http://nessie:19120/api/v1
iceberg.nessie.ref=main
iceberg.nessie.auth.type=BEARER
fs.native-s3.enabled=true
s3.endpoint=http://seaweedfs-s3:8333
s3.path-style-access=true
s3.aws-access-key=datalake_user
s3.aws-secret-key=<secret>Resource Groups (Prevent Query Starvation):
{
"rootGroups": [
{
"name": "global",
"softMemoryLimit": "80%",
"hardConcurrencyLimit": 100,
"subGroups": [
{
"name": "bi_dashboards",
"softMemoryLimit": "30%",
"hardConcurrencyLimit": 20,
"schedulingPolicy": "fair"
},
{
"name": "data_engineering",
"softMemoryLimit": "50%",
"hardConcurrencyLimit": 10,
"schedulingPolicy": "weighted",
"schedulingWeight": 2
},
{
"name": "adhoc_analysts",
"softMemoryLimit": "20%",
"hardConcurrencyLimit": 30,
"schedulingPolicy": "fair"
}
]
}
],
"selectors": [
{
"source": "tableau",
"group": "global.bi_dashboards"
},
{
"user": "airflow",
"group": "global.data_engineering"
}
]
}Access Control (File-Based):
{
"catalogs": [
{
"catalog": "iceberg",
"allow": ["data_engineer", "analyst", "data_scientist"],
"schemas": [
{
"schema": "bronze.*",
"owner": "data_engineer",
"allow": ["data_engineer"]
},
{
"schema": "silver.*",
"owner": "data_engineer",
"tables": [
{
"table": "*",
"privileges": ["SELECT"],
"allow": ["analyst", "data_scientist"]
}
]
},
{
"schema": "gold.*",
"owner": "data_engineer",
"tables": [
{
"table": "*",
"privileges": ["SELECT"],
"allow": ["analyst", "bi_tool"]
}
]
}
]
}
]
}Deployment (Kubernetes Executor):
executor: KubernetesExecutor
webserver:
replicas: 2
resources:
requests:
cpu: 2
memory: 4Gi
scheduler:
replicas: 2
resources:
requests:
cpu: 2
memory: 4Gi
postgresql:
enabled: true
persistence:
size: 100Gi
config:
core:
dags_folder: /opt/airflow/dags
load_examples: false
logging:
remote_logging: true
remote_base_log_folder: s3://datalake/airflow_logs
remote_log_conn_id: seaweedfs_s3Real-world problem: DAGs grow to hundreds of files, become unmaintainable
Solution: Organized DAG directory:
/opt/airflow/dags/
├── _common/ # Shared utilities
│ ├── operators/
│ │ ├── iceberg_operator.py
│ │ └── data_quality_operator.py
│ ├── sensors/
│ │ └── seaweedfs_sensor.py
│ └── utils/
│ ├── slack_alerts.py
│ └── metadata_logger.py
│
├── ingestion/ # Bronze layer jobs
│ ├── erp_sap/
│ │ ├── dag_ingest_customers.py
│ │ ├── dag_ingest_orders.py
│ │ └── config.yaml
│ ├── crm_salesforce/
│ │ └── dag_ingest_accounts.py
│ └── api_connectors/
│ └── dag_ingest_weather.py
│
├── transformation/ # Silver layer jobs
│ ├── sales_domain/
│ │ ├── dag_build_fact_orders.py
│ │ ├── dag_build_dim_customers.py
│ │ └── sql/
│ │ ├── fact_orders.sql
│ │ └── dim_customers.sql
│ └── finance_domain/
│ └── dag_build_fact_transactions.py
│
├── aggregation/ # Gold layer jobs
│ ├── reporting/
│ │ ├── dag_daily_sales_summary.py
│ │ └── dag_monthly_revenue.py
│ └── ml_features/
│ └── dag_build_customer_features.py
│
├── maintenance/ # Housekeeping jobs
│ ├── dag_iceberg_compaction.py # Compact small files
│ ├── dag_expire_snapshots.py # Clean old versions
│ ├── dag_vacuum_tables.py # Remove orphan files
│ └── dag_health_checks.py # Monitor platform
│
└── data_quality/ # Quality checks
├── dag_validate_bronze.py
├── dag_validate_silver.py
└── expectations/ # Great Expectations suites
├── bronze_checks.json
└── silver_checks.json
File: ingestion/erp_sap/dag_ingest_customers.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from airflow.operators.python import PythonOperator
from _common.utils.slack_alerts import send_slack_alert
default_args = {
'owner': 'data-engineering',
'depends_on_past': False,
'email': ['data-alerts@company.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'execution_timeout': timedelta(hours=2),
'on_failure_callback': send_slack_alert,
}
dag = DAG(
'ingest_erp_customers',
default_args=default_args,
description='Ingest customer data from SAP ERP to Bronze layer',
schedule_interval='0 2 * * *', # Daily at 2 AM UTC
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['ingestion', 'bronze', 'erp', 'customers'],
)
# Task 1: Extract from source
extract_task = KubernetesPodOperator(
task_id='extract_from_sap',
name='extract-sap-customers',
namespace='data-platform',
image='company/data-extractors:1.0',
cmds=['python', 'extractors/sap_extractor.py'],
arguments=[
'--table', 'CUSTOMERS',
'--output', 's3://datalake/bronze/source_systems/erp_sap/customers/data/ingest_date={{ ds }}/hour={{ execution_date.hour }}/',
'--format', 'parquet',
'--partition-size', '100MB',
],
env_vars={
'SAP_HOST': '{{ var.value.sap_host }}',
'SAP_USER': '{{ var.value.sap_user }}',
'SAP_PASSWORD': '{{ var.value.sap_password }}',
},
dag=dag,
)
# Task 2: Register in Nessie/Iceberg catalog
def register_iceberg_table(**context):
from trino import dbapi
conn = dbapi.connect(
host='trino-coordinator',
port=8080,
user='airflow',
catalog='iceberg',
schema='bronze_erp_sap',
)
cursor = conn.cursor()
# Create table if not exists
cursor.execute("""
CREATE TABLE IF NOT EXISTS bronze_erp_sap.customers (
customer_id BIGINT,
email VARCHAR,
first_name VARCHAR,
last_name VARCHAR,
created_at TIMESTAMP,
ingest_timestamp TIMESTAMP
)
USING ICEBERG
LOCATION 's3://datalake/bronze/source_systems/erp_sap/customers/'
PARTITIONED BY (days(ingest_timestamp))
""")
# Refresh metadata (for new partitions)
cursor.execute("CALL iceberg.system.register_table('bronze_erp_sap', 'customers', 's3://datalake/bronze/source_systems/erp_sap/customers/')")
cursor.close()
conn.close()
register_task = PythonOperator(
task_id='register_in_catalog',
python_callable=register_iceberg_table,
dag=dag,
)
# Task 3: Data quality checks
quality_check_task = KubernetesPodOperator(
task_id='validate_data_quality',
name='quality-check-customers',
namespace='data-platform',
image='company/data-quality:1.0',
cmds=['great_expectations', 'checkpoint', 'run'],
arguments=[
'bronze_customers_checkpoint',
'--batch-request', '{"datasource_name": "trino", "data_asset_name": "bronze_erp_sap.customers"}',
],
dag=dag,
)
# Task 4: Update metadata catalog
def update_openmetadata(**context):
import requests
payload = {
"name": "bronze_erp_sap.customers",
"tableType": "Iceberg",
"columns": [...], # Schema
"owner": {"name": "data-engineering", "type": "team"},
"tags": ["bronze", "pii", "gdpr"],
}
response = requests.post(
'http://openmetadata:8585/api/v1/tables',
json=payload,
headers={'Authorization': f'Bearer {context["var"]["value"]["openmetadata_token"]}'}
)
response.raise_for_status()
metadata_task = PythonOperator(
task_id='update_metadata_catalog',
python_callable=update_openmetadata,
dag=dag,
)
# Define dependencies
extract_task >> register_task >> quality_check_task >> metadata_taskSetup script:
# Connections
airflow connections add seaweedfs_s3 \
--conn-type aws \
--conn-host http://seaweedfs-s3:8333 \
--conn-login datalake_user \
--conn-password <secret>
airflow connections add trino \
--conn-type trino \
--conn-host trino-coordinator \
--conn-port 8080 \
--conn-schema iceberg
airflow connections add postgresql_metadata \
--conn-type postgres \
--conn-host postgresql \
--conn-schema nessie \
--conn-login airflow \
--conn-password <secret>
# Variables (stored in PostgreSQL backend, encrypted at rest)
airflow variables set sap_host "sap-prod.company.com"
airflow variables set data_retention_days_bronze 90
airflow variables set data_retention_days_silver 730
airflow variables set slack_webhook_url "https://hooks.slack.com/..."Deployment:
prometheus:
replicas: 2
retention: 30d
storageSpec:
volumeClaimTemplate:
spec:
resources:
requests:
storage: 200Gi
# Critical metrics for data engineering
additionalScrapeConfigs:
- job_name: 'seaweedfs-master'
static_configs:
- targets: ['seaweedfs-master:9324']
- job_name: 'seaweedfs-volume'
kubernetes_sd_configs:
- role: pod
relabel_configs:
- source_labels: [__meta_kubernetes_pod_label_app]
regex: seaweedfs-volume
action: keep
- job_name: 'trino'
static_configs:
- targets: ['trino-coordinator:8080']
metrics_path: '/v1/metrics'
grafana:
replicas: 2
persistence:
enabled: true
size: 10Gi
# Pre-import dashboards
dashboards:
default:
seaweedfs-overview:
url: https://grafana.com/api/dashboards/12345/revisions/1/download
trino-cluster:
url: https://grafana.com/api/dashboards/12346/revisions/1/download
airflow-overview:
url: https://grafana.com/api/dashboards/12347/revisions/1/downloadCritical Alerts for Data Engineers:
# prometheus-rules.yaml
groups:
- name: data_platform_alerts
interval: 30s
rules:
# Storage alerts
- alert: SeaweedFSHighDiskUsage
expr: (seaweedfs_volume_disk_used_bytes / seaweedfs_volume_disk_total_bytes) > 0.85
for: 10m
labels:
severity: warning
team: data-engineering
annotations:
summary: "SeaweedFS volume {{ $labels.instance }} disk usage > 85%"
description: "Current usage: {{ $value | humanizePercentage }}"
# Query performance
- alert: TrinoQueryQueueBacklog
expr: trino_queued_queries > 20
for: 5m
labels:
severity: warning
annotations:
summary: "Trino has {{ $value }} queries queued"
description: "Consider scaling workers or investigating slow queries"
# Data freshness
- alert: StaleDataInSilver
expr: (time() - max(iceberg_last_commit_timestamp{layer="silver"})) > 7200
for: 15m
labels:
severity: critical
annotations:
summary: "Silver layer data hasn't updated in 2+ hours"
description: "Check Airflow DAGs and upstream sources"
# Airflow health
- alert: AirflowDAGFailures
expr: rate(airflow_dag_failed[1h]) > 0.1
for: 5m
labels:
severity: critical
annotations:
summary: "DAG failure rate > 10%"
description: "Check Airflow logs and Slack alerts"Custom Metrics for Data Engineering:
# custom_metrics_exporter.py
# Deploy as a sidecar to Airflow scheduler
from prometheus_client import start_http_server, Gauge, Counter
import time
from trino import dbapi
# Define metrics
data_freshness = Gauge('iceberg_table_freshness_seconds', 'Time since last update', ['layer', 'table'])
table_row_count = Gauge('iceberg_table_row_count', 'Number of rows', ['layer', 'table'])
data_quality_score = Gauge('data_quality_score', 'Quality check pass rate', ['layer', 'table'])
pipeline_duration = Gauge('pipeline_duration_seconds', 'Time to complete pipeline', ['dag_id'])
def collect_metrics():
conn = dbapi.connect(host='trino-coordinator', catalog='iceberg')
cursor = conn.cursor()
# Check data freshness
cursor.execute("""
SELECT
'silver' as layer,
table_name,
CAST(to_unixtime(max(updated_at)) as DOUBLE) as last_update
FROM iceberg.silver.information_schema.tables t
JOIN iceberg.silver.<table> USING (table_name)
GROUP BY table_name
""")
for row in cursor.fetchall():
layer, table, last_update = row
freshness = time.time() - last_update
data_freshness.labels(layer=layer, table=table).set(freshness)
cursor.close()
conn.close()
if __name__ == '__main__':
start_http_server(8000)
while True:
collect_metrics()
time.sleep(60)Grafana Dashboard for Data Engineers:
{
"dashboard": {
"title": "Data Platform Health",
"panels": [
{
"title": "Data Freshness by Layer",
"targets": [{
"expr": "iceberg_table_freshness_seconds{layer='silver'} / 3600",
"legendFormat": "{{table}}"
}],
"thresholds": [
{"value": 2, "color": "green"},
{"value": 6, "color": "yellow"},
{"value": 12, "color": "red"}
]
},
{
"title": "Storage Growth Rate",
"targets": [{
"expr": "rate(seaweedfs_volume_disk_used_bytes[24h])"
}]
},
{
"title": "Pipeline Success Rate (24h)",
"targets": [{
"expr": "sum(rate(airflow_dag_success[24h])) / sum(rate(airflow_dag_run[24h]))"
}]
},
{
"title": "Query Performance (P95)",
"targets": [{
"expr": "histogram_quantile(0.95, trino_query_duration_seconds_bucket)"
}]
}
]
}
}Deployment:
velero:
replicas: 1
configuration:
provider: aws
backupStorageLocation:
name: default
provider: aws
bucket: datalake-backups
config:
region: us-east-1
s3Url: http://seaweedfs-s3:8333
s3ForcePathStyle: true
volumeSnapshotLocation:
name: default
provider: aws
credentials:
secretContents:
cloud: |
[default]
aws_access_key_id=backup_user
aws_secret_access_key=<secret>
schedules:
# Metadata backups (frequent)
metadata-daily:
schedule: "0 2 * * *"
template:
includedNamespaces:
- data-platform
includedResources:
- configmap
- secret
- persistentvolumeclaim
labelSelector:
matchLabels:
backup: metadata
# Full platform backup (weekly)
full-weekly:
schedule: "0 3 * * 0"
template:
includedNamespaces:
- data-platform
ttl: 720h # 30 days retentionTool: pgBackRest
# pgbackrest-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: pgbackrest-config
data:
pgbackrest.conf: |
[global]
repo1-type=s3
repo1-s3-endpoint=seaweedfs-s3.data-platform.svc.cluster.local
repo1-s3-bucket=datalake-backups
repo1-s3-region=us-east-1
repo1-s3-key=backup_user
repo1-s3-key-secret=<secret>
repo1-path=/pgbackrest
repo1-retention-full=4
repo1-retention-diff=8
process-max=4
log-level-console=info
[nessie]
pg1-host=postgresql-nessie
pg1-path=/var/lib/postgresql/data
pg1-user=postgres
[airflow]
pg1-host=postgresql-airflow
pg1-path=/var/lib/postgresql/data
pg1-user=postgresBackup CronJob:
apiVersion: batch/v1
kind: CronJob
metadata:
name: postgres-backup
spec:
schedule: "0 2 * * *" # Daily at 2 AM
jobTemplate:
spec:
template:
spec:
containers:
- name: pgbackrest
image: pgbackrest/pgbackrest:latest
command:
- /bin/bash
- -c
- |
# Full backup on Sundays, incremental on other days
if [ $(date +%u) -eq 7 ]; then
pgbackrest --stanza=nessie backup --type=full
pgbackrest --stanza=airflow backup --type=full
else
pgbackrest --stanza=nessie backup --type=diff
pgbackrest --stanza=airflow backup --type=diff
fi
# Expire old backups
pgbackrest --stanza=nessie expire
pgbackrest --stanza=airflow expire
volumeMounts:
- name: pgbackrest-config
mountPath: /etc/pgbackrest
restartPolicy: OnFailureRecovery Runbook:
#!/bin/bash
# disaster_recovery.sh
echo "=== Data Platform Disaster Recovery ==="
echo "Estimated time: 2-4 hours"
echo ""
# Step 1: Restore Kubernetes resources (15 min)
echo "[1/5] Restoring Kubernetes resources..."
velero restore create --from-backup metadata-daily-20241201 --wait
# Step 2: Restore PostgreSQL databases (30 min)
echo "[2/5] Restoring PostgreSQL databases..."
kubectl exec -it postgresql-nessie-0 -- pgbackrest --stanza=nessie restore
kubectl exec -it postgresql-airflow-0 -- pgbackrest --stanza=airflow restore
# Restart PostgreSQL pods
kubectl rollout restart statefulset postgresql-nessie
kubectl rollout restart statefulset postgresql-airflow
# Step 3: Verify SeaweedFS integrity (10 min)
echo "[3/5] Checking SeaweedFS data integrity..."
kubectl exec -it seaweedfs-master-0 -- weed shell <<EOF
volume.list
volume.fsck
EOF
# Step 4: Verify Iceberg table metadata (20 min)
echo "[4/5] Verifying Iceberg tables..."
kubectl exec -it trino-coordinator-0 -- trino --execute "
SHOW SCHEMAS IN iceberg;
SHOW TABLES IN iceberg.bronze_erp_sap;
SELECT COUNT(*) FROM iceberg.bronze_erp_sap.customers;
"
# Step 5: Test end-to-end pipeline (30 min)
echo "[5/5] Running smoke test DAG..."
kubectl exec -it airflow-scheduler-0 -- airflow dags test health_check_dag
echo ""
echo "=== Recovery Complete ==="
echo "Next steps:"
echo "1. Check Grafana dashboards for anomalies"
echo "2. Review Airflow logs for any failed tasks"
echo "3. Notify stakeholders that system is back online"Deployment:
apiVersion: v1
kind: Namespace
metadata:
name: sealed-secrets
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: sealed-secrets-controller
namespace: sealed-secrets
spec:
replicas: 1
selector:
matchLabels:
app: sealed-secrets-controller
template:
spec:
containers:
- name: sealed-secrets-controller
image: quay.io/bitnami/sealed-secrets-controller:latest
command:
- controller
ports:
- containerPort: 8080
name: httpUsage Pattern for Data Engineering:
# Step 1: Get the public key (one-time)
kubeseal --fetch-cert > public-key.pem
# Step 2: Create a secret manifest
cat <<EOF > database-secret.yaml
apiVersion: v1
kind: Secret
metadata:
name: postgres-credentials
namespace: data-platform
stringData:
username: datalake_user
password: MySecurePassword123!
connection_string: postgresql://datalake_user:MySecurePassword123!@postgresql:5432/nessie
EOF
# Step 3: Encrypt it
kubeseal --format yaml --cert public-key.pem < database-secret.yaml > database-sealed-secret.yaml
# Step 4: Commit to Git (safe!)
git add database-sealed-secret.yaml
git commit -m "Add database credentials"
git push
# Step 5: Apply to cluster
kubectl apply -f database-sealed-secret.yaml
# The sealed-secrets controller automatically decrypts and creates the SecretSecret Rotation Strategy:
# rotate_secrets.py
# Run quarterly via Airflow DAG
import subprocess
import random
import string
from trino import dbapi
def generate_password(length=32):
chars = string.ascii_letters + string.digits + "!@#$%^&*"
return ''.join(random.choice(chars) for _ in range(length))
def rotate_database_password():
# 1. Generate new password
new_password = generate_password()
# 2. Update in database
conn = dbapi.connect(host='postgresql', user='admin')
cursor = conn.cursor()
cursor.execute(f"ALTER USER datalake_user WITH PASSWORD '{new_password}'")
cursor.close()
conn.close()
# 3. Create new sealed secret
subprocess.run([
'kubectl', 'create', 'secret', 'generic', 'postgres-credentials',
'--from-literal', f'password={new_password}',
'--dry-run=client', '-o', 'yaml'
], stdout=open('temp-secret.yaml', 'w'))
subprocess.run([
'kubeseal', '--format', 'yaml', '--cert', 'public-key.pem'
], stdin=open('temp-secret.yaml'), stdout=open('sealed-secret.yaml', 'w'))
# 4. Apply new secret
subprocess.run(['kubectl', 'apply', '-f', 'sealed-secret.yaml'])
# 5. Rolling restart of pods using this secret
subprocess.run(['kubectl', 'rollout', 'restart', 'deployment/trino-coordinator'])
subprocess.run(['kubectl', 'rollout', 'restart', 'deployment/airflow-scheduler'])
print("Password rotated successfully")
if __name__ == '__main__':
rotate_database_password()Goal: Enable complex ETL, transformations, and streaming pipelines
When Trino is NOT enough:
- Complex multi-stage transformations (Trino is single-pass)
- Iterative algorithms (machine learning)
- Heavy shuffles (large GROUP BY with billions of rows)
- UDF-heavy workloads (Python UDFs are slow in Trino)
Real-world decision tree:
Query needs to process < 1TB of data?
└─ YES → Use Trino (faster for simple queries)
└─ NO → Continue...
Query has complex joins across 5+ tables?
└─ YES → Use Spark (better optimization)
└─ NO → Use Trino
Need to train ML models?
└─ YES → Use Spark (MLlib support)
└─ NO → Use Trino
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: silver-layer-transformer
namespace: data-platform
spec:
type: Scala
mode: cluster
image: company/spark-iceberg:3.5.0
imagePullPolicy: Always
mainClass: com.company.datalake.SilverLayerJob
mainApplicationFile: s3a://datalake/jars/silver-layer-job.jar
sparkVersion: "3.5.0"
# Auto-scaling configuration
dynamicAllocation:
enabled: true
initialExecutors: 3
minExecutors: 3
maxExecutors: 20
driver:
cores: 4
memory: "8g"
serviceAccount: spark-operator
labels:
app: spark-driver
layer: silver
executor:
cores: 4
instances: 3
memory: "16g"
labels:
app: spark-executor
layer: silver
# Iceberg + Nessie configuration
sparkConf:
# Iceberg catalog
"spark.sql.catalog.nessie": "org.apache.iceberg.spark.SparkCatalog"
"spark.sql.catalog.nessie.catalog-impl": "org.apache.iceberg.nessie.NessieCatalog"
"spark.sql.catalog.nessie.uri": "http://nessie:19120/api/v1"
"spark.sql.catalog.nessie.ref": "main"
"spark.sql.catalog.nessie.warehouse": "s3a://datalake/"
# SeaweedFS S3 configuration
"spark.hadoop.fs.s3a.endpoint": "http://seaweedfs-s3:8333"
"spark.hadoop.fs.s3a.access.key": "datalake_user"
"spark.hadoop.fs.s3a.secret.key": "{{SECRET}}"
"spark.hadoop.fs.s3a.path.style.access": "true"
"spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem"
# Performance tuning
"spark.sql.adaptive.enabled": "true"
"spark.sql.adaptive.coalescePartitions.enabled": "true"
"spark.sql.adaptive.skewJoin.enabled": "true"
"spark.sql.files.maxPartitionBytes": "134217728" # 128MB
"spark.sql.shuffle.partitions": "200"
# Iceberg optimizations
"spark.sql.iceberg.handle-timestamp-without-timezone": "true"
"spark.sql.iceberg.vectorization.enabled": "true"
# Resource monitoring
monitoring:
prometheus:
jmxExporterJar: "/prometheus/jmx_prometheus_javaagent.jar"
port: 8090
# Automatic cleanup
restartPolicy:
type: OnFailure
onFailureRetries: 3
onFailureRetryInterval: 10
# Dependencies
deps:
jars:
- s3a://datalake/jars/iceberg-spark-runtime-3.5_2.12-1.4.0.jar
- s3a://datalake/jars/nessie-spark-extensions-3.5_2.12-0.74.0.jar// SilverLayerJob.scala
package com.company.datalake
import org.apache.spark.sql.{SparkSession, DataFrame}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
object SilverLayerJob {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("Silver Layer - Customer Transformation")
.getOrCreate()
import spark.implicits._
// Read from Bronze (raw data)
val bronzeCustomers = spark.table("nessie.bronze_erp_sap.customers")
// Data quality filters
val cleanedCustomers = bronzeCustomers
.filter($"customer_id".isNotNull)
.filter($"email".rlike("^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$"))
.filter($"created_at".isNotNull)
// Data transformations
val silverCustomers = cleanedCustomers
.withColumn("email_domain", split($"email", "@").getItem(1))
.withColumn("full_name", concat_ws(" ", $"first_name", $"last_name"))
.withColumn("customer_age_days", datediff(current_date(), $"created_at"))
.withColumn("customer_segment",
when($"customer_age_days" < 30, "new")
.when($"customer_age_days" < 365, "active")
.otherwise("mature"))
.withColumn("processed_at", current_timestamp())
.withColumn("data_quality_score", lit(1.0)) // Placeholder for ML model
// Deduplication (keep latest record)
val dedupedCustomers = silverCustomers
.withColumn("row_num", row_number().over(
Window.partitionBy("customer_id").orderBy($"updated_at".desc)
))
.filter($"row_num" === 1)
.drop("row_num")
// Write to Silver layer (upsert via merge)
dedupedCustomers.createOrReplaceTempView("updates")
spark.sql("""
MERGE INTO nessie.silver_sales.dim_customers target
USING updates source
ON target.customer_id = source.customer_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
// Collect metrics
val recordsProcessed = dedupedCustomers.count()
val duplicatesRemoved = cleanedCustomers.count() - dedupedCustomers.count()
println(s"Records processed: $recordsProcessed")
println(s"Duplicates removed: $duplicatesRemoved")
// Write metrics to monitoring table
Seq((recordsProcessed, duplicatesRemoved, current_timestamp()))
.toDF("records_processed", "duplicates_removed", "job_timestamp")
.write
.mode("append")
.saveAsTable("nessie.metadata.job_metrics")
spark.stop()
}
}# dag_silver_customers.py
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-engineering',
'retries': 2,
'retry_delay': timedelta(minutes=10),
}
dag = DAG(
'silver_transform_customers',
default_args=default_args,
schedule_interval='0 4 * * *', # 4 AM daily
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['transformation', 'silver', 'spark'],
)
# Spark job operator
transform_task = SparkKubernetesOperator(
task_id='transform_bronze_to_silver',
namespace='data-platform',
application_file='silver-layer-transformer.yaml',
do_xcom_push=True,
dag=dag,
)
# Data quality validation
from airflow.providers.trino.operators.trino import TrinoOperator
validate_task = TrinoOperator(
task_id='validate_silver_data',
trino_conn_id='trino',
sql="""
SELECT
COUNT(*) as total_records,
COUNT(DISTINCT customer_id) as unique_customers,
SUM(CASE WHEN email IS NULL THEN 1 ELSE 0 END) as missing_emails
FROM nessie.silver_sales.dim_customers
WHERE DATE(processed_at) = CURRENT_DATE
""",
handler=lambda cur: print(f"Validation results: {cur.fetchall()}"),
dag=dag,
)
transform_task >> validate_taskProblem without dbt:
- SQL scattered across 100+ Airflow DAGs
- Hard to test transformations
- No lineage visibility
- Analysts can't contribute (need Airflow access)
Solution with dbt:
- All transformations in Git
- Built-in testing framework
- Auto-generated documentation
- Analysts write SQL, engineers deploy
# dbt-runner-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: dbt-profiles
namespace: data-platform
data:
profiles.yml: |
datalake:
target: prod
outputs:
prod:
type: trino
method: ldap
host: trino-coordinator
port: 8080
catalog: nessie
schema: silver_sales
threads: 8
user: dbt_service_account
password: "{{ env_var('DBT_PASSWORD') }}"
dev:
type: trino
host: trino-coordinator
port: 8080
catalog: nessie
schema: dev_{{ env_var('USER') }}
threads: 4/dbt_project/
├── dbt_project.yml
├── profiles.yml
│
├── models/
│ ├── staging/ # Bronze → Silver
│ │ ├── _sources.yml
│ │ ├── stg_erp__customers.sql
│ │ ├── stg_erp__orders.sql
│ │ └── stg_crm__accounts.sql
│ │
│ ├── intermediate/ # Complex business logic
│ │ ├── _models.yml
│ │ ├── int_customer_orders.sql
│ │ └── int_customer_lifetime_value.sql
│ │
│ └── marts/ # Silver → Gold
│ ├── sales/
│ │ ├── _models.yml
│ │ ├── fct_orders.sql
│ │ ├── dim_customers.sql
│ │ └── dim_products.sql
│ │
│ └── finance/
│ ├── fct_transactions.sql
│ └── dim_accounts.sql
│
├── tests/
│ ├── generic/
│ │ └── test_email_format.sql
│ └── singular/
│ └── test_no_negative_revenue.sql
│
├── macros/
│ ├── generate_schema_name.sql
│ └── custom_tests.sql
│
├── snapshots/
│ └── snap_dim_customers.sql # SCD Type 2
│
└── seeds/
├── country_codes.csv
└── product_categories.csv
File: models/staging/stg_erp__customers.sql
{{
config(
materialized='incremental',
unique_key='customer_id',
on_schema_change='append_new_columns',
partition_by=['DATE_TRUNC(\'day\', updated_at)'],
tags=['staging', 'pii']
)
}}
WITH source AS (
SELECT *
FROM {{ source('bronze_erp_sap', 'customers') }}
{% if is_incremental() %}
-- Only process new/updated records
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}
),
cleaned AS (
SELECT
customer_id,
TRIM(LOWER(email)) AS email,
TRIM(first_name) AS first_name,
TRIM(last_name) AS last_name,
created_at,
updated_at,
-- Data quality flags
CASE
WHEN email IS NULL THEN FALSE
WHEN email NOT LIKE '%@%.%' THEN FALSE
ELSE TRUE
END AS is_email_valid,
CASE
WHEN LENGTH(first_name) < 2 THEN FALSE
ELSE TRUE
END AS is_name_valid,
CURRENT_TIMESTAMP AS dbt_processed_at
FROM source
),
final AS (
SELECT
customer_id,
email,
first_name,
last_name,
created_at,
updated_at,
-- Only include valid records
is_email_valid,
is_name_valid,
-- Derived fields
SPLIT_PART(email, '@', 2) AS email_domain,
CONCAT(first_name, ' ', last_name) AS full_name,
DATE_DIFF('day', created_at, CURRENT_DATE) AS customer_age_days,
dbt_processed_at
FROM cleaned
WHERE is_email_valid = TRUE
AND is_name_valid = TRUE
)
SELECT * FROM finalFile: models/staging/_sources.yml
version: 2
sources:
- name: bronze_erp_sap
description: Raw data from SAP ERP system
database: nessie
schema: bronze_erp_sap
tables:
- name: customers
description: Customer master data
loaded_at_field: ingest_timestamp
freshness:
warn_after: {count: 6, period: hour}
error_after: {count: 12, period: hour}
columns:
- name: customer_id
description: Primary key
tests:
- unique
- not_null
- name: email
description: Customer email address
tests:
- not_null
- name: created_at
description: Account creation timestamp
tests:
- not_nullFile: models/marts/sales/fct_orders.sql
{{
config(
materialized='incremental',
unique_key='order_id',
partition_by=['DATE_TRUNC(\'month\', order_date)'],
tags=['marts', 'sales']
)
}}
WITH orders AS (
SELECT * FROM {{ ref('stg_erp__orders') }}
{% if is_incremental() %}
WHERE order_date > (SELECT MAX(order_date) FROM {{ this }})
{% endif %}
),
customers AS (
SELECT * FROM {{ ref('dim_customers') }}
),
products AS (
SELECT * FROM {{ ref('dim_products') }}
),
joined AS (
SELECT
o.order_id,
o.customer_id,
o.product_id,
o.order_date,
o.quantity,
o.unit_price,
o.discount_amount,
-- Customer attributes
c.customer_segment,
c.customer_lifetime_value,
-- Product attributes
p.product_category,
p.product_subcategory,
-- Calculated measures
(o.quantity * o.unit_price) AS gross_revenue,
(o.quantity * o.unit_price - o.discount_amount) AS net_revenue,
(o.quantity * o.unit_price - o.discount_amount - o.quantity * p.unit_cost) AS profit,
CURRENT_TIMESTAMP AS dbt_updated_at
FROM orders o
LEFT JOIN customers c ON o.customer_id = c.customer_id
LEFT JOIN products p ON o.product_id = p.product_id
)
SELECT * FROM joined# dag_dbt_silver_gold.py
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-engineering',
'retries': 1,
}
dag = DAG(
'dbt_silver_to_gold',
default_args=default_args,
schedule_interval='0 6 * * *', # 6 AM daily
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['dbt', 'transformation', 'gold'],
)
# Run dbt models
dbt_run = KubernetesPodOperator(
task_id='dbt_run_marts',
name='dbt-runner',
namespace='data-platform',
image='company/dbt-trino:1.7.0',
cmds=['dbt'],
arguments=[
'run',
'--profiles-dir', '/root/.dbt',
'--project-dir', '/dbt',
'--models', 'marts', # Only run marts (gold layer)
'--target', 'prod',
],
env_vars={
'DBT_PASSWORD': '{{ var.value.dbt_password }}',
},
get_logs=True,
dag=dag,
)
# Run dbt tests
dbt_test = KubernetesPodOperator(
task_id='dbt_test_marts',
name='dbt-tester',
namespace='data-platform',
image='company/dbt-trino:1.7.0',
cmds=['dbt'],
arguments=[
'test',
'--profiles-dir', '/root/.dbt',
'--project-dir', '/dbt',
'--models', 'marts',
],
env_vars={
'DBT_PASSWORD': '{{ var.value.dbt_password }}',
},
get_logs=True,
dag=dag,
)
# Generate documentation
dbt_docs = KubernetesPodOperator(
task_id='dbt_generate_docs',
name='dbt-docs',
namespace='data-platform',
image='company/dbt-trino:1.7.0',
cmds=['/bin/bash', '-c'],
arguments=[
'dbt docs generate --profiles-dir /root/.dbt --project-dir /dbt && '
'aws s3 cp /dbt/target s3://datalake/metadata/dbt_docs/ --recursive'
],
dag=dag,
)
dbt_run >> dbt_test >> dbt_docsDeployment:
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: datalake-kafka
namespace: data-platform
spec:
kafka:
version: 3.6.0
replicas: 3
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
log.retention.hours: 168 # 7 days
log.segment.bytes: 1073741824 # 1GB
compression.type: snappy
auto.create.topics.enable: false
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 1Ti
class: fast-ssd
deleteClaim: false
resources:
requests:
memory: 8Gi
cpu: 4
limits:
memory: 16Gi
cpu: 8
# Monitoring
metricsConfig:
type: jmxPrometheusExporter
valueFrom:
configMapKeyRef:
name: kafka-metrics
key: kafka-metrics-config.yml
zookeeper:
replicas: 3
storage:
type: persistent-claim
size: 100Gi
class: fast-ssd
deleteClaim: false
resources:
requests:
memory: 4Gi
cpu: 2
limits:
memory: 8Gi
cpu: 4
entityOperator:
topicOperator: {}
userOperator: {}# kafka-topics.yaml
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: cdc.erp.customers
namespace: data-platform
labels:
strimzi.io/cluster: datalake-kafka
spec:
partitions: 12 # Based on throughput requirements
replicas: 3
config:
retention.ms: 604800000 # 7 days
segment.ms: 3600000 # 1 hour
compression.type: snappy
cleanup.policy: delete
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: cdc.erp.orders
namespace: data-platform
labels:
strimzi.io/cluster: datalake-kafka
spec:
partitions: 24 # Higher volume
replicas: 3
config:
retention.ms: 604800000
compression.type: snappyTopic Naming Convention:
{source_type}.{source_system}.{table_name}
Examples:
- cdc.erp.customers (Change Data Capture from ERP)
- cdc.crm.accounts
- events.web.clickstream (Event stream)
- events.iot.sensors
Deploy Kafka Connect:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: debezium-connect
namespace: data-platform
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: 3.6.0
replicas: 3
bootstrapServers: datalake-kafka-bootstrap:9092
config:
group.id: debezium-connect
offset.storage.topic: connect-offsets
config.storage.topic: connect-configs
status.storage.topic: connect-status
# Replication factors
offset.storage.replication.factor: 3
config.storage.replication.factor: 3
status.storage.replication.factor: 3
# Build custom image with Debezium connectors
build:
output:
type: docker
image: company/kafka-connect-debezium:latest
plugins:
- name: debezium-postgres
artifacts:
- type: tgz
url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/2.5.0.Final/debezium-connector-postgres-2.5.0.Final-plugin.tar.gz
- name: debezium-mysql
artifacts:
- type: tgz
url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.5.0.Final/debezium-connector-mysql-2.5.0.Final-plugin.tar.gz
resources:
requests:
memory: 4Gi
cpu: 2
limits:
memory: 8Gi
cpu: 4Configure CDC Source:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: erp-customers-source
namespace: data-platform
labels:
strimzi.io/cluster: debezium-connect
spec:
class: io.debezium.connector.postgresql.PostgresConnector
tasksMax: 1
config:
# Database connection
database.hostname: erp-database.company.internal
database.port: 5432
database.user: debezium_user
database.password: ${file:/opt/kafka/external-configuration/debezium-credentials/password}
database.dbname: erp_production
# Connector settings
database.server.name: erp
table.include.list: public.customers,public.orders,public.products
# Output topic naming
topic.prefix: cdc.erp
# Snapshot settings
snapshot.mode: initial # Take full snapshot on first run
# Change event settings
include.schema.changes: true
tombstones.on.delete: true
# Performance tuning
max.batch.size: 2048
max.queue.size: 8192
poll.interval.ms: 100
# Transformations
transforms: unwrap
transforms.unwrap.type: io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones: false
transforms.unwrap.delete.handling.mode: rewrite// StreamingJob.scala
package com.company.datalake
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
object KafkaToIcebergStreaming {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("CDC Stream - ERP Customers")
.getOrCreate()
import spark.implicits._
// Read from Kafka
val kafkaStream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "datalake-kafka-bootstrap:9092")
.option("subscribe", "cdc.erp.customers")
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", 10000) // Rate limiting
.load()
// Parse JSON payload
val parsedStream = kafkaStream
.selectExpr("CAST(value AS STRING) as json_value")
.select(from_json($"json_value", schema).as("data"))
.select("data.*")
// Add metadata
val enrichedStream = parsedStream
.withColumn("ingested_at", current_timestamp())
.withColumn("source_system", lit("erp"))
.withColumn("cdc_operation", $"__op") // Debezium operation: c=create, u=update, d=delete
// Write to Iceberg (upsert using merge)
val query = enrichedStream
.writeStream
.format("iceberg")
.outputMode("append")
.trigger(Trigger.ProcessingTime("1 minute")) // Micro-batch every 1 min
.option("path", "s3a://datalake/bronze/source_systems/erp_sap/customers/")
.option("checkpointLocation", "s3a://datalake/checkpoints/cdc_erp_customers/")
.option("fanout-enabled", "true") // Better write performance
.start()
query.awaitTermination()
}
def schema = StructType(Seq(
StructField("customer_id", LongType, nullable = false),
StructField("email", StringType, nullable = true),
StructField("first_name", StringType, nullable = true),
StructField("last_name", StringType, nullable = true),
StructField("created_at", TimestampType, nullable = true),
StructField("updated_at", TimestampType, nullable = true),
StructField("__op", StringType, nullable = true), // Debezium metadata
))
}Deploy as Long-Running Job:
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: streaming-cdc-customers
namespace: data-platform
spec:
type: Scala
mode: cluster
image: company/spark-streaming:3.5.0
mainClass: com.company.datalake.KafkaToIcebergStreaming
mainApplicationFile: s3a://datalake/jars/streaming-job.jar
sparkVersion: "3.5.0"
restartPolicy:
type: Always # Restart on failure (long-running)
driver:
cores: 2
memory: "4g"
executor:
cores: 4
instances: 4
memory: "8g"
sparkConf:
"spark.sql.streaming.checkpointLocation.enabled": "true"
"spark.sql.streaming.statefulOperator.checkCorrectness.enabled": "false"Deployment:
apiVersion: v1
kind: Namespace
metadata:
name: openmetadata
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: openmetadata
namespace: openmetadata
spec:
replicas: 2
template:
spec:
containers:
- name: openmetadata
image: openmetadata/server:1.3.0
ports:
- containerPort: 8585
env:
- name: OPENMETADATA_CLUSTER_NAME
value: "datalake-prod"
- name: DB_DRIVER_CLASS
value: "org.postgresql.Driver"
- name: DB_HOST
value: "postgresql.data-platform"
- name: DB_PORT
value: "5432"
- name: DB_USER
valueFrom:
secretKeyRef:
name: openmetadata-secrets
key: db-user
- name: DB_PASSWORD
valueFrom:
secretKeyRef:
name: openmetadata-secrets
key: db-password
- name: ELASTICSEARCH_HOST
value: "elasticsearch:9200"
resources:
requests:
memory: 4Gi
cpu: 2
limits:
memory: 8Gi
cpu: 4
livenessProbe:
httpGet:
path: /api/v1/health
port: 8585
initialDelaySeconds: 120
periodSeconds: 30Ingest from Trino:
# trino-ingestion.yaml
source:
type: trino
serviceName: trino-datalake
serviceConnection:
config:
type: Trino
hostPort: trino-coordinator:8080
catalog: nessie
username: openmetadata
authType: basic
sourceConfig:
config:
type: DatabaseMetadata
schemaFilterPattern:
includes:
- bronze_.*
- silver_.*
- gold_.*
tableFilterPattern:
excludes:
- .*_temp$
- .*_staging$
# Enable profiling (sample data for statistics)
enableDataProfiler: true
profileSample: 100 # Sample 100 rows per table
# Collect lineage
includeViews: true
includeTags: true
sink:
type: metadata-rest
config: {}
workflowConfig:
loggerLevel: INFO
openMetadataServerConfig:
hostPort: http://openmetadata:8585/api
authProvider: no-authIngest from Airflow (Lineage):
# airflow-lineage-ingestion.yaml
source:
type: airflow
serviceName: airflow-orchestrator
serviceConnection:
config:
type: Airflow
hostPort: http://airflow-webserver:8080
connection:
type: Backend
sourceConfig:
config:
type: PipelineMetadata
includeLineage: true # Extract task dependencies
pipelineFilterPattern:
includes:
- "dag_.*"
sink:
type: metadata-rest
config: {}Run ingestion via CronJob:
apiVersion: batch/v1
kind: CronJob
metadata:
name: openmetadata-ingestion
namespace: openmetadata
spec:
schedule: "0 */6 * * *" # Every 6 hours
jobTemplate:
spec:
template:
spec:
containers:
- name: ingestion
image: openmetadata/ingestion:1.3.0
command:
- metadata
- ingest
- -c
- /config/trino-ingestion.yaml
volumeMounts:
- name: config
mountPath: /config
restartPolicy: OnFailure
volumes:
- name: config
configMap:
name: ingestion-configsDeployment:
apiVersion: apps/v1
kind: Deployment
metadata:
name: great-expectations-runner
namespace: data-platform
spec:
replicas: 1
template:
spec:
containers:
- name: ge-runner
image: company/great-expectations:0.18.0
command: ["/bin/sh", "-c", "while true; do sleep 3600; done"] # Keep alive
volumeMounts:
- name: ge-config
mountPath: /great_expectations
- name: checkpoints
mountPath: /checkpoints
env:
- name: TRINO_HOST
value: "trino-coordinator"
- name: TRINO_PORT
value: "8080"/great_expectations/
├── great_expectations.yml
├── checkpoints/
│ ├── bronze_customers_daily.yml
│ ├── silver_orders_daily.yml
│ └── gold_revenue_daily.yml
│
├── expectations/
│ ├── bronze/
│ │ └── erp_customers_suite.json
│ ├── silver/
│ │ ├── dim_customers_suite.json
│ │ └── fact_orders_suite.json
│ └── gold/
│ └── daily_revenue_suite.json
│
└── plugins/
└── custom_expectations/
└── expect_column_values_to_match_email_pattern.py
File: expectations/silver/dim_customers_suite.json
{
"data_asset_type": "Dataset",
"expectation_suite_name": "silver.dim_customers",
"expectations": [
{
"expectation_type": "expect_table_row_count_to_be_between",
"kwargs": {
"min_value": 100000,
"max_value": 10000000
},
"meta": {
"notes": "We expect between 100K-10M customers"
}
},
{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": "customer_id"
}
},
{
"expectation_type": "expect_column_values_to_be_unique",
"kwargs": {
"column": "customer_id"
}
},
{
"expectation_type": "expect_column_values_to_match_regex",
"kwargs": {
"column": "email",
"regex": "^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$"
}
},
{
"expectation_type": "expect_column_values_to_be_between",
"kwargs": {
"column": "customer_age_days",
"min_value": 0,
"max_value": 36500
},
"meta": {
"notes": "Customer age should be 0-100 years"
}
},
{
"expectation_type": "expect_column_values_to_be_in_set",
"kwargs": {
"column": "customer_segment",
"value_set": ["new", "active", "mature", "churned"]
}
},
{
"expectation_type": "expect_column_mean_to_be_between",
"kwargs": {
"column": "customer_lifetime_value",
"min_value": 50,
"max_value": 5000
}
}
],
"meta": {
"great_expectations_version": "0.18.0"
}
}# dag_data_quality_checks.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import great_expectations as gx
default_args = {
'owner': 'data-quality-team',
'retries': 1,
}
dag = DAG(
'data_quality_silver_layer',
default_args=default_args,
schedule_interval='0 8 * * *', # 8 AM daily
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['data-quality', 'silver'],
)
def run_checkpoint(checkpoint_name):
context = gx.get_context(project_root_dir="/great_expectations")
result = context.run_checkpoint(
checkpoint_name=checkpoint_name,
batch_request={
"datasource_name": "trino_datasource",
"data_connector_name": "default_runtime_data_connector",
"data_asset_name": "silver.dim_customers",
}
)
if not result["success"]:
failed_expectations = [
exp for exp in result.list_validation_results()[0].results
if not exp.success
]
error_msg = f"Data quality checks failed:\n"
for exp in failed_expectations:
error_msg += f"- {exp.expectation_config.expectation_type}\n"
raise ValueError(error_msg)
return result
validate_customers = PythonOperator(
task_id='validate_dim_customers',
python_callable=run_checkpoint,
op_kwargs={'checkpoint_name': 'silver_dim_customers_daily'},
dag=dag,
)
validate_orders = PythonOperator(
task_id='validate_fact_orders',
python_callable=run_checkpoint,
op_kwargs={'checkpoint_name': 'silver_fact_orders_daily'},
dag=dag,
)
[validate_customers, validate_orders]Deployment:
apiVersion: apps/v1
kind: Deployment
metadata:
name: mlflow-server
namespace: data-platform
spec:
replicas: 2
template:
spec:
containers:
- name: mlflow
image: company/mlflow:2.10.0
ports:
- containerPort: 5000
command:
- mlflow
- server
- --backend-store-uri
- postgresql://mlflow_user:${DB_PASSWORD}@postgresql:5432/mlflow
- --default-artifact-root
- s3://datalake/mlflow_artifacts/
- --host
- 0.0.0.0
env:
- name: AWS_ACCESS_KEY_ID
value: "mlflow_user"
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: seaweedfs-credentials
key: secret-key
- name: MLFLOW_S3_ENDPOINT_URL
value: "http://seaweedfs-s3:8333"
resources:
requests:
memory: 2Gi
cpu: 1
limits:
memory: 4Gi
cpu: 2# train_churn_model.py
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from trino import dbapi
import pandas as pd
# Connect to data lake
conn = dbapi.connect(host='trino-coordinator', catalog='nessie', schema='gold_ml_features')
# Load training data
query = """
SELECT
customer_id,
customer_age_days,
total_orders,
total_revenue,
days_since_last_order,
churned -- Target variable
FROM customer_features
WHERE feature_date >= CURRENT_DATE - INTERVAL '90' DAY
"""
df = pd.read_sql(query, conn)
conn.close()
# Prepare data
X = df.drop(['customer_id', 'churned'], axis=1)
y = df['churned']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Start MLflow experiment
mlflow.set_tracking_uri("http://mlflow-server:5000")
mlflow.set_experiment("customer_churn_prediction")
with mlflow.start_run(run_name="random_forest_v1"):
# Log parameters
params = {
"n_estimators": 100,
"max_depth": 10,
"min_samples_split": 5,
}
mlflow.log_params(params)
# Train model
model = RandomForestClassifier(**params, random_state=42)
model.fit(X_train, y_train)
# Evaluate
train_score = model.score(X_train, y_train)
test_score = model.score(X_test, y_test)
mlflow.log_metric("train_accuracy", train_score)
mlflow.log_metric("test_accuracy", test_score)
# Log model
mlflow.sklearn.log_model(
model,
"model",
registered_model_name="churn_prediction_model"
)
print(f"Model logged. Test accuracy: {test_score:.3f}")Problem: Iceberg tables accumulate small files and old snapshots, degrading performance.
Solution: Regular compaction and cleanup.
# dag_table_maintenance.py
from airflow import DAG
from airflow.providers.trino.operators.trino import TrinoOperator
from datetime import datetime, timedelta
dag = DAG(
'table_maintenance_weekly',
schedule_interval='0 2 * * 0', # Sunday 2 AM
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['maintenance'],
)
# List of tables to maintain
TABLES = [
'bronze_erp_sap.customers',
'bronze_erp_sap.orders',
'silver_sales.dim_customers',
'silver_sales.fact_orders',
'gold_reporting.daily_sales',
]
for table in TABLES:
# Task 1: Compact small files
compact_task = TrinoOperator(
task_id=f'compact_{table.replace(".", "_")}',
trino_conn_id='trino',
sql=f"""
ALTER TABLE nessie.{table}
EXECUTE REWRITE_DATA_FILES(
target_file_size_mb => 512,
min_input_files => 5
)
""",
dag=dag,
)
# Task 2: Expire old snapshots (keep last 30 days)
expire_task = TrinoOperator(
task_id=f'expire_{table.replace(".", "_")}',
trino_conn_id='trino',
sql=f"""
ALTER TABLE nessie.{table}
EXECUTE EXPIRE_SNAPSHOTS(
older_than => TIMESTAMP '{(datetime.now() - timedelta(days=30)).isoformat()}'
)
""",
dag=dag,
)
# Task 3: Remove orphan files
vacuum_task = TrinoOperator(
task_id=f'vacuum_{table.replace(".", "_")}',
trino_conn_id='trino',
sql=f"""
ALTER TABLE nessie.{table}
EXECUTE REMOVE_ORPHAN_FILES(
older_than => TIMESTAMP '{(datetime.now() - timedelta(days=7)).isoformat()}'
)
""",
dag=dag,
)
compact_task >> expire_task >> vacuum_task#!/bin/bash
# disaster_recovery_drill.sh
# Run this monthly to ensure recovery procedures work
set -e
echo "=== DISASTER RECOVERY DRILL ==="
echo "This will:"
echo "1. Backup current state"
echo "2. Delete a non-critical table"
echo "3. Restore from backup"
echo ""
read -p "Continue? (yes/no): " confirm
if [ "$confirm" != "yes" ]; then
echo "Aborted."
exit 1
fi
# Step 1: Create test backup
echo "[Step 1/5] Creating backup..."
velero backup create dr-drill-$(date +%Y%m%d) \
--include-namespaces data-platform \
--wait
# Step 2: Record current state
echo "[Step 2/5] Recording current state..."
kubectl exec -it trino-coordinator-0 -- trino --execute "
SELECT COUNT(*) as row_count
FROM nessie.bronze_erp_sap.customers
" > /tmp/original_count.txt
# Step 3: Simulate disaster (delete table metadata)
echo "[Step 3/5] Simulating disaster (deleting table)..."
kubectl exec -it trino-coordinator-0 -- trino --execute "
DROP TABLE nessie.bronze_erp_sap.customers
"
# Step 4: Restore from backup
echo "[Step 4/5] Restoring from backup..."
velero restore create --from-backup dr-drill-$(date +%Y%m%d) --wait
# Step 5: Verify restoration
echo "[Step 5/5] Verifying restoration..."
kubectl exec -it trino-coordinator-0 -- trino --execute "
SELECT COUNT(*) as row_count
FROM nessie.bronze_erp_sap.customers
" > /tmp/restored_count.txt
if diff /tmp/original_count.txt /tmp/restored_count.txt; then
echo ""
echo "✓ DR DRILL PASSED"
echo "Recovery time: Check Velero logs for duration"
else
echo ""
echo "✗ DR DRILL FAILED"
echo "Row counts don't match!"
exit 1
fiWhen to scale:
- Prometheus alert:
trino_queued_queries > 10 for 10 minutes - Manual request from users: "Queries are slow"
#!/bin/bash
# scale_trino_workers.sh
CURRENT_REPLICAS=$(kubectl get deployment trino-worker -n data-platform -o jsonpath='{.spec.replicas}')
NEW_REPLICAS=$((CURRENT_REPLICAS + 3))
echo "Current Trino workers: $CURRENT_REPLICAS"
echo "Scaling to: $NEW_REPLICAS"
kubectl scale deployment trino-worker \
--replicas=$NEW_REPLICAS \
-n data-platform
echo "Waiting for new workers to be ready..."
kubectl wait --for=condition=available \
--timeout=300s \
deployment/trino-worker \
-n data-platform
echo "✓ Scaling complete"
echo ""
echo "New workers:"
kubectl get pods -n data-platform -l app=trino-worker1. Partitioning Strategy:
-- Bad: No partitioning
CREATE TABLE orders (...)
LOCATION 's3://datalake/gold/orders/';
-- Good: Partitioned by date
CREATE TABLE orders (
order_id BIGINT,
customer_id BIGINT,
order_date DATE,
amount DECIMAL(10,2)
)
PARTITIONED BY (months(order_date))
LOCATION 's3://datalake/gold/orders/';
-- Best: Partitioned + sorted
CREATE TABLE orders (
order_id BIGINT,
customer_id BIGINT,
order_date DATE,
region VARCHAR,
amount DECIMAL(10,2)
)
PARTITIONED BY (months(order_date))
SORTED BY (region, customer_id)
LOCATION 's3://datalake/gold/orders/';2. File Size Management:
-- Check file sizes
SELECT
file_path,
file_size_in_bytes / 1024 / 1024 as size_mb,
record_count
FROM nessie.silver_sales."fact_orders$files"
WHERE file_size_in_bytes < 50 * 1024 * 1024 -- Files smaller than 50MB
ORDER BY file_size_in_bytes;
-- Compact small files
ALTER TABLE nessie.silver_sales.fact_orders
EXECUTE REWRITE_DATA_FILES(
target_file_size_mb => 512, -- Target 512MB files
min_input_files => 5 -- Only compact if 5+ small files
);3. Snapshot Management:
-- List snapshots
SELECT
committed_at,
snapshot_id,
operation,
summary['total-records'] as total_records,
summary['total-data-files'] as data_files
FROM nessie.silver_sales."fact_orders$snapshots"
ORDER BY committed_at DESC
LIMIT 20;
-- Expire old snapshots (keep 30 days)
ALTER TABLE nessie.silver_sales.fact_orders
EXECUTE EXPIRE_SNAPSHOTS(
older_than => CURRENT_TIMESTAMP - INTERVAL '30' DAY,
retain_last => 10 -- Always keep last 10 snapshots
);
-- Remove orphan files (files not referenced by any snapshot)
ALTER TABLE nessie.silver_sales.fact_orders
EXECUTE REMOVE_ORPHAN_FILES(
older_than => CURRENT_TIMESTAMP - INTERVAL '7' DAY
);4. Query Performance Analysis:
-- Analyze query performance in Trino
SELECT
query_id,
query,
state,
DATE_DIFF('second', created, end) as duration_seconds,
queued_time_ms / 1000 as queue_time_seconds,
analysis_time_ms / 1000 as analysis_seconds,
total_bytes / 1024 / 1024 / 1024 as data_gb,
total_rows / 1000000.0 as rows_millions
FROM system.runtime.queries
WHERE state = 'FINISHED'
AND created > CURRENT_TIMESTAMP - INTERVAL '1' HOUR
ORDER BY duration_seconds DESC
LIMIT 20;
-- Find slow queries
SELECT
query_id,
SUBSTR(query, 1, 100) as query_preview,
DATE_DIFF('second', created, end) as duration_seconds,
user,
source
FROM system.runtime.queries
WHERE state = 'FINISHED'
AND DATE_DIFF('second', created, end) > 300 -- Queries > 5 minutes
AND created > CURRENT_TIMESTAMP - INTERVAL '24' HOUR
ORDER BY duration_seconds DESC;Pattern 1: Push Down Filters
-- Bad: Filters after join
SELECT o.*, c.name
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
WHERE o.order_date = DATE '2024-12-01';
-- Good: Filter before join
SELECT o.*, c.name
FROM (
SELECT *
FROM orders
WHERE order_date = DATE '2024-12-01'
) o
JOIN customers c ON o.customer_id = c.customer_id;
-- Best: Partition pruning
-- (Automatically done if partitioned by order_date)**Pattern 2: Avoid SELECT ***
-- Bad: Selects all columns (includes large TEXT columns)
SELECT *
FROM customer_profiles
WHERE customer_id = 12345;
-- Good: Select only needed columns
SELECT customer_id, name, email, segment
FROM customer_profiles
WHERE customer_id = 12345;Pattern 3: Use Approximate Aggregations
-- Bad: Exact count on billions of rows (slow)
SELECT COUNT(DISTINCT customer_id)
FROM clickstream
WHERE event_date >= CURRENT_DATE - INTERVAL '30' DAY;
-- Good: Approximate count (100x faster, 2% error)
SELECT APPROX_DISTINCT(customer_id)
FROM clickstream
WHERE event_date >= CURRENT_DATE - INTERVAL '30' DAY;Pattern 4: Broadcast Small Tables
-- For joins where one table is small (< 100MB)
SELECT /*+ broadcast(d) */
f.order_id,
f.amount,
d.product_name
FROM fact_orders f
JOIN dim_products d ON f.product_id = d.product_id;1. Volume Server Configuration:
# /etc/seaweedfs/volume.toml
# Increase concurrent connections
[volume]
max.connections = 500
# Enable read cache (speeds up repeated reads)
read.cache.size.mb = 8192 # 8GB cache
# Adjust compaction settings
compaction.bytes.per.second = 50000000 # 50MB/s
# Enable erasure coding for storage efficiency
[erasure_coding]
data.shards = 10
parity.shards = 42. Filer Configuration for Iceberg:
# /etc/seaweedfs/filer.toml
# Cache directory metadata
[filer.options]
dir.cache.size = 10000
dir.cache.ttl = "5m"
# Optimize for Iceberg small files
[filer.leveldb2]
enabled = true
dir = "/data/filerdb"
# S3 API optimizations
[s3]
port = 8333
cert.file = ""
key.file = ""3. Monitoring SeaweedFS:
# Check volume server health
curl http://seaweedfs-volume-0:9333/stats/counter
# Check disk usage
curl http://seaweedfs-volume-0:9333/stats/disk
# List volumes
curl http://seaweedfs-master-0:9333/dir/status# dag_data_retention.py
from airflow import DAG
from airflow.providers.trino.operators.trino import TrinoOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
# Retention policies by layer
RETENTION_POLICIES = {
'bronze': 90, # 90 days
'silver': 730, # 2 years
'gold': 1825, # 5 years
}
dag = DAG(
'data_retention_enforcement',
schedule_interval='0 3 * * 0', # Weekly, Sunday 3 AM
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['governance', 'retention'],
)
def get_tables_to_archive(layer, retention_days):
"""Find partitions older than retention period"""
from trino import dbapi
conn = dbapi.connect(host='trino-coordinator', catalog='nessie')
cursor = conn.cursor()
cursor.execute(f"""
SELECT
table_schema,
table_name,
partition_value
FROM nessie.information_schema.table_partitions
WHERE table_schema LIKE '{layer}_%'
AND TRY_CAST(partition_value AS DATE) < CURRENT_DATE - INTERVAL '{retention_days}' DAY
""")
return cursor.fetchall()
def archive_partition(schema, table, partition):
"""Move old partition to archive bucket"""
from trino import dbapi
conn = dbapi.connect(host='trino-coordinator', catalog='nessie')
cursor = conn.cursor()
# Copy to archive
cursor.execute(f"""
CREATE TABLE IF NOT EXISTS nessie.archive_{schema}.{table}
AS SELECT * FROM nessie.{schema}.{table}
WHERE FALSE
""")
cursor.execute(f"""
INSERT INTO nessie.archive_{schema}.{table}
SELECT * FROM nessie.{schema}.{table}
WHERE partition_col = '{partition}'
""")
# Delete from production
cursor.execute(f"""
DELETE FROM nessie.{schema}.{table}
WHERE partition_col = '{partition}'
""")
conn.commit()
cursor.close()
conn.close()
for layer, retention_days in RETENTION_POLICIES.items():
archive_task = PythonOperator(
task_id=f'archive_{layer}_data',
python_callable=lambda l=layer, r=retention_days: [
archive_partition(schema, table, partition)
for schema, table, partition in get_tables_to_archive(l, r)
],
dag=dag,
)# dag_gdpr_data_deletion.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from trino import dbapi
from datetime import datetime
dag = DAG(
'gdpr_data_deletion',
schedule_interval=None, # Triggered manually
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['compliance', 'gdpr'],
)
def delete_customer_data(customer_id: int):
"""
Delete all customer data across all layers
according to GDPR requirements
"""
# Tables containing customer PII
TABLES_TO_CLEAN = [
'bronze_erp_sap.customers',
'bronze_crm.accounts',
'silver_sales.dim_customers',
'silver_sales.fact_orders',
'gold_reporting.customer_lifetime_value',
]
conn = dbapi.connect(host='trino-coordinator', catalog='nessie')
cursor = conn.cursor()
deletion_log = []
for table in TABLES_TO_CLEAN:
# Check if customer exists
cursor.execute(f"""
SELECT COUNT(*) as cnt
FROM nessie.{table}
WHERE customer_id = {customer_id}
""")
count = cursor.fetchone()[0]
if count > 0:
# Delete customer data
cursor.execute(f"""
DELETE FROM nessie.{table}
WHERE customer_id = {customer_id}
""")
deletion_log.append({
'table': table,
'rows_deleted': count,
'timestamp': datetime.now().isoformat()
})
# Log deletion for audit
pg_hook = PostgresHook(postgres_conn_id='audit_db')
pg_hook.run("""
INSERT INTO gdpr_deletion_log (customer_id, tables_affected, deletion_timestamp)
VALUES (%s, %s, %s)
""", parameters=(customer_id, str(deletion_log), datetime.now()))
conn.commit()
cursor.close()
conn.close()
return f"Deleted data for customer {customer_id} from {len(deletion_log)} tables"
delete_task = PythonOperator(
task_id='delete_customer_data',
python_callable=delete_customer_data,
op_kwargs={'customer_id': '{{ dag_run.conf.customer_id }}'},
dag=dag,
)-- Create view with masked PII for analysts
CREATE OR REPLACE VIEW silver_sales.dim_customers_masked AS
SELECT
customer_id,
CONCAT(
SUBSTR(email, 1, 2),
'***@',
SPLIT_PART(email, '@', 2)
) as email, -- john***@example.com
first_name, -- Keep first name
CONCAT(SUBSTR(last_name, 1, 1), '***') as last_name, -- D***
CASE
WHEN phone IS NOT NULL
THEN CONCAT('***-***-', SUBSTR(phone, -4)) -- ***-***-1234
ELSE NULL
END as phone,
customer_segment,
customer_lifetime_value,
created_at,
updated_at
FROM silver_sales.dim_customers;
-- Grant access to masked view only
GRANT SELECT ON silver_sales.dim_customers_masked TO ROLE analyst;
REVOKE SELECT ON silver_sales.dim_customers FROM ROLE analyst;-- Calculate storage cost by layer
WITH storage_stats AS (
SELECT
SPLIT_PART(file_path, '/', 3) as layer, -- bronze/silver/gold
SUM(file_size_in_bytes) / 1024 / 1024 / 1024 as size_gb,
COUNT(*) as file_count,
AVG(file_size_in_bytes) / 1024 / 1024 as avg_file_size_mb
FROM nessie.information_schema.files
GROUP BY 1
)
SELECT
layer,
size_gb,
file_count,
avg_file_size_mb,
size_gb * 0.023 as estimated_monthly_cost_usd -- $0.023/GB/month for storage
FROM storage_stats
ORDER BY size_gb DESC;-- Track compute costs by team/user
CREATE TABLE IF NOT EXISTS metadata.query_costs (
query_id VARCHAR,
user_name VARCHAR,
query_text VARCHAR,
execution_time_seconds INT,
data_scanned_gb DECIMAL(10,2),
estimated_cost_usd DECIMAL(10,4),
query_date DATE
);
-- Insert query metrics (run hourly)
INSERT INTO metadata.query_costs
SELECT
query_id,
user AS user_name,
SUBSTR(query, 1, 1000) as query_text,
DATE_DIFF('second', created, end) as execution_time_seconds,
CAST(total_bytes AS DECIMAL) / 1024 / 1024 / 1024 as data_scanned_gb,
-- Cost model: $5/TB scanned + $0.01/compute-hour
(CAST(total_bytes AS DECIMAL) / 1024 / 1024 / 1024 / 1024 * 5) +
(DATE_DIFF('second', created, end) / 3600.0 * 0.01) as estimated_cost_usd,
CAST(created AS DATE) as query_date
FROM system.runtime.queries
WHERE state = 'FINISHED'
AND created >= CURRENT_TIMESTAMP - INTERVAL '1' HOUR;
-- Monthly cost report by team
SELECT
SUBSTR(user_name, 1, POSITION('@' IN user_name) - 1) as user,
DATE_TRUNC('month', query_date) as month,
COUNT(*) as query_count,
SUM(data_scanned_gb) as total_data_scanned_gb,
SUM(estimated_cost_usd) as total_cost_usd
FROM metadata.query_costs
WHERE query_date >= CURRENT_DATE - INTERVAL '90' DAY
GROUP BY 1, 2
ORDER BY total_cost_usd DESC;Symptoms:
Query exceeded per-node total memory limit of 32GB
Diagnosis:
-- Check memory usage by query
SELECT
query_id,
state,
memory_pool,
total_memory_reservation,
peak_user_memory_reservation,
query
FROM system.runtime.queries
WHERE state = 'FAILED'
AND error_code = 'EXCEEDED_MEMORY_LIMIT'
ORDER BY created DESC
LIMIT 10;Solutions:
- Increase memory per worker:
# trino-values.yaml
worker:
resources:
requests:
memory: 64Gi # Increase from 32Gi
limits:
memory: 64Gi
config:
query.max-memory-per-node: 48GB- Enable spill to disk:
# config.properties
spill-enabled=true
spiller-spill-path=/tmp/trino-spill
max-spill-per-node=100GB
query-max-spill-per-node=100GB- Optimize query:
-- Bad: Cross join creates huge intermediate result
SELECT a.*, b.*
FROM large_table_a a, large_table_b b
WHERE a.date = b.date;
-- Good: Explicit join with filters
SELECT a.*, b.*
FROM large_table_a a
JOIN large_table_b b ON a.id = b.id
WHERE a.date >= CURRENT_DATE - INTERVAL '7' DAY
AND b.status = 'active';Symptoms:
Error writing file: no space left on device
Diagnosis:
# Check volume usage
kubectl exec -it seaweedfs-master-0 -- weed shell <<EOF
volume.list
EOF
# Check disk usage on volume servers
kubectl exec -it seaweedfs-volume-0 -- df -h /dataSolutions:
- Add new volume servers:
# Scale up volume servers
kubectl scale statefulset seaweedfs-volume --replicas=6 -n data-platform- Run garbage collection:
# Remove deleted files
kubectl exec -it seaweedfs-master-0 -- weed shell <<EOF
volume.fsck
lock
volume.vacuum
unlock
EOF- Implement data lifecycle policy (see Data Lifecycle Management section above)
Symptoms:
- DAG shows "success" but tasks didn't run
- DAG stuck in "running" state
Diagnosis:
# Check scheduler logs
kubectl logs -f airflow-scheduler-0 -n data-platform
# Check DAG details
kubectl exec -it airflow-scheduler-0 -- airflow dags list-runs -d <dag_id>
# Test DAG manually
kubectl exec -it airflow-scheduler-0 -- airflow dags test <dag_id> 2024-12-01Common Causes & Fixes:
- DAG file syntax error:
# Validate DAG syntax
kubectl exec -it airflow-scheduler-0 -- python /opt/airflow/dags/my_dag.py- Connection not configured:
# List connections
kubectl exec -it airflow-scheduler-0 -- airflow connections list
# Add missing connection
kubectl exec -it airflow-scheduler-0 -- airflow connections add trino \
--conn-type trino \
--conn-host trino-coordinator \
--conn-port 8080- Task stuck in "running" (zombie tasks):
# Find zombie tasks
kubectl exec -it airflow-scheduler-0 -- airflow tasks clear <dag_id> -s 2024-12-01 -e 2024-12-01Symptoms:
org.apache.iceberg.exceptions.ValidationException: Found duplicate rows
Diagnosis:
-- Check table health
SELECT *
FROM nessie.silver_sales."fact_orders$snapshots"
ORDER BY committed_at DESC
LIMIT 10;
-- Check for orphan files
SELECT
file_path,
file_size_in_bytes,
record_count
FROM nessie.silver_sales."fact_orders$files"
WHERE file_path NOT IN (
SELECT file_path
FROM nessie.silver_sales."fact_orders$manifests"
);Recovery:
- Rollback to previous snapshot:
-- Find last good snapshot
SELECT snapshot_id, committed_at, operation
FROM nessie.silver_sales."fact_orders$snapshots"
ORDER BY committed_at DESC;
-- Rollback
ALTER TABLE nessie.silver_sales.fact_orders
EXECUTE ROLLBACK_TO_SNAPSHOT(snapshot_id => 123456789);- If rollback doesn't work, restore from backup:
# Restore table from Velero backup
velero restore create --from-backup metadata-daily-20241201 \
--include-resources iceberg-table \
--selector table=fact_orders# prometheus-alerts.yaml
groups:
- name: data_platform_critical
interval: 30s
rules:
# Data freshness
- alert: StaleDataBronzeLayer
expr: (time() - max(iceberg_last_commit_timestamp{layer="bronze"})) > 7200
for: 15m
labels:
severity: critical
team: data-engineering
annotations:
summary: "Bronze layer data hasn't updated in 2+ hours"
description: "Table {{ $labels.table }} in bronze layer is stale. Last update: {{ $value | humanizeDuration }} ago"
runbook: "https://wiki.company.com/runbooks/stale-data"
# Pipeline failures
- alert: HighAirflowDAGFailureRate
expr: rate(airflow_dag_failed[1h]) / rate(airflow_dag_run[1h]) > 0.2
for: 10m
labels:
severity: critical
annotations:
summary: "20%+ of Airflow DAGs are failing"
description: "Current failure rate: {{ $value | humanizePercentage }}"
# Storage capacity
- alert: SeaweedFSHighDiskUsage
expr: (seaweedfs_volume_disk_used_bytes / seaweedfs_volume_disk_total_bytes) > 0.85
for: 10m
labels:
severity: warning
annotations:
summary: "SeaweedFS volume {{ $labels.instance }} disk usage > 85%"
description: "Current usage: {{ $value | humanizePercentage }}. Consider adding storage or archiving old data"
# Query performance
- alert: TrinoHighQueueTime
expr: avg(trino_queued_queries) > 20
for: 10m
labels:
severity: warning
annotations:
summary: "Trino has {{ $value }} queries queued"
description: "Consider scaling workers or investigating slow queries"
# Data quality
- alert: DataQualityChecksFailing
expr: rate(great_expectations_validation_failed[1h]) > 0.1
for: 5m
labels:
severity: critical
annotations:
summary: "Data quality checks are failing"
description: "Check Great Expectations dashboard and Airflow logs"{
"dashboard": {
"title": "Data Engineering Platform Overview",
"uid": "data-eng-overview",
"tags": ["data-platform", "overview"],
"timezone": "UTC",
"panels": [
{
"title": "Data Freshness by Layer",
"type": "stat",
"targets": [{
"expr": "time() - iceberg_last_commit_timestamp",
"legendFormat": "{{layer}}.{{table}}"
}],
"fieldConfig": {
"defaults": {
"thresholds": {
"steps": [
{"value": 0, "color": "green"},
{"value": 7200, "color": "yellow"},
{"value": 14400, "color": "red"}
]
},
"unit": "s"
}
}
},
{
"title": "Pipeline Success Rate (24h)",
"type": "gauge",
"targets": [{
"expr": "sum(rate(airflow_dag_success[24h])) / sum(rate(airflow_dag_run[24h]))"
}],
"fieldConfig": {
"defaults": {
"min": 0,
"max": 1,
"thresholds": {
"steps": [
{"value": 0, "color": "red"},
{"value": 0.95, "color": "yellow"},
{"value": 0.99, "color": "green"}
]
},
"unit": "percentunit"
}
}
},
{
"title": "Storage Growth Rate",
"type": "graph",
"targets": [{
"expr": "rate(seaweedfs_volume_disk_used_bytes[24h])",
"legendFormat": "{{volume}}"
}],
"yaxes": [{
"format": "bytes",
"label": "Bytes/day"
}]
},
{
"title": "Query Performance (P95 latency)",
"type": "graph",
"targets": [{
"expr": "histogram_quantile(0.95, rate(trino_query_duration_seconds_bucket[5m]))",
"legendFormat": "P95 latency"
}],
"yaxes": [{
"format": "s",
"label": "Seconds"
}]
},
{
"title": "Top 10 Slowest Queries (Today)",
"type": "table",
"targets": [{
"expr": "topk(10, trino_query_duration_seconds{state='FINISHED'})"
}]
},
{
"title": "Data Quality Score by Table",
"type": "heatmap",
"targets": [{
"expr": "data_quality_score",
"legendFormat": "{{layer}}.{{table}}"
}]
}
]
}
}Week 1: Platform Fundamentals
- Access to Kubernetes cluster (kubectl configured)
- Access to Grafana dashboards
- Access to Airflow UI
- Access to Trino UI
- Read architecture documentation
- Run first query on Trino
- Explore directory structure in SeaweedFS
Week 2: Development Workflow
- Set up local dbt environment
- Create first dbt model (dev branch)
- Write first Airflow DAG (test environment)
- Run Great Expectations check
- Submit first PR for review
Week 3: Production Operations
- Shadow oncall engineer for 1 week
- Practice disaster recovery drill
- Scale Trino workers during load test
- Investigate a slow query
- Run table maintenance tasks
Week 4: Advanced Topics
- Build first Spark job
- Set up Kafka consumer
- Configure new data source
- Create custom Grafana dashboard
# === Kubernetes ===
# List all pods in data-platform namespace
kubectl get pods -n data-platform
# Get logs from Trino coordinator
kubectl logs -f trino-coordinator-0 -n data-platform
# Execute command in Airflow scheduler
kubectl exec -it airflow-scheduler-0 -n data-platform -- bash
# Port-forward to access UI locally
kubectl port-forward svc/trino-coordinator 8080:8080 -n data-platform
# === Trino CLI ===
# Connect to Trino
kubectl exec -it trino-coordinator-0 -- trino
# List catalogs
SHOW CATALOGS;
# List schemas in iceberg catalog
SHOW SCHEMAS IN nessie;
# Describe table
DESCRIBE nessie.silver_sales.dim_customers;
# === SeaweedFS ===
# Access SeaweedFS shell
kubectl exec -it seaweedfs-master-0 -- weed shell
# List volumes
volume.list
# Check file system integrity
volume.fsck
# === Airflow CLI ===
# List DAGs
kubectl exec -it airflow-scheduler-0 -- airflow dags list
# Trigger DAG manually
kubectl exec -it airflow-scheduler-0 -- airflow dags trigger <dag_id>
# Test DAG
kubectl exec -it airflow-scheduler-0 -- airflow dags test <dag_id> 2024-12-01
# Clear task instance
kubectl exec -it airflow-scheduler-0 -- airflow tasks clear <dag_id> -t <task_id> -s 2024-12-01
# === dbt ===
# Run all models
dbt run
# Run specific model
dbt run --select dim_customers
# Test models
dbt test
# Generate documentation
dbt docs generate
dbt docs serveAll configuration files should be stored in Git repository:
/infrastructure/
├── kubernetes/
│ ├── namespaces/
│ ├── seaweedfs/
│ ├── trino/
│ ├── airflow/
│ └── monitoring/
├── dbt_project/
├── airflow_dags/
└── docs/
├── architecture.md
├── runbooks/
└── training/
This data lakehouse platform provides:
✅ Scalable storage with SeaweedFS (handles both large files and small metadata) ✅ Flexible compute with Trino (interactive queries) and Spark (batch processing) ✅ Data organization following industry best practices ✅ Version control for data with Nessie (Git for tables) ✅ Quality assurance with Great Expectations ✅ Observability with Prometheus + Grafana ✅ Disaster recovery with Velero + pgBackRest ✅ Cost optimization through lifecycle policies
- Phase 1 (Months 1-6): Deploy core platform, ingest first data sources
- Phase 2 (Months 7-12): Add Spark and real-time streaming
- Phase 3 (Months 13-18): Implement governance and ML platform
- Internal Wiki: https://wiki.company.com/data-platform
- Slack Channel: #data-engineering
- Oncall Rotation: PagerDuty schedule
- Monthly Review: First Monday, 10 AM
Document Version: 3.0
Last Updated: December 2024
Owner: Data Engineering Team