Skip to content

Commit df65ccb

Browse files
committed
Added Integrations
1 parent c5d5ad1 commit df65ccb

File tree

10 files changed

+1042
-0
lines changed

10 files changed

+1042
-0
lines changed

examples/integrations/README.md

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# DataScreenIQ — Integrations
2+
3+
Ready-to-copy integrations that plug DataScreenIQ into your existing tools. Each integration is a complete, working example you can drop into your project.
4+
5+
## Integrations
6+
7+
| Integration | What it does | Setup time |
8+
|------------|-------------|------------|
9+
| [**GitHub Action**](./github-action/) | Screen CSV/JSON files on every PR. Block merges when data quality fails. | 2 min |
10+
| [**Airflow DAG**](./airflow/) | Quality gate task between extract and load. Stops pipeline on BLOCK. | 5 min |
11+
| [**dbt post-hook**](./dbt/) | Screen model output after `dbt run`. Catch drift in transformed data. | 5 min |
12+
| [**Prefect flow**](./prefect/) | Quality gate flow with alerting on BLOCK. | 5 min |
13+
| [**Google Colab**](./colab/) | Interactive notebook — try DataScreenIQ in 60 seconds. | 1 min |
14+
15+
## Quick start
16+
17+
```bash
18+
pip install datascreeniq
19+
export DATASCREENIQ_API_KEY=dsiq_live_...
20+
```
21+
22+
Get a free API key (500K rows/month, no credit card): [datascreeniq.com](https://datascreeniq.com)
23+
24+
## How it fits
25+
26+
```
27+
Your source → Extract → DataScreenIQ → PASS ✓ → Load → Warehouse
28+
→ WARN ⚠ → Load + alert
29+
→ BLOCK ✗ → Dead-letter queue
30+
```
31+
32+
DataScreenIQ is not a replacement for dbt tests or Great Expectations. It fills a different gap: the **pre-storage screening layer** that catches problems before they propagate.
33+
34+
## Links
35+
36+
- [Python SDK (PyPI)](https://pypi.org/project/datascreeniq/)
37+
- [API reference](https://datascreeniq.com/api-reference.html)
38+
- [GitHub](https://github.com/AppDevIQ/datascreeniq-python)
39+
- [Documentation](https://datascreeniq.com/docs)
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
# DataScreenIQ — Airflow Integration
2+
3+
A complete Airflow DAG that screens extracted data through DataScreenIQ before loading it to the warehouse.
4+
5+
## Flow
6+
7+
```
8+
extract_data → quality_gate → load_to_warehouse
9+
↓ (on BLOCK)
10+
alert_on_failure
11+
```
12+
13+
## Setup
14+
15+
```bash
16+
pip install datascreeniq apache-airflow
17+
```
18+
19+
Set your API key as an Airflow Variable:
20+
21+
```bash
22+
airflow variables set DATASCREENIQ_API_KEY dsiq_live_...
23+
```
24+
25+
Or as an environment variable:
26+
27+
```bash
28+
export DATASCREENIQ_API_KEY=dsiq_live_...
29+
```
30+
31+
Copy `quality_gate_dag.py` to your `dags/` folder and customise the `extract_data()` and `load_to_warehouse()` tasks.
32+
33+
## Behaviour
34+
35+
| Quality result | Pipeline action |
36+
|---------------|----------------|
37+
| **PASS** | Proceeds to load |
38+
| **WARN** | Proceeds with warnings logged |
39+
| **BLOCK** | Stops pipeline, triggers alert task |
40+
41+
## Get a free API key
42+
43+
[datascreeniq.com](https://datascreeniq.com) — 500K rows/month, no credit card required.
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
"""
2+
DataScreenIQ — Airflow Integration: Quality Gate DAG
3+
4+
A complete Airflow DAG that screens extracted data through DataScreenIQ
5+
before loading it to the warehouse. If the data is BLOCKED, the pipeline
6+
stops and sends an alert.
7+
8+
Setup:
9+
1. pip install datascreeniq apache-airflow
10+
2. Set the DATASCREENIQ_API_KEY environment variable (or Airflow Variable)
11+
3. Copy this file to your dags/ folder
12+
4. Customise extract_data() and load_to_warehouse() for your pipeline
13+
14+
Get a free API key (500K rows/month): https://datascreeniq.com
15+
"""
16+
17+
from datetime import datetime, timedelta
18+
from airflow import DAG
19+
from airflow.decorators import task
20+
from airflow.models import Variable
21+
22+
import datascreeniq as dsiq
23+
from datascreeniq.exceptions import DataQualityError
24+
25+
26+
default_args = {
27+
"owner": "data-engineering",
28+
"depends_on_past": False,
29+
"email_on_failure": True,
30+
"email_on_retry": False,
31+
"retries": 1,
32+
"retry_delay": timedelta(minutes=5),
33+
}
34+
35+
with DAG(
36+
dag_id="etl_with_quality_gate",
37+
default_args=default_args,
38+
description="ETL pipeline with DataScreenIQ quality screening before load",
39+
schedule="0 6 * * *", # daily at 6am
40+
start_date=datetime(2026, 1, 1),
41+
catchup=False,
42+
tags=["data-quality", "datascreeniq", "etl"],
43+
) as dag:
44+
45+
@task()
46+
def extract_data() -> list[dict]:
47+
"""
48+
Extract data from your source.
49+
Replace this with your actual extraction logic.
50+
"""
51+
# Example: fetch from an API, read from S3, query a database, etc.
52+
import json
53+
54+
# Simulated extraction — replace with your real source
55+
rows = [
56+
{"order_id": "ORD-001", "amount": 99.50, "email": "alice@corp.com", "status": "paid"},
57+
{"order_id": "ORD-002", "amount": 150.00, "email": "bob@corp.com", "status": "paid"},
58+
{"order_id": "ORD-003", "amount": 75.00, "email": None, "status": "pending"},
59+
{"order_id": "ORD-004", "amount": 220.50, "email": "carol@corp.com", "status": "paid"},
60+
]
61+
62+
print(f"Extracted {len(rows)} rows")
63+
return rows
64+
65+
@task()
66+
def quality_gate(rows: list[dict]) -> dict:
67+
"""
68+
Screen extracted data through DataScreenIQ.
69+
Raises an exception if data is BLOCKED, stopping the pipeline.
70+
Returns the quality report for downstream tasks.
71+
"""
72+
# Get API key from Airflow Variable or environment
73+
api_key = Variable.get("DATASCREENIQ_API_KEY", default_var=None)
74+
client = dsiq.Client(api_key) # falls back to env var if None
75+
76+
report = client.screen(rows, source="orders")
77+
78+
print(f"Quality report: {report.summary()}")
79+
print(f" Status: {report.status}")
80+
print(f" Health: {report.health_pct}")
81+
print(f" Rows: {report.rows_received}")
82+
print(f" Latency: {report.latency_ms}ms")
83+
84+
if report.is_blocked:
85+
raise DataQualityError(
86+
f"Data quality gate FAILED for 'orders': {report.summary()}",
87+
report=report,
88+
)
89+
90+
if report.is_warn:
91+
print(f"⚠️ Quality warnings detected — proceeding with caution")
92+
if report.type_mismatches:
93+
print(f" Type mismatches: {report.type_mismatches}")
94+
if report.null_rates:
95+
print(f" Null rates: {report.null_rates}")
96+
97+
return report.to_dict()
98+
99+
@task()
100+
def load_to_warehouse(rows: list[dict], report: dict):
101+
"""
102+
Load clean data to your warehouse.
103+
Only runs if quality_gate passed.
104+
Replace this with your actual load logic.
105+
"""
106+
status = report.get("status", "UNKNOWN")
107+
print(f"Loading {len(rows)} rows to warehouse (quality: {status})")
108+
109+
# Example: write to BigQuery, Snowflake, Postgres, S3, etc.
110+
# bigquery_client.insert_rows_json(table, rows)
111+
112+
print(f"✅ Loaded {len(rows)} rows successfully")
113+
114+
@task(trigger_rule="one_failed")
115+
def alert_on_failure():
116+
"""
117+
Send alert when the quality gate blocks data.
118+
Customise with your alerting: Slack, PagerDuty, email, etc.
119+
"""
120+
print("🚨 Quality gate BLOCKED the pipeline — sending alert")
121+
# Example:
122+
# slack.post_message("#data-alerts", "Pipeline blocked by DataScreenIQ")
123+
# pagerduty.trigger("Data quality failure in orders pipeline")
124+
125+
# ── DAG flow ──
126+
extracted = extract_data()
127+
report = quality_gate(extracted)
128+
load_to_warehouse(extracted, report)
129+
alert_on_failure()

0 commit comments

Comments
 (0)