Skip to content

Commit 31bdd4b

Browse files
Add implicit catalog dependency detection for DLT pipelines
Pipelines have a Catalog field that was used to look up schemas but was never itself resolved to an explicit ${resources.catalogs.<key>.name} reference. Co-authored-by: Isaac
1 parent 2782447 commit 31bdd4b

File tree

2 files changed

+46
-0
lines changed

2 files changed

+46
-0
lines changed

bundle/config/mutator/resourcemutator/capture_uc_dependencies.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ func (m *captureUCDependencies) Apply(ctx context.Context, b *bundle.Bundle) dia
114114
// exclusive i.e. only one can be set at a time.
115115
p.Schema = resolveSchema(b, p.Catalog, p.Schema)
116116
p.Target = resolveSchema(b, p.Catalog, p.Target)
117+
p.Catalog = resolveCatalog(b, p.Catalog)
117118
}
118119
for _, qm := range b.Config.Resources.QualityMonitors {
119120
if qm == nil || qm.OutputSchemaName == "" {

bundle/config/mutator/resourcemutator/capture_uc_dependencies_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,51 @@ func TestCaptureUCDependenciesForPipelinesWithSchema(t *testing.T) {
276276
}
277277
}
278278

279+
func TestCaptureUCDependenciesForPipelineCatalog(t *testing.T) {
280+
b := &bundle.Bundle{
281+
Config: config.Root{
282+
Resources: config.Resources{
283+
Catalogs: map[string]*resources.Catalog{
284+
"catalog1": {
285+
CreateCatalog: catalog.CreateCatalog{
286+
Name: "catalog1",
287+
},
288+
},
289+
},
290+
Pipelines: map[string]*resources.Pipeline{
291+
"pipeline1": {
292+
CreatePipeline: pipelines.CreatePipeline{
293+
Catalog: "catalog1",
294+
Schema: "foobar",
295+
},
296+
},
297+
"pipeline2": {
298+
CreatePipeline: pipelines.CreatePipeline{
299+
Catalog: "catalogX",
300+
Schema: "foobar",
301+
},
302+
},
303+
"pipeline3": {
304+
CreatePipeline: pipelines.CreatePipeline{
305+
Catalog: "",
306+
Schema: "foobar",
307+
},
308+
},
309+
"nilPipeline": nil,
310+
},
311+
},
312+
},
313+
}
314+
315+
d := bundle.Apply(t.Context(), b, CaptureUCDependencies())
316+
require.Nil(t, d)
317+
318+
assert.Equal(t, "${resources.catalogs.catalog1.name}", b.Config.Resources.Pipelines["pipeline1"].Catalog)
319+
assert.Equal(t, "catalogX", b.Config.Resources.Pipelines["pipeline2"].Catalog)
320+
assert.Equal(t, "", b.Config.Resources.Pipelines["pipeline3"].Catalog)
321+
assert.Nil(t, b.Config.Resources.Pipelines["nilPipeline"])
322+
}
323+
279324
func TestCaptureUCDependenciesForRegisteredModel(t *testing.T) {
280325
b := &bundle.Bundle{
281326
Config: config.Root{

0 commit comments

Comments
 (0)