Pipeline de engenharia de dados para ingestão, transformação e disponibilização dos dados cadastrais públicos de empresas da Receita Federal do Brasil (CNPJ), seguindo a arquitetura Medallion.
Processar os dados abertos do CNPJ da RFB em um formato estruturado e analítico, passando pelas camadas Bronze, Silver e Gold com rastreabilidade e qualidade de dados em cada etapa.
- Databricks Community Edition — plataforma de processamento distribuído
- Delta Lake — formato de armazenamento nas camadas Bronze, Silver e Gold
- dbt — transformações SQL a partir da camada Bronze
- Python / PySpark — ingestão e carga inicial
- Databricks SDK — upload de arquivos para Volumes
- Scrapy — crawler para coleta dos arquivos no portal da RFB
- sidrapy — fonte de dados do PIB (IBGE/SIDRA)
- Astro — Gerenciador do Airflow
Fonte (RFB)
│
▼
Staging (Volume) ← arquivos .csv brutos da RFB e IBGE
│
▼
Bronze (Delta Table) ← dados sem tratamento com schema/colunas basicas aplicadas (apenas para organizar em tabelas)
│
▼
Silver (dbt) ← schema aplicado, tipos corretos, colunas nomeadas
│
▼
Gold (dbt) ← agregações e visões analíticas
rfb/
├── dags/
│ ├── ingestion/
│ └── transformation/
├── rfb_crawler/
│ ├── spiders/
│ │ └── rfb_spider.py
│ ├── pipelines.py
│ └── settings.py
├── src/
│ ├── ingestion/
│ │ ├── download_rfb_data.py
│ │ ├── send_data_to_remote.py
│ │ └── dto/
│ └── dbt/
│ ├── models/
│ ├── macros/
│ ├── seeds/
│ └── dbt_project.yml
└── tests/
└── dags/
- CNPJ (RFB): Portal de Dados Abertos da Receita Federal, atualizado mensalmente
- PIB: API do IBGE via
sidrapy
Pré-requisito: é necessário ter acesso ao Databricks (pode ser a Community Edition) com Unity Catalog habilitado.
No Databricks, abra um notebook e execute os comandos abaixo para criar o catálogo e os schemas necessários:
spark.sql("CREATE CATALOG IF NOT EXISTS rfb")
spark.sql("CREATE SCHEMA IF NOT EXISTS rfb.transient")
spark.sql("CREATE SCHEMA IF NOT EXISTS rfb.bronze")
spark.sql("CREATE SCHEMA IF NOT EXISTS rfb.silver")
spark.sql("CREATE SCHEMA IF NOT EXISTS rfb.gold")pip install poetry
poetry installCrie um arquivo .env na raiz do projeto com as credenciais do Databricks:
DATABRICKS_HOST=https://<your-workspace>.azuredatabricks.net
DATABRICKS_TOKEN=<your-token># Inicia uma instancia do airflow, podendo rodar maunalmente as dags ou com agendamento
astro dev startCrie as variaveis do databricks para as dags conseguirem conectar com fontes externas. (Ex: Databricks)
cd src/dbt
dbt deps
dbt runObs: Caso queira testar todo o fluxo pode rodar direto as dags no airflow.
As DAGs de ingestão e transformação ficam disponíveis na interface do Airflow em localhost:8080.
