diff --git a/couch2pg/src/importer.js b/couch2pg/src/importer.js index bad3f11..32a2f4b 100644 --- a/couch2pg/src/importer.js +++ b/couch2pg/src/importer.js @@ -74,7 +74,7 @@ const buildBulkInsertQuery = (allDocs, source) => { allDocs.rows.forEach((row) => { removeSecurityDetails(row.doc); insertStmts.push(`($${idx++}, $${idx++}, $${idx++}, $${idx++}, $${idx++})`); - docsToInsert.push(now, row.id, !!row.deleted, source, sanitise(JSON.stringify(row.doc))); + docsToInsert.push(now, row.doc._id, !!row.deleted, source, sanitise(JSON.stringify(row.doc))); }); return { @@ -95,18 +95,26 @@ const addDeletesToResult = (deletedDocs, allDocs) => { }; /* - Downloads all given documents from couchdb and stores them in Postgres, in batches. - We presume if a document is on this list it has changed, and thus needs updating. + Processes documents from changes feed and stores them in Postgres. + Documents are already included in the changes feed with include_docs=true. */ -const loadAndStoreDocs = async (couchdb, docsToDownload, source) => { - if (!docsToDownload.length) { +const loadAndStoreDocs = async (couchdb, changes, source) => { + if (!changes.length) { return; } - const deletedDocs = docsToDownload.filter(change => change.deleted); - const docIds = docsToDownload.filter(change => !change.deleted).map(change => change.id); - const allDocsResult = await couchdb.allDocs({ keys: docIds, include_docs: true }); - console.info('Pulled ' + allDocsResult.rows.length + ' results from couchdb'); + const deletedDocs = changes.filter(change => change.deleted); + const nonDeletedDocs = changes.filter(change => !change.deleted && change.doc); + + const allDocsResult = { + rows: nonDeletedDocs.map(change => ({ + id: change.id, + key: change.id, + doc: change.doc + })) + }; + + console.info('Processing ' + allDocsResult.rows.length + ' documents from changes feed'); const docsToStore = addDeletesToResult(deletedDocs, allDocsResult); @@ -144,14 +152,19 @@ const importChangesBatch = async (couchDb, source) => { pending = null; } - const changes = await couchDb.changes({ limit: BATCH_SIZE, since: seq, seq_interval: BATCH_SIZE }); + const changes = await couchDb.changes({ + limit: BATCH_SIZE, + since: seq, + seq_interval: BATCH_SIZE, + batch_size: BATCH_SIZE, + include_docs: true + }); console.log(`There are ${changes.results.length} changes to process in ${dbName}`); - const docsToDelete = []; - const docsToDownload = []; - changes.results.forEach(change => change.deleted ? docsToDelete.push(change) : docsToDownload.push(change)); + const deletedChanges = changes.results.filter(change => change.deleted); + const nonDeletedChanges = changes.results.filter(change => !change.deleted); - console.info(`There are ${docsToDelete.length} deletions and ${docsToDownload.length} new / changed documents ` + + console.info(`There are ${deletedChanges.length} deletions and ${nonDeletedChanges.length} new / changed documents ` + `in ${dbName}`); console.log(`There are approximately ${pending} changes left in ${dbName}`); @@ -183,4 +196,3 @@ export default async (couchdb) => { return totalDocs; }; - diff --git a/couch2pg/tests/unit/importer.spec.js b/couch2pg/tests/unit/importer.spec.js index 7496f3e..c2a4813 100644 --- a/couch2pg/tests/unit/importer.spec.js +++ b/couch2pg/tests/unit/importer.spec.js @@ -104,7 +104,9 @@ describe('importer', () => { await importer(couchDb); - expect(couchDb.changes.args).to.deep.equal([[{ limit: 1000, seq_interval: 1000, since: 0 }]]); + expect(couchDb.changes.args).to.deep.equal([[{ + limit: 1000, seq_interval: 1000, since: 0, batch_size: 1000, include_docs: true + }]]); expect(pgClient.query.calledThrice).to.equal(true); expect(pgClient.query.args[0]).to.deep.equal([getSeqMatch(), ['host/db']]); expect(pgClient.query.args[1]).to.deep.equal([insertSeqMatch(), [0, null, 'host/db']]); @@ -119,21 +121,21 @@ describe('importer', () => { await importer(couchDb); - expect(couchDb.changes.args).to.deep.equal([[{ limit: 1000, seq_interval: 1000, since: '22-123' }]]); + expect(couchDb.changes.args).to.deep.equal([[{ + limit: 1000, seq_interval: 1000, since: '22-123', batch_size: 1000, include_docs: true + }]]); }); it('should import one batch of documents', async () => { const now = new Date('2023-01-01'); clock.setSystemTime(now.valueOf()); - const changes = [{ id: 'doc1' }, { id: 'doc2' }, { id: 'doc3' }]; - const docs = [ + const changes = [ { id: 'doc1', doc: { _id: 'doc1', _rev: '23-1234', field: 'test1' } }, { id: 'doc2', doc: { _id: 'doc2', _rev: '3-fdsfs', field: 'test2' } }, { id: 'doc3', doc: { _id: 'doc3', _rev: '123-dsadadadssa', field: 'test3' } } ]; couchDb.changes.onCall(0).resolves({ results: changes, last_seq: '23-ppp' }); couchDb.changes.onCall(1).resolves({ results: [], last_seq: '25-vvv' }); - couchDb.allDocs.resolves({ rows: docs }); seqQueries.get .onCall(0).resolves({ rows: [{ seq: '1-22' }] }) @@ -144,8 +146,12 @@ describe('importer', () => { await importer(couchDb); expect(couchDb.changes.calledTwice).to.equal(true); - expect(couchDb.changes.args[0]).to.deep.equal([{ limit: 1000, seq_interval: 1000, since: '1-22' }]); - expect(couchDb.changes.args[1]).to.deep.equal([{ limit: 1000, seq_interval: 1000, since: '23-ppp' }]); + expect(couchDb.changes.args[0]).to.deep.equal([{ + limit: 1000, seq_interval: 1000, since: '1-22', batch_size: 1000, include_docs: true + }]); + expect(couchDb.changes.args[1]).to.deep.equal([{ + limit: 1000, seq_interval: 1000, since: '23-ppp', batch_size: 1000, include_docs: true + }]); expect(seqQueries.update.calledTwice).to.equal(true); expect(seqQueries.update.args).to.deep.equal([ @@ -153,8 +159,7 @@ describe('importer', () => { [updateSeqMatch(), ['25-vvv', 0, 'thehost/medic']], ]); - expect(couchDb.allDocs.calledOnce).to.equal(true); - expect(couchDb.allDocs.args).to.deep.equal([[{ include_docs: true, keys: ['doc1', 'doc2', 'doc3'] }]]); + expect(couchDb.allDocs.called).to.equal(false); expect(pgClient.query.withArgs(sinon.match(insertDocsMatch())).args).to.deep.equal([[ 'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, source, doc) VALUES ' + @@ -164,19 +169,19 @@ describe('importer', () => { 'doc1', false, 'thehost/medic', - JSON.stringify(docs[0].doc), + JSON.stringify(changes[0].doc), now.toISOString(), 'doc2', false, 'thehost/medic', - JSON.stringify(docs[1].doc), + JSON.stringify(changes[1].doc), now.toISOString(), 'doc3', false, 'thehost/medic', - JSON.stringify(docs[2].doc), + JSON.stringify(changes[2].doc), ] ]]); expect(db.getPgClient.callCount).to.equal(pgClient.end.callCount); @@ -184,10 +189,12 @@ describe('importer', () => { it('should import multiple batches of documents', async () => { const genChanges = (iteration, count) => { - const changes = Array.from({ length: count }).map((_, i) => ({ id: `doc${iteration}-${i}` })); - const docs = changes.map(change => ({ id: change.id, doc: { _id: change.id, _rev: '1-rev', field: 2 } })); + const changes = Array.from({ length: count }).map((_, i) => ({ + id: `doc${iteration}-${i}`, + doc: { _id: `doc${iteration}-${i}`, _rev: '1-rev', field: 2 } + })); - return { changes, docs }; + return { changes }; }; const now = new Date('2023-01-01'); @@ -201,9 +208,6 @@ describe('importer', () => { couchDb.changes.onCall(1).resolves({ results: iterationTwo.changes, last_seq: '6-seq' }); couchDb.changes.onCall(2).resolves({ results: iterationThree.changes, last_seq: '9-seq' }); couchDb.changes.onCall(3).resolves({ results: [], last_seq: '9-seq' }); - couchDb.allDocs.onCall(0).resolves({ rows: iterationOne.docs }); - couchDb.allDocs.onCall(1).resolves({ rows: iterationTwo.docs }); - couchDb.allDocs.onCall(2).resolves({ rows: iterationThree.docs }); seqQueries.get .onCall(0).resolves({ rows: [{ seq: '1-seq' }] }) @@ -216,10 +220,18 @@ describe('importer', () => { await importer(couchDb); expect(couchDb.changes.callCount).to.equal(4); - expect(couchDb.changes.args[0]).to.deep.equal([{ limit: 1000, seq_interval: 1000, since: '1-seq' }]); - expect(couchDb.changes.args[1]).to.deep.equal([{ limit: 1000, seq_interval: 1000, since: '3-seq' }]); - expect(couchDb.changes.args[2]).to.deep.equal([{ limit: 1000, seq_interval: 1000, since: '6-seq' }]); - expect(couchDb.changes.args[3]).to.deep.equal([{ limit: 1000, seq_interval: 1000, since: '9-seq' }]); + expect(couchDb.changes.args[0]).to.deep.equal([{ + limit: 1000, seq_interval: 1000, since: '1-seq', batch_size: 1000, include_docs: true + }]); + expect(couchDb.changes.args[1]).to.deep.equal([{ + limit: 1000, seq_interval: 1000, since: '3-seq', batch_size: 1000, include_docs: true + }]); + expect(couchDb.changes.args[2]).to.deep.equal([{ + limit: 1000, seq_interval: 1000, since: '6-seq', batch_size: 1000, include_docs: true + }]); + expect(couchDb.changes.args[3]).to.deep.equal([{ + limit: 1000, seq_interval: 1000, since: '9-seq', batch_size: 1000, include_docs: true + }]); expect(seqQueries.update.callCount).to.equal(4); expect(seqQueries.update.args).to.deep.equal([ @@ -229,12 +241,7 @@ describe('importer', () => { [updateSeqMatch(), ['9-seq', 0, 'thehost/medic']], ]); - expect(couchDb.allDocs.callCount).to.equal(3); - expect(couchDb.allDocs.args).to.deep.equal([ - [{ include_docs: true, keys: iterationOne.changes.map(c => c.id) }], - [{ include_docs: true, keys: iterationTwo.changes.map(c => c.id) }], - [{ include_docs: true, keys: iterationThree.changes.map(c => c.id) }], - ]); + expect(couchDb.allDocs.called).to.equal(false); expect(pgClient.query.withArgs(sinon.match(insertDocsMatch())).callCount).to.equal(3); expect(pgClient.query.withArgs(sinon.match(insertDocsMatch())).args[0]).to.deep.equal([ @@ -242,22 +249,22 @@ describe('importer', () => { '($1, $2, $3, $4, $5),($6, $7, $8, $9, $10),($11, $12, $13, $14, $15) ' + ON_CONFLICT_STMT, [ now.toISOString(), - iterationOne.docs[0].id, + iterationOne.changes[0].id, false, 'thehost/medic', - JSON.stringify(iterationOne.docs[0].doc), + JSON.stringify(iterationOne.changes[0].doc), now.toISOString(), - iterationOne.docs[1].id, + iterationOne.changes[1].id, false, 'thehost/medic', - JSON.stringify(iterationOne.docs[1].doc), + JSON.stringify(iterationOne.changes[1].doc), now.toISOString(), - iterationOne.docs[2].id, + iterationOne.changes[2].id, false, 'thehost/medic', - JSON.stringify(iterationOne.docs[2].doc), + JSON.stringify(iterationOne.changes[2].doc), ] ]); @@ -266,22 +273,22 @@ describe('importer', () => { '($1, $2, $3, $4, $5),($6, $7, $8, $9, $10),($11, $12, $13, $14, $15) ' + ON_CONFLICT_STMT, [ now.toISOString(), - iterationTwo.docs[0].id, + iterationTwo.changes[0].id, false, 'thehost/medic', - JSON.stringify(iterationTwo.docs[0].doc), + JSON.stringify(iterationTwo.changes[0].doc), now.toISOString(), - iterationTwo.docs[1].id, + iterationTwo.changes[1].id, false, 'thehost/medic', - JSON.stringify(iterationTwo.docs[1].doc), + JSON.stringify(iterationTwo.changes[1].doc), now.toISOString(), - iterationTwo.docs[2].id, + iterationTwo.changes[2].id, false, 'thehost/medic', - JSON.stringify(iterationTwo.docs[2].doc), + JSON.stringify(iterationTwo.changes[2].doc), ] ]); @@ -290,22 +297,22 @@ describe('importer', () => { '($1, $2, $3, $4, $5),($6, $7, $8, $9, $10),($11, $12, $13, $14, $15) ' + ON_CONFLICT_STMT, [ now.toISOString(), - iterationThree.docs[0].id, + iterationThree.changes[0].id, false, 'thehost/medic', - JSON.stringify(iterationThree.docs[0].doc), + JSON.stringify(iterationThree.changes[0].doc), now.toISOString(), - iterationThree.docs[1].id, + iterationThree.changes[1].id, false, 'thehost/medic', - JSON.stringify(iterationThree.docs[1].doc), + JSON.stringify(iterationThree.changes[1].doc), now.toISOString(), - iterationThree.docs[2].id, + iterationThree.changes[2].id, false, 'thehost/medic', - JSON.stringify(iterationThree.docs[2].doc), + JSON.stringify(iterationThree.changes[2].doc), ] ]); expect(db.getPgClient.callCount).to.equal(pgClient.end.callCount); @@ -313,20 +320,21 @@ describe('importer', () => { it('should sanitize input', async () => { clock.setSystemTime(); - couchDb.changes.onCall(0).resolves({ results: [{ id: 'change' }], last_seq: '2' }); - couchDb.changes.onCall(1).resolves({ results: [], last_seq: '2' }); const brokenDoc = { _id: 'change', _rev: '1', field: '\u0000', otherField: 'something\u0000something\\u0000something', }; - couchDb.allDocs.resolves({ rows: [{ id: 'change', doc: brokenDoc }] }); + couchDb.changes.onCall(0).resolves({ results: [{ id: 'change', doc: brokenDoc }], last_seq: '2' }); + couchDb.changes.onCall(1).resolves({ results: [], last_seq: '2' }); seqQueries.get.resolves({ rows: [{ seq: '1-22' }] }); await importer(couchDb); + expect(couchDb.allDocs.called).to.equal(false); + expect(pgClient.query.withArgs(sinon.match(insertDocsMatch())).args).to.deep.equal([[ 'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, source, doc) VALUES ' + '($1, $2, $3, $4, $5) ' + ON_CONFLICT_STMT, @@ -347,8 +355,6 @@ describe('importer', () => { it('should remove security details from user docs', async () => { clock.setSystemTime(); - couchDb.changes.onCall(0).resolves({ results: [{ id: 'change' }], last_seq: '2' }); - couchDb.changes.onCall(1).resolves({ results: [], last_seq: '2' }); const userDoc = { _id: 'org.couchdb.user:paul', _rev: '1', @@ -359,18 +365,21 @@ describe('importer', () => { name: 'paul', field: 2 }; - couchDb.allDocs.resolves({ rows: [{ id: 'change', doc: userDoc }] }); + couchDb.changes.onCall(0).resolves({ results: [{ id: 'org.couchdb.user:paul', doc: userDoc }], last_seq: '2' }); + couchDb.changes.onCall(1).resolves({ results: [], last_seq: '2' }); seqQueries.get.resolves({ rows: [{ seq: '1-22' }] }); await importer(couchDb); + expect(couchDb.allDocs.called).to.equal(false); + expect(pgClient.query.withArgs(sinon.match(insertDocsMatch())).args).to.deep.equal([[ 'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, source, doc) VALUES ' + '($1, $2, $3, $4, $5) ' + ON_CONFLICT_STMT, [ new Date().toISOString(), - 'change', + 'org.couchdb.user:paul', false, 'thehost/medic', JSON.stringify({ @@ -389,22 +398,17 @@ describe('importer', () => { clock.setSystemTime(now.valueOf()); const changes = [ { id: 'doc1', deleted: true, changes: [{ rev: 1 }] }, - { id: 'doc2' }, - { id: 'doc3', deleted: true } - ]; - const docs = [ { id: 'doc2', doc: { _id: 'doc2', _rev: '3-fdsfs', field: 'test2' } }, + { id: 'doc3', deleted: true } ]; couchDb.changes.onCall(0).resolves({ results: changes, last_seq: '2' }); couchDb.changes.onCall(1).resolves({ results: [], last_seq: '2' }); - couchDb.allDocs.resolves({ rows: docs }); seqQueries.get.resolves({ rows: [{ seq: '1-22' }] }); await importer(couchDb); - expect(couchDb.allDocs.calledOnce).to.equal(true); - expect(couchDb.allDocs.args).to.deep.equal([[{ include_docs: true, keys: ['doc2'] }]]); + expect(couchDb.allDocs.called).to.equal(false); expect(pgClient.query.withArgs(sinon.match(insertDocsMatch())).args).to.deep.equal([[ 'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, source, doc) VALUES ' + @@ -414,7 +418,7 @@ describe('importer', () => { 'doc2', false, 'thehost/medic', - JSON.stringify(docs[0].doc), + JSON.stringify(changes[1].doc), now.toISOString(), 'doc1', @@ -479,30 +483,16 @@ describe('importer', () => { expect(insertQuery.called).to.equal(false); }); - it('should throw error when getting docs fails', async () => { - seqQueries.get.resolves({ rows: [] }); - couchDb.changes.resolves({ results: [{ id: 3 }], last_seq: '2' }); - couchDb.allDocs.rejects(new Error('502')); - - await expect(importer(couchDb)).to.eventually.be.rejectedWith('502'); - - expect(couchDb.changes.called).to.equal(true); - expect(couchDb.allDocs.called).to.equal(true); - expect(seqQueries.update.called).to.equal(false); - expect(seqQueries.get.called).to.equal(true); - expect(insertQuery.called).to.equal(false); - }); it('should throw error when saving docs fails', async () => { seqQueries.get.resolves({ rows: [] }); - couchDb.changes.resolves({ results: [{ id: 3 }], last_seq: '2' }); - couchDb.allDocs.resolves({ rows: [{ id: 3, doc: { _id: 3, _rev: '3-fdsfs' }}] }); + couchDb.changes.resolves({ results: [{ id: 3, doc: { _id: 3, _rev: '3-fdsfs' } }], last_seq: '2' }); insertQuery.rejects(new Error('out of space or something')); await expect(importer(couchDb)).to.eventually.be.rejectedWith('out of space or something'); expect(couchDb.changes.called).to.equal(true); - expect(couchDb.allDocs.called).to.equal(true); + expect(couchDb.allDocs.called).to.equal(false); expect(seqQueries.update.called).to.equal(false); expect(seqQueries.get.called).to.equal(true); expect(insertQuery.called).to.equal(true); @@ -510,15 +500,14 @@ describe('importer', () => { it('should throw error when writing seq fails', async () => { seqQueries.get.resolves({ rows: [] }); - couchDb.changes.resolves({ results: [{ id: 3 }], last_seq: '2' }); - couchDb.allDocs.resolves({ rows: [{ id: 3, doc: { _id: 3, _rev: '3-fdsfs' }}] }); + couchDb.changes.resolves({ results: [{ id: 3, doc: { _id: 3, _rev: '3-fdsfs' } }], last_seq: '2' }); insertQuery.resolves(); seqQueries.update.rejects(new Error('done')); await expect(importer(couchDb)).to.eventually.be.rejectedWith('done'); expect(couchDb.changes.called).to.equal(true); - expect(couchDb.allDocs.called).to.equal(true); + expect(couchDb.allDocs.called).to.equal(false); expect(seqQueries.update.called).to.equal(true); expect(seqQueries.get.called).to.equal(true); expect(insertQuery.called).to.equal(true); @@ -531,15 +520,16 @@ describe('importer', () => { axios.get = sinon.stub().resolves(pendingResponse); seqQueries.get.resolves({ rows: [] }); - couchDb.changes.onCall(0).resolves({ results: [{ id: 'doc1' }], last_seq: '23-ppp' }); + couchDb.changes.onCall(0).resolves({ + results: [{ id: 'doc1', doc: { _id: 'doc1', _rev: '3-fdsfs' } }], last_seq: '23-ppp' + }); couchDb.changes.onCall(1).resolves({ results: [], last_seq: '25-vvv' }); - couchDb.allDocs.resolves({ rows: [{ id: 3, doc: { _id: 3, _rev: '3-fdsfs' }}] }); insertQuery.resolves(); await importer(couchDb); expect(couchDb.changes.called).to.equal(true); - expect(couchDb.allDocs.called).to.equal(true); + expect(couchDb.allDocs.called).to.equal(false); expect(seqQueries.update.called).to.equal(true); expect(seqQueries.get.called).to.equal(true); expect(insertQuery.called).to.equal(true); @@ -548,15 +538,13 @@ describe('importer', () => { it('should retry on deadlock', async () => { const now = new Date('2023-01-01'); clock.setSystemTime(now.valueOf()); - const changes = [{ id: 'doc1' }, { id: 'doc2' }, { id: 'doc3' }]; - const docs = [ + const changes = [ { id: 'doc1', doc: { _id: 'doc1', field: 'test1' } }, { id: 'doc2', doc: { _id: 'doc2', field: 'test2' } }, { id: 'doc3', doc: { _id: 'doc3', field: 'test3' } } ]; couchDb.changes.onCall(0).resolves({ results: changes, last_seq: '23-ppp' }); couchDb.changes.onCall(1).resolves({ results: [], last_seq: '25-vvv' }); - couchDb.allDocs.resolves({ rows: docs }); seqQueries.get .onCall(0).resolves({ rows: [{ seq: '1-22' }] }) @@ -570,8 +558,12 @@ describe('importer', () => { await importer(couchDb); expect(couchDb.changes.calledTwice).to.equal(true); - expect(couchDb.changes.args[0]).to.deep.equal([{ limit: 1000, seq_interval: 1000, since: '1-22' }]); - expect(couchDb.changes.args[1]).to.deep.equal([{ limit: 1000, seq_interval: 1000, since: '23-ppp' }]); + expect(couchDb.changes.args[0]).to.deep.equal([{ + limit: 1000, seq_interval: 1000, since: '1-22', batch_size: 1000, include_docs: true + }]); + expect(couchDb.changes.args[1]).to.deep.equal([{ + limit: 1000, seq_interval: 1000, since: '23-ppp', batch_size: 1000, include_docs: true + }]); expect(seqQueries.update.calledTwice).to.equal(true); expect(seqQueries.update.args).to.deep.equal([ @@ -579,8 +571,7 @@ describe('importer', () => { [updateSeqMatch(), ['25-vvv', 0, 'thehost/medic']], ]); - expect(couchDb.allDocs.calledOnce).to.equal(true); - expect(couchDb.allDocs.args).to.deep.equal([[{ include_docs: true, keys: ['doc1', 'doc2', 'doc3'] }]]); + expect(couchDb.allDocs.called).to.equal(false); const queryArgs = [ 'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, source, doc) VALUES ' + @@ -590,19 +581,19 @@ describe('importer', () => { 'doc1', false, 'thehost/medic', - JSON.stringify(docs[0].doc), + JSON.stringify(changes[0].doc), now.toISOString(), 'doc2', false, 'thehost/medic', - JSON.stringify(docs[1].doc), + JSON.stringify(changes[1].doc), now.toISOString(), 'doc3', false, 'thehost/medic', - JSON.stringify(docs[2].doc), + JSON.stringify(changes[2].doc), ] ];