Skip to content

duyydang/airflow-oganization

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

1 Commit
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

E-commerce Data Pipeline

Production-ready Airflow data pipeline with modular, reusable components.

πŸ—οΈ Project Structure

airflow_project/
β”œβ”€β”€ dags/                          # DAG definitions
β”‚   β”œβ”€β”€ ecommerce_pipeline.py     # Main production pipeline
β”‚   └── simple_pipeline_example.py # Examples
β”‚
β”œβ”€β”€ config/                        # Configuration files
β”‚   β”œβ”€β”€ sources.py                # Data source configs
β”‚   β”œβ”€β”€ reports.py                # Report definitions
β”‚   └── settings.py               # General settings
β”‚
β”œβ”€β”€ tasks/                         # Business logic
β”‚   β”œβ”€β”€ extraction/
β”‚   β”‚   └── api_extractor.py     # Extraction logic
β”‚   └── transformation/
β”‚       └── cleaner.py            # Cleaning logic
β”‚
β”œβ”€β”€ task_groups/                   # Reusable task groups
β”‚   β”œβ”€β”€ extraction_group.py
β”‚   β”œβ”€β”€ transformation_group.py
β”‚   └── reporting_group.py
β”‚
└── utils/                         # Utilities
    └── notifications.py          # Slack/email alerts

πŸš€ Quick Start

1. Add a New Data Source

Edit config/sources.py:

DATA_SOURCES = {
    'your_source': {
        'name': 'Your Source',
        'type': 'rest_api',
        'connection_id': 'your_conn',
        'api_config': {...},
        'tables': ['table1', 'table2'],
        'output_path': '/path/to/output',
    }
}

That's it! The extraction task group will automatically create tasks for your new source.

2. Add a New Report

Edit config/reports.py:

REPORTS.append({
    'name': 'your_report',
    'required_sources': ['shopify', 'stripe'],
    'output': {
        'table': 'analytics.your_report',
        'format': 'parquet',
    }
})

That's it! The reporting task group will automatically create a task for your report.

3. Create a New Pipeline

Create dags/your_pipeline.py:

from task_groups.extraction_group import create_extraction_group
from task_groups.reporting_group import create_reporting_group

with DAG('your_pipeline', ...) as dag:
    extractions = create_extraction_group(['source1', 'source2'], date)
    reports = create_reporting_group(extractions)
    
    extractions >> reports

That's it! All the heavy lifting is done by reusable components.

🎯 Key Features

βœ… Modular Architecture

  • Separate configuration from code
  • Reusable task groups
  • Easy to test and maintain

βœ… Automatic Parallelization

  • All sources extract in parallel
  • All reports generate in parallel
  • No manual task dependency management

βœ… Configuration-Driven

  • Add sources without touching DAG code
  • Add reports without touching DAG code
  • Change schedules in one place

βœ… Production-Ready

  • Built-in error handling
  • Retry logic with exponential backoff
  • Data quality checks
  • Slack/email notifications
  • Comprehensive logging

πŸ“Š Example Pipelines

Main Production Pipeline

ecommerce_data_pipeline
β”œβ”€β”€ Extract (4 sources in parallel)
β”‚   β”œβ”€β”€ Shopify
β”‚   β”œβ”€β”€ Stripe
β”‚   β”œβ”€β”€ Google Analytics
β”‚   └── Facebook Ads
β”œβ”€β”€ Clean (4 cleanings in parallel)
β”œβ”€β”€ Generate Reports (4 reports in parallel)
└── Notify

Payment Processing Pipeline

payment_processing_pipeline
β”œβ”€β”€ Extract (2 sources in parallel)
β”‚   β”œβ”€β”€ Shopify
β”‚   └── Stripe
β”œβ”€β”€ Clean
β”œβ”€β”€ Generate Reports (priority 1 only)
└── Notify

πŸ”§ Configuration

Environment Variables

Create .env file:

ENVIRONMENT=production
LAKEHOUSE_PATH=/data/lakehouse
WAREHOUSE_TYPE=snowflake
WAREHOUSE_DB=analytics
SLACK_WEBHOOK_URL=https://hooks.slack.com/...
LOG_LEVEL=INFO

Airflow Connections

Create these connections in Airflow UI:

  • shopify_conn - Shopify API credentials
  • stripe_conn - Stripe API credentials
  • google_analytics_conn - GA credentials
  • facebook_ads_conn - Facebook credentials
  • warehouse_conn - Data warehouse credentials

πŸ§ͺ Testing

# Test extraction logic
pytest tests/test_extractors.py

# Test transformation logic
pytest tests/test_transformers.py

# Test DAG structure
pytest tests/test_dags.py

πŸ“ Adding Custom Logic

Custom Task Group

from airflow.decorators import task_group

@task_group(group_id='my_custom_group')
def my_custom_processing(data):
    @task
    def process_data(item):
        # Your logic here
        return processed_item
    
    results = [process_data(item) for item in data]
    return results

Custom Extractor

Inherit from APIExtractor:

from tasks.extraction.api_extractor import APIExtractor

class MyCustomExtractor(APIExtractor):
    def extract_table(self, table_name: str):
        # Custom extraction logic
        return super().extract_table(table_name)

πŸŽ“ Best Practices

  1. Always use configuration files - Don't hardcode values in DAGs
  2. Use task groups for related tasks - Better organization in UI
  3. Handle errors gracefully - Use try/except and retry logic
  4. Add data quality checks - Catch issues early
  5. Monitor with notifications - Set up Slack/email alerts
  6. Test before deploying - Unit test your logic
  7. Document changes - Keep README updated

πŸ†˜ Troubleshooting

Pipeline runs slowly

  • Check parallel task limits in config/settings.py
  • Increase PARALLEL_TASKS values

Source extraction fails

  • Verify Airflow connection is configured
  • Check API credentials and rate limits
  • Review logs in Airflow UI

Reports have no data

  • Verify source data was extracted successfully
  • Check cleaning tasks passed quality checks
  • Review required sources in config/reports.py

πŸ“š Learn More

🀝 Contributing

  1. Add your feature
  2. Update configuration files
  3. Add tests
  4. Update this README
  5. Create pull request

Made with ❀️ by the Data Engineering Team

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages