Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
d8e8324
fix(#182): add dbt-local-path
witash Nov 21, 2024
c7bf41f
fix(#182): changing path name
witash Nov 21, 2024
6e54662
chore(#182): changing tests to use local path
witash Nov 21, 2024
0074e05
Merge branch 'main' into 182-add-local-path
witash Jan 15, 2025
6f7fc59
fix: adding dbt-thread-count to local docker-compose
witash Jan 15, 2025
4fc4018
feat: using dbt selectors to test separating dbt runs in k8s
witash Jan 15, 2025
e2ee2ff
fix(#172): adding selector to manifest query
witash Jan 30, 2025
5c872f7
fate(#172): adding source column to couch table
witash Jan 30, 2025
8a6feb4
fix(#172): fixing test
witash Jan 30, 2025
8731421
fix(#172): list affected models
witash Jan 31, 2025
fdc48a7
chore(#172): add tests and docker profiles
witash Feb 7, 2025
3d6b50e
fix(#172): adding back restart always
witash Feb 10, 2025
2ce148b
fix(#172): try increasing timeout to fix test
witash Feb 11, 2025
fd8bd63
feat(#156): add batch size
witash Feb 11, 2025
c93e5d8
Merge branch '182-add-local-path' into v2
witash Feb 17, 2025
53bd60f
fix(#182): fix tests using local path
witash Feb 17, 2025
109c42f
Merge branch '156-batch-with-limit-only' into v2
witash Feb 17, 2025
2f1ad5a
fix(#127): removing unused service and changing app names
witash Jul 31, 2024
0be1d02
fix: heml chart fixes and inclusions from Moh Ke
witash Feb 18, 2025
e1677e0
chore: adding migrations scripts for v2
witash Feb 18, 2025
26f4303
fix: changing dockerhub username
witash Feb 19, 2025
fab35ce
fix: changing from multiple files to profiles
witash Feb 28, 2025
5a684ec
fix: fixing tests
witash Mar 3, 2025
bc19b82
fix: allowing local profile w/o DBT_LOCAL_PATH
witash Mar 5, 2025
57a8d15
fix: small fixes for docker-compose
witash Mar 7, 2025
f7b12ae
fix: add dbt_selector to migration script
witash Mar 7, 2025
bf34239
Revert "fix: changing dockerhub username"
witash Mar 14, 2025
3d23268
fix: add selector to model change command
witash Mar 28, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions couch2pg/src/importer.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ const UPDATE_SEQ_STMT = `
SET seq = $1, pending = $2, updated_at = CURRENT_TIMESTAMP
WHERE source = $3
`;
const INSERT_DOCS_STMT = `INSERT INTO ${db.postgresTable} (saved_timestamp, _id, _deleted, doc) VALUES`;
const INSERT_DOCS_STMT = `INSERT INTO ${db.postgresTable} (saved_timestamp, _id, _deleted, source, doc) VALUES`;
const ON_CONFLICT_STMT = `
ON CONFLICT (_id) DO UPDATE SET
saved_timestamp = EXCLUDED.saved_timestamp,
_deleted = EXCLUDED._deleted,
source = EXCLUDED.source,
doc = EXCLUDED.doc
`;

Expand Down Expand Up @@ -63,7 +64,7 @@ const storeSeq = async (seq, pending, source) => {
await client.end();
};

const buildBulkInsertQuery = (allDocs) => {
const buildBulkInsertQuery = (allDocs, source) => {
const now = new Date().toISOString();

let idx = 1;
Expand All @@ -72,8 +73,8 @@ const buildBulkInsertQuery = (allDocs) => {

allDocs.rows.forEach((row) => {
removeSecurityDetails(row.doc);
insertStmts.push(`($${idx++}, $${idx++}, $${idx++}, $${idx++})`);
docsToInsert.push(now, row.id, !!row.deleted, sanitise(JSON.stringify(row.doc)));
insertStmts.push(`($${idx++}, $${idx++}, $${idx++}, $${idx++}, $${idx++})`);
docsToInsert.push(now, row.id, !!row.deleted, source, sanitise(JSON.stringify(row.doc)));
});

return {
Expand All @@ -97,7 +98,7 @@ const addDeletesToResult = (deletedDocs, allDocs) => {
Downloads all given documents from couchdb and stores them in Postgres, in batches.
We presume if a document is on this list it has changed, and thus needs updating.
*/
const loadAndStoreDocs = async (couchdb, docsToDownload) => {
const loadAndStoreDocs = async (couchdb, docsToDownload, source) => {
if (!docsToDownload.length) {
return;
}
Expand All @@ -109,13 +110,13 @@ const loadAndStoreDocs = async (couchdb, docsToDownload) => {

const docsToStore = addDeletesToResult(deletedDocs, allDocsResult);

await storeDocs(docsToStore);
await storeDocs(docsToStore, source);
};

const storeDocs = async (allDocsResult) => {
const storeDocs = async (allDocsResult, source) => {
let client;
try {
const { query, values } = buildBulkInsertQuery(allDocsResult);
const { query, values } = buildBulkInsertQuery(allDocsResult, source);

client = await db.getPgClient();
await client.query(query, values);
Expand All @@ -124,7 +125,7 @@ const storeDocs = async (allDocsResult) => {
if (err.code === '40P01') {
// deadlock detected
await client.end();
return storeDocs(allDocsResult);
return storeDocs(allDocsResult, source);
}
throw err;
}
Expand Down Expand Up @@ -154,7 +155,7 @@ const importChangesBatch = async (couchDb, source) => {
`in ${dbName}`);

console.log(`There are approximately ${pending} changes left in ${dbName}`);
await loadAndStoreDocs(couchDb, changes.results);
await loadAndStoreDocs(couchDb, changes.results, source);
await storeSeq(changes.last_seq, pending, source);

return changes.results.length;
Expand Down
6 changes: 6 additions & 0 deletions couch2pg/src/setup.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ CREATE TABLE IF NOT EXISTS ${db.postgresTable} (
saved_timestamp TIMESTAMP,
_id VARCHAR PRIMARY KEY,
_deleted BOOLEAN,
source varchar,
doc jsonb
)`;

Expand All @@ -25,12 +26,17 @@ const createTimestampIndex = `
CREATE INDEX CONCURRENTLY IF NOT EXISTS saved_timestamp ON ${db.postgresTable}(saved_timestamp);
`;

const createSourceIndex = `
CREATE INDEX CONCURRENTLY IF NOT EXISTS source ON ${db.postgresTable}(source);
`;

export const createDatabase = async () => {
const client = await db.getPgClient();
await client.query(createSchema);
await client.query(createTable);
await client.query(createDeleteIndex);
await client.query(createTimestampIndex);
await client.query(createSourceIndex);
await client.query(createProgressTable);
await client.end();
};
55 changes: 38 additions & 17 deletions couch2pg/tests/unit/importer.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ const updateSeqMatch = () => `
WHERE source = $3
`;

const insertDocsMatch = () => `INSERT INTO ${db.postgresTable} (saved_timestamp, _id, _deleted, doc) VALUES`;
const insertDocsMatch = () => `INSERT INTO ${db.postgresTable} (saved_timestamp, _id, _deleted, source, doc) VALUES`;

const ON_CONFLICT_STMT = `
ON CONFLICT (_id) DO UPDATE SET
saved_timestamp = EXCLUDED.saved_timestamp,
_deleted = EXCLUDED._deleted,
source = EXCLUDED.source,
doc = EXCLUDED.doc
`;

Expand Down Expand Up @@ -156,22 +157,25 @@ describe('importer', () => {
expect(couchDb.allDocs.args).to.deep.equal([[{ include_docs: true, keys: ['doc1', 'doc2', 'doc3'] }]]);

expect(pgClient.query.withArgs(sinon.match(insertDocsMatch())).args).to.deep.equal([[
'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, doc) VALUES ' +
'($1, $2, $3, $4),($5, $6, $7, $8),($9, $10, $11, $12) ' + ON_CONFLICT_STMT,
'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, source, doc) VALUES ' +
'($1, $2, $3, $4, $5),($6, $7, $8, $9, $10),($11, $12, $13, $14, $15) ' + ON_CONFLICT_STMT,
[
now.toISOString(),
'doc1',
false,
'thehost/medic',
JSON.stringify(docs[0].doc),

now.toISOString(),
'doc2',
false,
'thehost/medic',
JSON.stringify(docs[1].doc),

now.toISOString(),
'doc3',
false,
'thehost/medic',
JSON.stringify(docs[2].doc),
]
]]);
Expand Down Expand Up @@ -234,64 +238,73 @@ describe('importer', () => {

expect(pgClient.query.withArgs(sinon.match(insertDocsMatch())).callCount).to.equal(3);
expect(pgClient.query.withArgs(sinon.match(insertDocsMatch())).args[0]).to.deep.equal([
'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, doc) VALUES ' +
'($1, $2, $3, $4),($5, $6, $7, $8),($9, $10, $11, $12) ' + ON_CONFLICT_STMT,
'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, source, doc) VALUES ' +
'($1, $2, $3, $4, $5),($6, $7, $8, $9, $10),($11, $12, $13, $14, $15) ' + ON_CONFLICT_STMT,
[
now.toISOString(),
iterationOne.docs[0].id,
false,
'thehost/medic',
JSON.stringify(iterationOne.docs[0].doc),

now.toISOString(),
iterationOne.docs[1].id,
false,
'thehost/medic',
JSON.stringify(iterationOne.docs[1].doc),

now.toISOString(),
iterationOne.docs[2].id,
false,
'thehost/medic',
JSON.stringify(iterationOne.docs[2].doc),
]
]);

expect(pgClient.query.withArgs(sinon.match(insertDocsMatch())).args[1]).to.deep.equal([
'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, doc) VALUES ' +
'($1, $2, $3, $4),($5, $6, $7, $8),($9, $10, $11, $12) ' + ON_CONFLICT_STMT,
'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, source, doc) VALUES ' +
'($1, $2, $3, $4, $5),($6, $7, $8, $9, $10),($11, $12, $13, $14, $15) ' + ON_CONFLICT_STMT,
[
now.toISOString(),
iterationTwo.docs[0].id,
false,
'thehost/medic',
JSON.stringify(iterationTwo.docs[0].doc),

now.toISOString(),
iterationTwo.docs[1].id,
false,
'thehost/medic',
JSON.stringify(iterationTwo.docs[1].doc),

now.toISOString(),
iterationTwo.docs[2].id,
false,
'thehost/medic',
JSON.stringify(iterationTwo.docs[2].doc),
]
]);

expect(pgClient.query.withArgs(sinon.match(insertDocsMatch())).args[2]).to.deep.equal([
'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, doc) VALUES ' +
'($1, $2, $3, $4),($5, $6, $7, $8),($9, $10, $11, $12) ' + ON_CONFLICT_STMT,
'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, source, doc) VALUES ' +
'($1, $2, $3, $4, $5),($6, $7, $8, $9, $10),($11, $12, $13, $14, $15) ' + ON_CONFLICT_STMT,
[
now.toISOString(),
iterationThree.docs[0].id,
false,
'thehost/medic',
JSON.stringify(iterationThree.docs[0].doc),

now.toISOString(),
iterationThree.docs[1].id,
false,
'thehost/medic',
JSON.stringify(iterationThree.docs[1].doc),

now.toISOString(),
iterationThree.docs[2].id,
false,
'thehost/medic',
JSON.stringify(iterationThree.docs[2].doc),
]
]);
Expand All @@ -315,12 +328,13 @@ describe('importer', () => {
await importer(couchDb);

expect(pgClient.query.withArgs(sinon.match(insertDocsMatch())).args).to.deep.equal([[
'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, doc) VALUES ' +
'($1, $2, $3, $4) ' + ON_CONFLICT_STMT,
'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, source, doc) VALUES ' +
'($1, $2, $3, $4, $5) ' + ON_CONFLICT_STMT,
[
new Date().toISOString(),
'change',
false,
'thehost/medic',
JSON.stringify({
_id: 'change',
_rev: '1',
Expand Down Expand Up @@ -352,12 +366,13 @@ describe('importer', () => {
await importer(couchDb);

expect(pgClient.query.withArgs(sinon.match(insertDocsMatch())).args).to.deep.equal([[
'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, doc) VALUES ' +
'($1, $2, $3, $4) ' + ON_CONFLICT_STMT,
'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, source, doc) VALUES ' +
'($1, $2, $3, $4, $5) ' + ON_CONFLICT_STMT,
[
new Date().toISOString(),
'change',
false,
'thehost/medic',
JSON.stringify({
_id: 'org.couchdb.user:paul',
_rev: '1',
Expand Down Expand Up @@ -392,22 +407,25 @@ describe('importer', () => {
expect(couchDb.allDocs.args).to.deep.equal([[{ include_docs: true, keys: ['doc2'] }]]);

expect(pgClient.query.withArgs(sinon.match(insertDocsMatch())).args).to.deep.equal([[
'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, doc) VALUES ' +
'($1, $2, $3, $4),($5, $6, $7, $8),($9, $10, $11, $12) ' + ON_CONFLICT_STMT,
'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, source, doc) VALUES ' +
'($1, $2, $3, $4, $5),($6, $7, $8, $9, $10),($11, $12, $13, $14, $15) ' + ON_CONFLICT_STMT,
[
now.toISOString(),
'doc2',
false,
'thehost/medic',
JSON.stringify(docs[0].doc),

now.toISOString(),
'doc1',
true,
'thehost/medic',
JSON.stringify({ _id: 'doc1', _rev: 1, _deleted: true }),

now.toISOString(),
'doc3',
true,
'thehost/medic',
JSON.stringify({ _id: 'doc3', _rev: undefined, _deleted: true }),
]
]]);
Expand Down Expand Up @@ -565,22 +583,25 @@ describe('importer', () => {
expect(couchDb.allDocs.args).to.deep.equal([[{ include_docs: true, keys: ['doc1', 'doc2', 'doc3'] }]]);

const queryArgs = [
'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, doc) VALUES ' +
'($1, $2, $3, $4),($5, $6, $7, $8),($9, $10, $11, $12) ' + ON_CONFLICT_STMT,
'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, source, doc) VALUES ' +
'($1, $2, $3, $4, $5),($6, $7, $8, $9, $10),($11, $12, $13, $14, $15) ' + ON_CONFLICT_STMT,
[
now.toISOString(),
'doc1',
false,
'thehost/medic',
JSON.stringify(docs[0].doc),

now.toISOString(),
'doc2',
false,
'thehost/medic',
JSON.stringify(docs[1].doc),

now.toISOString(),
'doc3',
false,
'thehost/medic',
JSON.stringify(docs[2].doc),
]
];
Expand Down
6 changes: 5 additions & 1 deletion couch2pg/tests/unit/setup.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,15 @@ describe('setup', () => {
await setup.createDatabase();

expect(db.getPgClient.calledOnce).to.equal(true);
expect(pgClient.query.callCount).to.equal(5);
expect(pgClient.query.callCount).to.equal(6);
expect(pgClient.end.calledOnce).to.equal(true);
expect(pgClient.query.args[0]).to.deep.equal(['CREATE SCHEMA IF NOT EXISTS v1']);
expect(pgClient.query.args[1]).to.deep.equal([ `
CREATE TABLE IF NOT EXISTS v1.whatever (
saved_timestamp TIMESTAMP,
_id VARCHAR PRIMARY KEY,
_deleted BOOLEAN,
source varchar,
doc jsonb
)`]);
expect(pgClient.query.args[2]).to.deep.equal([`
Expand All @@ -49,6 +50,9 @@ CREATE INDEX CONCURRENTLY IF NOT EXISTS _deleted ON v1.whatever(_deleted);
CREATE INDEX CONCURRENTLY IF NOT EXISTS saved_timestamp ON v1.whatever(saved_timestamp);
`]);
expect(pgClient.query.args[4]).to.deep.equal([`
CREATE INDEX CONCURRENTLY IF NOT EXISTS source ON v1.whatever(source);
`]);
expect(pgClient.query.args[5]).to.deep.equal([`
CREATE TABLE IF NOT EXISTS v1.couchdb_progress (
seq varchar,
pending integer,
Expand Down
Loading