Skip to content

Улучшение DQ в DWH через внедрение Data Quality DAG с блокировкой некорректных данных и алертами #313

@VladislavVoskoboinik

Description

@VladislavVoskoboinik

Улучшение контроля качества данных в DWH 🚦

Текущая ситуация и её проблемы ❗
Данные из stg, ods, dwh и dm загружаются друг за другом без жёстких проверок качества.

Ошибки и «мусор» чаще всего обнаруживаются уже после попадания данных в основные таблицы.

Хотя алерты есть, плохие данные всё равно успевают просочиться, портя аналитику и усложняя поиск причин.

Отсутствие автоматической блокировки некорректных данных приводит к долгим расследованиям и снижает доверие к хранилищу.

Нет быстрого способа локализовать и выделить проблемные данные для анализа.

Как это решаем? Предлагаемая архитектура 🔧

Создаём отдельный Data Quality DAG, который запускается сразу после загрузки в staging-таблицы.

В этом DAG реализуем задачи на проверку качества данных:

Проверяем формат, полноту, уникальность, соответствие бизнес-правилам — с помощью SQL.

Если проверки проходят — данные коммитятся в ods/dwh.(продолжаются текущие реализованные пайплайны)

Если есть ошибки — проблемные записи перемещаются в отдельную таблицу quarantine для разбора.

Используем ветвление DAG, чтобы при ошибках останавливать загрузку в основные таблицы.

Автоматически отправляем детализированные алерты в Slack/email/Telegram с описанием причин.

Преимущества такого подхода ⭐
🚫 Плохие данные не попадают в DWH — аналитика становится точнее.

⏱️ Ошибки видны и разбираются сразу — легче быстро реагировать.

📦 Есть отдельное хранилище для проблемных данных — удобно анализировать и исправлять.

🔄 Процесс интегрируется в Airflow, легко поддерживается и расширяется.

📊 В дальнейшем можно собрать метрики качества для дашборда и контроля.

Архитектура пайплайна подробно 🛠️

Запуск DAG
После загрузки сырых данных в stg из бекенда, Telegram и GitHub.

Этапы внутри Data Quality DAG

Проверка данных (Data Quality Checks) с помощью SQL:

Проверка на NULL в обязательных полях

Проверка правильности форматов (даты, email и др.)

Проверка уникальности ключей

Проверка бизнес-правил (напр. значения в пределах)
Каждый чек выдаёт Pass/Fail

Агрегация результатов с BranchPythonOperator:

Если все проверки успешны, идём дальше

Если хотя бы одна провалена — запускается обработка ошибок

Если прошло — коммит в ods:

Перенос данных из stg → ods (Insert/Update)

Если ошибки — quarantine:

Копируем проблемные записи в отдельную quarantine таблицу

Храним причины ошибок, время, источник

Можно удалить проблемные данные из stg, не давая им идти дальше

Алерты:

Уведомляем команду через тг

В сообщениях — ссылки и пояснения по ошибкам

Ветвление (BranchPythonOperator)
Позволяет контролировать поток: коммит или quarantine.

Логирование и мониторинг

Полные логи в Airflow UI

Возможно добавление метрик для Data Quality Dashboard (уже в редаше потом собирать будем)

Sub-issues

Metadata

Metadata

Labels

epic 💯Масштабная задача, целый сервис

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions