From d502ed17f85b0cea87efb6f49749713ea59f12e8 Mon Sep 17 00:00:00 2001 From: Steve Biondi Date: Fri, 17 Oct 2025 16:00:38 -0700 Subject: [PATCH] MLE-24755 - fix intermittent errors: change to use modern await and Promise objects to make test logic comprehension easier. Remove the incorrect assertion that on the first and subsequent call to onBatchSuccess that the number of transformed documents is always greater than or equal to the batch size. This is not always the case, sometimes on the first callback the number of transformed docs in the summary is less than the batch size. --- test-complete/nodejs-dmsdk-UpdAndRdAll.js | 121 +++---- .../nodejs-dmsdk-queryToTransformAll.js | 335 +++++++++--------- 2 files changed, 235 insertions(+), 221 deletions(-) diff --git a/test-complete/nodejs-dmsdk-UpdAndRdAll.js b/test-complete/nodejs-dmsdk-UpdAndRdAll.js index ecb4677a..c7dcb077 100644 --- a/test-complete/nodejs-dmsdk-UpdAndRdAll.js +++ b/test-complete/nodejs-dmsdk-UpdAndRdAll.js @@ -2,19 +2,22 @@ * Copyright (c) 2015-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved. */ -var marklogic = require('../'); +const marklogic = require('../'); -var testconfig = require('../etc/test-config-qa.js'); +const testconfig = require('../etc/test-config-qa.js'); const stream = require('stream'); const { expect } = require('chai'); +const { pipeline } = require('stream/promises'); -var memStore = { }; +const memStore = { }; +const dbWriter = marklogic.createDatabaseClient(testconfig.dmsdkrestWriterConnection); +const inputJsonUris = []; +const inputContents = []; -var uriStream = new stream.Readable(); -var dbWriter = marklogic.createDatabaseClient(testconfig.dmsdkrestWriterConnection); -let inputJsonUris = []; -let inputContents = []; +let uriStream = new stream.Readable(); + +const TOTAL_DOCS = 1000; /* Based on example from @@ -42,9 +45,9 @@ class MLQASnapshotTransform extends stream.Transform { // Filter what we need and push. We will verify only 900.json piped from ReadAll if (chunk.uri === this.docId) { //Push transformed content onto the stream with changed key names such as Matched ID and Matched Name - var currId = chunk.content.id; - var currName = chunk.content.name; - var retStr = 'Matched ID:' + currId + ', Matched Name:' + currName; + let currId = chunk.content.id; + let currName = chunk.content.name; + let retStr = 'Matched ID:' + currId + ', Matched Name:' + currName; this.push(retStr); } return setImmediate(callback); @@ -68,20 +71,21 @@ class MLQAWritableStream extends stream.Writable { } _write(chunk, encoding, callback) { - var buffer = (Buffer.isBuffer(chunk)) ? + let buffer = (Buffer.isBuffer(chunk)) ? chunk : // already is Buffer use it - new Buffer(chunk, encoding); + Buffer.from(chunk, encoding); memStore[this.key] = Buffer.concat([memStore[this.key], buffer]); return setImmediate(callback); } } describe('Update doc and readAll with Snapshot', function () { - before(function (done) { - this.timeout(50000); - var jsonDocreadable = new stream.Readable({ objectMode: true }); - for (let i = 0; i < 1000; i++) { + before(async function () { + + const jsonDocreadable = new stream.Readable({ objectMode: true }); + + for (let i = 0; i < TOTAL_DOCS; i++) { const tempJson = { uri: '/data/dmsdk/Snap-update-then-readall/' + i + '.json', contentType: 'application/json', @@ -93,70 +97,67 @@ describe('Update doc and readAll with Snapshot', function () { inputContents.push(tempJson.content); } jsonDocreadable.push(null); - dbWriter.documents.writeAll(jsonDocreadable, { - onCompletion: ((summary) => { - setTimeout(() => { - var i = 0; i++; - }, 1000); - summary.docsWrittenSuccessfully.should.be.greaterThanOrEqual(1000); + + let summaryPromiseResolve; + + // The following pattern uses Promise.all to coordinate the completion of the writeAll operation and its onCompletion callback. + // The first promise initiates the writeAll process, while the second promise is resolved by the onCompletion callback with the summary object. + // This ensures that both the write operation and its completion summary are available before proceeding. + const [result, summary] = await Promise.all([ + dbWriter.documents.writeAll(jsonDocreadable, { + onCompletion: (summary) => { + summaryPromiseResolve(summary); + } + }), + new Promise(resolve => { + summaryPromiseResolve = resolve; }) - }); // End of pipe to writeAll - // Use uriStream as the input to readAll() + ]); + expect(summary.docsWrittenSuccessfully).to.be.greaterThanOrEqual(1000); + uriStream = new stream.PassThrough({ objectMode: true }); inputJsonUris.forEach(uri => uriStream.push(uri)); uriStream.push(null); - // wait for DB to finish writing - setTimeout(() => { - done(); - }, 10000); }); - after((function (done) { - this.timeout(10000); - - dbWriter.documents.remove(inputJsonUris) - .result(function (response) { - done(); - }) - .catch(err => done(err)) - .catch(done); - })); + after(async function () { + await dbWriter.documents.remove(inputJsonUris).result(); + }); // This test updates an existing doc and then performs readAll - it('update a doc and readAll with snapshot', function (done) { - this.timeout(30000); + it('update a doc and readAll with snapshot', async function () { + // Used in test that updates doc and then does readAll const UpdBeforeReadAllUriName = '/data/dmsdk/Snap-update-then-readall/900.json'; const filteredSnapshot = new MLQASnapshotTransform(UpdBeforeReadAllUriName, { objectMode: true }); - setTimeout(() => { - var i = 0; i++; - }, 3000); // Initiate a document change on doc id 900. - dbWriter.documents.write({ + const writeResponse = await dbWriter.documents.write({ uri: UpdBeforeReadAllUriName, collections: ['coll5', 'coll6'], contentType: 'application/json', quality: 250, properties: { prop1: 'bar', prop2: 1981 }, - content: { id: 88, name: 'David' } - }); - // Expected result + content: { id: 88, name: 'David' }, + }).result(); + + // Updated doc should be in db now. var exptdResult = 'Matched ID:88, Matched Name:David'; var mlqawstream = new MLQAWritableStream('before'); - // Have listeners before calling pipe. - setTimeout(() => { - var i = 0; i++; - }, 3000); - mlqawstream.on('finish', function () { - expect(memStore.before.toString()).to.equal(exptdResult); - }); - dbWriter.documents.readAll(uriStream, { - inputkind: 'Array', - consistentSnapshot: true, - batch: 50 - }).pipe(filteredSnapshot).pipe(mlqawstream);/* Add.pipe(process.stdout) to debug */ - done(); + + // Use pipeline with await to read and confirm, much cleaner and understandable. + await pipeline( + dbWriter.documents.readAll(uriStream, { + inputkind: 'Array', + consistentSnapshot: true, + batch: 50 + }), + filteredSnapshot, + mlqawstream + ); + + // confirm we wrote correct stream to memStore in mlqawstream + expect(memStore.before.toString()).to.equal(exptdResult); }); }); diff --git a/test-complete/nodejs-dmsdk-queryToTransformAll.js b/test-complete/nodejs-dmsdk-queryToTransformAll.js index 1ff17d77..a348f704 100644 --- a/test-complete/nodejs-dmsdk-queryToTransformAll.js +++ b/test-complete/nodejs-dmsdk-queryToTransformAll.js @@ -19,27 +19,23 @@ const q = marklogic.queryBuilder; const query = q.where(ctsQb.cts.directoryQuery('/test/dataMovement/requests/transformAll/')); describe('data movement transformAll - nodejs-dmsdk-queryToTransformAll', function () { - this.timeout(20000); - before(function (done) { - restAdminDB.config.transforms.write(transformName, 'javascript', fs.createReadStream(transformPath)) - .result(() => { - for (let i = 0; i < 100; i++) { - uris.push('/test/dataMovement/requests/transformAll/' + i + '.json'); - } - }) - .then(() => done()) - .catch(error => done(error)); + before(async function () { + await restAdminDB.config.transforms.write(transformName, 'javascript', + fs.createReadStream(transformPath)).result(); + for (let i = 0; i < 100; i++) { + uris.push('/test/dataMovement/requests/transformAll/' + i + '.json'); + } }); - beforeEach(function (done) { + beforeEach(async function () { let readable = new Stream.Readable({ objectMode: true }); transformStream = new Stream.PassThrough({ objectMode: true }); for (let i = 0; i < 100; i++) { const temp = { uri: '/test/dataMovement/requests/transformAll/' + i + '.json', contentType: 'application/json', - content: { ['key']: 'initialValue' } + content: { key: 'initialValue' } }; readable.push(temp); transformStream.push(temp.uri); @@ -47,244 +43,261 @@ describe('data movement transformAll - nodejs-dmsdk-queryToTransformAll', functi readable.push(null); transformStream.push(null); - dbWriter.documents.writeAll(readable, { - onCompletion: ((summary) => { - done(); - }) + return new Promise((resolve, reject) => { + dbWriter.documents.writeAll(readable, { + onCompletion: ((summary) => { + resolve(summary); + }), + onError: (error) => { + reject(error); + } + }); }); - }); - afterEach((function (done) { - dbWriter.documents.remove(uris) - .result(function (response) { - done(); - }) - .catch(err => done(err)) - .catch(done); - })); - - it('should queryToTransformAll documents with onCompletion, transform, concurrentRequests and transformStrategy as ignore', - function (done) { + afterEach( async function () { + await dbWriter.documents.remove(uris).result(); + }); + it('should queryToTransformAll documents with onCompletion, transform, concurrentRequests and transformStrategy as ignore', async function () { + const summary = await new Promise((resolve, reject) => { dbWriter.documents.queryToTransformAll(query, { transform: [transformName, { newValue: 'transformedValue' }], concurrentRequests: { multipleOf: 'hosts', multiplier: 4 }, transformStrategy: 'ignore', onCompletion: ((summary) => { - try { - summary.docsTransformedSuccessfully.should.be.equal(100); - summary.docsFailedToBeTransformed.should.be.equal(0); - summary.timeElapsed.should.be.greaterThanOrEqual(0); - verifyDocs('initialValue', done); - } catch (err) { - done(err); - } - }) + resolve(summary); + }), + onError: (error) => { + reject(error); + } }); }); - it('should transformAll documents with transformStrategy as ignore', function (done) { - - dbWriter.documents.queryToTransformAll(query, { - transform: [transformName, { newValue: 'transformedValue' }], - concurrentRequests: { multipleOf: 'hosts', multiplier: 4 }, - transformStrategy: 'ignore', - onCompletion: ((summary) => { - try { - summary.docsTransformedSuccessfully.should.be.equal(100); - summary.docsFailedToBeTransformed.should.be.equal(0); - summary.timeElapsed.should.be.greaterThanOrEqual(0); - verifyDocs('initialValue', done); - } catch (err) { - done(err); - } - }) - }); + summary.docsTransformedSuccessfully.should.be.equal(100); + summary.docsFailedToBeTransformed.should.be.equal(0); + summary.timeElapsed.should.be.greaterThanOrEqual(0); + + await verifyDocs('initialValue'); }); - it('should work with query and onCompletion function', function (done) { - dbWriter.documents.queryToTransformAll(query, { - transform: [transformName, { newValue: 'transformedValue' }], - onCompletion: ((summary) => { - try { - summary.docsTransformedSuccessfully.should.be.equal(100); - summary.docsFailedToBeTransformed.should.be.equal(0); - summary.timeElapsed.should.be.greaterThanOrEqual(0); - verifyDocs('transformedValue', done); - } catch (err) { - done(err); + it('should transformAll documents with transformStrategy as ignore', async function () { + const summary = await new Promise((resolve, reject) => { + dbWriter.documents.queryToTransformAll(query, { + transform: [transformName, { newValue: 'transformedValue' }], + transformStrategy: 'ignore', + onCompletion: ((summary) => { + resolve(summary); + }), + onError: (error) => { + reject(error); } - }) + }); }); + + summary.docsTransformedSuccessfully.should.be.equal(100); + summary.docsFailedToBeTransformed.should.be.equal(0); + summary.timeElapsed.should.be.greaterThanOrEqual(0); + + await verifyDocs('initialValue'); }); - it('should work with query and onCompletion function and batchSize', function (done) { - dbWriter.documents.queryToTransformAll(query, { - transform: [transformName, { newValue: 'transformedValue' }], - batchSize: 10, - onBatchSuccess: (function (progress, documents) { - try { - progress.docsTransformedSuccessfully.should.be.greaterThanOrEqual(10); - progress.docsFailedToBeTransformed.should.be.equal(0); - progress.timeElapsed.should.be.greaterThanOrEqual(0); - } catch (err) { - done(err); + it('should work with query and onCompletion function', async function () { + const summary = await new Promise((resolve, reject) => { + dbWriter.documents.queryToTransformAll(query, { + transform: [transformName, { newValue: 'transformedValue' }], + onCompletion: ((summary) => { + resolve(summary); + }), + onError: (error) => { + reject(error); } + }); + }); - }), - onCompletion: ((summary) => { - try { - summary.docsTransformedSuccessfully.should.be.equal(100); - summary.docsFailedToBeTransformed.should.be.equal(0); - summary.timeElapsed.should.be.greaterThanOrEqual(0); - verifyDocs('transformedValue', done); - } catch (err) { - done(err); + summary.docsTransformedSuccessfully.should.be.equal(100); + summary.docsFailedToBeTransformed.should.be.equal(0); + summary.timeElapsed.should.be.greaterThanOrEqual(0); + + await verifyDocs('transformedValue'); + }); + + it('should work with query and onCompletion function and batchSize', async function () { + const summary = await new Promise((resolve, reject) => { + dbWriter.documents.queryToTransformAll(query, { + transform: [transformName, { newValue: 'transformedValue' }], + batchSize: 10, + onBatchSuccess: ((progress) => { + try { + progress.docsFailedToBeTransformed.should.be.equal(0); + progress.timeElapsed.should.be.greaterThanOrEqual(0); + } catch (err) { + reject(err); + } + }), + onCompletion: ((summary) => { + resolve(summary); + }), + onError: (error) => { + reject(error); } }) }); + + summary.docsTransformedSuccessfully.should.be.equal(100); + summary.docsFailedToBeTransformed.should.be.equal(0); + summary.timeElapsed.should.be.greaterThanOrEqual(0); + + await verifyDocs('transformedValue'); }); - it('should transformAll documents with onCompletion, concurrentRequests and transform options', function (done) { - - dbWriter.documents.queryToTransformAll(query, { - transform: [transformName, { newValue: 'transformedValue' }], - concurrentRequests: { multipleOf: 'hosts', multiplier: 4 }, - onCompletion: ((summary) => { - try { - summary.docsTransformedSuccessfully.should.be.equal(100); - summary.docsFailedToBeTransformed.should.be.equal(0); - summary.timeElapsed.should.be.greaterThanOrEqual(0); - verifyDocs('transformedValue', done); - } catch (err) { - done(err); + it('should transformAll documents with onCompletion, concurrentRequests and transform options', async function () { + const summary = await new Promise((resolve, reject) => { + dbWriter.documents.queryToTransformAll(query, { + transform: [transformName, { newValue: 'transformedValue' }], + concurrentRequests: { multipleOf: 'hosts', multiplier: 4 }, + onCompletion: ((summary) => { + resolve(summary); + }), + onError: (error) => { + reject(error); } - }) + }); }); - }); + + summary.docsTransformedSuccessfully.should.be.equal(100); + summary.docsFailedToBeTransformed.should.be.equal(0); + summary.timeElapsed.should.be.greaterThanOrEqual(0); - it('should transformAll documents with inputKind as array', function (done) { + await verifyDocs('transformedValue'); + }); + it('should transformAll documents with inputKind as array', async function () { transformStream = new Stream.Readable({ objectMode: true }); for (let i = 0; i + 10 <= uris.length; i = i + 10) { transformStream.push(uris.slice(i, i + 10)); } transformStream.push(null); - dbWriter.documents.queryToTransformAll(query, { - transform: [transformName, { newValue: 'transformedValue' }], - inputKind: 'aRRaY', - onCompletion: ((summary) => { - try { - summary.docsTransformedSuccessfully.should.be.equal(100); - summary.docsFailedToBeTransformed.should.be.equal(0); - summary.timeElapsed.should.be.greaterThanOrEqual(0); - verifyDocs('transformedValue', done); - } catch (error) { - done(error); + const summary = await new Promise((resolve, reject) => { + dbWriter.documents.queryToTransformAll(query, { + transform: [transformName, { newValue: 'transformedValue' }], + inputKind: 'aRRaY', + onCompletion: ((summary) => { + resolve(summary); + }), + onError: (error) => { + reject(error); } - }) + }); }); - }); - it('should queryToTransformAll documents with onCompletion option', function (done) { + summary.docsTransformedSuccessfully.should.be.equal(100); + summary.docsFailedToBeTransformed.should.be.equal(0); + summary.timeElapsed.should.be.greaterThanOrEqual(0); + + await verifyDocs('transformedValue'); + }); - dbWriter.documents.queryToTransformAll(query, { - transform: [transformName, { newValue: 'transformedValue' }], - onCompletion: ((summary) => { - summary.docsTransformedSuccessfully.should.be.equal(100); - summary.docsFailedToBeTransformed.should.be.equal(0); - summary.timeElapsed.should.be.greaterThanOrEqual(0); - verifyDocs('transformedValue', done); - }) + it('should queryToTransformAll documents with onCompletion option', async function () { + const summary = await new Promise((resolve, reject) => { + dbWriter.documents.queryToTransformAll(query, { + transform: [transformName, { newValue: 'transformedValue' }], + onCompletion: ((summary) => { + resolve(summary); + }), + onError: (error) => { + reject(error); + } + }); }); + + summary.docsTransformedSuccessfully.should.be.equal(100); + summary.docsFailedToBeTransformed.should.be.equal(0); + summary.timeElapsed.should.be.greaterThanOrEqual(0); + + await verifyDocs('transformedValue'); }); - it('should work with batchSize less than 1', function (done) { - dbWriter.documents.queryToTransformAll(query, { - transform: [transformName, { newValue: 'transformedValue' }], - batchSize: 0, - onCompletion: ((summary) => { - try { - summary.docsTransformedSuccessfully.should.be.equal(100); - summary.docsFailedToBeTransformed.should.be.equal(0); - summary.timeElapsed.should.be.greaterThanOrEqual(0); - verifyDocs('transformedValue', done); - } catch (err) { - done(err); + it('should work with batchSize less than 1', async function () { + const summary = await new Promise((resolve, reject) => { + dbWriter.documents.queryToTransformAll(query, { + transform: [transformName, { newValue: 'transformedValue' }], + batchSize: 0, + onCompletion: ((summary) => { + resolve(summary); + }), + onError: (error) => { + reject(error); } - }) + }); }); + + summary.docsTransformedSuccessfully.should.be.equal(100); + summary.docsFailedToBeTransformed.should.be.equal(0); + summary.timeElapsed.should.be.greaterThanOrEqual(0); + + await verifyDocs('transformedValue'); }); - it('should throw error with no query', function (done) { + it('should throw error with no query', async function () { try { - dbWriter.documents.queryToTransformAll('invalid query', {}); + await dbWriter.documents.queryToTransformAll('invalid query', {}); } catch (err) { err.toString().should.equal('Error: Query needs to be a cts query.'); - done(); } }); - it('should throw error with null query', function (done) { + it('should throw error with null query', async function () { try { dbWriter.documents.queryToTransformAll(null, { transform: [transformName, { newValue: 'transformedValue' }], }); } catch (err) { err.toString().should.equal('Error: Query cannot be null or undefined.'); - done(); } }); - it('should throw error with onInitialTimestamp and wrong consistentSnapshot', function (done) { + it('should throw error with onInitialTimestamp and wrong consistentSnapshot', async function () { try { - dbWriter.documents.queryToTransformAll(query, { + await dbWriter.documents.queryToTransformAll(query, { transform: [transformName, { newValue: 'transformedValue' }], onInitialTimestamp: '1667222674', consistentSnapshot: false, }); } catch (err) { err.toString().should.equal('Error: consistentSnapshot needs to be true when onInitialTimestamp is provided.'); - done(); } }); - it('should throw error with consistentSnapshot another type', function (done) { + it('should throw error with consistentSnapshot another type', async function () { try { - dbWriter.documents.queryToTransformAll(query, { + await dbWriter.documents.queryToTransformAll(query, { transform: [transformName, { newValue: 'transformedValue' }], consistentSnapshot: 'true', }); } catch (err) { err.toString().should.equal('Error: consistentSnapshot needs to be a boolean or DatabaseClient.Timestamp object.'); - done(); } }); - it('should throw error with batchSize greater than 100000', function (done) { + it('should throw error with batchSize greater than 100000', async function () { try { - dbWriter.documents.queryToTransformAll(query, { + await dbWriter.documents.queryToTransformAll(query, { transform: [transformName, { newValue: 'transformedValue' }], batchSize: 1000000, }); } catch (err) { err.toString().should.equal('Error: batchSize cannot be greater than 100000'); - done(); } }); }); -function verifyDocs(value, done) { - dbWriter.documents.read(uris) - .result(function (documents) { - documents.length.should.equal(100); - for (let i = 0; i < documents.length; i++) { - documents[0].content.key.should.equal(value); - } - }) - .then(() => done()) - .catch(err => done(err)); +async function verifyDocs(value) { + const documents = await dbWriter.documents.read(uris).result(); + documents.length.should.equal(100); + for (let i = 0; i < documents.length; i++) { + documents[i].content.key.should.equal(value); + } } \ No newline at end of file