diff --git a/actions/transactWriteItems.js b/actions/transactWriteItems.js new file mode 100644 index 0000000..921205b --- /dev/null +++ b/actions/transactWriteItems.js @@ -0,0 +1,251 @@ +var async = require('async'), + db = require('../db') + +module.exports = function transactWriteItem(store, data, cb) { + var seenKeys = {} + var batchOpts = {} + var releaseLocks = [] + var indexUpdates = {} + + async.eachSeries(data.TransactItems, addActions, function (err) { + if (err) { + if (err.body && (/Missing the key/.test(err.body.message) || /Type mismatch for key/.test(err.body.message))) + err.body.message = 'The provided key element does not match the schema' + return cb(err) + } + + // this does NOT ensure atomicity across tables - but the items on each table are already locked + // and the actions have been validated. at this point the only thing that would fail would be the + // database itself, and that's a lot of work to get around so I'm just ¯\_(ツ)_/¯ not gonna do that + var operationsbyTable = Object.entries(batchOpts) + + async.each(operationsbyTable, function([tableName, ops], callback) { + var itemDb = store.getItemDb(tableName) + + store.getTable(tableName, function(err, table) { + indexUpdates[tableName].forEach(update => { + db.updateIndexes(store, table, update.existingItem, update.item, function(err) { + if (err) return callback(err) + }) + }) + itemDb.batch(ops, function(err, results) { + if (err) callback(err) + callback(results) + }) + }) + }, function(err) { + releaseLocks.forEach(release => release()()) + if (err) return cb(err) + + var res = {UnprocessedItems: {}}, tableUnits = {} + + if (~['TOTAL', 'INDEXES'].indexOf(data.ReturnConsumedCapacity)) { + operationsbyTable.forEach(([table, operations]) => { + tableUnits[table] = 0 + operations.forEach(op => { + let readCapacity = db.capacityUnits(op.value, true, true) + let writeCapacity = db.capacityUnits(op.value, false, true) + tableUnits[table] += readCapacity + writeCapacity + }) + }) + res.ConsumedCapacity = Object.keys(tableUnits).map(function (table) { + return { + CapacityUnits: tableUnits[table], + TableName: table, + Table: data.ReturnConsumedCapacity == 'INDEXES' ? {CapacityUnits: tableUnits[table]} : undefined, + } + }) + } + + cb(null, res) + }) + }) + + function addActions(transactItem, cb) { + var options = {} + var tableName + + if (data.ReturnConsumedCapacity) options.ReturnConsumedCapacity = data.ReturnConsumedCapacity + + if (transactItem.Put) { + tableName = transactItem.Put.TableName + + store.getTable(tableName, function (err, table) { + if (err) return cb(err) + if ((err = db.validateItem(transactItem.Put.Item, table)) != null) return cb(err) + + let value = transactItem.Put.Item + let key = db.createKey(transactItem.Put.Item, table) + if (seenKeys[key]) { + return cb(db.transactionCancelledException('Transaction cancelled, please refer cancellation reasons for specific reasons')) + } + + seenKeys[key] = true + + var itemDb = store.getItemDb(tableName) + + itemDb.get(key, function(err, oldItem) { + if (oldItem) { + itemDb.lock(key, function(release) { + releaseLocks.push(release) + }) + } + + if (err && err.name != 'NotFoundError') return cb(err) + + if ((err = db.checkConditional(transactItem.Put, oldItem)) != null) return cb(err) + + let operation = { + type: 'put', + key, + value + } + + let indexUpdate = { + existingItem: oldItem, + item: value + } + + if (batchOpts[tableName]) { + batchOpts[tableName].push(operation) + indexUpdates[tableName].push(indexUpdate) + } else { + batchOpts[tableName] = [operation] + indexUpdates[tableName] = [indexUpdate] + } + + return cb() + }) + }) + } else if (transactItem.Delete) { + tableName = transactItem.Delete.TableName + + store.getTable(tableName, function (err, table) { + if (err) return cb(err) + if ((err = db.validateKey(transactItem.Delete.Key, table) != null)) return cb(err) + + let key = db.createKey(transactItem.Delete.Key, table) + if (seenKeys[key]) { + return cb(db.transactionCancelledException('Transaction cancelled, please refer cancellation reasons for specific reasons')) + } + + seenKeys[key] = true + + var itemDb = store.getItemDb(tableName) + + itemDb.lock(key, function(release) { + releaseLocks.push(release) + itemDb.get(key, function(err, oldItem) { + if (err && err.name != 'NotFoundError') return cb(err) + + if ((err = db.checkConditional(transactItem.Delete, oldItem)) != null) return cb(err) + + let operation = { + type: 'del', + key + } + + let indexUpdate = { + existingItem: oldItem + } + + if (batchOpts[tableName]) { + batchOpts[tableName].push(operation) + indexUpdates[tableName].push(indexUpdate) + } else { + batchOpts[tableName] = [operation] + indexUpdates[tableName] = [indexUpdate] + } + return cb() + }) + }) + }) + } else if (transactItem.Update) { + tableName = transactItem.Update.TableName + + store.getTable(tableName, function (err, table) { + if (err) return cb(err) + + if ((err = db.validateUpdates(transactItem.Update.AttributeUpdates, transactItem.Update._updates, table)) != null) return cb(err) + + let key = db.createKey(transactItem.Update.Key, table) + if (seenKeys[key]) { + return cb(db.transactionCancelledException('Transaction cancelled, please refer cancellation reasons for specific reasons')) + } + + seenKeys[key] = true + + var itemDb = store.getItemDb(tableName) + + itemDb.lock(key, function(release) { + releaseLocks.push(release) + itemDb.get(key, function(err, oldItem) { + if (err && err.name != 'NotFoundError') return cb(err) + + if ((err = db.checkConditional(transactItem.Update, oldItem)) != null) return cb(err) + + var item = transactItem.Update.Key + + if (oldItem) { + item = db.deepClone(oldItem) + } + + err = transactItem.Update._updates ? db.applyUpdateExpression(transactItem.Update._updates.sections, table, item) : + db.applyAttributeUpdates(transactItem.Update.AttributeUpdates, table, item) + if (err) return cb(err) + + if (db.itemSize(item) > store.options.maxItemSize) + return cb(db.validationError('Item size to update has exceeded the maximum allowed size')) + + let operation = { + type: 'put', + key, + value: item + } + + let indexUpdate = { + existingItem: oldItem, + item: item + } + + if (batchOpts[tableName]) { + batchOpts[tableName].push(operation) + indexUpdates[tableName].push(indexUpdate) + } else { + batchOpts[tableName] = [operation] + indexUpdates[tableName] = [indexUpdate] + } + + return cb() + }) + }) + }) + } else if (transactItem.ConditionCheck) { + tableName = transactItem.ConditionCheck.TableName + + store.getTable(tableName, function (err, table) { + if (err) return cb(err) + + let key = db.createKey(transactItem.ConditionCheck.Key, table) + if (seenKeys[key]) { + return cb(db.transactionCancelledException('Transaction cancelled, please refer cancellation reasons for specific reasons')) + } + + seenKeys[key] = true + + var itemDb = store.getItemDb(tableName) + + itemDb.lock(key, function(release) { + releaseLocks.push(release) + itemDb.get(key, function(err, oldItem) { + if (err && err.name != 'NotFoundError') return cb(err) + + if ((err = db.checkConditional(transactItem.ConditionCheck, oldItem)) != null) return cb(err) + + return cb() + }) + }) + }) + } + } +} diff --git a/actions/updateItem.js b/actions/updateItem.js index 72f7f7c..1e91501 100644 --- a/actions/updateItem.js +++ b/actions/updateItem.js @@ -1,5 +1,4 @@ -var Big = require('big.js'), - db = require('../db') +var db = require('../db') module.exports = function updateItem(store, data, cb) { @@ -24,7 +23,7 @@ module.exports = function updateItem(store, data, cb) { paths = data._updates ? data._updates.paths : Object.keys(data.AttributeUpdates || {}) if (oldItem) { - item = deepClone(oldItem) + item = db.deepClone(oldItem) if (data.ReturnValues == 'ALL_OLD') { returnObj.Attributes = oldItem } else if (data.ReturnValues == 'UPDATED_OLD') { @@ -32,8 +31,8 @@ module.exports = function updateItem(store, data, cb) { } } - err = data._updates ? applyUpdateExpression(data._updates.sections, table, item) : - applyAttributeUpdates(data.AttributeUpdates, table, item) + err = data._updates ? db.applyUpdateExpression(data._updates.sections, table, item) : + db.applyAttributeUpdates(data.AttributeUpdates, table, item) if (err) return cb(err) if (db.itemSize(item) > store.options.maxItemSize) @@ -59,179 +58,3 @@ module.exports = function updateItem(store, data, cb) { }) }) } - -// Relatively fast deep clone of simple objects/arrays -function deepClone(obj) { - if (typeof obj != 'object' || obj == null) return obj - var result - if (Array.isArray(obj)) { - result = new Array(obj.length) - for (var i = 0; i < obj.length; i++) { - result[i] = deepClone(obj[i]) - } - } else { - result = Object.create(null) - for (var attr in obj) { - result[attr] = deepClone(obj[attr]) - } - } - return result -} - -function applyAttributeUpdates(updates, table, item) { - for (var attr in updates) { - var update = updates[attr] - if (update.Action == 'PUT' || update.Action == null) { - item[attr] = update.Value - } else if (update.Action == 'ADD') { - if (update.Value.N) { - if (item[attr] && !item[attr].N) - return db.validationError('Type mismatch for attribute to update') - if (!item[attr]) item[attr] = {N: '0'} - item[attr].N = new Big(item[attr].N).plus(update.Value.N).toFixed() - } else { - var type = Object.keys(update.Value)[0] - if (item[attr] && !item[attr][type]) - return db.validationError('Type mismatch for attribute to update') - if (!item[attr]) item[attr] = {} - if (!item[attr][type]) item[attr][type] = [] - var val = type == 'L' ? update.Value[type] : update.Value[type].filter(function(a) { // eslint-disable-line no-loop-func - return !~item[attr][type].indexOf(a) - }) - item[attr][type] = item[attr][type].concat(val) - } - } else if (update.Action == 'DELETE') { - if (update.Value) { - type = Object.keys(update.Value)[0] - if (item[attr] && !item[attr][type]) - return db.validationError('Type mismatch for attribute to update') - if (item[attr] && item[attr][type]) { - item[attr][type] = item[attr][type].filter(function(val) { // eslint-disable-line no-loop-func - return !~update.Value[type].indexOf(val) - }) - if (!item[attr][type].length) delete item[attr] - } - } else { - delete item[attr] - } - } - } -} - -function applyUpdateExpression(sections, table, item) { - var toSquash = [] - for (var i = 0; i < sections.length; i++) { - var section = sections[i] - if (section.type == 'set') { - section.val = resolveValue(section.val, item) - if (typeof section.val == 'string') { - return db.validationError(section.val) - } - } - } - for (i = 0; i < sections.length; i++) { - section = sections[i] - var parent = db.mapPath(section.path.slice(0, -1), item) - var attr = section.path[section.path.length - 1] - if (parent == null || (typeof attr == 'number' ? parent.L : parent.M) == null) { - return db.validationError('The document path provided in the update expression is invalid for update') - } - var existing = parent.M ? parent.M[attr] : parent.L[attr] - var alreadyExists = existing != null - if (section.type == 'remove') { - deleteFromParent(parent, attr) - } else if (section.type == 'delete') { - if (alreadyExists && Object.keys(existing)[0] != section.attrType) { - return db.validationError('An operand in the update expression has an incorrect data type') - } - if (alreadyExists) { - existing[section.attrType] = existing[section.attrType].filter(function(val) { // eslint-disable-line no-loop-func - return !~section.val[section.attrType].indexOf(val) - }) - if (!existing[section.attrType].length) { - deleteFromParent(parent, attr) - } - } - } else if (section.type == 'add') { - if (alreadyExists && Object.keys(existing)[0] != section.attrType) { - return db.validationError('An operand in the update expression has an incorrect data type') - } - if (section.attrType == 'N') { - if (!existing) existing = {N: '0'} - existing.N = new Big(existing.N).plus(section.val.N).toFixed() - } else { - if (!existing) existing = {} - if (!existing[section.attrType]) existing[section.attrType] = [] - existing[section.attrType] = existing[section.attrType].concat(section.val[section.attrType].filter(function(a) { // eslint-disable-line no-loop-func - return !~existing[section.attrType].indexOf(a) - })) - } - if (!alreadyExists) { - addToParent(parent, attr, existing, toSquash) - } - } else if (section.type == 'set') { - if (section.path.length == 1) { - var err = db.traverseIndexes(table, function(attr, type) { - if (section.path[0] == attr && section.val[type] == null) { - return db.validationError('The update expression attempted to update the secondary index key to unsupported type') - } - }) - if (err) return err - } - addToParent(parent, attr, section.val, toSquash) - } - } - toSquash.forEach(function(obj) { obj.L = obj.L.filter(Boolean) }) -} - -function resolveValue(val, item) { - if (Array.isArray(val)) { - val = db.mapPath(val, item) - } else if (val.type == 'add' || val.type == 'subtract') { - var val1 = resolveValue(val.args[0], item) - if (typeof val1 == 'string') return val1 - if (val1.N == null) { - return 'An operand in the update expression has an incorrect data type' - } - var val2 = resolveValue(val.args[1], item) - if (typeof val2 == 'string') return val2 - if (val2.N == null) { - return 'An operand in the update expression has an incorrect data type' - } - val = {N: new Big(val1.N)[val.type == 'add' ? 'plus' : 'minus'](val2.N).toFixed()} - } else if (val.type == 'function' && val.name == 'if_not_exists') { - val = db.mapPath(val.args[0], item) || resolveValue(val.args[1], item) - } else if (val.type == 'function' && val.name == 'list_append') { - val1 = resolveValue(val.args[0], item) - if (typeof val1 == 'string') return val1 - if (val1.L == null) { - return 'An operand in the update expression has an incorrect data type' - } - val2 = resolveValue(val.args[1], item) - if (typeof val2 == 'string') return val2 - if (val2.L == null) { - return 'An operand in the update expression has an incorrect data type' - } - return {L: val1.L.concat(val2.L)} - } - return val || 'The provided expression refers to an attribute that does not exist in the item' -} - -function deleteFromParent(parent, attr) { - if (parent.M) { - delete parent.M[attr] - } else if (parent.L) { - parent.L.splice(attr, 1) - } -} - -function addToParent(parent, attr, val, toSquash) { - if (parent.M) { - parent.M[attr] = val - } else if (parent.L) { - if (attr > parent.L.length && !~toSquash.indexOf(parent)) { - toSquash.push(parent) - } - parent.L[attr] = val - } -} diff --git a/db/index.js b/db/index.js index 03f29d4..9765f1b 100644 --- a/db/index.js +++ b/db/index.js @@ -25,6 +25,7 @@ exports.toRangeStr = toRangeStr exports.toLexiStr = toLexiStr exports.hashPrefix = hashPrefix exports.validationError = validationError +exports.transactionCancelledException = transactionCancelledException exports.limitError = limitError exports.checkConditional = checkConditional exports.itemSize = itemSize @@ -38,6 +39,12 @@ exports.mapPath = mapPath exports.queryTable = queryTable exports.updateIndexes = updateIndexes exports.getIndexActions = getIndexActions +exports.deepClone = deepClone +exports.applyAttributeUpdates = applyAttributeUpdates +exports.applyUpdateExpression = applyUpdateExpression +exports.resolveValue = resolveValue +exports.deleteFromParent = deleteFromParent +exports.addToParent = addToParent function create(options) { options = options || {} @@ -478,6 +485,16 @@ function limitError(msg) { return err } +function transactionCancelledException(msg) { + var err = new Error(msg) + err.statusCode = 400 + err.body = { + __type: 'com.amazonaws.dynamodb.v20120810#TransactionCanceledException', + message: msg + } + return err +} + function itemSize(item, compress, addMetaSize, rangeKey) { // Size of compressed item (for checking query/scan limit) seems complicated, // probably due to some internal serialization format. @@ -971,3 +988,179 @@ function getIndexActions(indexes, existingItem, item, table) { }) return {puts: puts, deletes: deletes} } + +// Relatively fast deep clone of simple objects/arrays +function deepClone(obj) { + if (typeof obj != 'object' || obj == null) return obj + var result + if (Array.isArray(obj)) { + result = new Array(obj.length) + for (var i = 0; i < obj.length; i++) { + result[i] = deepClone(obj[i]) + } + } else { + result = Object.create(null) + for (var attr in obj) { + result[attr] = deepClone(obj[attr]) + } + } + return result +} + +function applyAttributeUpdates(updates, table, item) { + for (var attr in updates) { + var update = updates[attr] + if (update.Action == 'PUT' || update.Action == null) { + item[attr] = update.Value + } else if (update.Action == 'ADD') { + if (update.Value.N) { + if (item[attr] && !item[attr].N) + return validationError('Type mismatch for attribute to update') + if (!item[attr]) item[attr] = {N: '0'} + item[attr].N = new Big(item[attr].N).plus(update.Value.N).toFixed() + } else { + var type = Object.keys(update.Value)[0] + if (item[attr] && !item[attr][type]) + return validationError('Type mismatch for attribute to update') + if (!item[attr]) item[attr] = {} + if (!item[attr][type]) item[attr][type] = [] + var val = type == 'L' ? update.Value[type] : update.Value[type].filter(function(a) { // eslint-disable-line no-loop-func + return !~item[attr][type].indexOf(a) + }) + item[attr][type] = item[attr][type].concat(val) + } + } else if (update.Action == 'DELETE') { + if (update.Value) { + type = Object.keys(update.Value)[0] + if (item[attr] && !item[attr][type]) + return validationError('Type mismatch for attribute to update') + if (item[attr] && item[attr][type]) { + item[attr][type] = item[attr][type].filter(function(val) { // eslint-disable-line no-loop-func + return !~update.Value[type].indexOf(val) + }) + if (!item[attr][type].length) delete item[attr] + } + } else { + delete item[attr] + } + } + } +} + +function applyUpdateExpression(sections, table, item) { + var toSquash = [] + for (var i = 0; i < sections.length; i++) { + var section = sections[i] + if (section.type == 'set') { + section.val = resolveValue(section.val, item) + if (typeof section.val == 'string') { + return validationError(section.val) + } + } + } + for (i = 0; i < sections.length; i++) { + section = sections[i] + var parent = mapPath(section.path.slice(0, -1), item) + var attr = section.path[section.path.length - 1] + if (parent == null || (typeof attr == 'number' ? parent.L : parent.M) == null) { + return validationError('The document path provided in the update expression is invalid for update') + } + var existing = parent.M ? parent.M[attr] : parent.L[attr] + var alreadyExists = existing != null + if (section.type == 'remove') { + deleteFromParent(parent, attr) + } else if (section.type == 'delete') { + if (alreadyExists && Object.keys(existing)[0] != section.attrType) { + return validationError('An operand in the update expression has an incorrect data type') + } + if (alreadyExists) { + existing[section.attrType] = existing[section.attrType].filter(function(val) { // eslint-disable-line no-loop-func + return !~section.val[section.attrType].indexOf(val) + }) + if (!existing[section.attrType].length) { + deleteFromParent(parent, attr) + } + } + } else if (section.type == 'add') { + if (alreadyExists && Object.keys(existing)[0] != section.attrType) { + return validationError('An operand in the update expression has an incorrect data type') + } + if (section.attrType == 'N') { + if (!existing) existing = {N: '0'} + existing.N = new Big(existing.N).plus(section.val.N).toFixed() + } else { + if (!existing) existing = {} + if (!existing[section.attrType]) existing[section.attrType] = [] + existing[section.attrType] = existing[section.attrType].concat(section.val[section.attrType].filter(function(a) { // eslint-disable-line no-loop-func + return !~existing[section.attrType].indexOf(a) + })) + } + if (!alreadyExists) { + addToParent(parent, attr, existing, toSquash) + } + } else if (section.type == 'set') { + if (section.path.length == 1) { + var err = traverseIndexes(table, function(attr, type) { + if (section.path[0] == attr && section.val[type] == null) { + return validationError('The update expression attempted to update the secondary index key to unsupported type') + } + }) + if (err) return err + } + addToParent(parent, attr, section.val, toSquash) + } + } + toSquash.forEach(function(obj) { obj.L = obj.L.filter(Boolean) }) +} + +function resolveValue(val, item) { + if (Array.isArray(val)) { + val = mapPath(val, item) + } else if (val.type == 'add' || val.type == 'subtract') { + var val1 = resolveValue(val.args[0], item) + if (typeof val1 == 'string') return val1 + if (val1.N == null) { + return 'An operand in the update expression has an incorrect data type' + } + var val2 = resolveValue(val.args[1], item) + if (typeof val2 == 'string') return val2 + if (val2.N == null) { + return 'An operand in the update expression has an incorrect data type' + } + val = {N: new Big(val1.N)[val.type == 'add' ? 'plus' : 'minus'](val2.N).toFixed()} + } else if (val.type == 'function' && val.name == 'if_not_exists') { + val = mapPath(val.args[0], item) || resolveValue(val.args[1], item) + } else if (val.type == 'function' && val.name == 'list_append') { + val1 = resolveValue(val.args[0], item) + if (typeof val1 == 'string') return val1 + if (val1.L == null) { + return 'An operand in the update expression has an incorrect data type' + } + val2 = resolveValue(val.args[1], item) + if (typeof val2 == 'string') return val2 + if (val2.L == null) { + return 'An operand in the update expression has an incorrect data type' + } + return {L: val1.L.concat(val2.L)} + } + return val || 'The provided expression refers to an attribute that does not exist in the item' +} + +function deleteFromParent(parent, attr) { + if (parent.M) { + delete parent.M[attr] + } else if (parent.L) { + parent.L.splice(attr, 1) + } +} + +function addToParent(parent, attr, val, toSquash) { + if (parent.M) { + parent.M[attr] = val + } else if (parent.L) { + if (attr > parent.L.length && !~toSquash.indexOf(parent)) { + toSquash.push(parent) + } + parent.L[attr] = val + } +} diff --git a/index.js b/index.js index 1c169ca..6a7385d 100644 --- a/index.js +++ b/index.js @@ -13,7 +13,7 @@ var MAX_REQUEST_BYTES = 16 * 1024 * 1024 var validApis = ['DynamoDB_20111205', 'DynamoDB_20120810'], validOperations = ['BatchGetItem', 'BatchWriteItem', 'CreateTable', 'DeleteItem', 'DeleteTable', 'DescribeTable', 'DescribeTimeToLive', 'GetItem', 'ListTables', 'PutItem', 'Query', 'Scan', 'TagResource', - 'UntagResource', 'ListTagsOfResource', 'UpdateItem', 'UpdateTable'], + 'UntagResource', 'ListTagsOfResource', 'UpdateItem', 'UpdateTable', 'TransactWriteItems'], actions = {}, actionValidations = {} diff --git a/test/helpers.js b/test/helpers.js index 9e10b89..b6ab43e 100644 --- a/test/helpers.js +++ b/test/helpers.js @@ -25,6 +25,7 @@ exports.batchBulkPut = batchBulkPut exports.assertSerialization = assertSerialization exports.assertType = assertType exports.assertValidation = assertValidation +exports.assertTransactionCanceled = assertTransactionCanceled exports.assertNotFound = assertNotFound exports.assertInUse = assertInUse exports.assertConditional = assertConditional @@ -578,6 +579,23 @@ function assertValidation(target, data, msg, done) { }) } +function assertTransactionCanceled(target, data, msg, done) { + request(opts(target, data), function(err, res) { + if (err) return done(err) + if (typeof res.body !== 'object') { + return done(new Error('Not JSON: ' + res.body)) + } + res.body.__type.should.equal('com.amazonaws.dynamodb.v20120810#TransactionCanceledException') + if (msg instanceof RegExp) { + res.body.message.should.match(msg) + } else { + res.body.message.should.equal(msg) + } + res.statusCode.should.equal(400) + done() + }) +} + function assertNotFound(target, data, msg, done) { request(opts(target, data), function(err, res) { if (err) return done(err) diff --git a/test/transactWriteItem.js b/test/transactWriteItem.js new file mode 100644 index 0000000..be4efe3 --- /dev/null +++ b/test/transactWriteItem.js @@ -0,0 +1,1283 @@ +var async = require('async'), + helpers = require('./helpers'), + db = require('../db') + +var target = 'TransactWriteItems', + request = helpers.request, + randomName = helpers.randomName, + opts = helpers.opts.bind(null, target), + assertType = helpers.assertType.bind(null, target), + assertValidation = helpers.assertValidation.bind(null, target), + assertTransactionCanceled = helpers.assertTransactionCanceled.bind(null, target) + assertNotFound = helpers.assertNotFound.bind(null, target) + +describe('transactWriteItem', function() { + + describe('serializations', function() { + + it('should return SerializationException when TransactItems is not a list', function(done) { + assertType('TransactItems', 'List', done) + }) + + it('should return SerializationException when TransactItems.0.Delete.Key is not a map', function(done) { + assertType('TransactItems.0.Delete.Key', 'Map', done) + }) + + it('should return SerializationException when TransactItems.0.Delete.Key.Attr is not an attr struct', function(done) { + this.timeout(60000) + assertType('TransactItems.0.Delete.Key.Attr', 'AttrStruct', done) + }) + + it('should return SerializationException when TransactItems.0.Put is not a struct', function(done) { + assertType('TransactItems.0.Put', 'FieldStruct', done) + }) + + it('should return SerializationException when TransactItems.0.Put.Item is not a map', function(done) { + assertType('TransactItems.0.Put.Item', 'Map', done) + }) + + it('should return SerializationException when TransactItems.0.Put.Item.Attr is not an attr struct', function(done) { + this.timeout(60000) + assertType('TransactItems.0.Put.Item.Attr', 'AttrStruct', done) + }) + + it('should return SerializationException when TransactItems.0.Update is not a struct', function(done) { + assertType('TransactItems.0.Update', 'FieldStruct', done) + }) + + it('should return SerializationException when TransactItems.0.Update.UpdateExpression is not a string', function(done) { + assertType('TransactItems.0.Update.UpdateExpression', 'String', done) + }) + + it('should return SerializationException when ReturnConsumedCapacity is not a string', function(done) { + assertType('ReturnConsumedCapacity', 'String', done) + }) + + it('should return SerializationException when ReturnItemCollectionMetrics is not a string', function(done) { + assertType('ReturnItemCollectionMetrics', 'String', done) + }) + }) + + describe('validations', function() { + + it('should return ValidationException for empty body', function (done) { + assertValidation({}, + '1 validation error detected: ' + + 'Value null at \'transactItems\' failed to satisfy constraint: ' + + 'Member must not be null', done) + }) + + it('should return ValidationException for missing TransactItems', function (done) { + assertValidation({ReturnConsumedCapacity: 'hi', ReturnItemCollectionMetrics: 'hi'}, [ + 'Value \'hi\' at \'returnConsumedCapacity\' failed to satisfy constraint: ' + + 'Member must satisfy enum value set: [INDEXES, TOTAL, NONE]', + 'Value \'hi\' at \'returnItemCollectionMetrics\' failed to satisfy constraint: ' + + 'Member must satisfy enum value set: [SIZE, NONE]', + 'Value null at \'transactItems\' failed to satisfy constraint: ' + + 'Member must not be null', + ], done) + }) + + it('should return ValidationException for empty TransactItems', function (done) { + assertValidation({TransactItems: []}, + '1 validation error detected: ' + + 'Value \'[]\' at \'transactItems\' failed to satisfy constraint: ' + + 'Member must have length greater than or equal to 1', done) + }) + + it('should return ValidationException for invalid update request in TransactItems', function (done) { + assertValidation({TransactItems: [{Update: {}}]}, + '1 validation error detected: ' + + 'Value null at \'transactItems.1.member.update.key\' failed to satisfy constraint: ' + + 'Member must not be null', done) + }) + + + it('should return ValidationException for invalid put request in TransactItems', function (done) { + assertValidation({TransactItems: [{Put: {}}]}, + '1 validation error detected: ' + + 'Value null at \'transactItems.1.member.put.item\' failed to satisfy constraint: ' + + 'Member must not be null', done) + }) + + + it('should return ValidationException for invalid delete request in TransactItems', function (done) { + assertValidation({TransactItems: [{Delete: {}}]}, + '1 validation error detected: ' + + 'Value null at \'transactItems.1.member.delete.key\' failed to satisfy constraint: ' + + 'Member must not be null', done) + }) + + it('should return ValidationException for invalid ConditionCheck request in TransactItems', function (done) { + assertValidation({TransactItems: [{ConditionCheck: {}}]}, + '1 validation error detected: ' + + 'Value null at \'transactItems.1.member.conditionCheck.key\' failed to satisfy constraint: ' + + 'Member must not be null', done) + }) + + it('should return ValidationException for invalid metadata and missing requests', function (done) { + assertValidation({TransactItems: [], ReturnConsumedCapacity: 'hi', ReturnItemCollectionMetrics: 'hi'}, [ + 'Value \'hi\' at \'returnConsumedCapacity\' failed to satisfy constraint: ' + + 'Member must satisfy enum value set: [INDEXES, TOTAL, NONE]', + 'Value \'hi\' at \'returnItemCollectionMetrics\' failed to satisfy constraint: ' + + 'Member must satisfy enum value set: [SIZE, NONE]', + 'Value \'[]\' at \'transactItems\' failed to satisfy constraint: ' + + 'Member must have length greater than or equal to 1', + ], done) + }) + + it('should return ValidationException for incorrect attributes', function (done) { + assertValidation({ + TransactItems: [{Put: {}, Delete: {}}], + ReturnConsumedCapacity: 'hi', ReturnItemCollectionMetrics: 'hi' + }, [ + 'Value \'hi\' at \'returnConsumedCapacity\' failed to satisfy constraint: ' + + 'Member must satisfy enum value set: [INDEXES, TOTAL, NONE]', + 'Value \'hi\' at \'returnItemCollectionMetrics\' failed to satisfy constraint: ' + + 'Member must satisfy enum value set: [SIZE, NONE]', + 'Value null at \'transactItems.1.member.delete.key\' failed to satisfy constraint: ' + + 'Member must not be null', + 'Value null at \'transactItems.1.member.put.item\' failed to satisfy constraint: ' + + 'Member must not be null', + ], done) + }) + + it('should return ValidationException when writing more than 25 items', function (done) { + var requests = [], i + for (i = 0; i < 26; i++) { + requests.push(i % 2 ? {Delete: {Key: {a: {S: String(i)}}}} : {Put: {Item: {a: {S: String(i)}}}}) + } + assertValidation({TransactItems: requests}, + [new RegExp('Member must have length less than or equal to 25')], done) + }) + + it('should return ResourceNotFoundException when fetching exactly 25 items and table does not exist', function (done) { + var requests = [], i + for (i = 0; i < 25; i++) { + requests.push(i % 2 ? {Delete: {TableName: 'a', Key: {a: {S: String(i)}}}} : { + Put: { + TableName: 'a', + Item: {a: {S: String(i)}} + } + }) + } + assertNotFound({TransactItems: requests}, + 'Requested resource not found', done) + }) + + it('should check table exists first before checking for duplicate keys', function (done) { + assertNotFound({ + TransactItems: [{Delete: {TableName: 'a', Key: {a: {S: '1'}}}}, { + Put: { + TableName: 'a', + Item: {a: {S: '1'}} + } + }] + }, + 'Requested resource not found', done) + }) + + it('should return TransactionCanceledException for puts and deletes of the same item with delete first', function (done) { + var transaction = { + TransactItems: [{ + Delete: { + TableName: helpers.testHashTable, + Key: {a: {S: 'aaaaa'}} + } + }, {Put: {TableName: helpers.testHashTable, Item: {a: {S: 'aaaaa'}}}}] + } + assertTransactionCanceled(transaction, 'Transaction cancelled, please refer cancellation reasons for specific reasons', done) + }) + + it('should return TransactionCanceledException for puts and deletes of the same item with put first', function (done) { + var transaction = { + TransactItems: [{ + Put: { + TableName: helpers.testHashTable, + Item: {a: {S: 'aaaaa'}} + } + }, {Delete: {TableName: helpers.testHashTable, Key: {a: {S: 'aaaaa'}}}}] + } + assertTransactionCanceled(transaction, 'Transaction cancelled, please refer cancellation reasons for specific reasons', done) + }) + + it('should return TransactionCanceledException for puts and updates of the same item with put first', function (done) { + var transaction = { + TransactItems: [{ + Put: { + TableName: helpers.testHashTable, + Item: {a: {S: 'aaaaa'}} + } + }, { + Update: { + TableName: helpers.testHashTable, + Key: {a: {S: 'aaaaa'}}, + UpdateExpression: 'SET b = :b', + ExpressionAttributeValues: { + ':b': { + S: 'b' + } + } + } + }] + } + assertTransactionCanceled(transaction, 'Transaction cancelled, please refer cancellation reasons for specific reasons', done) + }) + + it('should return TransactionCanceledException for puts and condition checks of the same item with put first', function (done) { + var transaction = { + TransactItems: [{ + Put: { + TableName: helpers.testHashTable, + Item: {a: {S: 'aaaaa'}} + } + }, { + ConditionCheck: { + TableName: helpers.testHashTable, + Key: {a: {S: 'aaaaa'}}, + ConditionExpression: 'attribute_exists(a)' + } + }] + } + assertTransactionCanceled(transaction, 'Transaction cancelled, please refer cancellation reasons for specific reasons', done) + }) + + it('should return ValidationException for item too large', function(done) { + var key = {a: {S: helpers.randomString()}} + var expressionAttributeValues = { + ':b': {S: new Array(helpers.MAX_SIZE).join('a')}, + ':c': {N: new Array(38 + 1).join('1') + new Array(89).join('0')}, + } + assertValidation({TransactItems: [ + {Update: + {TableName: helpers.testHashTable, + Key: key, + UpdateExpression: 'SET b = :b, c = :c', + ExpressionAttributeValues: expressionAttributeValues + }}]}, + 'Item size to update has exceeded the maximum allowed size', + done) + }) + + it('should return ValidationException for key type mismatch in Put Item', function (done) { + async.forEach([ + {NULL: true}, + {M: {a: {S: ''}}}, + {L: [{M: {a: {S: ''}}}]} + ], function (expr, cb) { + assertValidation({TransactItems: [{Put: {TableName: helpers.testHashTable, Item: {a: expr}}}]}, + 'The provided key element does not match the schema', cb) + }, done) + }) + + it('should return ValidationException for single invalid action', function (done) { + var transaction = { + TransactItems: [{ + NotARealAction: { + TableName: helpers.testHashTable, + Item: {a: {S: 'aaaaa'}} + } + }] + } + assertValidation(transaction, 'The action or operation requested is invalid. Verify that the action is typed correctly.', done) + }) + + it('should return ValidationException for one invalid action and one valid action', function (done) { + var transaction = { + TransactItems: [{ + NotARealAction: { + TableName: helpers.testHashTable, + Item: {a: {S: 'aaaaa'}} + } + }, + { + Put: { + TableName: helpers.testHashTable, + Item: {a: {S: 'aaaaa'}} + } + }] + } + assertValidation(transaction, 'The action or operation requested is invalid. Verify that the action is typed correctly.', done) + }) + + + it('should return ValidationException for multiple invalid actions', function (done) { + var transaction = { + TransactItems: [{ + NotARealAction: { + TableName: helpers.testHashTable, + Item: {a: {S: 'aaaaa'}} + } + }, + { + AnotherFakeAction: { + TableName: helpers.testHashTable, + Item: {a: {S: 'aaaaa'}} + } + }] + } + assertValidation(transaction, 'The action or operation requested is invalid. Verify that the action is typed correctly.', done) + }) + + describe('functionality', function() { + it('should write a single item', function(done) { + var item = { + a: {S: helpers.randomString()}, + c: {S: 'c'}}, + batchReq = {TransactItems: []} + batchReq.TransactItems = [{Put: {TableName: helpers.testHashTable, Item: item}}] + request(opts(batchReq), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.should.eql({UnprocessedItems: {}}) + request(helpers.opts('GetItem', {TableName: helpers.testHashTable, Key: {a: item.a}, ConsistentRead: true}), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.should.eql({Item: item}) + done() + }) + }) + }) + + it('should put multiple items', function(done) { + var item = { + a: {S: helpers.randomString()}, + c: {S: 'c'}}, + item2 = { + a: {S: helpers.randomString()}, + c: {S: 'c'}}, + transactReq = {TransactItems: []} + transactReq.TransactItems = [{Put: {TableName: helpers.testHashTable, Item: item}}, {Put: {TableName: helpers.testHashTable, Item: item2}}] + request(opts(transactReq), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.should.eql({UnprocessedItems: {}}) + request(helpers.opts('GetItem', {TableName: helpers.testHashTable, Key: {a: item.a}, ConsistentRead: true}), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.should.eql({Item: item}) + request(helpers.opts('GetItem', {TableName: helpers.testHashTable, Key: {a: item2.a}, ConsistentRead: true}), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.should.eql({Item: item2}) + done() + }) + }) + }) + }) + + it('should update multiple items', function(done) { + var item = { + a: {S: helpers.randomString()}, + c: {S: 'c'}}, + item2 = { + a: {S: helpers.randomString()}, + c: {S: 'c'}}, + transactReq = {TransactItems: []} + + transactReq.TransactItems = [ + { + Update: { + TableName: helpers.testHashTable, + Key: { + a: item.a + }, + UpdateExpression: 'SET c=:d', + ExpressionAttributeValues: { + ':d': { + S: 'd' + } + } + } + }, + { + Update: { + TableName: helpers.testHashTable, + Key: { + a: item2.a + }, + UpdateExpression: 'SET c=:d', + ExpressionAttributeValues: { + ':d': { + S: 'd' + } + } + } + } + ] + + request(helpers.opts('PutItem', {TableName: helpers.testHashTable, Item: item}), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + request(helpers.opts('PutItem', {TableName: helpers.testHashTable, Item: item2}), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + request(opts(transactReq), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.should.eql({UnprocessedItems: {}}) + request(helpers.opts('GetItem', {TableName: helpers.testHashTable, Key: {a: item.a}, ConsistentRead: true}), function(err, res) { + // update item + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.Item.should.eql({...item, c: {S: 'd'}}) + request(helpers.opts('GetItem', {TableName: helpers.testHashTable, Key: {a: item2.a}, ConsistentRead: true}), function(err, res) { + // put item + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.Item.should.eql({...item2, c: {S: 'd'}}) + done() + }) + }) + }) + }) + }) + }) + + it('should delete multiple items', function(done) { + var item = { + a: {S: helpers.randomString()}, + c: {S: 'c'}}, + item2 = { + a: {S: helpers.randomString()}, + c: {S: 'c'}}, + transactReq = {TransactItems: []} + + transactReq.TransactItems = [ + { + Delete: { + TableName: helpers.testHashTable, + Key: { + a: item.a + } + } + }, + { + Delete: { + TableName: helpers.testHashTable, + Key: { + a: item2.a + } + } + } + ] + + request(helpers.opts('PutItem', {TableName: helpers.testHashTable, Item: item}), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + request(helpers.opts('PutItem', {TableName: helpers.testHashTable, Item: item2}), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + request(opts(transactReq), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.should.eql({UnprocessedItems: {}}) + request(helpers.opts('GetItem', {TableName: helpers.testHashTable, Key: {a: item.a}, ConsistentRead: true}), function(err, res) { + // update item + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.should.eql({}) + request(helpers.opts('GetItem', {TableName: helpers.testHashTable, Key: {a: item2.a}, ConsistentRead: true}), function(err, res) { + // put item + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.should.eql({}) + done() + }) + }) + }) + }) + }) + }) + + it('should write, update, and delete in one transaction', function(done) { + var item = { + a: {S: helpers.randomString()}, + c: {S: 'c'}}, + item2 = { + a: {S: helpers.randomString()}, + c: {S: 'c'}}, + item3 = { + a: {S: helpers.randomString()}, + c: {S: 'c'}}, + transactReq = {TransactItems: []} + + transactReq.TransactItems = [ + { + Put: { + TableName: helpers.testHashTable, + Item: item3 + } + }, + { + Update: { + TableName: helpers.testHashTable, + Key: { + a: item.a + }, + UpdateExpression: 'SET c=:d', + ExpressionAttributeValues: { + ':d': { + S: 'd' + } + } + } + }, + { + Delete: { + TableName: helpers.testHashTable, + Key: { + a: item2.a + } + } + } + ] + + request(helpers.opts('PutItem', {TableName: helpers.testHashTable, Item: item}), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + request(helpers.opts('PutItem', {TableName: helpers.testHashTable, Item: item2}), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + request(opts(transactReq), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.should.eql({UnprocessedItems: {}}) + request(helpers.opts('GetItem', {TableName: helpers.testHashTable, Key: {a: item.a}, ConsistentRead: true}), function(err, res) { + // update item + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.Item.should.eql({...item, c: {S: 'd'}}) + request(helpers.opts('GetItem', {TableName: helpers.testHashTable, Key: {a: item3.a}, ConsistentRead: true}), function(err, res) { + // put item + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.should.eql({Item: item3}) + request(helpers.opts('GetItem', {TableName: helpers.testHashTable, Key: {a: item2.a}, ConsistentRead: true}), function(err, res) { + // delete item + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.should.eql({}) + done() + }) + }) + }) + }) + }) + }) + }) + + it('should write & update with condition expression in one transaction', function(done) { + var item = { + a: {S: helpers.randomString()}, + c: {S: 'c'}}, + item2 = { + a: {S: helpers.randomString()}, + c: {S: 'c'}}, + transactReq = {TransactItems: []} + + transactReq.TransactItems = [ + { + Put: { + TableName: helpers.testHashTable, + Item: item + } + }, + { + Update: { + TableName: helpers.testHashTable, + Key: { + a: item2.a + }, + ConditionExpression: 'attribute_not_exists(f)', + UpdateExpression: 'SET c=:d', + ExpressionAttributeValues: { + ':d': { + S: 'd' + } + } + } + } + ] + + request(helpers.opts('PutItem', {TableName: helpers.testHashTable, Item: item2}), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + request(opts(transactReq), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.should.eql({UnprocessedItems: {}}) + request(helpers.opts('GetItem', {TableName: helpers.testHashTable, Key: {a: item.a}, ConsistentRead: true}), function(err, res) { + // update item + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.Item.should.eql(item) + request(helpers.opts('GetItem', {TableName: helpers.testHashTable, Key: {a: item2.a}, ConsistentRead: true}), function(err, res) { + // put item + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.Item.should.eql({...item2, c: {S: 'd'}}) + done() + }) + }) + }) + }) + }) + + it('should fail to write & update with failed condition expression in one transaction', function(done) { + var item = { + a: {S: helpers.randomString()}, + c: {S: 'c'}}, + item2 = { + a: {S: helpers.randomString()}, + c: {S: 'c'}}, + transactReq = {TransactItems: []} + + transactReq.TransactItems = [ + { + Put: { + TableName: helpers.testHashTable, + Item: item + } + }, + { + Update: { + TableName: helpers.testHashTable, + Key: { + a: item2.a + }, + ConditionExpression: 'attribute_not_exists(c)', + UpdateExpression: 'SET c=:d', + ExpressionAttributeValues: { + ':d': { + S: 'd' + } + } + } + } + ] + + request(helpers.opts('PutItem', {TableName: helpers.testHashTable, Item: item2}), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + request(opts(transactReq), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(400) + res.body.message.should.equal('The conditional request failed') + request(helpers.opts('GetItem', {TableName: helpers.testHashTable, Key: {a: item.a}, ConsistentRead: true}), function(err, res) { + // update item + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.should.eql({}) + request(helpers.opts('GetItem', {TableName: helpers.testHashTable, Key: {a: item2.a}, ConsistentRead: true}), function(err, res) { + // put item + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.Item.should.eql(item2) + done() + }) + }) + }) + }) + }) + + it('should fail to put with failed condition expression in one transaction', function(done) { + var item = { + a: {S: helpers.randomString()}, + c: {S: 'c'}}, + transactReq = {TransactItems: []} + + transactReq.TransactItems = [ + { + Put: { + TableName: helpers.testHashTable, + Item: { + ...item, + c: {S: 'd'} + }, + ConditionExpression: 'attribute_not_exists(c)', + } + } + ] + + request(helpers.opts('PutItem', {TableName: helpers.testHashTable, Item: item}), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + request(opts(transactReq), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(400) + res.body.message.should.equal('The conditional request failed') + request(helpers.opts('GetItem', {TableName: helpers.testHashTable, Key: {a: item.a}, ConsistentRead: true}), function(err, res) { + // update item + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.should.eql({Item: item}) + done() + }) + }) + }) + }) + + it('should fail to write with failed ConditionCheck in one transaction', function(done) { + var item = { + a: {S: helpers.randomString()}, + c: {S: 'c'}}, + item2 = { + a: {S: helpers.randomString()}, + c: {S: 'c'}}, + transactReq = {TransactItems: []} + + transactReq.TransactItems = [ + { + Put: { + TableName: helpers.testHashTable, + Item: item + } + }, + { + ConditionCheck: { + TableName: helpers.testHashTable, + Key: { + a: item2.a + }, + ConditionExpression: 'attribute_not_exists(c)' + } + } + ] + + request(helpers.opts('PutItem', {TableName: helpers.testHashTable, Item: item2}), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + request(opts(transactReq), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(400) + res.body.message.should.equal('The conditional request failed') + request(helpers.opts('GetItem', {TableName: helpers.testHashTable, Key: {a: item.a}, ConsistentRead: true}), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.should.eql({}) + done() + }) + }) + }) + }) + + it('should succeed to write with ConditionCheck in one transaction', function(done) { + var item = { + a: {S: helpers.randomString()}, + c: {S: 'c'}}, + item2 = { + a: {S: helpers.randomString()}, + c: {S: 'c'}}, + transactReq = {TransactItems: []} + + transactReq.TransactItems = [ + { + Put: { + TableName: helpers.testHashTable, + Item: item + } + }, + { + ConditionCheck: { + TableName: helpers.testHashTable, + Key: { + a: item2.a + }, + ConditionExpression: 'attribute_not_exists(c)' + } + } + ] + + request(helpers.opts('PutItem', {TableName: helpers.testHashTable, Item: item2}), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + request(opts(transactReq), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(400) + res.body.message.should.equal('The conditional request failed') + request(helpers.opts('GetItem', {TableName: helpers.testHashTable, Key: {a: item.a}, ConsistentRead: true}), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.should.eql({}) + done() + }) + }) + }) + }) + + it('should succeed to write in table A with succeeded ConditionCheck in table B in one transaction', function(done) { + var item = { + a: {S: helpers.randomString()}, + c: {S: 'c'}}, + item2 = {a: {S: helpers.randomString()}, b: {S: helpers.randomString()}, g: {N: '23'}}, + transactReq = {TransactItems: []} + + transactReq.TransactItems = [ + { + Put: { + TableName: helpers.testHashTable, + Item: item + } + }, + { + ConditionCheck: { + TableName: helpers.testRangeTable, + Key: {a: item2.a, b: item2.b}, + ConditionExpression: 'attribute_not_exists(c)' + } + } + ] + + request(helpers.opts('PutItem', {TableName: helpers.testRangeTable, Item: item2}), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + request(opts(transactReq), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + request(helpers.opts('GetItem', {TableName: helpers.testHashTable, Key: {a: item.a}, ConsistentRead: true}), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.Item.should.eql(item) + done() + }) + }) + }) + }) + + it('should fail to write in table A with failed ConditionCheck in table B in one transaction', function(done) { + var item = { + a: {S: helpers.randomString()}, + c: {S: 'c'}}, + item2 = {a: {S: helpers.randomString()}, b: {S: helpers.randomString()}, g: {N: '23'}}, + transactReq = {TransactItems: []} + + transactReq.TransactItems = [ + { + Put: { + TableName: helpers.testHashTable, + Item: item + } + }, + { + ConditionCheck: { + TableName: helpers.testRangeTable, + Key: {a: item2.a, b: item2.b}, + ConditionExpression: 'attribute_not_exists(g)' + } + } + ] + + request(helpers.opts('PutItem', {TableName: helpers.testRangeTable, Item: item2}), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + request(opts(transactReq), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(400) + res.body.message.should.equal('The conditional request failed') + request(helpers.opts('GetItem', {TableName: helpers.testHashTable, Key: {a: item.a}, ConsistentRead: true}), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.should.eql({}) + done() + }) + }) + }) + }) + + it('should write to two different tables', function(done) { + var hashItem = {a: {S: helpers.randomString()}, c: {S: 'c'}}, + rangeItem = {a: {S: helpers.randomString()}, b: {S: helpers.randomString()}, g: {N: '23'}} + transactReq = {TransactItems: []} + transactReq.TransactItems = [{Put: {TableName: helpers.testHashTable, Item: hashItem}}, {Put: {TableName: helpers.testRangeTable, Item: rangeItem}}] + request(opts(transactReq), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.should.eql({UnprocessedItems: {}}) + request(helpers.opts('GetItem', {TableName: helpers.testHashTable, Key: {a: hashItem.a}, ConsistentRead: true}), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.should.eql({Item: hashItem}) + request(helpers.opts('GetItem', {TableName: helpers.testRangeTable, Key: {a: rangeItem.a, b: rangeItem.b}, ConsistentRead: true}), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.should.eql({Item: rangeItem}) + done() + }) + }) + }) + }) + + it('should return ConsumedCapacity from each specified table when putting and deleting small item', function(done) { + var a = helpers.randomString(), b = new Array(1010 - a.length).join('b'), + item = {a: {S: a}, b: {S: b}, c: {N: '12.3456'}, d: {B: 'AQI='}, e: {BS: ['AQI=', 'Ag==', 'AQ==']}}, + key2 = helpers.randomString(), key3 = helpers.randomNumber(), + batchReq = {TransactItems: {}, ReturnConsumedCapacity: 'TOTAL'} + batchReq.TransactItems = [ + {Put: {Item: item, TableName: helpers.testHashTable}}, + {Put: {Item: {a: {S: key2}}, TableName: helpers.testHashTable}}, + {Put: {Item: {a: {N: key3}}, TableName: helpers.testHashNTable}} + ] + request(opts(batchReq), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.ConsumedCapacity.should.containEql({CapacityUnits: 4, TableName: helpers.testHashTable}) + res.body.ConsumedCapacity.should.containEql({CapacityUnits: 2, TableName: helpers.testHashNTable}) + batchReq.ReturnConsumedCapacity = 'INDEXES' + request(opts(batchReq), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.ConsumedCapacity.should.containEql({CapacityUnits: 4, Table: {CapacityUnits: 4}, TableName: helpers.testHashTable}) + res.body.ConsumedCapacity.should.containEql({CapacityUnits: 2, Table: {CapacityUnits: 2}, TableName: helpers.testHashNTable}) + batchReq.ReturnConsumedCapacity = 'TOTAL' + batchReq.TransactItems[helpers.testHashTable] = [{DeleteRequest: {Key: {a: item.a}}}, {DeleteRequest: {Key: {a: {S: key2}}}}] + batchReq.TransactItems[helpers.testHashNTable] = [{DeleteRequest: {Key: {a: {N: key3}}}}] + request(opts(batchReq), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.ConsumedCapacity.should.containEql({CapacityUnits: 4, TableName: helpers.testHashTable}) + res.body.ConsumedCapacity.should.containEql({CapacityUnits: 2, TableName: helpers.testHashNTable}) + batchReq.ReturnConsumedCapacity = 'INDEXES' + request(opts(batchReq), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.ConsumedCapacity.should.containEql({CapacityUnits: 4, Table: {CapacityUnits: 4}, TableName: helpers.testHashTable}) + res.body.ConsumedCapacity.should.containEql({CapacityUnits: 2, Table: {CapacityUnits: 2}, TableName: helpers.testHashNTable}) + done() + }) + }) + }) + }) + }) + + it('should return ConsumedCapacity from each specified table when putting and deleting larger item', function(done) { + var a = helpers.randomString(), b = new Array(1012 - a.length).join('b'), + item = {a: {S: a}, b: {S: b}, c: {N: '12.3456'}, d: {B: 'AQI='}, e: {BS: ['AQI=', 'Ag==']}}, + key2 = helpers.randomString(), key3 = helpers.randomNumber(), + batchReq = {TransactItems: [], ReturnConsumedCapacity: 'TOTAL'} + batchReq.TransactItems = [ + {Put: {Item: item, TableName: helpers.testHashTable}}, + {Put: {Item: {a: {S: key2}}, TableName: helpers.testHashTable}}, + {Put: {Item: {a: {N: key3}}, TableName: helpers.testHashNTable}} + ] + request(opts(batchReq), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.ConsumedCapacity.should.containEql({CapacityUnits: 5, TableName: helpers.testHashTable}) + res.body.ConsumedCapacity.should.containEql({CapacityUnits: 2, TableName: helpers.testHashNTable}) + batchReq.ReturnConsumedCapacity = 'INDEXES' + request(opts(batchReq), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.ConsumedCapacity.should.containEql({CapacityUnits: 5, Table: {CapacityUnits: 5}, TableName: helpers.testHashTable}) + res.body.ConsumedCapacity.should.containEql({CapacityUnits: 2, Table: {CapacityUnits: 2}, TableName: helpers.testHashNTable}) + batchReq.ReturnConsumedCapacity = 'TOTAL' + batchReq.TransactItems[helpers.testHashTable] = [{DeleteRequest: {Key: {a: item.a}}}, {DeleteRequest: {Key: {a: {S: key2}}}}] + batchReq.TransactItems[helpers.testHashNTable] = [{DeleteRequest: {Key: {a: {N: key3}}}}] + request(opts(batchReq), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.ConsumedCapacity.should.containEql({CapacityUnits: 5, TableName: helpers.testHashTable}) + res.body.ConsumedCapacity.should.containEql({CapacityUnits: 2, TableName: helpers.testHashNTable}) + batchReq.ReturnConsumedCapacity = 'INDEXES' + request(opts(batchReq), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.ConsumedCapacity.should.containEql({CapacityUnits: 5, Table: {CapacityUnits: 5}, TableName: helpers.testHashTable}) + res.body.ConsumedCapacity.should.containEql({CapacityUnits: 2, Table: {CapacityUnits: 2}, TableName: helpers.testHashNTable}) + done() + }) + }) + }) + }) + }) + + it('should complete successfully with updates and atomic counter decrement', function (done) { + var atomicCounter = { + a: { + S: 'atomicCounter' + }, + counter: { + N: '1' + } + } + + var item = { + a: { + S: 'item' + }, + b: { + M: { + 'one': { + S: 'itemname', + }, + 'two': { + N: '123456' + } + } + } + } + + var transaction = { + TransactItems: [{ + Update: { + TableName: helpers.testHashTable, + Key: { + a: atomicCounter.a + }, + UpdateExpression: `SET #counter = if_not_exists(#counter, :zero) + :increment`, + ExpressionAttributeNames: { + '#counter': 'counter' + }, + ExpressionAttributeValues: { + ':increment': { + N: '-1' + }, + ':zero': { + N: '0' + } + } + } + }, + { + Update: { + TableName: helpers.testHashTable, + Key: {a: item.a}, + UpdateExpression: 'SET b = :b', + ExpressionAttributeValues: { + ':b': { + M: { + 'one': { + S: 'itemname', + }, + 'two': { + N: '654321' + } + } + } + } + } + } + ] + } + + request(helpers.opts('PutItem', {TableName: helpers.testHashTable, Item: atomicCounter}), function(err, res) { + if (err) return done(err) + res.statusCode.should.eql(200) + request(helpers.opts('PutItem', {TableName: helpers.testHashTable, Item: item}), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + request(opts(transaction), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.should.eql({UnprocessedItems: {}}) + request(helpers.opts('GetItem', {TableName: helpers.testHashTable, Key: {a: item.a}, ConsistentRead: true}), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.Item.b.M.should.eql({'one': { + S: 'itemname', + }, + 'two': { + N: '654321' + }}) + request(helpers.opts('GetItem', {TableName: helpers.testHashTable, Key: {a: atomicCounter.a}, ConsistentRead: true}), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.Item.counter.should.eql({N: '0'}) + done() + }) + }) + }) + }) + }) + }) + + it('should update the same row multiple times', function (done) { + var atomicCounter = { + a: { + S: 'atomicCounter' + }, + counter: { + N: '1' + } + } + + var transaction = { + TransactItems: [{ + Update: { + TableName: helpers.testHashTable, + Key: { + a: atomicCounter.a + }, + UpdateExpression: `SET #counter = if_not_exists(#counter, :zero) + :increment`, + ExpressionAttributeNames: { + '#counter': 'counter' + }, + ExpressionAttributeValues: { + ':increment': { + N: '1' + }, + ':zero': { + N: '0' + } + } + } + } + ] + } + + request(opts(transaction), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.should.eql({UnprocessedItems: {}}) + request(helpers.opts('GetItem', {TableName: helpers.testHashTable, Key: {a: atomicCounter.a}, ConsistentRead: true}), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.Item.counter.N.should.eql('1') + request(opts(transaction), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.should.eql({UnprocessedItems: {}}) + request(helpers.opts('GetItem', {TableName: helpers.testHashTable, Key: {a: atomicCounter.a}, ConsistentRead: true}), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.Item.counter.N.should.eql('2') + done() + }) + }) + }) + }) + }) + + it('should update the index', function (done) { + var key = {a: {S: helpers.randomString()}, b: {S: helpers.randomString()}} + var transaction = { + TransactItems: [{ + Update: { + TableName: helpers.testRangeTable, + Key: key, + UpdateExpression: 'set c = a, d = b, e = a, f = b' + } + } + ] + } + request(opts(transaction), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + request(helpers.opts('Query', { + TableName: helpers.testRangeTable, + ConsistentRead: true, + IndexName: 'index1', + KeyConditions: { + a: {ComparisonOperator: 'EQ', AttributeValueList: [key.a]}, + c: {ComparisonOperator: 'EQ', AttributeValueList: [key.a]}, + }, + }), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.Items.should.eql([{a: key.a, b: key.b, c: key.a, d: key.b, e: key.a, f: key.b}]) + request(helpers.opts('Query', { + TableName: helpers.testRangeTable, + ConsistentRead: true, + IndexName: 'index2', + KeyConditions: { + a: {ComparisonOperator: 'EQ', AttributeValueList: [key.a]}, + d: {ComparisonOperator: 'EQ', AttributeValueList: [key.b]}, + }, + }), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.Items.should.eql([{a: key.a, b: key.b, c: key.a, d: key.b}]) + transaction.TransactItems[0].Update = { + TableName: helpers.testRangeTable, + Key: key, + UpdateExpression: 'set c = b, d = a, e = b, f = a', + } + request(opts(transaction), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + request(helpers.opts('Query', { + TableName: helpers.testRangeTable, + ConsistentRead: true, + IndexName: 'index1', + KeyConditions: { + a: {ComparisonOperator: 'EQ', AttributeValueList: [key.a]}, + c: {ComparisonOperator: 'EQ', AttributeValueList: [key.a]}, + }, + }), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.Items.should.eql([]) + request(helpers.opts('Query', { + TableName: helpers.testRangeTable, + ConsistentRead: true, + IndexName: 'index2', + KeyConditions: { + a: {ComparisonOperator: 'EQ', AttributeValueList: [key.a]}, + d: {ComparisonOperator: 'EQ', AttributeValueList: [key.b]}, + }, + }), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.Items.should.eql([]) + request(helpers.opts('Query', { + TableName: helpers.testRangeTable, + ConsistentRead: true, + IndexName: 'index1', + KeyConditions: { + a: {ComparisonOperator: 'EQ', AttributeValueList: [key.a]}, + c: {ComparisonOperator: 'EQ', AttributeValueList: [key.b]}, + }, + }), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.Items.should.eql([{a: key.a, b: key.b, c: key.b, d: key.a, e: key.b, f: key.a}]) + request(helpers.opts('Query', { + TableName: helpers.testRangeTable, + ConsistentRead: true, + IndexName: 'index2', + KeyConditions: { + a: {ComparisonOperator: 'EQ', AttributeValueList: [key.a]}, + d: {ComparisonOperator: 'EQ', AttributeValueList: [key.a]}, + }, + }), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.Items.should.eql([{a: key.a, b: key.b, c: key.b, d: key.a}]) + request(helpers.opts('Query', { + TableName: helpers.testRangeTable, + IndexName: 'index3', + KeyConditions: { + c: {ComparisonOperator: 'EQ', AttributeValueList: [key.b]}, + }, + }), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.Items.should.eql([{a: key.a, b: key.b, c: key.b, d: key.a, e: key.b, f: key.a}]) + request(helpers.opts('Query', { + TableName: helpers.testRangeTable, + IndexName: 'index4', + KeyConditions: { + c: {ComparisonOperator: 'EQ', AttributeValueList: [key.b]}, + d: {ComparisonOperator: 'EQ', AttributeValueList: [key.a]}, + }, + }), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.Items.should.eql([{a: key.a, b: key.b, c: key.b, d: key.a, e: key.b}]) + done() + }) + }) + }) + }) + }) + }) + }) + }) + }) + }) + }) + + }) + }) +}) \ No newline at end of file diff --git a/validations/transactWriteItems.js b/validations/transactWriteItems.js new file mode 100644 index 0000000..a0c0efa --- /dev/null +++ b/validations/transactWriteItems.js @@ -0,0 +1,190 @@ +var validations = require('./index'), + db = require('../db') + +exports.types = { + ReturnConsumedCapacity: { + type: 'String', + enum: ['INDEXES', 'TOTAL', 'NONE'], + }, + ReturnItemCollectionMetrics: { + type: 'String', + enum: ['SIZE', 'NONE'], + }, + ClientRequestToken: { + type: 'String' + }, + TransactItems: { + type: 'List', + notNull: true, + lengthGreaterThanOrEqual: 1, + lengthLessThanOrEqual: 25, + children: { + type: 'ValueStruct', + children: { + Put: { + type: 'FieldStruct', + children: { + TableName: { + type: 'String', + }, + ExpressionAttributeValues: { + type: 'Map', + children: 'AttrStruct', + }, + ExpressionAttributeNames: { + type: 'Map', + children: 'String', + }, + ReturnValuesOnConditionCheckFailure: { + type: 'String', + enum: ['ALL_OLD', 'NONE'], + }, + ConditionExpression: { + type: 'String', + }, + Item: { + type: 'Map', + notNull: true, + children: 'AttrStruct', + }, + }, + }, + Update: { + type: 'FieldStruct', + children: { + TableName: { + type: 'String', + }, + ExpressionAttributeValues: { + type: 'Map', + children: 'AttrStruct', + }, + ExpressionAttributeNames: { + type: 'Map', + children: 'String', + }, + ReturnValuesOnConditionCheckFailure: { + type: 'String', + enum: ['ALL_OLD', 'NONE'], + }, + ConditionExpression: { + type: 'String', + }, + Key: { + type: 'Map', + notNull: true, + children: 'AttrStruct', + }, + UpdateExpression: { + type: 'String', + }, + }, + }, + Delete: { + type: 'FieldStruct', + children: { + TableName: { + type: 'String', + }, + ExpressionAttributeValues: { + type: 'Map', + children: 'AttrStruct', + }, + ExpressionAttributeNames: { + type: 'Map', + children: 'String', + }, + ReturnValuesOnConditionCheckFailure: { + type: 'String', + enum: ['ALL_OLD', 'NONE'], + }, + ConditionExpression: { + type: 'String', + }, + Key: { + type: 'Map', + notNull: true, + children: 'AttrStruct', + }, + }, + }, + ConditionCheck: { + type: 'FieldStruct', + children: { + TableName: { + type: 'String', + }, + ExpressionAttributeValues: { + type: 'Map', + children: 'AttrStruct', + }, + ExpressionAttributeNames: { + type: 'Map', + children: 'String', + }, + ReturnValuesOnConditionCheckFailure: { + type: 'String', + enum: ['ALL_OLD', 'NONE'], + }, + ConditionExpression: { + type: 'String', + }, + Key: { + type: 'Map', + notNull: true, + children: 'AttrStruct', + }, + }, + }, + }, + }, + }, +} + +exports.custom = function(data, store) { + var i, request, msg, key + for (i = 0; i < data.TransactItems.length; i++) { + request = data.TransactItems[i] + if (request.Put) { + for (key in request.Put.Item) { + msg = validations.validateAttributeValue(request.Put.Item[key]) + if (msg) return msg + } + if (db.itemSize(request.Put.Item) > store.options.maxItemSize) + return 'Item size has exceeded the maximum allowed size' + msg = validations.validateExpressions(request.Put) + if (msg) return msg + } else if (request.Delete) { + for (key in request.Delete.Key) { + msg = validations.validateAttributeValue(request.Delete.Key[key]) + if (msg) return msg + } + msg = validations.validateExpressions(request.Delete) + if (msg) return msg + } else if (request.Update) { + for (key in request.Update.Key) { + msg = validations.validateAttributeValue(request.Update.Key[key]) + if (msg) return msg + } + msg = validations.validateExpressionParams(request.Update, + ['UpdateExpression', 'ConditionExpression'], + ['AttributeUpdates', 'Expected']) + if (msg) return msg + msg = validations.validateAttributeConditions(request.Update) + if (msg) return msg + msg = validations.validateExpressions(request.Update) + if (msg) return msg + } else if (request.ConditionCheck) { + for (key in request.ConditionCheck.Key) { + msg = validations.validateAttributeValue(request.ConditionCheck.Key[key]) + if (msg) return msg + } + msg = validations.validateExpressionParams(request.ConditionCheck, ['ConditionExpression'], []) + if (msg) return msg + msg = validations.validateExpressions(request.ConditionCheck) + if (msg) return msg + } else { + return 'The action or operation requested is invalid. Verify that the action is typed correctly.' + } + } +}