diff --git a/actions/createTable.js b/actions/createTable.js index ded5809..2269727 100644 --- a/actions/createTable.js +++ b/actions/createTable.js @@ -1,24 +1,49 @@ -var crypto = require('crypto') +var async = require('async'), + crypto = require('crypto'), + kinesaliteCreateStream = require('kinesalite/actions/createStream') module.exports = function createTable(store, data, cb) { var key = data.TableName, tableDb = store.tableDb - tableDb.lock(key, function(release) { - cb = release(cb) - - tableDb.get(key, function(err) { - if (err && err.name != 'NotFoundError') return cb(err) - if (!err) { - err = new Error - err.statusCode = 400 - err.body = { - __type: 'com.amazonaws.dynamodb.v20120810#ResourceInUseException', - message: '', + async.auto({ + lock: function(callback) { + tableDb.lock(key, function(release) { + callback(null, release) + }) + }, + checkTable: ['lock', function(results, callback) { + tableDb.get(key, function(err) { + if (err && err.name != 'NotFoundError') return callback(err) + if (!err) { + err = new Error + err.statusCode = 400 + err.body = { + __type: 'com.amazonaws.dynamodb.v20120810#ResourceInUseException', + message: '', + } + return callback(err) } - return cb(err) + + callback() + }) + }], + streamUpdates: ['checkTable', function(results, callback) { + if (!data.StreamSpecification) { + return callback() } + kinesaliteCreateStream(store.kinesalite, {StreamName: data.TableName, ShardCount: 1}, function(err) { + if (err) return callback(err) + + callback(null, { + StreamSpecification: data.StreamSpecification, + LatestStreamLabel: (new Date()).toISOString().replace('Z', ''), + LatestStreamArn: 'arn:aws:dynamodb:' + tableDb.awsRegion + ':' + tableDb.awsAccountId + ':table/' + data.TableName + '/stream/' + data.LatestStreamLabel, + }) + }) + }], + createTable: ['streamUpdates', function(results, callback) { data.TableArn = 'arn:aws:dynamodb:' + tableDb.awsRegion + ':' + tableDb.awsAccountId + ':table/' + data.TableName data.TableId = uuidV4() data.CreationDateTime = Date.now() / 1000 @@ -45,31 +70,43 @@ module.exports = function createTable(store, data, cb) { }) } - tableDb.put(key, data, function(err) { - if (err) return cb(err) - - setTimeout(function() { + if (results.streamUpdates) { + data.LatestStreamLabel = (new Date()).toISOString().replace('Z', '') + data.LatestStreamArn = 'arn:aws:dynamodb:' + tableDb.awsRegion + ':' + tableDb.awsAccountId + ':table/' + data.TableName + '/stream/' + data.LatestStreamLabel + } - // Shouldn't need to lock/fetch as nothing should have changed - data.TableStatus = 'ACTIVE' - if (data.GlobalSecondaryIndexes) { - data.GlobalSecondaryIndexes.forEach(function(index) { - index.IndexStatus = 'ACTIVE' - }) - } + tableDb.put(key, data, callback) + }], + setActive: ['createTable', function(results, callback) { + setTimeout(function() { - tableDb.put(key, data, function(err) { - // eslint-disable-next-line no-console - if (err && !/Database is not open/.test(err)) console.error(err.stack || err) + // Shouldn't need to lock/fetch as nothing should have changed + data.TableStatus = 'ACTIVE' + if (data.GlobalSecondaryIndexes) { + data.GlobalSecondaryIndexes.forEach(function(index) { + index.IndexStatus = 'ACTIVE' }) + } - }, store.options.createTableMs) + tableDb.put(key, data, function(err) { + // eslint-disable-next-line no-console + if (err && !/Database is not open/.test(err)) console.error(err.stack || err) + }) - cb(null, {TableDescription: data}) - }) - }) - }) + }, store.options.createTableMs) + + callback() + }], + }, function(err, results) { + var release = results.lock + cb = release(cb) + if (err) { + return cb(err) + } + + cb(null, {TableDescription: data}) + }) } function uuidV4() { diff --git a/actions/deleteItem.js b/actions/deleteItem.js index b94e5c0..1d3d6a0 100644 --- a/actions/deleteItem.js +++ b/actions/deleteItem.js @@ -1,4 +1,5 @@ -var db = require('../db') +var db = require('../db'), + utils = require('../utils') module.exports = function deleteItem(store, data, cb) { @@ -29,7 +30,16 @@ module.exports = function deleteItem(store, data, cb) { itemDb.del(key, function(err) { if (err) return cb(err) - cb(null, returnObj) + + if (!table.LatestStreamArn) { + return cb(null, returnObj) + } + + var streamRecord = utils.createStreamRecord(table, existingItem, null) + utils.writeStreamRecord(store, data.TableName, streamRecord, function(err) { + if (err) return cb(err) + cb(null, returnObj) + }) }) }) }) diff --git a/actions/describeStream.js b/actions/describeStream.js new file mode 100644 index 0000000..1b1a850 --- /dev/null +++ b/actions/describeStream.js @@ -0,0 +1,31 @@ +var kinesaliteDescribeStream = require('kinesalite/actions/describeStream'), + utils = require('../utils') + +module.exports = function describeStream(store, data, cb) { + var tableName = utils.getTableNameFromStreamArn(data.StreamArn) + + store.getTable(tableName, false, function(err, table) { + if (err) return cb(err) + + kinesaliteDescribeStream(store.kinesalite, {StreamName: tableName}, function(err, stream) { + if (err) return cb(err) + + cb(null, { + StreamDescription: { + CreationRequestDateTime: stream.StreamDescription.StreamCreationTimestamp, + KeySchema: table.KeySchema, + Shards: stream.StreamDescription.Shards.map(function(shard) { + shard.ShardId = utils.makeShardIdLong(shard.ShardId) + shard.ParentShardId = utils.makeShardIdLong(shard.ParentShardId) + return shard + }), + StreamArn: table.LatestStreamArn, + StreamLabel: table.LatestStreamLabel, + StreamStatus: stream.StreamDescription.StreamStatus, + StreamViewType: table.StreamSpecification.StreamViewType, + TableName: tableName, + }, + }) + }) + }) +} diff --git a/actions/getRecords.js b/actions/getRecords.js new file mode 100644 index 0000000..11d2e89 --- /dev/null +++ b/actions/getRecords.js @@ -0,0 +1,16 @@ +var kinesaliteGetRecords = require('kinesalite/actions/getRecords') + +module.exports = function getRecords(store, data, cb) { + kinesaliteGetRecords(store.kinesalite, data, function(err, results) { + if (err) return cb(err) + + cb(null, { + NextShardIterator: results.NextShardIterator, + Records: results.Records.map(function(record) { + var recordData = JSON.parse(record.Data) + recordData.dynamodb.SequenceNumber = record.SequenceNumber + return recordData + }), + }) + }) +} diff --git a/actions/getShardIterator.js b/actions/getShardIterator.js new file mode 100644 index 0000000..21a358a --- /dev/null +++ b/actions/getShardIterator.js @@ -0,0 +1,13 @@ +var kinesaliteGetShardIterator = require('kinesalite/actions/getShardIterator'), + utils = require('../utils') + +module.exports = function getShardIterator(store, data, cb) { + var options = { + ShardId: utils.makeShardIdShort(data.ShardId), + ShardIteratorType: data.ShardIteratorType, + StartingSequenceNumber: data.SequenceNumber, + StreamName: utils.getTableNameFromStreamArn(data.StreamArn), + } + + kinesaliteGetShardIterator(store.kinesalite, options, cb) +} diff --git a/actions/listStreams.js b/actions/listStreams.js new file mode 100644 index 0000000..561d2d0 --- /dev/null +++ b/actions/listStreams.js @@ -0,0 +1,34 @@ +var once = require('once'), + db = require('../db'), + getTableNameFromStreamArn = require('../utils').getTableNameFromStreamArn + +module.exports = function listStreams(store, data, cb) { + cb = once(cb) + var opts, limit = data.Limit || 100 + + if (data.ExclusiveStartStreamArn) { + opts = {gt: getTableNameFromStreamArn(data.ExclusiveStartStreamArn)} + } + + db.lazy(store.tableDb.createValueStream(opts), cb) + .filter(function(table) { + return table.hasOwnProperty('StreamSpecification') + }) + .take(limit + 1) + .map(function(table) { + return { + StreamArn: table.LatestStreamArn, + StreamLabel: table.LatestStreamLabel, + TableName: table.TableName, + } + }) + .join(function(streams) { + var result = {} + if (streams.length > limit) { + streams.splice(limit) + result.LastEvaluatedStreamArn = streams[streams.length - 1].StreamArn + } + result.Streams = streams + cb(null, result) + }) +} diff --git a/actions/putItem.js b/actions/putItem.js index e299dc0..4ba2f78 100644 --- a/actions/putItem.js +++ b/actions/putItem.js @@ -1,4 +1,5 @@ -var db = require('../db') +var db = require('../db'), + utils = require('../utils') module.exports = function putItem(store, data, cb) { @@ -29,7 +30,16 @@ module.exports = function putItem(store, data, cb) { itemDb.put(key, data.Item, function(err) { if (err) return cb(err) - cb(null, returnObj) + + if (!table.LatestStreamArn) { + return cb(null, returnObj) + } + + var streamRecord = utils.createStreamRecord(table, existingItem, data.Item) + utils.writeStreamRecord(store, data.TableName, streamRecord, function(err) { + if (err) return cb(err) + cb(null, returnObj) + }) }) }) }) diff --git a/actions/updateItem.js b/actions/updateItem.js index 72f7f7c..f76e7d1 100644 --- a/actions/updateItem.js +++ b/actions/updateItem.js @@ -1,5 +1,6 @@ var Big = require('big.js'), - db = require('../db') + db = require('../db'), + utils = require('../utils') module.exports = function updateItem(store, data, cb) { @@ -52,7 +53,16 @@ module.exports = function updateItem(store, data, cb) { itemDb.put(key, item, function(err) { if (err) return cb(err) - cb(null, returnObj) + + if (!table.LatestStreamArn) { + return cb(null, returnObj) + } + + var streamRecord = utils.createStreamRecord(table, oldItem, item) + utils.writeStreamRecord(store, data.TableName, streamRecord, function(err) { + if (err) return cb(err) + cb(null, returnObj) + }) }) }) }) diff --git a/actions/updateTable.js b/actions/updateTable.js index 2302631..609825e 100644 --- a/actions/updateTable.js +++ b/actions/updateTable.js @@ -1,21 +1,87 @@ -var db = require('../db') +var async = require('async'), + db = require('../db'), + createStream = require('kinesalite/actions/createStream'), + deleteStream = require('kinesalite/actions/deleteStream') module.exports = function updateTable(store, data, cb) { var key = data.TableName, tableDb = store.tableDb - tableDb.lock(key, function(release) { - cb = release(cb) + async.auto({ + lock: function(callback) { + tableDb.lock(key, function(release) { + callback(null, release) + }) + }, + table: ['lock', function(results, callback) { + store.getTable(key, false, function(err, table) { + if (err) return callback(err) + + if (table.TableStatus == 'CREATING') { + err = new Error + err.statusCode = 400 + err.body = { + __type: 'com.amazonaws.dynamodb.v20120810#ResourceInUseException', + message: 'Attempt to change a resource which is still in use: Table is being created: ' + key, + } + return callback(err) + } + + callback(null, table) + }) + }], + streamUpdates: ['table', function(results, callback) { + var table = results.table + + if (!data.StreamSpecification) { + return callback() + } - store.getTable(key, false, function(err, table) { - if (err) return cb(err) + if (table.LatestStreamLabel && data.StreamSpecification.StreamEnabled === false) { + return deleteStream(store.kinesalite, { StreamName: data.TableName }, function(err) { + if (err) return callback(err) - var updates, i, update, dataThroughput, tableThroughput, readDiff, writeDiff + callback(null, { + StreamSpecification: {}, + LatestStreamLabel: null, + LatestStreamArn: null, + }) + }) + } + + if (!table.LatestStreamLabel && data.StreamSpecification.StreamEnabled === true) { + return createStream(store.kinesalite, { StreamName: data.TableName, ShardCount: 1 }, function(err) { + if (err) return callback(err) + + var latestStreamLabel = (new Date()).toISOString().replace('Z', '') + callback(null, { + StreamSpecification: data.StreamSpecification, + LatestStreamLabel: latestStreamLabel, + LatestStreamArn: 'arn:aws:dynamodb:' + tableDb.awsRegion + ':' + tableDb.awsAccountId + ':table/' + data.TableName + '/stream/' + latestStreamLabel, + }) + }) + } + + // cf. https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_UpdateTable.html + // "You will receive a ResourceInUseException if you attempt to enable a + // stream on a table that already has a stream, or if you attempt to disable + // a stream on a table which does not have a stream." + var err = new Error + err.statusCode = 400 + err.body = { + __type: 'com.amazonaws.dynamodb.v20120810#ResourceInUseException', + message: '', + } + callback(err) + }], + tableUpdates: ['table', function(results, callback) { + var table = results.table, + updates, i, update, dataThroughput, tableThroughput, readDiff, writeDiff try { updates = getThroughputUpdates(data, table) } catch (err) { - return cb(err) + return callback(err) } for (i = 0; i < updates.length; i++) { @@ -26,7 +92,7 @@ module.exports = function updateTable(store, data, cb) { writeDiff = dataThroughput.WriteCapacityUnits - tableThroughput.WriteCapacityUnits if (!readDiff && !writeDiff) - return cb(db.validationError( + return callback(db.validationError( 'The provisioned throughput for the table will not change. The requested value equals the current value. ' + 'Current ReadCapacityUnits provisioned for the table: ' + tableThroughput.ReadCapacityUnits + '. Requested ReadCapacityUnits: ' + dataThroughput.ReadCapacityUnits + '. ' + @@ -43,41 +109,61 @@ module.exports = function updateTable(store, data, cb) { update.writeDiff = writeDiff } - tableDb.put(key, table, function(err) { - if (err) return cb(err) + callback(null, updates) + }], + updateTable: ['tableUpdates', 'streamUpdates', function(results, callback) { + var table = results.table - setTimeout(function() { + if (results.streamUpdates) { + table.StreamSpecification = results.streamUpdates.StreamSpecification + table.LatestStreamLabel = results.streamUpdates.LatestStreamLabel + table.LatestStreamArn = results.streamUpdates.LatestStreamArn + } - // Shouldn't need to lock/fetch as nothing should have changed - updates.forEach(function(update) { - dataThroughput = update.dataThroughput - tableThroughput = update.tableThroughput + tableDb.put(key, table, callback) + }], + setActive: ['updateTable', function(results, callback) { + var table = results.table, updates = results.tableUpdates - update.setStatus('ACTIVE') + setTimeout(function() { - if (update.readDiff > 0 || update.writeDiff > 0) { - tableThroughput.LastIncreaseDateTime = Date.now() / 1000 - } else if (update.readDiff < 0 || update.writeDiff < 0) { - tableThroughput.LastDecreaseDateTime = Date.now() / 1000 - tableThroughput.NumberOfDecreasesToday++ - } + // Shouldn't need to lock/fetch as nothing should have changed + updates.forEach(function(update) { + dataThroughput = update.dataThroughput + tableThroughput = update.tableThroughput - tableThroughput.ReadCapacityUnits = dataThroughput.ReadCapacityUnits - tableThroughput.WriteCapacityUnits = dataThroughput.WriteCapacityUnits - }) + update.setStatus('ACTIVE') - tableDb.put(key, table, function(err) { - // eslint-disable-next-line no-console - if (err && !/Database is not open/.test(err)) console.error(err.stack || err) - }) + if (update.readDiff > 0 || update.writeDiff > 0) { + tableThroughput.LastIncreaseDateTime = Date.now() / 1000 + } else if (update.readDiff < 0 || update.writeDiff < 0) { + tableThroughput.LastDecreaseDateTime = Date.now() / 1000 + tableThroughput.NumberOfDecreasesToday++ + } - }, store.options.updateTableMs) + tableThroughput.ReadCapacityUnits = dataThroughput.ReadCapacityUnits + tableThroughput.WriteCapacityUnits = dataThroughput.WriteCapacityUnits + }) - cb(null, {TableDescription: table}) - }) - }) - }) + tableDb.put(key, table, function(err) { + // eslint-disable-next-line no-console + if (err && !/Database is not open/.test(err)) console.error(err.stack || err) + }) + + }, store.options.updateTableMs) + + callback() + }], + }, function(err, results) { + var release = results.lock + cb = release(cb) + if (err) { + return cb(err) + } + + cb(null, {TableDescription: results.table}) + }) } function getThroughputUpdates(data, table) { diff --git a/cli.js b/cli.js index 34e29e5..d2434c7 100755 --- a/cli.js +++ b/cli.js @@ -12,6 +12,7 @@ if (argv.help) { '', 'Options:', '--help Display this help message and exit', + '--verbose Enable verbose logging', '--port The port to listen on (default: 4567)', '--path The path to use for the LevelDB store (in-memory by default)', '--ssl Enable SSL for the web server (default: false)', diff --git a/db/index.js b/db/index.js index 81a74d6..103b695 100644 --- a/db/index.js +++ b/db/index.js @@ -3,11 +3,13 @@ var crypto = require('crypto'), async = require('async'), Lazy = require('lazy'), levelup = require('levelup'), + kinesaliteDb = require('kinesalite/db'), memdown = require('memdown'), sub = require('subleveldown'), lock = require('lock'), Big = require('big.js'), - once = require('once') + once = require('once'), + utils = require('../utils') exports.MAX_SIZE = 409600 // TODO: get rid of this? or leave for backwards compat? exports.create = create @@ -49,13 +51,21 @@ function create(options) { var db = levelup(options.path ? require('leveldown')(options.path) : memdown()), tableDb = sub(db, 'table', {valueEncoding: 'json'}), - subDbs = Object.create(null) + subDbs = Object.create(null), + kinesaliteOptions = {}, + kinesaliteStore + + if (options.path) { + kinesaliteOptions.path = options.path + '/kinesalite' + } + + var kinesaliteStore = kinesaliteDb.create(kinesaliteOptions) tableDb.lock = lock.Lock() // XXX: Is there a better way to get this? tableDb.awsAccountId = (process.env.AWS_ACCOUNT_ID || '0000-0000-0000').replace(/[^\d]/g, '') - tableDb.awsRegion = process.env.AWS_REGION || process.env.AWS_DEFAULT_REGION || 'us-east-1' + tableDb.awsRegion = utils.awsRegion function getItemDb(name) { return getSubDb('item-' + name) @@ -131,6 +141,7 @@ function create(options) { deleteIndexDb: deleteIndexDb, getTable: getTable, recreate: recreate, + kinesalite: kinesaliteStore, } } diff --git a/index.js b/index.js index 4b2631a..049f1a1 100644 --- a/index.js +++ b/index.js @@ -10,10 +10,10 @@ var http = require('http'), var MAX_REQUEST_BYTES = 16 * 1024 * 1024 -var validApis = ['DynamoDB_20111205', 'DynamoDB_20120810'], +var validApis = ['DynamoDB_20111205', 'DynamoDB_20120810', 'DynamoDBStreams_20120810'], validOperations = ['BatchGetItem', 'BatchWriteItem', 'CreateTable', 'DeleteItem', 'DeleteTable', 'DescribeTable', 'GetItem', 'ListTables', 'PutItem', 'Query', 'Scan', 'TagResource', 'UntagResource', - 'UpdateItem', 'UpdateTable'], + 'UpdateItem', 'UpdateTable', 'DescribeStream', 'GetRecords', 'GetShardIterator', 'ListStreams'], actions = {}, actionValidations = {} @@ -62,7 +62,7 @@ function rand52CharId() { return bytes.toString('base64').toUpperCase().replace(/\+|\//g, '0') } -function sendData(req, res, data, statusCode) { +function _sendData(verbose, req, res, data, statusCode) { var body = JSON.stringify(data) req.removeAllListeners() res.statusCode = statusCode || 200 @@ -73,10 +73,18 @@ function sendData(req, res, data, statusCode) { // res.setHeader('Connection', '') // res.shouldKeepAlive = false res.end(body) + + if (verbose) { + console.log('[' + req.id + '] Status ' + res.statusCode) + console.log('[' + req.id + '] Response: ' + body) + } } function httpHandler(store, req, res) { - var body + var body, + sendData = _sendData.bind(null, store.options.verbose) + + req.id = crypto.randomBytes(5).toString('hex') req.on('error', function(err) { throw err }) req.on('data', function(data) { var newLength = data.length + (body ? body.length : 0) @@ -92,6 +100,11 @@ function httpHandler(store, req, res) { body = body ? body.toString() : '' + if (store.options.verbose) { + console.log('[' + req.id + '] ' + req.method + ' ' + req.url + ' (' + req.headers['x-amz-target'] + ')') + console.log('[' + req.id + '] Request: ' + body) + } + // All responses after this point have a RequestId res.setHeader('x-amzn-RequestId', rand52CharId()) diff --git a/package-lock.json b/package-lock.json index 1f2f62d..e88c0f7 100644 --- a/package-lock.json +++ b/package-lock.json @@ -90,6 +90,11 @@ "resolved": "https://registry.npmjs.org/big.js/-/big.js-5.2.2.tgz", "integrity": "sha512-vyL2OymJxmarO8gxMr0mhChsO9QGwhynfuu4+MHTAW6czfq9humCB7rKpUjDd9YUiDPU4mzpyupFSvOClAwbmQ==" }, + "bignumber.js": { + "version": "8.1.1", + "resolved": "https://registry.npmjs.org/bignumber.js/-/bignumber.js-8.1.1.tgz", + "integrity": "sha512-QD46ppGintwPGuL1KqmwhR0O+N2cZUg8JG/VzwI2e28sM9TqHjQB10lI4QAaMHVbLzwVLLAwEglpKPViWX+5NQ==" + }, "bindings": { "version": "1.3.0", "resolved": "https://registry.npmjs.org/bindings/-/bindings-1.3.0.tgz", @@ -597,6 +602,24 @@ "is-buffer": "^1.1.5" } }, + "kinesalite": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/kinesalite/-/kinesalite-2.0.0.tgz", + "integrity": "sha512-NApm1Utrw5lT/bR6MM3kCBV+O63CSdP4/2XusmnoEcohrLDcvW31lnfU37177q3G4S96OXbaZeRzxqzbjJk2LA==", + "requires": { + "async": "^2.6.1", + "bignumber.js": "^8.0.2", + "lazy": "^1.0.11", + "leveldown": "^4.0.1", + "levelup": "^4.0.0", + "lock": "^1.1.0", + "memdown": "^3.0.0", + "minimist": "^1.2.0", + "node-uuid": "^1.4.7", + "once": "^1.3.3", + "subleveldown": "^3.0.1" + } + }, "lazy": { "version": "1.0.11", "resolved": "https://registry.npmjs.org/lazy/-/lazy-1.0.11.tgz", @@ -831,6 +854,11 @@ "semver": "^5.4.1" } }, + "node-uuid": { + "version": "1.4.8", + "resolved": "https://registry.npmjs.org/node-uuid/-/node-uuid-1.4.8.tgz", + "integrity": "sha1-sEDrCSOWivq/jTL7HxfxFn/auQc=" + }, "noop-logger": { "version": "0.1.1", "resolved": "https://registry.npmjs.org/noop-logger/-/noop-logger-0.1.1.tgz", diff --git a/package.json b/package.json index 6fbbfa8..70ffa81 100644 --- a/package.json +++ b/package.json @@ -25,6 +25,7 @@ "async": "^2.1.4", "big.js": "^5.2.2", "buffer-crc32": "^0.2.13", + "kinesalite": "^2.0.0", "lazy": "^1.0.11", "levelup": "^4.0.0", "lock": "^1.1.0", diff --git a/test/connection.js b/test/connection.js index f47b4cd..3ea380e 100644 --- a/test/connection.js +++ b/test/connection.js @@ -1,7 +1,8 @@ var https = require('https'), once = require('once'), dynalite = require('..'), - helpers = require('./helpers') + helpers = require('./helpers'), + utils = require('../utils') var request = helpers.request @@ -75,7 +76,7 @@ describe('dynalite connections', function() { request({method: 'GET', noSign: true}, function(err, res) { if (err) return done(err) res.statusCode.should.equal(200) - res.body.should.equal('healthy: dynamodb.' + helpers.awsRegion + '.amazonaws.com ') + res.body.should.equal('healthy: dynamodb.' + utils.awsRegion + '.amazonaws.com ') res.headers['x-amz-crc32'].should.match(/^[0-9]+$/) res.headers['content-length'].should.equal(res.body.length.toString()) res.headers['x-amzn-requestid'].should.match(/^[0-9A-Z]{52}$/) diff --git a/test/createTable.js b/test/createTable.js index fba64f2..56ecc57 100644 --- a/test/createTable.js +++ b/test/createTable.js @@ -1,12 +1,14 @@ var helpers = require('./helpers'), - should = require('should') + should = require('should'), + utils = require('../utils') var target = 'CreateTable', request = helpers.request, randomName = helpers.randomName, opts = helpers.opts.bind(null, target), assertType = helpers.assertType.bind(null, target), - assertValidation = helpers.assertValidation.bind(null, target) + assertValidation = helpers.assertValidation.bind(null, target), + awsRegion = utils.awsRegion describe('createTable', function() { @@ -1141,7 +1143,7 @@ describe('createTable', function() { desc.CreationDateTime.should.be.above(createdAt - 5) delete desc.CreationDateTime desc.TableArn.should.match(new RegExp( - 'arn:aws:dynamodb:' + helpers.awsRegion + ':\\d+:table/' + table.TableName)) + 'arn:aws:dynamodb:' + awsRegion + ':\\d+:table/' + table.TableName)) delete desc.TableArn table.ItemCount = 0 table.ProvisionedThroughput.NumberOfDecreasesToday = 0 @@ -1214,11 +1216,11 @@ describe('createTable', function() { desc.CreationDateTime.should.be.above(createdAt - 5) delete desc.CreationDateTime desc.TableArn.should.match(new RegExp( - 'arn:aws:dynamodb:' + helpers.awsRegion + ':\\d+:table/' + table.TableName)) + 'arn:aws:dynamodb:' + awsRegion + ':\\d+:table/' + table.TableName)) delete desc.TableArn desc.LocalSecondaryIndexes.forEach(function(index) { index.IndexArn.should.match(new RegExp( - 'arn:aws:dynamodb:' + helpers.awsRegion + ':\\d+:table/' + table.TableName + '/index/' + index.IndexName)) + 'arn:aws:dynamodb:' + awsRegion + ':\\d+:table/' + table.TableName + '/index/' + index.IndexName)) delete index.IndexArn }) table.ItemCount = 0 @@ -1284,11 +1286,11 @@ describe('createTable', function() { desc.CreationDateTime.should.be.above(createdAt - 5) delete desc.CreationDateTime desc.TableArn.should.match(new RegExp( - 'arn:aws:dynamodb:' + helpers.awsRegion + ':\\d+:table/' + table.TableName)) + 'arn:aws:dynamodb:' + awsRegion + ':\\d+:table/' + table.TableName)) delete desc.TableArn desc.GlobalSecondaryIndexes.forEach(function(index) { index.IndexArn.should.match(new RegExp( - 'arn:aws:dynamodb:' + helpers.awsRegion + ':\\d+:table/' + table.TableName + '/index/' + index.IndexName)) + 'arn:aws:dynamodb:' + awsRegion + ':\\d+:table/' + table.TableName + '/index/' + index.IndexName)) delete index.IndexArn }) table.ItemCount = 0 diff --git a/test/getRecords.js b/test/getRecords.js new file mode 100644 index 0000000..d073382 --- /dev/null +++ b/test/getRecords.js @@ -0,0 +1,311 @@ +var helpers = require('./helpers'), + utils = require('../utils') + +var target = 'GetRecords', + request = helpers.request, + randomName = helpers.randomName, + opts = helpers.opts.bind(null, target) + +var tableName = randomName(), + streamViewType = 'NEW_AND_OLD_IMAGES', + streamArn, shardId + +describe('getRecords', function() { + + describe('serializations', function() { + }) + + describe('validations', function() { + }) + + describe('functionality', function() { + before(function(done) { + var table = { + TableName: tableName, + AttributeDefinitions: [{AttributeName: 'a', AttributeType: 'S'}], + KeySchema: [{AttributeName: 'a', KeyType: 'HASH'}], + ProvisionedThroughput: {ReadCapacityUnits: 1, WriteCapacityUnits: 1}, + StreamSpecification: {StreamEnabled: true, StreamViewType: streamViewType}, + } + + request(helpers.opts('CreateTable', table), function(err, res) { + if (err) return done(err) + res.statusCode.should.be.equal(200) + + streamArn = res.body.TableDescription.LatestStreamArn + + helpers.waitUntilActive(tableName, function(err) { + if (err) return done(err) + + var itemKey1 = {S: 'foo'}, item = { + a: itemKey1, + val: {S: 'bar'}, + other: {S: 'baz'}, + } + request(helpers.opts('PutItem', {TableName: tableName, Item: item}), function(err, res) { + if (err) return done(err) + res.statusCode.should.be.equal(200) + + var item = { + a: {S: 'meaning-of-life'}, + answer: {N: '42'}, + } + request(helpers.opts('PutItem', {TableName: tableName, Item: item}), function(err, res) { + if (err) return done(err) + res.statusCode.should.be.equal(200) + + var req = { + TableName: tableName, + Key: {a: itemKey1}, + UpdateExpression: 'SET #ATT = :VAL', + ExpressionAttributeNames: {'#ATT': 'val'}, + ExpressionAttributeValues: {':VAL': {S: 'New value'}}, + } + request(helpers.opts('UpdateItem', req), function(err, res) { + if (err) return done(err) + res.statusCode.should.be.equal(200) + + var req = { + TableName: tableName, + Key: {a: itemKey1}, + } + request(helpers.opts('DeleteItem', req), function(err, res) { + if (err) return done(err) + res.statusCode.should.be.equal(200) + + request(helpers.opts('DescribeStream', {StreamArn: streamArn}), function(err, res) { + if (err) return done(err) + res.statusCode.should.be.equal(200) + + shardId = res.body.StreamDescription.Shards[0].ShardId + + done() + }) + }) + }) + }) + }) + }) + }) + }) + + after(function(done) { + helpers.deleteWhenActive(tableName, done) + }) + + it('should return all stream records', function(done) { + var req = { + StreamArn: streamArn, + ShardId: shardId, + ShardIteratorType: 'TRIM_HORIZON', + } + request(helpers.opts('GetShardIterator', req), function(err, res) { + if (err) return done(err) + res.statusCode.should.be.equal(200) + + var iterator = res.body.ShardIterator + request(opts({ShardIterator: iterator}), function(err, res) { + if (err) return done(err) + res.statusCode.should.be.equal(200) + + res.body.NextShardIterator.should.not.be.empty + res.body.Records.should.be.Array() + res.body.Records.should.have.length(4) + + var lastSequenceNumber = 0 + for (var i = 0; i < 4; i++) { + res.body.Records[i].awsRegion.should.be.equal(utils.awsRegion) + res.body.Records[i].eventID.should.not.be.empty + res.body.Records[i].eventSource.should.be.equal('aws:dynamodb') + res.body.Records[i].eventVersion.should.be.equal('1.1') + + res.body.Records[i].dynamodb.ApproximateCreationDateTime.should.not.be.empty + res.body.Records[i].dynamodb.SequenceNumber.should.be.above(lastSequenceNumber) + res.body.Records[i].dynamodb.SizeBytes.should.be.above(0) + res.body.Records[i].dynamodb.StreamViewType.should.be.equal(streamViewType) + + lastSequenceNumber = res.body.Records[i].dynamodb.SequenceNumber + } + + res.body.Records[0].eventName.should.be.equal('INSERT') + res.body.Records[0].dynamodb.Keys.should.be.eql({a: {S: 'foo'}}) + res.body.Records[0].dynamodb.should.not.have.property('OldImage') + res.body.Records[0].dynamodb.NewImage.should.be.eql({a: {S: 'foo'}, val: {S: 'bar'}, other: {S: 'baz'}}) + + res.body.Records[1].eventName.should.be.equal('INSERT') + res.body.Records[1].dynamodb.Keys.should.be.eql({a: {S: 'meaning-of-life'}}) + res.body.Records[1].dynamodb.should.not.have.property('OldImage') + res.body.Records[1].dynamodb.NewImage.should.be.eql({a: {S: 'meaning-of-life'}, answer: {N: '42'}}) + + res.body.Records[2].eventName.should.be.equal('MODIFY') + res.body.Records[2].dynamodb.Keys.should.be.eql({a: {S: 'foo'}}) + res.body.Records[2].dynamodb.OldImage.should.be.eql({a: {S: 'foo'}, val: {S: 'bar'}, other: {S: 'baz'}}) + res.body.Records[2].dynamodb.NewImage.should.be.eql({a: {S: 'foo'}, val: {S: 'New value'}, other: {S: 'baz'}}) + + res.body.Records[3].eventName.should.be.equal('REMOVE') + res.body.Records[3].dynamodb.Keys.should.be.eql({a: {S: 'foo'}}) + res.body.Records[3].dynamodb.OldImage.should.be.eql({a: {S: 'foo'}, val: {S: 'New value'}, other: {S: 'baz'}}) + res.body.Records[3].dynamodb.should.not.have.property('NewImage') + + done() + }) + }) + }) + + it('should return respect limit and return valid next iterator', function(done) { + var req = { + StreamArn: streamArn, + ShardId: shardId, + ShardIteratorType: 'TRIM_HORIZON', + } + request(helpers.opts('GetShardIterator', req), function(err, res) { + if (err) return done(err) + res.statusCode.should.be.equal(200) + + var iterator = res.body.ShardIterator + request(opts({ShardIterator: iterator, Limit: 2}), function(err, res) { + if (err) return done(err) + res.statusCode.should.be.equal(200) + + res.body.NextShardIterator.should.not.be.empty + res.body.Records.should.be.Array() + res.body.Records.should.have.length(2) + + var lastSequenceNumber = 0 + for (var i = 0; i < 2; i++) { + res.body.Records[i].awsRegion.should.be.equal(utils.awsRegion) + res.body.Records[i].eventID.should.not.be.empty + res.body.Records[i].eventName.should.be.equal('INSERT') + res.body.Records[i].eventSource.should.be.equal('aws:dynamodb') + res.body.Records[i].eventVersion.should.be.equal('1.1') + + res.body.Records[i].dynamodb.should.not.have.property('OldImage') + res.body.Records[i].dynamodb.ApproximateCreationDateTime.should.not.be.empty + res.body.Records[i].dynamodb.SequenceNumber.should.be.above(lastSequenceNumber) + res.body.Records[i].dynamodb.SizeBytes.should.be.above(0) + res.body.Records[i].dynamodb.StreamViewType.should.be.equal(streamViewType) + + lastSequenceNumber = res.body.Records[i].dynamodb.SequenceNumber + } + + res.body.Records[0].dynamodb.Keys.should.be.eql({a: {S: 'foo'}}) + res.body.Records[0].dynamodb.NewImage.should.be.eql({a: {S: 'foo'}, val: {S: 'bar'}, other: {S: 'baz'}}) + + res.body.Records[1].dynamodb.Keys.should.be.eql({a: {S: 'meaning-of-life'}}) + res.body.Records[1].dynamodb.NewImage.should.be.eql({a: {S: 'meaning-of-life'}, answer: {N: '42'}}) + + var iterator = res.body.NextShardIterator + request(opts({ShardIterator: iterator, Limit: 2}), function(err, res) { + if (err) return done(err) + res.statusCode.should.be.equal(200) + + res.body.NextShardIterator.should.not.be.empty + res.body.Records.should.be.Array() + res.body.Records.should.have.length(2) + + var lastSequenceNumber = 0 + for (var i = 0; i < 2; i++) { + res.body.Records[i].awsRegion.should.be.equal(utils.awsRegion) + res.body.Records[i].eventID.should.not.be.empty + res.body.Records[i].eventSource.should.be.equal('aws:dynamodb') + res.body.Records[i].eventVersion.should.be.equal('1.1') + + res.body.Records[i].dynamodb.Keys.should.be.eql({a: {S: 'foo'}}) + res.body.Records[i].dynamodb.ApproximateCreationDateTime.should.not.be.empty + res.body.Records[i].dynamodb.SequenceNumber.should.be.above(lastSequenceNumber) + res.body.Records[i].dynamodb.SizeBytes.should.be.above(0) + res.body.Records[i].dynamodb.StreamViewType.should.be.equal(streamViewType) + + lastSequenceNumber = res.body.Records[i].dynamodb.SequenceNumber + } + + res.body.Records[0].eventName.should.be.equal('MODIFY') + res.body.Records[0].dynamodb.OldImage.should.be.eql({a: {S: 'foo'}, val: {S: 'bar'}, other: {S: 'baz'}}) + res.body.Records[0].dynamodb.NewImage.should.be.eql({a: {S: 'foo'}, val: {S: 'New value'}, other: {S: 'baz'}}) + + res.body.Records[1].eventName.should.be.equal('REMOVE') + res.body.Records[1].dynamodb.OldImage.should.be.eql({a: {S: 'foo'}, val: {S: 'New value'}, other: {S: 'baz'}}) + res.body.Records[1].dynamodb.should.not.have.property('NewImage') + + var iterator = res.body.NextShardIterator + request(opts({ShardIterator: iterator}), function(err, res) { + if (err) return done(err) + res.statusCode.should.be.equal(200) + + res.body.NextShardIterator.should.not.be.empty + res.body.Records.should.be.Array() + res.body.Records.should.be.empty + + done() + }) + }) + }) + }) + }) + + it('latest iterator should return only new records', function(done) { + var req = { + StreamArn: streamArn, + ShardId: shardId, + ShardIteratorType: 'LATEST', + } + request(helpers.opts('GetShardIterator', req), function(err, res) { + if (err) return done(err) + res.statusCode.should.be.equal(200) + + var iterator = res.body.ShardIterator + request(opts({ShardIterator: iterator}), function(err, res) { + if (err) return done(err) + res.statusCode.should.be.equal(200) + + res.body.NextShardIterator.should.not.be.empty + res.body.Records.should.be.Array() + res.body.Records.should.be.empty + + var iterator = res.body.NextShardIterator + + request(helpers.opts('PutItem', {TableName: tableName, Item: {a: {S: 'New Item'}}}), function(err, res) { + if (err) return done(err) + res.statusCode.should.be.equal(200) + + request(opts({ShardIterator: iterator}), function(err, res) { + if (err) return done(err) + res.statusCode.should.be.equal(200) + + res.body.NextShardIterator.should.not.be.empty + res.body.Records.should.be.Array() + res.body.Records.should.have.length(1) + + res.body.Records[0].awsRegion.should.be.equal(utils.awsRegion) + res.body.Records[0].eventID.should.not.be.empty + res.body.Records[0].eventSource.should.be.equal('aws:dynamodb') + res.body.Records[0].eventVersion.should.be.equal('1.1') + res.body.Records[0].eventName.should.be.equal('INSERT') + res.body.Records[0].dynamodb.Keys.should.be.eql({a: {S: 'New Item'}}) + res.body.Records[0].dynamodb.ApproximateCreationDateTime.should.not.be.empty + res.body.Records[0].dynamodb.SequenceNumber.should.not.be.empty + res.body.Records[0].dynamodb.SizeBytes.should.be.above(0) + res.body.Records[0].dynamodb.StreamViewType.should.be.equal(streamViewType) + res.body.Records[0].dynamodb.NewImage.should.be.eql({a: {S: 'New Item'}}) + res.body.Records[0].dynamodb.should.not.have.property('OldImage') + + var iterator = res.body.NextShardIterator + request(opts({ShardIterator: iterator}), function(err, res) { + if (err) return done(err) + res.statusCode.should.be.equal(200) + + res.body.NextShardIterator.should.not.be.empty + res.body.Records.should.be.Array() + res.body.Records.should.be.empty + + done() + }) + }) + }) + }) + }) + }) + + }) + +}) diff --git a/test/helpers.js b/test/helpers.js index a40df77..a5be8c2 100644 --- a/test/helpers.js +++ b/test/helpers.js @@ -2,12 +2,12 @@ var http = require('http'), aws4 = require('aws4'), async = require('async'), once = require('once'), - dynalite = require('..') + dynalite = require('..'), + utils = require('../utils') http.globalAgent.maxSockets = Infinity exports.MAX_SIZE = 409600 -exports.awsRegion = process.env.AWS_REGION || process.env.AWS_DEFAULT_REGION || 'us-east-1' exports.version = 'DynamoDB_20120810' exports.prefix = '__dynalite_test_' exports.request = request @@ -47,7 +47,7 @@ exports.testRangeBTable = randomName() var port = 10000 + Math.round(Math.random() * 10000), requestOpts = process.env.REMOTE ? - {host: 'dynamodb.' + exports.awsRegion + '.amazonaws.com', method: 'POST'} : + {host: 'dynamodb.' + utils.awsRegion + '.amazonaws.com', method: 'POST'} : {host: '127.0.0.1', port: port, method: 'POST'} var dynaliteServer = dynalite({path: process.env.DYNALITE_PATH}) diff --git a/test/updateTable.js b/test/updateTable.js index ede61ec..3210601 100644 --- a/test/updateTable.js +++ b/test/updateTable.js @@ -5,6 +5,7 @@ var target = 'UpdateTable', opts = helpers.opts.bind(null, target), assertType = helpers.assertType.bind(null, target), assertValidation = helpers.assertValidation.bind(null, target), + assertInUse = helpers.assertInUse.bind(null, target) assertNotFound = helpers.assertNotFound.bind(null, target) describe('updateTable', function() { @@ -193,7 +194,7 @@ describe('updateTable', function() { it('should return ValidationException for empty GlobalSecondaryIndexUpdates', function(done) { assertValidation({TableName: 'abc', GlobalSecondaryIndexUpdates: []}, - 'At least one of ProvisionedThroughput, UpdateStreamEnabled, GlobalSecondaryIndexUpdates or SSESpecification is required', done) + 'At least one of ProvisionedThroughput, BillingMode, UpdateStreamEnabled, GlobalSecondaryIndexUpdates or SSESpecification is required', done) }) it('should return ValidationException for empty Update', function(done) { @@ -310,6 +311,30 @@ describe('updateTable', function() { describe('functionality', function() { + it('should reject updates while table is in CREATING state', function(done) { + this.timeout(100000) + var tableName = helpers.randomName(), + table = { + TableName: tableName, + AttributeDefinitions: [{AttributeName: 'a', AttributeType: 'S'}], + KeySchema: [{KeyType: 'HASH', AttributeName: 'a'}], + ProvisionedThroughput: {ReadCapacityUnits: 1, WriteCapacityUnits: 1}, + } + + request(helpers.opts('CreateTable', table), function(err, res) { + if (err) return done(err) + res.body.TableDescription.TableStatus.should.equal('CREATING') + + var data = { + TableName: tableName, + ProvisionedThroughput: {ReadCapacityUnits: 2, WriteCapacityUnits: 2} + } + + assertInUse(data, 'Attempt to change a resource which is still in use: ' + + 'Table is being created: ' + table.TableName, done) + }) + }) + it('should triple rates and then reduce if requested', function(done) { this.timeout(200000) exports.writeCapacity diff --git a/utils.js b/utils.js new file mode 100644 index 0000000..5dfb715 --- /dev/null +++ b/utils.js @@ -0,0 +1,91 @@ +var crypto = require('crypto'), + db = require('./db'), + kinesalitePutRecord = require('kinesalite/actions/putRecord') + +var awsRegion = process.env.AWS_REGION || process.env.AWS_DEFAULT_REGION || 'us-east-1' + +exports.awsRegion = awsRegion +exports.getTableNameFromStreamArn = getTableNameFromStreamArn +exports.createStreamRecord = createStreamRecord +exports.writeStreamRecord = writeStreamRecord +exports.makeShardIdLong = makeShardIdLong +exports.makeShardIdShort = makeShardIdShort + +function getTableNameFromStreamArn(streamArn) { + var streamArnParts = streamArn.split('/stream/'), + tableArn = streamArnParts[0], + tableArnParts = tableArn.split(':table/'), + tableName = tableArnParts[1] + + return tableName +} + +function createStreamRecord(table, oldItem, newItem) { + var record = { + awsRegion: awsRegion, + dynamodb: { + ApproximateCreationDateTime: Math.floor(Date.now() / 1000), + Keys: {}, + SizeBytes: 0, + StreamViewType: table.StreamSpecification.StreamViewType, + }, + eventID: crypto.randomBytes(20).toString('hex'), + eventSource: 'aws:dynamodb', + eventVersion: '1.1', + } + + if (oldItem) { + record.dynamodb.OldImage = oldItem + record.dynamodb.SizeBytes += db.itemSize(oldItem, false, true) + record.eventName = 'REMOVE' + } + if (newItem) { + record.dynamodb.NewImage = newItem + record.dynamodb.SizeBytes += db.itemSize(newItem, false, true) + + if (record.eventName) { + record.eventName = 'MODIFY' + } else { + record.eventName = 'INSERT' + } + } + + db.traverseKey(table, function(attr) { + if (newItem) { + return record.dynamodb.Keys[attr] = newItem[attr] + } else { + return record.dynamodb.Keys[attr] = oldItem[attr] + } + }) + + return record +} + +function writeStreamRecord(store, tableName, streamRecord, cb) { + kinesalitePutRecord(store.kinesalite, { + StreamName: tableName, + Data: JSON.stringify(streamRecord), + PartitionKey: 'foo' + }, function(err) { + if (err) return cb(err) + + if (store.options.verbose) { + console.log('Wrote ' + streamRecord.eventName + ' record to stream for table ' + tableName) + } + + cb() + }) +} + +// Kinesis shard ids only need to be 1 character long, but Dynamo shard ids +// need to be 28 characters long. The shard ids generated by kinesalite are +// too short, so (1) expose a longer version from Dynalite, and (2) shorten +// them again before sending to kinesalite. +var shardIdPrefix = 'dynamo-compatible-' +function makeShardIdLong(kinesisShardId) { + return shardIdPrefix + kinesisShardId +} + +function makeShardIdShort(dynamoShardId) { + return dynamoShardId.slice(shardIdPrefix.length) +} diff --git a/validations/createTable.js b/validations/createTable.js index 90afebc..5ef027a 100644 --- a/validations/createTable.js +++ b/validations/createTable.js @@ -171,6 +171,18 @@ exports.types = { }, }, }, + StreamSpecification: { + type: 'ValueStruct', + children: { + StreamEnabled: { + type: 'Boolean', + }, + StreamViewType: { + type: 'String', + enum: ['KEYS_ONLY', 'NEW_IMAGE', 'OLD_IMAGE', 'NEW_AND_OLD_IMAGES'], + }, + }, + }, } exports.custom = function(data) { diff --git a/validations/describeStream.js b/validations/describeStream.js new file mode 100644 index 0000000..178e25d --- /dev/null +++ b/validations/describeStream.js @@ -0,0 +1,20 @@ +exports.types = { + StreamArn: { + required: true, + type: 'String', + regex: '[a-zA-Z0-9\-\.:/_]+', + lengthGreaterThanOrEqual: 37, + lengthLessThanOrEqual: 1024, + }, + Limit: { + type: 'Integer', + greaterThanOrEqual: 1, + lessThanOrEqual: 100, + }, + ExclusiveStartShardId: { + type: 'String', + regex: '[a-zA-Z0-9\-]+', + lengthGreaterThanOrEqual: 28, + lengthLessThanOrEqual: 65, + }, +} diff --git a/validations/getRecords.js b/validations/getRecords.js new file mode 100644 index 0000000..424daa8 --- /dev/null +++ b/validations/getRecords.js @@ -0,0 +1,13 @@ +exports.types = { + ShardIterator: { + required: true, + type: 'String', + lengthGreaterThanOrEqual: 1, + lengthLessThanOrEqual: 2048, + }, + Limit: { + type: 'Integer', + greaterThanOrEqual: 1, + lessThanOrEqual: 1000, + }, +} diff --git a/validations/getShardIterator.js b/validations/getShardIterator.js new file mode 100644 index 0000000..777da1c --- /dev/null +++ b/validations/getShardIterator.js @@ -0,0 +1,27 @@ +exports.types = { + ShardId: { + required: true, + type: 'String', + regex: '[a-zA-Z0-9\-]+', + lengthGreaterThanOrEqual: 28, + lengthLessThanOrEqual: 65, + }, + ShardIteratorType: { + required: true, + type: 'String', + enum: ['TRIM_HORIZON', 'LATEST', 'AT_SEQUENCE_NUMBER', 'AFTER_SEQUENCE_NUMBER'], + }, + StreamArn: { + required: true, + type: 'String', + regex: '[a-zA-Z0-9\-\.:/_]+', + lengthGreaterThanOrEqual: 37, + lengthLessThanOrEqual: 1024, + }, + SequenceNumber: { + type: 'String', + regex: '\d+', + lengthGreaterThanOrEqual: 21, + lengthLessThanOrEqual: 40, + }, +} diff --git a/validations/listStreams.js b/validations/listStreams.js new file mode 100644 index 0000000..b6f2574 --- /dev/null +++ b/validations/listStreams.js @@ -0,0 +1,18 @@ +exports.types = { + Limit: { + type: 'Integer', + greaterThanOrEqual: 1, + lessThanOrEqual: 100, + }, + ExclusiveStartStreamArn: { + type: 'String', + regex: '[a-zA-Z0-9\-\.:/]+', + lengthGreaterThanOrEqual: 37, + lengthLessThanOrEqual: 1024, + }, + TableName: { + type: 'String', + tableName: true, + regex: '[a-zA-Z0-9_.-]+', + }, +} diff --git a/validations/updateTable.js b/validations/updateTable.js index 737d8eb..0b1d5de 100644 --- a/validations/updateTable.js +++ b/validations/updateTable.js @@ -130,13 +130,25 @@ exports.types = { }, }, }, + StreamSpecification: { + type: 'ValueStruct', + children: { + StreamEnabled: { + type: 'Boolean', + }, + StreamViewType: { + type: 'String', + enum: ['KEYS_ONLY', 'NEW_IMAGE', 'OLD_IMAGE', 'NEW_AND_OLD_IMAGES'], + }, + }, + }, } exports.custom = function(data) { - if (!data.ProvisionedThroughput && !data.UpdateStreamEnabled && + if (!data.ProvisionedThroughput && !data.StreamSpecification && !data.UpdateStreamEnabled && (!data.GlobalSecondaryIndexUpdates || !data.GlobalSecondaryIndexUpdates.length) && !data.SSESpecification) { - return 'At least one of ProvisionedThroughput, UpdateStreamEnabled, GlobalSecondaryIndexUpdates or SSESpecification is required' + return 'At least one of ProvisionedThroughput, BillingMode, UpdateStreamEnabled, GlobalSecondaryIndexUpdates or SSESpecification is required' } if (data.ProvisionedThroughput) {