Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ function Invoke-BufferedWrite {
Invoke-DuckDBUpsert -Connection $Connection -TableName $TableName `
-Data $normalizedData -PKColumns $PKColumns `
-UseCsvImport:$UseCsvImport -SimpleTypesOnly:$SimpleTypesOnly
# Result object (Inserts, Updates) is passed through to the caller
}
24 changes: 22 additions & 2 deletions SqlPipeline/SqlPipeline/Private/duckdb/Invoke-DuckDBUpsert.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -55,23 +55,43 @@ function Invoke-DuckDBUpsert {
$setClause = ($setCols | ForEach-Object { """$_"" = excluded.""$_""" }) -join ', '
$pkList = $PKColumns -join ', '

# Count inserts vs updates before the merge
$joinClause = ($PKColumns | ForEach-Object { "t.`"$_`" = s.`"$_`"" }) -join " AND "
$countResult = Get-DuckDBData -Connection $Connection -Query @"
SELECT
COUNT(*) FILTER (WHERE EXISTS (SELECT 1 FROM $TableName t WHERE $joinClause)) AS Updates,
COUNT(*) FILTER (WHERE NOT EXISTS (SELECT 1 FROM $TableName t WHERE $joinClause)) AS Inserts
FROM $stagingTable s
"@
$insertCount = [long]$countResult.Rows[0]["Inserts"]
$updateCount = [long]$countResult.Rows[0]["Updates"]

Write-Verbose "[$TableName] Performing UPSERT with PK columns: $pkList"
Invoke-DuckDBQuery -Connection $Connection -Query @"
INSERT INTO $TableName
SELECT * FROM $stagingTable
ON CONFLICT ($pkList) DO UPDATE SET $setClause
"@
} else {
# No PK defined - plain INSERT
# No PK defined - plain INSERT; count staging rows
$countResult = Get-DuckDBData -Connection $Connection -Query "SELECT COUNT(*) AS cnt FROM $stagingTable"
$insertCount = [long]$countResult.Rows[0]["cnt"]
$updateCount = 0L

Invoke-DuckDBQuery -Connection $Connection -Query @"
INSERT INTO $TableName
SELECT * FROM $stagingTable
"@
}

Write-Verbose "[$TableName] Merge completed."
Write-Verbose "[$TableName] Merge completed. Inserts: $insertCount, Updates: $updateCount."

# Clean up staging table
Invoke-DuckDBQuery -Connection $Connection -Query "DROP TABLE IF EXISTS $stagingTable"
Write-Verbose "[$TableName] UPSERT completed."

[PSCustomObject]@{
Inserts = $insertCount
Updates = $updateCount
}
}
19 changes: 16 additions & 3 deletions SqlPipeline/SqlPipeline/Public/duckdb/Add-RowsToDuckDB.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ function Add-RowsToDuckDB {
}
$buffer = [System.Collections.Generic.List[PSObject]]::new()
$rowCount = 0
$totalInserts = 0L
$totalUpdates = 0L
Write-Verbose "[$TableName] Add-RowsToDuckDB started (UseTransaction=$UseTransaction, BatchSize=$BatchSize)"
}

Expand All @@ -100,8 +102,10 @@ function Add-RowsToDuckDB {
# Without UseTransaction: write in batches once BatchSize is reached
if (-not $UseTransaction -and $buffer.Count -ge $BatchSize) {
Write-Verbose "[$TableName] Batch write: $($buffer.Count) rows"
Invoke-BufferedWrite -Connection $Connection -TableName $TableName `
$batchResult = Invoke-BufferedWrite -Connection $Connection -TableName $TableName `
-Data $buffer -PKColumns $PKColumns -UseCsvImport:$UseCsvImport -SimpleTypesOnly:$SimpleTypesOnly
$totalInserts += $batchResult.Inserts
$totalUpdates += $batchResult.Updates
$buffer.Clear()
}
}
Expand All @@ -113,13 +117,22 @@ function Add-RowsToDuckDB {
}

Write-Verbose "[$TableName] Final write: $($buffer.Count) rows (total: $rowCount)"
Invoke-BufferedWrite -Connection $Connection -TableName $TableName `
$finalResult = Invoke-BufferedWrite -Connection $Connection -TableName $TableName `
-Data $buffer -PKColumns $PKColumns -UseCsvImport:$UseCsvImport -SimpleTypesOnly:$SimpleTypesOnly
Write-Information "[$TableName] $rowCount rows inserted via pipeline."
$totalInserts += $finalResult.Inserts
$totalUpdates += $finalResult.Updates
Write-Information "[$TableName] $rowCount rows processed: $totalInserts inserts, $totalUpdates updates."

# Force DuckDB to flush changes to disk (important for in-memory connections or when using transactions)
Invoke-DuckDBQuery -Connection $Connection -Query "FORCE CHECKPOINT"

[PSCustomObject]@{
TableName = $TableName
RowsInserted = $totalInserts
RowsUpdated = $totalUpdates
RowsTotal = $rowCount
}

}
}
#endregion
3 changes: 2 additions & 1 deletion SqlPipeline/SqlPipeline/SqlPipeline.psd1
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
RootModule = 'SqlPipeline.psm1'

# Die Versionsnummer dieses Moduls
ModuleVersion = '0.3.5'
ModuleVersion = '0.3.6'

# Unterstützte PSEditions
# CompatiblePSEditions = @()
Expand Down Expand Up @@ -126,6 +126,7 @@ PrivateData = @{

# 'ReleaseNotes' des Moduls
ReleaseNotes = '
0.3.6 Adding functionality to count updates and inserts when executing the MERGE
0.3.5 Added function to show open DuckDB connections: Show-DuckDBConnection
0.3.4 Fixing package installation with PowerShell 5.1 because Expand-Archive only supports *.zip files
0.3.3 Extending Install-SqlPipeline to install DuckDB.net 1.4.4 when using PowerShell 5.1 (latest supported version), pwsh is supporting all latest versions
Expand Down
114 changes: 114 additions & 0 deletions SqlPipeline/Tests/SqlPipeline_DuckDB.Tests.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ Describe "Add-RowsToDuckDB" -Skip:(-not $script:duckDBAvailable) {
Invoke-DuckDBQuery -Query "DROP TABLE IF EXISTS ard_upsert" -ErrorAction SilentlyContinue
Invoke-DuckDBQuery -Query "DROP TABLE IF EXISTS ard_schema" -ErrorAction SilentlyContinue
Invoke-DuckDBQuery -Query "DROP TABLE IF EXISTS ard_tx" -ErrorAction SilentlyContinue
Invoke-DuckDBQuery -Query "DROP TABLE IF EXISTS ard_result" -ErrorAction SilentlyContinue
Invoke-DuckDBQuery -Query "DROP TABLE IF EXISTS ard_multi_pk" -ErrorAction SilentlyContinue
}

It "Inserts PSCustomObject rows" {
Expand Down Expand Up @@ -165,6 +167,118 @@ Describe "Add-RowsToDuckDB" -Skip:(-not $script:duckDBAvailable) {
[int]$result.Rows[0]["cnt"] | Should -Be 25
}

# ---------------------------------------------------------------------------
# Result object (RowsInserted / RowsUpdated / RowsTotal)
# ---------------------------------------------------------------------------

It "Returns a result object with TableName, RowsInserted, RowsUpdated and RowsTotal properties" {
$result = [PSCustomObject]@{ Id = 1; Val = "a" } | Add-RowsToDuckDB -TableName "ard_result" -PKColumns "Id"
$result | Should -Not -BeNullOrEmpty
$result.PSObject.Properties.Name | Should -Contain "TableName"
$result.PSObject.Properties.Name | Should -Contain "RowsInserted"
$result.PSObject.Properties.Name | Should -Contain "RowsUpdated"
$result.PSObject.Properties.Name | Should -Contain "RowsTotal"
}

It "TableName in result matches the target table" {
$result = [PSCustomObject]@{ Id = 1 } | Add-RowsToDuckDB -TableName "ard_result"
$result.TableName | Should -Be "ard_result"
}

It "Reports all rows as inserts and zero updates on plain INSERT (no PKColumns)" {
$rows = @(
[PSCustomObject]@{ Id = 1; Val = "a" }
[PSCustomObject]@{ Id = 2; Val = "b" }
[PSCustomObject]@{ Id = 3; Val = "c" }
)
$result = $rows | Add-RowsToDuckDB -TableName "ard_result"
$result.RowsInserted | Should -Be 3
$result.RowsUpdated | Should -Be 0
$result.RowsTotal | Should -Be 3
}

It "Reports all rows as inserts and zero updates on first UPSERT load" {
$rows = @(
[PSCustomObject]@{ Id = 1; Val = "first" }
[PSCustomObject]@{ Id = 2; Val = "first" }
)
$result = $rows | Add-RowsToDuckDB -TableName "ard_upsert" -PKColumns "Id"
$result.RowsInserted | Should -Be 2
$result.RowsUpdated | Should -Be 0
$result.RowsTotal | Should -Be 2
}

It "Reports all rows as updates and zero inserts when every PK already exists" {
@(
[PSCustomObject]@{ Id = 1; Val = "original" }
[PSCustomObject]@{ Id = 2; Val = "original" }
) | Add-RowsToDuckDB -TableName "ard_upsert" -PKColumns "Id" | Out-Null

$result = @(
[PSCustomObject]@{ Id = 1; Val = "updated" }
[PSCustomObject]@{ Id = 2; Val = "updated" }
) | Add-RowsToDuckDB -TableName "ard_upsert" -PKColumns "Id"

$result.RowsInserted | Should -Be 0
$result.RowsUpdated | Should -Be 2
$result.RowsTotal | Should -Be 2
}

It "Reports correct split when some rows are inserts and some are updates" {
@(
[PSCustomObject]@{ Id = 1; Val = "original" }
[PSCustomObject]@{ Id = 2; Val = "original" }
) | Add-RowsToDuckDB -TableName "ard_upsert" -PKColumns "Id" | Out-Null

$result = @(
[PSCustomObject]@{ Id = 2; Val = "updated" } # existing -> update
[PSCustomObject]@{ Id = 3; Val = "new" } # new -> insert
[PSCustomObject]@{ Id = 4; Val = "new" } # new -> insert
) | Add-RowsToDuckDB -TableName "ard_upsert" -PKColumns "Id"

$result.RowsInserted | Should -Be 2
$result.RowsUpdated | Should -Be 1
$result.RowsTotal | Should -Be 3
}

It "Accumulates insert counts correctly across multiple batches" {
$rows = 1..25 | ForEach-Object { [PSCustomObject]@{ Num = $_ } }
$result = $rows | Add-RowsToDuckDB -TableName "ard_result" -BatchSize 10
$result.RowsInserted | Should -Be 25
$result.RowsUpdated | Should -Be 0
$result.RowsTotal | Should -Be 25
}

It "Accumulates update counts correctly across multiple batches" {
# Pre-load 25 rows
1..25 | ForEach-Object { [PSCustomObject]@{ Id = $_; Val = "old" } } |
Add-RowsToDuckDB -TableName "ard_upsert" -PKColumns "Id" | Out-Null

# Re-load same 25 rows (all updates) in small batches
$result = 1..25 | ForEach-Object { [PSCustomObject]@{ Id = $_; Val = "new" } } |
Add-RowsToDuckDB -TableName "ard_upsert" -PKColumns "Id" -BatchSize 10

$result.RowsInserted | Should -Be 0
$result.RowsUpdated | Should -Be 25
$result.RowsTotal | Should -Be 25
}

It "Reports correct counts with a composite (multi-column) primary key" {
@(
[PSCustomObject]@{ RegionId = 1; ProductId = 10; Sales = 100 }
[PSCustomObject]@{ RegionId = 1; ProductId = 20; Sales = 200 }
) | Add-RowsToDuckDB -TableName "ard_multi_pk" -PKColumns "RegionId","ProductId" | Out-Null

$result = @(
[PSCustomObject]@{ RegionId = 1; ProductId = 10; Sales = 999 } # update
[PSCustomObject]@{ RegionId = 2; ProductId = 10; Sales = 50 } # insert
) | Add-RowsToDuckDB -TableName "ard_multi_pk" -PKColumns "RegionId","ProductId"

$result.RowsInserted | Should -Be 1
$result.RowsUpdated | Should -Be 1
$result.RowsTotal | Should -Be 2
}

}


Expand Down