Skip to content

Commit d6bf61b

Browse files
committed
Default restatement van external models + QC
In `uv run pipeline` worden de 'raw.*'-modellen (uit dlt) nu standaard gerestate; dat zorgt ervoor dat stg/silver-lagen refreshen bij nieuwe runs Verder enkele kleine quality-issues opgelost (o.a. Docker health-check)
1 parent 3b86cb7 commit d6bf61b

6 files changed

Lines changed: 386 additions & 14 deletions

File tree

.github/copilot-instructions.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,19 @@ uv run dev # Full dev setup: Docker + Oracle + d
2222
uv run dev --dest mssql # Dev with MSSQL target
2323
uv run pipeline --dest postgres # Production pipeline
2424
uv run pipeline --dest postgres --dry-run # Preview commands
25+
uv run pipeline --dest postgres --no-restate-raw # Skip restatement (model changes only)
2526
uv run sqlmesh -p sqlmesh plan --auto-apply # Apply SQLMesh transformations
2627
uv run python scripts/validate_schema.py # Validate silver vs DDL (runs in CI)
2728
uv run pytest # Run all tests
2829
```
2930

31+
### External Model Restatement
32+
33+
By default, `uv run pipeline` includes `--restate-model raw.*` to ensure stg/silver models are refreshed when new data is loaded. This is required because raw tables are external models managed by dlt.
34+
35+
- **Default behavior**: All raw.* models are restated, triggering cascading backfill of stg.* and silver.*
36+
- **Skip restatement**: Use `--no-restate-raw` to only apply model changes without refreshing data
37+
3038
## SQL Model Patterns
3139

3240
### Staging (`sqlmesh/models/stg/*.sql`)

Dockerfile

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,16 +52,15 @@ RUN uv sync --frozen --no-dev --no-install-project
5252
# Copy the rest of the application
5353
COPY . .
5454

55-
# Set Python path to include the project root
56-
ENV PYTHONPATH=/app
57-
5855
# Default environment variables (can be overridden)
5956
# Gateway is auto-detected from destination
6057
ENV GGM_DESTINATION=postgres
6158

6259
# Healthcheck - verify Python and key modules are available
60+
# Use `-P` to avoid current working directory shadowing installed packages
61+
# (this repo contains top-level `dlt/` and `sqlmesh/` project folders).
6362
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
64-
CMD uv run python -c "import dlt; import sqlmesh; print('OK')" || exit 1
63+
CMD uv run python -P -c "from dlt.sources.sql_database import sql_database; from sqlmesh import Context; print('OK')" || exit 1
6564

6665
# Default entrypoint runs the pipeline script
6766
ENTRYPOINT ["uv", "run", "python", "scripts/pipeline.py"]

dlt/pipeline.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@
2424
import sys
2525
from pathlib import Path
2626

27-
# Add parent directory to path to allow imports when running directly
28-
sys.path.insert(0, str(Path(__file__).parent.parent))
27+
# NOTE: Do NOT add the repo root to sys.path here.
28+
# This repository contains a top-level `dlt/` directory (this one) which would
29+
# shadow the third-party `dlt` package via Python namespace-package resolution.
30+
# Keeping imports isolated to the script directory avoids that class of failure.
2931

30-
# Import dlt library (using full package name to avoid confusion)
32+
# Import dlt library
3133
import dlt as dlt_lib
3234
from dlt.sources.sql_database import sql_database
3335

scripts/dev.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,16 @@ def main() -> None:
132132
sqlmesh_env = {**os.environ}
133133
subprocess.run(
134134
_get_sqlmesh_command()
135-
+ ["-p", "sqlmesh", "--gateway", gateway, "plan", "--auto-apply"],
135+
+ [
136+
"-p",
137+
"sqlmesh",
138+
"--gateway",
139+
gateway,
140+
"plan",
141+
"--auto-apply",
142+
"--restate-model",
143+
"raw.*",
144+
],
136145
cwd=project_root,
137146
check=True,
138147
env=sqlmesh_env,

scripts/pipeline.py

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -149,17 +149,44 @@ def run_dlt(
149149
return result.returncode
150150

151151

152-
def run_sqlmesh(gateway: str, auto_apply: bool, dry_run: bool, verbose: bool) -> int:
153-
"""Run SQLMesh: raw -> stg -> silver."""
152+
def run_sqlmesh(
153+
gateway: str,
154+
auto_apply: bool,
155+
restate_models: list[str] | None = None,
156+
dry_run: bool = False,
157+
verbose: bool = False,
158+
) -> int:
159+
"""Run SQLMesh: raw -> stg -> silver.
160+
161+
Args:
162+
gateway: SQLMesh gateway to use
163+
auto_apply: If True, auto-apply the plan without prompting
164+
restate_models: List of model patterns to restate. Defaults to ["raw.*"]
165+
to ensure stg/silver refresh on new data loads. Pass empty list to skip.
166+
dry_run: If True, only show what would be executed
167+
verbose: If True, show detailed output
168+
"""
169+
# Default to restating raw.* to ensure stg/silver refresh on new data
170+
if restate_models is None:
171+
restate_models = ["raw.*"]
172+
154173
print(f"\n{'=' * 60}")
155174
print(f" SQLMesh: Transforming raw -> stg -> silver (gateway: {gateway})")
175+
if restate_models:
176+
print(f" Restating: {', '.join(restate_models)}")
156177
print(f"{'=' * 60}\n")
157178

158179
# Use sqlmesh CLI directly to avoid local 'sqlmesh/' directory shadowing the package
159180
cmd = _get_sqlmesh_command() + ["-p", "sqlmesh", "--gateway", gateway, "plan"]
160181
if auto_apply:
161182
cmd.append("--auto-apply")
162183

184+
# Add restate-model flags for each pattern
185+
# This triggers cascading backfill for downstream models (stg, silver)
186+
if restate_models:
187+
for model in restate_models:
188+
cmd.extend(["--restate-model", model])
189+
163190
return run_command(cmd, dry_run=dry_run, verbose=verbose)
164191

165192

@@ -227,6 +254,12 @@ def main() -> int:
227254
action="store_true",
228255
help="Don't auto-apply SQLMesh plan (interactive mode)",
229256
)
257+
parser.add_argument(
258+
"--restate-raw",
259+
action=argparse.BooleanOptionalAction,
260+
default=True,
261+
help="Restate raw.* external models to refresh stg/silver (default: True)",
262+
)
230263

231264
# General options
232265
parser.add_argument(
@@ -253,6 +286,9 @@ def main() -> int:
253286
dataset = args.dataset or get_env("GGM_DATASET", DEFAULT_DATASET)
254287
dlt_backend = args.dlt_backend or get_env("GGM_DLT_BACKEND", "auto")
255288
auto_apply = not args.no_auto_apply
289+
# Restate raw.* by default (unless --no-restate-raw is passed)
290+
# This ensures stg/silver are refreshed when new data is loaded to raw
291+
restate_raw = args.restate_raw
256292

257293
# Print configuration
258294
print("\n" + "=" * 60)
@@ -265,6 +301,7 @@ def main() -> int:
265301
print(f" dlt : {'skip' if args.skip_dlt else 'run'}")
266302
print(f" SQLMesh : {'skip' if args.skip_sqlmesh else 'run'}")
267303
print(f" Auto-apply : {auto_apply}")
304+
print(f" Restate raw : {restate_raw}")
268305
print("=" * 60)
269306

270307
if args.skip_dlt and args.skip_sqlmesh:
@@ -280,7 +317,13 @@ def main() -> int:
280317

281318
# Run SQLMesh
282319
if not args.skip_sqlmesh:
283-
rc = run_sqlmesh(gateway, auto_apply, args.dry_run, args.verbose)
320+
# Determine which models to restate
321+
# Restating external models (raw.*) triggers cascading backfill of stg/silver
322+
# Pass empty list to explicitly disable restatement (None uses default of raw.*)
323+
restate_models = ["raw.*"] if restate_raw else []
324+
rc = run_sqlmesh(
325+
gateway, auto_apply, restate_models, args.dry_run, args.verbose
326+
)
284327
if rc != 0:
285328
print(f"\n[!] SQLMesh failed with exit code {rc}")
286329
return rc
@@ -289,9 +332,7 @@ def main() -> int:
289332
print("\n" + "=" * 60)
290333
print(" Pipeline complete!")
291334
print("=" * 60)
292-
print(
293-
f"\n Explore with: uv run sqlmesh -p sqlmesh --gateway {gateway} ui"
294-
)
335+
print(f"\n Explore with: uv run sqlmesh -p sqlmesh --gateway {gateway} ui")
295336
print(
296337
f' Query data: uv run sqlmesh -p sqlmesh --gateway {gateway} fetchdf "SELECT * FROM silver.client LIMIT 10"'
297338
)

0 commit comments

Comments
 (0)