From e852ca1094f7b3e29e18e5f0c2eea8372b5b0e4f Mon Sep 17 00:00:00 2001 From: Kiril Pirozenko Date: Tue, 20 Dec 2016 12:29:09 +0200 Subject: [PATCH 1/5] Linter added --- package.json | 4 +++- validations/query.js | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/package.json b/package.json index f9e1929..642cbcc 100644 --- a/package.json +++ b/package.json @@ -7,7 +7,8 @@ "scripts": { "build": "for file in ./db/*.pegjs; do pegjs \"$file\"; done", "test": "mocha --require should --reporter spec -t $([ $REMOTE ] && echo 30s || echo 4s)", - "coverage": "istanbul cover ./node_modules/.bin/_mocha -- --require should -t 4s" + "coverage": "istanbul cover ./node_modules/.bin/_mocha -- --require should -t 4s", + "lint": "eslint ." }, "repository": "mhart/dynalite", "keywords": [ @@ -38,6 +39,7 @@ }, "devDependencies": { "aws4": "^1.3.2", + "eslint": "^3.12.1", "istanbul": "^0.4.3", "mocha": "^2.4.5", "pegjs": "^0.9.0", diff --git a/validations/query.js b/validations/query.js index 8eb6a0c..2cc48ce 100644 --- a/validations/query.js +++ b/validations/query.js @@ -102,7 +102,7 @@ exports.custom = function(data) { ['AttributesToGet', 'QueryFilter', 'ConditionalOperator', 'KeyConditions']) if (msg) return msg - var i, key + var key msg = validations.validateConditions(data.QueryFilter) if (msg) return msg From 5aac42525f07906ddc6056685daf315b51dbf828 Mon Sep 17 00:00:00 2001 From: Kiril Pirozenko Date: Tue, 20 Dec 2016 12:31:05 +0200 Subject: [PATCH 2/5] Test fixed to not depend on order of items --- test/scan.js | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/test/scan.js b/test/scan.js index 8ad25c2..50e6ede 100644 --- a/test/scan.js +++ b/test/scan.js @@ -3051,13 +3051,15 @@ describe('scan', function() { if (err) return done(err) res.statusCode.should.equal(200) - request(opts({TableName: helpers.testHashTable, AttributesToGet: ['a'], Limit: 100000}), function(err, res) { + request(opts({TableName: helpers.testHashTable, AttributesToGet: ['b'], Limit: 100000}), function(err, res) { if (err) return done(err) res.statusCode.should.equal(200) res.body.Count.should.equal(res.body.ScannedCount) should.not.exist(res.body.LastEvaluatedKey) for (var i = 0, lastIx = 0; i < res.body.Count; i++) { - if (res.body.Items[i].a.S < 5) lastIx = i + if (res.body.Items[i]['b'] && + res.body.Items[i].b['S'] && + res.body.Items[i].b.S == b.S) lastIx = i } var totalItems = res.body.Count request(opts({TableName: helpers.testHashTable, ScanFilter: scanFilter, Limit: lastIx}), function(err, res) { From 5e70a5fceb35b6470139779f5b24d5c0bf1ede9b Mon Sep 17 00:00:00 2001 From: Kiril Pirozenko Date: Tue, 20 Dec 2016 12:34:05 +0200 Subject: [PATCH 3/5] Support for DynamoDB Streams endpoints Hardcoded AWS region also got removed. --- cli.js | 2 +- index.js | 39 ++++++++++++++++++++---------- test/connection.js | 12 ++++++---- test/helpers.js | 60 +++++++++++++++++++++++++++++++++++----------- test/listTables.js | 5 ++-- 5 files changed, 83 insertions(+), 35 deletions(-) diff --git a/cli.js b/cli.js index 3489432..09569ca 100755 --- a/cli.js +++ b/cli.js @@ -24,7 +24,7 @@ if (argv.help) { ].join('\n')) } -var server = require('./index.js')(argv).listen(argv.port || 4567, function() { +var server = require('./index.js').server(argv).listen(argv.port || 4567, function() { var address = server.address(), protocol = argv.ssl ? 'https' : 'http' // eslint-disable-next-line no-console console.log('Listening at %s://%s:%s', protocol, address.address, address.port) diff --git a/index.js b/index.js index a0f5da6..1a2084a 100644 --- a/index.js +++ b/index.js @@ -10,13 +10,21 @@ var http = require('http'), var MAX_REQUEST_BYTES = 16 * 1024 * 1024 -var validApis = ['DynamoDB_20111205', 'DynamoDB_20120810'], - validOperations = ['BatchGetItem', 'BatchWriteItem', 'CreateTable', 'DeleteItem', 'DeleteTable', - 'DescribeTable', 'GetItem', 'ListTables', 'PutItem', 'Query', 'Scan', 'UpdateItem', 'UpdateTable'], +var dynamoApi = 'DynamoDB_20120810', + streamsApi = 'DynamoDBStreams_20120810' + +var validApis = {}, actions = {}, actionValidations = {} -module.exports = dynalite +validApis[dynamoApi] = ['BatchGetItem', 'BatchWriteItem', 'CreateTable', 'DeleteItem', 'DeleteTable', + 'DescribeTable', 'GetItem', 'ListTables', 'PutItem', 'Query', 'Scan', 'UpdateItem', 'UpdateTable'] +validApis[streamsApi] = [] + +exports.server = dynalite +exports.dynamoApi = dynamoApi +exports.streamsApi = streamsApi +exports.validApis = validApis function dynalite(options) { options = options || {} @@ -48,11 +56,15 @@ function dynalite(options) { return server } -validOperations.forEach(function(action) { - action = validations.toLowerFirst(action) - actions[action] = require('./actions/' + action) - actionValidations[action] = require('./validations/' + action) -}) +for (var api in validApis) { + if (validApis.hasOwnProperty(api)) { + validApis[api].forEach(function(action) { + action = validations.toLowerFirst(action) + actions[action] = require('./actions/' + action) + actionValidations[action] = require('./validations/' + action) + }) + } +} function rand52CharId() { // 39 bytes turns into 52 base64 characters @@ -113,10 +125,11 @@ function httpHandler(store, req, res) { if (req.method == 'GET') { req.removeAllListeners() + body = 'healthy: dynamodb.' + store.tableDb.awsRegion + '.amazonaws.com ' res.statusCode = 200 - res.setHeader('x-amz-crc32', 3128867991) - res.setHeader('Content-Length', 42) - return res.end('healthy: dynamodb.us-east-1.amazonaws.com ') + res.setHeader('x-amz-crc32', crc32.unsigned(body)) + res.setHeader('Content-Length', Buffer.byteLength(body, 'utf8')) + return res.end(body) } var contentType = (req.headers['content-type'] || '').split(';')[0].trim() @@ -146,7 +159,7 @@ function httpHandler(store, req, res) { var target = (req.headers['x-amz-target'] || '').split('.') - if (target.length != 2 || !~validApis.indexOf(target[0]) || !~validOperations.indexOf(target[1])) + if (target.length != 2 || !validApis[target[0]] || !~validApis[target[0]].indexOf(target[1])) return sendData(req, res, {__type: 'com.amazon.coral.service#UnknownOperationException'}, 400) var authHeader = req.headers.authorization diff --git a/test/connection.js b/test/connection.js index 2ba65be..d00b68d 100644 --- a/test/connection.js +++ b/test/connection.js @@ -1,7 +1,9 @@ var https = require('https'), once = require('once'), + crc32 = require('buffer-crc32'), dynalite = require('..'), - request = require('./helpers').request + helpers = require('./helpers'), + request = helpers.request describe('dynalite connections', function() { @@ -73,9 +75,9 @@ describe('dynalite connections', function() { request({method: 'GET', noSign: true}, function(err, res) { if (err) return done(err) res.statusCode.should.equal(200) - res.body.should.equal('healthy: dynamodb.us-east-1.amazonaws.com ') - res.headers['x-amz-crc32'].should.equal('3128867991') - res.headers['content-length'].should.equal('42') + res.body.should.equal('healthy: dynamodb.' + helpers.awsRegion + '.amazonaws.com ') + res.headers['x-amz-crc32'].should.equal(String(crc32.unsigned(res.body))) + res.headers['content-length'].should.equal(String(Buffer.byteLength(res.body, 'utf8'))) res.headers['x-amzn-requestid'].should.match(/^[0-9A-Z]{52}$/) done() }) @@ -106,7 +108,7 @@ describe('dynalite connections', function() { }) it('should connect to SSL', function(done) { - var port = 10000 + Math.round(Math.random() * 10000), dynaliteServer = dynalite({ssl: true}) + var port = 10000 + Math.round(Math.random() * 10000), dynaliteServer = dynalite.server({ssl: true}) dynaliteServer.listen(port, function(err) { if (err) return done(err) diff --git a/test/helpers.js b/test/helpers.js index 1c6809d..4b91ab8 100644 --- a/test/helpers.js +++ b/test/helpers.js @@ -8,7 +8,6 @@ http.globalAgent.maxSockets = Infinity exports.MAX_SIZE = 409600 exports.awsRegion = process.env.AWS_REGION || process.env.AWS_DEFAULT_REGION || 'us-east-1' -exports.version = 'DynamoDB_20120810' exports.prefix = '__dynalite_test_' exports.request = request exports.opts = opts @@ -45,36 +44,47 @@ exports.testRangeBTable = randomName() // exports.testRangeNTable = '__dynalite_test_4' // exports.testRangeBTable = '__dynalite_test_5' -var port = 10000 + Math.round(Math.random() * 10000), - requestOpts = process.env.REMOTE ? - {host: 'dynamodb.us-east-1.amazonaws.com', method: 'POST'} : - {host: '127.0.0.1', port: port, method: 'POST'} +var endpointLocal = {host: '127.0.0.1', port: 10000 + Math.round(Math.random() * 10000), method: 'POST'}, + endpointDymamo = {host: 'dynamodb.' + exports.awsRegion + '.amazonaws.com', method: 'POST'}, + endpointStreams = {host: 'streams.dynamodb.' + exports.awsRegion + '.amazonaws.com', method: 'POST'}, + requestOpts = {} -var dynaliteServer = dynalite({path: process.env.DYNALITE_PATH}) +requestOpts[dynalite.dynamoApi] = process.env.REMOTE ? endpointDymamo : endpointLocal +requestOpts[dynalite.streamsApi] = process.env.REMOTE ? endpointStreams : endpointLocal + +var dynaliteServer = dynalite.server({path: process.env.DYNALITE_PATH}) before(function(done) { this.timeout(200000) - dynaliteServer.listen(port, function(err) { - if (err) return done(err) + if (process.env.REMOTE) { createTestTables(done) - // done() - }) + } else { + dynaliteServer.listen(endpointLocal.port, function(err) { + if (err) return done(err) + createTestTables(done) + }) + } }) after(function(done) { this.timeout(200000) deleteTestTables(function(err) { if (err) return done(err) - dynaliteServer.close(done) + if (process.env.REMOTE) { + done() + } else { + dynaliteServer.close(done) + } }) }) function request(opts, cb) { if (typeof opts === 'function') { cb = opts; opts = {} } cb = once(cb) - for (var key in requestOpts) { + var api = getApiForOpts(opts) + for (var key in requestOpts[api]) { if (opts[key] === undefined) - opts[key] = requestOpts[key] + opts[key] = requestOpts[api][key] } if (!opts.noSign) { aws4.sign(opts) @@ -104,12 +114,34 @@ function opts(target, data) { return { headers: { 'Content-Type': 'application/x-amz-json-1.0', - 'X-Amz-Target': exports.version + '.' + target, + 'X-Amz-Target': getApiForTarget(target) + '.' + target, }, body: JSON.stringify(data), } } +function getApiForTarget(target) { + for (var api in dynalite.validApis) { + if (dynalite.validApis[api].indexOf(target) !== -1) + return api + } + + throw new Error('Unsupported target: ' + target) +} + +function getApiForOpts(opts) { + if (opts && opts['headers'] && opts.headers['X-Amz-Target']) { + var target = opts.headers['X-Amz-Target'].split('.') + if (target.length === 2) { + var api = target[0] + if (dynalite.validApis[api]) + return api + } + } + + return dynalite.dynamoApi +} + function randomString() { return ('AAAAAAAAA' + randomNumber()).slice(-10) } diff --git a/test/listTables.js b/test/listTables.js index 802b210..fa10455 100644 --- a/test/listTables.js +++ b/test/listTables.js @@ -1,6 +1,7 @@ var should = require('should'), async = require('async'), - helpers = require('./helpers') + helpers = require('./helpers'), + dynalite = require('..') var target = 'ListTables', request = helpers.request, @@ -15,7 +16,7 @@ describe('listTables', function() { describe('serializations', function() { it('should return 400 if no body', function(done) { - request({headers: {'x-amz-target': helpers.version + '.' + target}}, function(err, res) { + request({headers: {'x-amz-target': dynalite.dynamoApi + '.' + target}}, function(err, res) { if (err) return done(err) res.statusCode.should.equal(400) res.body.should.eql({__type: 'com.amazon.coral.service#SerializationException'}) From cb2fb95e0de9014406c58cdf7d51dc6b5aff7c18 Mon Sep 17 00:00:00 2001 From: Kiril Pirozenko Date: Tue, 20 Dec 2016 12:42:28 +0200 Subject: [PATCH 4/5] DynamoDB Streams implementation --- README.md | 1 - actions/createTable.js | 4 + actions/deleteItem.js | 14 +- actions/describeStream.js | 30 ++++ actions/getRecords.js | 51 ++++++ actions/getShardIterator.js | 12 ++ actions/listStreams.js | 34 ++++ actions/putItem.js | 14 +- actions/updateItem.js | 14 +- db/index.js | 10 ++ db/streams.js | 121 +++++++++++++ index.js | 2 +- test/describeStream.js | 71 ++++++++ test/getRecords.js | 310 ++++++++++++++++++++++++++++++++ test/getShardIterator.js | 60 +++++++ test/listStreams.js | 88 +++++++++ validations/createTable.js | 12 ++ validations/describeStream.js | 20 +++ validations/getRecords.js | 13 ++ validations/getShardIterator.js | 27 +++ validations/listStreams.js | 18 ++ 21 files changed, 918 insertions(+), 8 deletions(-) create mode 100644 actions/describeStream.js create mode 100644 actions/getRecords.js create mode 100644 actions/getShardIterator.js create mode 100644 actions/listStreams.js create mode 100644 db/streams.js create mode 100644 test/describeStream.js create mode 100644 test/getRecords.js create mode 100644 test/getShardIterator.js create mode 100644 test/listStreams.js create mode 100644 validations/describeStream.js create mode 100644 validations/getRecords.js create mode 100644 validations/getShardIterator.js create mode 100644 validations/listStreams.js diff --git a/README.md b/README.md index 65cf6fd..a69ec19 100644 --- a/README.md +++ b/README.md @@ -90,7 +90,6 @@ $ npm install -g dynalite TODO ---- -- Implement DynamoDB Streams - Implement `ReturnItemCollectionMetrics` on all remaining endpoints - Implement size info for tables and indexes - Add ProvisionedThroughput checking diff --git a/actions/createTable.js b/actions/createTable.js index ff8ddb0..ddbcdca 100644 --- a/actions/createTable.js +++ b/actions/createTable.js @@ -42,6 +42,10 @@ module.exports = function createTable(store, data, cb) { index.ProvisionedThroughput.NumberOfDecreasesToday = 0 }) } + if (data.StreamSpecification) { + data.LatestStreamLabel = (new Date()).toISOString().replace('Z', '') + data.LatestStreamArn = 'arn:aws:dynamodb:' + tableDb.awsRegion + ':' + tableDb.awsAccountId + ':table/' + data.TableName + '/stream/' + data.LatestStreamLabel + } tableDb.put(key, data, function(err) { if (err) return cb(err) diff --git a/actions/deleteItem.js b/actions/deleteItem.js index b94e5c0..e0a40e8 100644 --- a/actions/deleteItem.js +++ b/actions/deleteItem.js @@ -1,4 +1,5 @@ -var db = require('../db') +var db = require('../db'), + streams = require('../db/streams') module.exports = function deleteItem(store, data, cb) { @@ -29,7 +30,16 @@ module.exports = function deleteItem(store, data, cb) { itemDb.del(key, function(err) { if (err) return cb(err) - cb(null, returnObj) + + if (table.LatestStreamArn) { + streams.createStreamRecord(store, table, existingItem, null, function(err) { + if (err) return cb(err) + + cb(null, returnObj) + }) + } else { + cb(null, returnObj) + } }) }) }) diff --git a/actions/describeStream.js b/actions/describeStream.js new file mode 100644 index 0000000..dda1a85 --- /dev/null +++ b/actions/describeStream.js @@ -0,0 +1,30 @@ +module.exports = function describeStream(store, data, cb) { + var streamArnParts = data.StreamArn.split('/stream/'), + tableArn = streamArnParts[0], + tableArnParts = tableArn.split(':table/'), + tableName = tableArnParts[1] + + store.getTable(tableName, false, function(err, table) { + if (err) return cb(err) + + cb(null, { + StreamDescription: { + CreationRequestDateTime: table.CreationDateTime, + KeySchema: table.KeySchema, + Shards: [ + { + ShardId: 'shardId-00000000000000000000-00000001', + SequenceNumberRange: { + StartingSequenceNumber: '100000000000000000001', + }, + }, + ], + StreamArn: table.LatestStreamArn, + StreamLabel: table.LatestStreamLabel, + StreamStatus: 'ENABLED', + StreamViewType: table.StreamSpecification.StreamViewType, + TableName: table.TableName, + }, + }) + }) +} diff --git a/actions/getRecords.js b/actions/getRecords.js new file mode 100644 index 0000000..8d12d67 --- /dev/null +++ b/actions/getRecords.js @@ -0,0 +1,51 @@ +var once = require('once'), + db = require('../db'), + streams = require('../db/streams') + +module.exports = function getRecords(store, data, cb) { + cb = once(cb) + + var opts, limit = data.Limit || 100, + iterator = streams.decodeIterator(data.ShardIterator), + tableName = streams.getTableNameFromStreamArn(iterator.StreamArn) + + if (iterator.SequenceNumber) { + switch (iterator.ShardIteratorType) { + case 'AT_SEQUENCE_NUMBER': + opts = {gte: iterator.SequenceNumber} + break + case 'AFTER_SQUENCE_NUMBER': + opts = {gt: iterator.SequenceNumber} + break + } + } + + store.getTable(tableName, false, function(err, table) { + if (err) return cb(err) + + var streamDb = store.getStreamDb(table.LatestStreamArn) + db.lazy(streamDb.createValueStream(opts), cb) + .take(limit + 1) + .join(function(records) { + var result = {} + if (records.length > limit) { + iterator.ShardIteratorType = 'AT_SEQUENCE_NUMBER' + iterator.SequenceNumber = records[limit].eventID + + records.splice(limit) + + result.NextShardIterator = streams.encodeIterator(iterator) + } else if (records.length > 0) { + iterator.ShardIteratorType = 'AFTER_SQUENCE_NUMBER' + iterator.SequenceNumber = records[records.length - 1].eventID + + result.NextShardIterator = streams.encodeIterator(iterator) + } else { + result.NextShardIterator = data.ShardIterator + } + result.Records = records + + cb(null, result) + }) + }) +} diff --git a/actions/getShardIterator.js b/actions/getShardIterator.js new file mode 100644 index 0000000..a7ef92d --- /dev/null +++ b/actions/getShardIterator.js @@ -0,0 +1,12 @@ +var once = require('once'), + streams = require('../db/streams') + +module.exports = function getShardIterator(store, data, cb) { + cb = once(cb) + + streams.createShardIteratorToken(store, data, function(err, token) { + if (err) return cb(err) + + cb(null, {ShardIterator: token}) + }) +} diff --git a/actions/listStreams.js b/actions/listStreams.js new file mode 100644 index 0000000..df2e943 --- /dev/null +++ b/actions/listStreams.js @@ -0,0 +1,34 @@ +var once = require('once'), + db = require('../db'), + streams = require('../db/streams') + +module.exports = function listStreams(store, data, cb) { + cb = once(cb) + var opts, limit = data.Limit || 100 + + if (data.ExclusiveStartStreamArn) { + opts = {gt: streams.getTableNameFromStreamArn(data.ExclusiveStartStreamArn)} + } + + db.lazy(store.tableDb.createValueStream(opts), cb) + .filter(function(table) { + return table.hasOwnProperty('StreamSpecification') + }) + .take(limit + 1) + .map(function(table) { + return { + StreamArn: table.LatestStreamArn, + StreamLabel: table.LatestStreamLabel, + TableName: table.TableName, + } + }) + .join(function(streams) { + var result = {} + if (streams.length > limit) { + streams.splice(limit) + result.LastEvaluatedStreamArn = streams[streams.length - 1].StreamArn + } + result.Streams = streams + cb(null, result) + }) +} diff --git a/actions/putItem.js b/actions/putItem.js index e299dc0..9951fcf 100644 --- a/actions/putItem.js +++ b/actions/putItem.js @@ -1,4 +1,5 @@ -var db = require('../db') +var db = require('../db'), + streams = require('../db/streams') module.exports = function putItem(store, data, cb) { @@ -29,7 +30,16 @@ module.exports = function putItem(store, data, cb) { itemDb.put(key, data.Item, function(err) { if (err) return cb(err) - cb(null, returnObj) + + if (table.LatestStreamArn) { + streams.createStreamRecord(store, table, null, data.Item, function(err) { + if (err) return cb(err) + + cb(null, returnObj) + }) + } else { + cb(null, returnObj) + } }) }) }) diff --git a/actions/updateItem.js b/actions/updateItem.js index 72f7f7c..c78b240 100644 --- a/actions/updateItem.js +++ b/actions/updateItem.js @@ -1,5 +1,6 @@ var Big = require('big.js'), - db = require('../db') + db = require('../db'), + streams = require('../db/streams') module.exports = function updateItem(store, data, cb) { @@ -52,7 +53,16 @@ module.exports = function updateItem(store, data, cb) { itemDb.put(key, item, function(err) { if (err) return cb(err) - cb(null, returnObj) + + if (table.LatestStreamArn) { + streams.createStreamRecord(store, table, oldItem, item, function(err) { + if (err) return cb(err) + + cb(null, returnObj) + }) + } else { + cb(null, returnObj) + } }) }) }) diff --git a/db/index.js b/db/index.js index 7e5ef5a..62a6981 100644 --- a/db/index.js +++ b/db/index.js @@ -72,6 +72,14 @@ function create(options) { deleteSubDb('index-' + indexType.toLowerCase() + '~' + tableName + '~' + indexName, cb) } + function getStreamDb(streamArn) { + return getSubDb('stream-' + streamArn) + } + + function deleteStreamDb(streamArn, cb) { + deleteSubDb('stream-' + streamArn, cb) + } + function getSubDb(name) { if (!subDbs[name]) { subDbs[name] = sublevelDb.sublevel(name, {valueEncoding: 'json'}) @@ -127,6 +135,8 @@ function create(options) { deleteItemDb: deleteItemDb, getIndexDb: getIndexDb, deleteIndexDb: deleteIndexDb, + getStreamDb: getStreamDb, + deleteStreamDb: deleteStreamDb, getTable: getTable, recreate: recreate, } diff --git a/db/streams.js b/db/streams.js new file mode 100644 index 0000000..41e09e7 --- /dev/null +++ b/db/streams.js @@ -0,0 +1,121 @@ +var db = require('.'), + once = require('once'), + Big = require('big.js') + +exports.createStreamRecord = createStreamRecord +exports.createShardIteratorToken = createShardIteratorToken +exports.getTableNameFromStreamArn = getTableNameFromStreamArn +exports.encodeIterator = encodeIterator +exports.decodeIterator = decodeIterator + +function createStreamRecord(store, table, oldItem, newItem, cb) { + if (!oldItem && !newItem) { + throw new Error('Both old and new item are undefined') + } + + var streamDb = store.getStreamDb(table.LatestStreamArn) + + var insertStreamRecord = once(function(key) { + if (key) { + key = Big(key).plus(1).toString() + } else { + key = '100000000000000000001' + } + + var record = { + awsRegion: store.tableDb.awsRegion, + dynamodb: { + ApproximateCreationDateTime: Date.now(), + Keys: {}, + SequenceNumber: key, + SizeBytes: 0, + StreamViewType: table.StreamSpecification.StreamViewType, + }, + eventID: key, + eventSource: 'aws:dynamodb', + eventVersion: '1.1', + } + + if (oldItem) { + record.dynamodb.OldImage = oldItem + record.dynamodb.SizeBytes += db.itemSize(oldItem, false, true) + record.eventName = 'REMOVE' + } + if (newItem) { + record.dynamodb.NewImage = newItem + record.dynamodb.SizeBytes += db.itemSize(newItem, false, true) + + if (record.eventName) { + record.eventName = 'MODIFY' + } else { + record.eventName = 'INSERT' + } + } + + db.traverseKey(table, function(attr) { + if (newItem) { + return record.dynamodb.Keys[attr] = newItem[attr] + } else { + return record.dynamodb.Keys[attr] = oldItem[attr] + } + }) + + streamDb.put(key, record, cb) + }) + + db.lazy(streamDb.createKeyStream({reverse: true, limit: 1}), cb) + .on('end', insertStreamRecord) + .head(insertStreamRecord) +} + +function createShardIteratorToken(store, iterator, cb) { + if (iterator.ShardIteratorType == 'LATEST') { + var processKey = once(function(key) { + if (key) { + iterator.ShardIteratorType = 'AFTER_SQUENCE_NUMBER' + iterator.SequenceNumber = key + } else { + iterator.ShardIteratorType = 'TRIM_HORIZON' + } + + cb(null, encodeIterator(iterator)) + }) + + var tableName = getTableNameFromStreamArn(iterator.StreamArn) + store.getTable(tableName, false, function(err, table) { + if (err) return cb(err) + + var streamDb = store.getStreamDb(table.LatestStreamArn) + db.lazy(streamDb.createKeyStream({reverse: true, limit: 1}), cb) + .on('end', processKey) + .head(processKey) + }) + } else { + cb(null, encodeIterator(iterator)) + } +} + +function getTableNameFromStreamArn(streamArn) { + var streamArnParts = streamArn.split('/stream/'), + tableArn = streamArnParts[0], + tableArnParts = tableArn.split(':table/'), + tableName = tableArnParts[1] + + return tableName +} + +function encodeIterator(iterator) { + var encodedString = Buffer.from(JSON.stringify(iterator)).toString('base64'), + result = iterator.StreamArn + '|1|' + encodedString + + return result +} + +function decodeIterator(token) { + var iteratorParts = token.split('|1|'), + encodedString = iteratorParts[1], + decodedString = Buffer.from(encodedString, 'base64').toString('utf8'), + iterator = JSON.parse(decodedString) + + return iterator +} diff --git a/index.js b/index.js index 1a2084a..45e6d47 100644 --- a/index.js +++ b/index.js @@ -19,7 +19,7 @@ var validApis = {}, validApis[dynamoApi] = ['BatchGetItem', 'BatchWriteItem', 'CreateTable', 'DeleteItem', 'DeleteTable', 'DescribeTable', 'GetItem', 'ListTables', 'PutItem', 'Query', 'Scan', 'UpdateItem', 'UpdateTable'] -validApis[streamsApi] = [] +validApis[streamsApi] = ['ListStreams', 'DescribeStream', 'GetShardIterator', 'GetRecords'] exports.server = dynalite exports.dynamoApi = dynamoApi diff --git a/test/describeStream.js b/test/describeStream.js new file mode 100644 index 0000000..bf31a6e --- /dev/null +++ b/test/describeStream.js @@ -0,0 +1,71 @@ +var helpers = require('./helpers') + +var target = 'DescribeStream', + request = helpers.request, + randomName = helpers.randomName, + opts = helpers.opts.bind(null, target) + +describe('describeStream', function() { + + describe('serializations', function() { + }) + + describe('validations', function() { + }) + + describe('functionality', function() { + + it('should return stream description', function(done) { + var table = { + TableName: randomName(), + AttributeDefinitions: [{AttributeName: 'a', AttributeType: 'S'}], + KeySchema: [{AttributeName: 'a', KeyType: 'HASH'}], + ProvisionedThroughput: {ReadCapacityUnits: 1, WriteCapacityUnits: 1}, + StreamSpecification: {StreamEnabled: true, StreamViewType: 'NEW_AND_OLD_IMAGES'}, + } + + request(helpers.opts('CreateTable', table), function(err, res) { + if (err) return done(err) + + res.statusCode.should.be.equal(200) + res.body.TableDescription.LatestStreamArn.should.not.be.empty + res.body.TableDescription.LatestStreamLabel.should.not.be.empty + + var streamArn = res.body.TableDescription.LatestStreamArn, + streamLabel = res.body.TableDescription.LatestStreamLabel + + helpers.waitUntilActive(table.TableName, function(err) { + if (err) return done(err) + + request(opts({StreamArn: streamArn}), function(err, res) { + if (err) return done(err) + res.statusCode.should.be.equal(200) + res.body.StreamDescription.CreationRequestDateTime.should.not.be.empty + res.body.StreamDescription.KeySchema.should.be.Array() + res.body.StreamDescription.KeySchema.should.have.length(1) + res.body.StreamDescription.KeySchema[0].AttributeName.should.be.eql(table.KeySchema[0].AttributeName) + res.body.StreamDescription.KeySchema[0].KeyType.should.be.eql(table.KeySchema[0].KeyType) + res.body.StreamDescription.StreamArn.should.be.equal(streamArn) + res.body.StreamDescription.StreamLabel.should.be.equal(streamLabel) + res.body.StreamDescription.StreamStatus.should.be.equal('ENABLED') + res.body.StreamDescription.StreamViewType.should.be.equal(table.StreamSpecification.StreamViewType) + res.body.StreamDescription.TableName.should.be.equal(table.TableName) + + // This is newly created stream, so it's very unlikely it would have more than one shard + res.body.StreamDescription.Shards.should.be.Array() + res.body.StreamDescription.Shards.should.have.length(1) + res.body.StreamDescription.Shards[0].should.not.have.property('ParentShardId') + res.body.StreamDescription.Shards[0].ShardId.should.not.be.empty + res.body.StreamDescription.Shards[0].should.have.property('SequenceNumberRange') + res.body.StreamDescription.Shards[0].SequenceNumberRange.StartingSequenceNumber.should.not.be.empty + res.body.StreamDescription.Shards[0].SequenceNumberRange.should.not.have.property('EndingSequenceNumber') + + helpers.deleteWhenActive(table.TableName, done) + }) + }) + }) + }) + + }) + +}) diff --git a/test/getRecords.js b/test/getRecords.js new file mode 100644 index 0000000..0efee9f --- /dev/null +++ b/test/getRecords.js @@ -0,0 +1,310 @@ +var helpers = require('./helpers') + +var target = 'GetRecords', + request = helpers.request, + randomName = helpers.randomName, + opts = helpers.opts.bind(null, target) + +var tableName = randomName(), + streamViewType = 'NEW_AND_OLD_IMAGES', + streamArn, shardId + +describe('getRecords', function() { + + describe('serializations', function() { + }) + + describe('validations', function() { + }) + + describe('functionality', function() { + before(function(done) { + var table = { + TableName: tableName, + AttributeDefinitions: [{AttributeName: 'a', AttributeType: 'S'}], + KeySchema: [{AttributeName: 'a', KeyType: 'HASH'}], + ProvisionedThroughput: {ReadCapacityUnits: 1, WriteCapacityUnits: 1}, + StreamSpecification: {StreamEnabled: true, StreamViewType: streamViewType}, + } + + request(helpers.opts('CreateTable', table), function(err, res) { + if (err) return done(err) + res.statusCode.should.be.equal(200) + + streamArn = res.body.TableDescription.LatestStreamArn + + helpers.waitUntilActive(tableName, function(err) { + if (err) return done(err) + + var itemKey1 = {S: 'foo'}, item = { + a: itemKey1, + val: {S: 'bar'}, + other: {S: 'baz'}, + } + request(helpers.opts('PutItem', {TableName: tableName, Item: item}), function(err, res) { + if (err) return done(err) + res.statusCode.should.be.equal(200) + + var item = { + a: {S: 'meaning-of-life'}, + answer: {N: '42'}, + } + request(helpers.opts('PutItem', {TableName: tableName, Item: item}), function(err, res) { + if (err) return done(err) + res.statusCode.should.be.equal(200) + + var req = { + TableName: tableName, + Key: {a: itemKey1}, + UpdateExpression: 'SET #ATT = :VAL', + ExpressionAttributeNames: {'#ATT': 'val'}, + ExpressionAttributeValues: {':VAL': {S: 'New value'}}, + } + request(helpers.opts('UpdateItem', req), function(err, res) { + if (err) return done(err) + res.statusCode.should.be.equal(200) + + var req = { + TableName: tableName, + Key: {a: itemKey1}, + } + request(helpers.opts('DeleteItem', req), function(err, res) { + if (err) return done(err) + res.statusCode.should.be.equal(200) + + request(helpers.opts('DescribeStream', {StreamArn: streamArn}), function(err, res) { + if (err) return done(err) + res.statusCode.should.be.equal(200) + + shardId = res.body.StreamDescription.Shards[0].ShardId + + done() + }) + }) + }) + }) + }) + }) + }) + }) + + after(function(done) { + helpers.deleteWhenActive(tableName, done) + }) + + it('should return all stream records', function(done) { + var req = { + StreamArn: streamArn, + ShardId: shardId, + ShardIteratorType: 'TRIM_HORIZON', + } + request(helpers.opts('GetShardIterator', req), function(err, res) { + if (err) return done(err) + res.statusCode.should.be.equal(200) + + var iterator = res.body.ShardIterator + request(opts({ShardIterator: iterator}), function(err, res) { + if (err) return done(err) + res.statusCode.should.be.equal(200) + + res.body.NextShardIterator.should.not.be.empty + res.body.Records.should.be.Array() + res.body.Records.should.have.length(4) + + var lastSequenceNumber = 0 + for (var i = 0; i < 4; i++) { + res.body.Records[i].awsRegion.should.be.equal(helpers.awsRegion) + res.body.Records[i].eventID.should.not.be.empty + res.body.Records[i].eventSource.should.be.equal('aws:dynamodb') + res.body.Records[i].eventVersion.should.be.equal('1.1') + + res.body.Records[i].dynamodb.ApproximateCreationDateTime.should.not.be.empty + res.body.Records[i].dynamodb.SequenceNumber.should.be.above(lastSequenceNumber) + res.body.Records[i].dynamodb.SizeBytes.should.be.above(0) + res.body.Records[i].dynamodb.StreamViewType.should.be.equal(streamViewType) + + lastSequenceNumber = res.body.Records[i].dynamodb.SequenceNumber + } + + res.body.Records[0].eventName.should.be.equal('INSERT') + res.body.Records[0].dynamodb.Keys.should.be.eql({a: {S: 'foo'}}) + res.body.Records[0].dynamodb.should.not.have.property('OldImage') + res.body.Records[0].dynamodb.NewImage.should.be.eql({a: {S: 'foo'}, val: {S: 'bar'}, other: {S: 'baz'}}) + + res.body.Records[1].eventName.should.be.equal('INSERT') + res.body.Records[1].dynamodb.Keys.should.be.eql({a: {S: 'meaning-of-life'}}) + res.body.Records[1].dynamodb.should.not.have.property('OldImage') + res.body.Records[1].dynamodb.NewImage.should.be.eql({a: {S: 'meaning-of-life'}, answer: {N: '42'}}) + + res.body.Records[2].eventName.should.be.equal('MODIFY') + res.body.Records[2].dynamodb.Keys.should.be.eql({a: {S: 'foo'}}) + res.body.Records[2].dynamodb.OldImage.should.be.eql({a: {S: 'foo'}, val: {S: 'bar'}, other: {S: 'baz'}}) + res.body.Records[2].dynamodb.NewImage.should.be.eql({a: {S: 'foo'}, val: {S: 'New value'}, other: {S: 'baz'}}) + + res.body.Records[3].eventName.should.be.equal('REMOVE') + res.body.Records[3].dynamodb.Keys.should.be.eql({a: {S: 'foo'}}) + res.body.Records[3].dynamodb.OldImage.should.be.eql({a: {S: 'foo'}, val: {S: 'New value'}, other: {S: 'baz'}}) + res.body.Records[3].dynamodb.should.not.have.property('NewImage') + + done() + }) + }) + }) + + it('should return respect limit and return valid next iterator', function(done) { + var req = { + StreamArn: streamArn, + ShardId: shardId, + ShardIteratorType: 'TRIM_HORIZON', + } + request(helpers.opts('GetShardIterator', req), function(err, res) { + if (err) return done(err) + res.statusCode.should.be.equal(200) + + var iterator = res.body.ShardIterator + request(opts({ShardIterator: iterator, Limit: 2}), function(err, res) { + if (err) return done(err) + res.statusCode.should.be.equal(200) + + res.body.NextShardIterator.should.not.be.empty + res.body.Records.should.be.Array() + res.body.Records.should.have.length(2) + + var lastSequenceNumber = 0 + for (var i = 0; i < 2; i++) { + res.body.Records[i].awsRegion.should.be.equal(helpers.awsRegion) + res.body.Records[i].eventID.should.not.be.empty + res.body.Records[i].eventName.should.be.equal('INSERT') + res.body.Records[i].eventSource.should.be.equal('aws:dynamodb') + res.body.Records[i].eventVersion.should.be.equal('1.1') + + res.body.Records[i].dynamodb.should.not.have.property('OldImage') + res.body.Records[i].dynamodb.ApproximateCreationDateTime.should.not.be.empty + res.body.Records[i].dynamodb.SequenceNumber.should.be.above(lastSequenceNumber) + res.body.Records[i].dynamodb.SizeBytes.should.be.above(0) + res.body.Records[i].dynamodb.StreamViewType.should.be.equal(streamViewType) + + lastSequenceNumber = res.body.Records[i].dynamodb.SequenceNumber + } + + res.body.Records[0].dynamodb.Keys.should.be.eql({a: {S: 'foo'}}) + res.body.Records[0].dynamodb.NewImage.should.be.eql({a: {S: 'foo'}, val: {S: 'bar'}, other: {S: 'baz'}}) + + res.body.Records[1].dynamodb.Keys.should.be.eql({a: {S: 'meaning-of-life'}}) + res.body.Records[1].dynamodb.NewImage.should.be.eql({a: {S: 'meaning-of-life'}, answer: {N: '42'}}) + + var iterator = res.body.NextShardIterator + request(opts({ShardIterator: iterator, Limit: 2}), function(err, res) { + if (err) return done(err) + res.statusCode.should.be.equal(200) + + res.body.NextShardIterator.should.not.be.empty + res.body.Records.should.be.Array() + res.body.Records.should.have.length(2) + + var lastSequenceNumber = 0 + for (var i = 0; i < 2; i++) { + res.body.Records[i].awsRegion.should.be.equal(helpers.awsRegion) + res.body.Records[i].eventID.should.not.be.empty + res.body.Records[i].eventSource.should.be.equal('aws:dynamodb') + res.body.Records[i].eventVersion.should.be.equal('1.1') + + res.body.Records[i].dynamodb.Keys.should.be.eql({a: {S: 'foo'}}) + res.body.Records[i].dynamodb.ApproximateCreationDateTime.should.not.be.empty + res.body.Records[i].dynamodb.SequenceNumber.should.be.above(lastSequenceNumber) + res.body.Records[i].dynamodb.SizeBytes.should.be.above(0) + res.body.Records[i].dynamodb.StreamViewType.should.be.equal(streamViewType) + + lastSequenceNumber = res.body.Records[i].dynamodb.SequenceNumber + } + + res.body.Records[0].eventName.should.be.equal('MODIFY') + res.body.Records[0].dynamodb.OldImage.should.be.eql({a: {S: 'foo'}, val: {S: 'bar'}, other: {S: 'baz'}}) + res.body.Records[0].dynamodb.NewImage.should.be.eql({a: {S: 'foo'}, val: {S: 'New value'}, other: {S: 'baz'}}) + + res.body.Records[1].eventName.should.be.equal('REMOVE') + res.body.Records[1].dynamodb.OldImage.should.be.eql({a: {S: 'foo'}, val: {S: 'New value'}, other: {S: 'baz'}}) + res.body.Records[1].dynamodb.should.not.have.property('NewImage') + + var iterator = res.body.NextShardIterator + request(opts({ShardIterator: iterator}), function(err, res) { + if (err) return done(err) + res.statusCode.should.be.equal(200) + + res.body.NextShardIterator.should.not.be.empty + res.body.Records.should.be.Array() + res.body.Records.should.be.empty + + done() + }) + }) + }) + }) + }) + + it('latest iterator should return only new records', function(done) { + var req = { + StreamArn: streamArn, + ShardId: shardId, + ShardIteratorType: 'LATEST', + } + request(helpers.opts('GetShardIterator', req), function(err, res) { + if (err) return done(err) + res.statusCode.should.be.equal(200) + + var iterator = res.body.ShardIterator + request(opts({ShardIterator: iterator}), function(err, res) { + if (err) return done(err) + res.statusCode.should.be.equal(200) + + res.body.NextShardIterator.should.not.be.empty + res.body.Records.should.be.Array() + res.body.Records.should.be.empty + + var iterator = res.body.NextShardIterator + + request(helpers.opts('PutItem', {TableName: tableName, Item: {a: {S: 'New Item'}}}), function(err, res) { + if (err) return done(err) + res.statusCode.should.be.equal(200) + + request(opts({ShardIterator: iterator}), function(err, res) { + if (err) return done(err) + res.statusCode.should.be.equal(200) + + res.body.NextShardIterator.should.not.be.empty + res.body.Records.should.be.Array() + res.body.Records.should.have.length(1) + + res.body.Records[0].awsRegion.should.be.equal(helpers.awsRegion) + res.body.Records[0].eventID.should.not.be.empty + res.body.Records[0].eventSource.should.be.equal('aws:dynamodb') + res.body.Records[0].eventVersion.should.be.equal('1.1') + res.body.Records[0].eventName.should.be.equal('INSERT') + res.body.Records[0].dynamodb.Keys.should.be.eql({a: {S: 'New Item'}}) + res.body.Records[0].dynamodb.ApproximateCreationDateTime.should.not.be.empty + res.body.Records[0].dynamodb.SequenceNumber.should.not.be.empty + res.body.Records[0].dynamodb.SizeBytes.should.be.above(0) + res.body.Records[0].dynamodb.StreamViewType.should.be.equal(streamViewType) + res.body.Records[0].dynamodb.NewImage.should.be.eql({a: {S: 'New Item'}}) + res.body.Records[0].dynamodb.should.not.have.property('OldImage') + + var iterator = res.body.NextShardIterator + request(opts({ShardIterator: iterator}), function(err, res) { + if (err) return done(err) + res.statusCode.should.be.equal(200) + + res.body.NextShardIterator.should.not.be.empty + res.body.Records.should.be.Array() + res.body.Records.should.be.empty + + done() + }) + }) + }) + }) + }) + }) + + }) + +}) diff --git a/test/getShardIterator.js b/test/getShardIterator.js new file mode 100644 index 0000000..b6c171e --- /dev/null +++ b/test/getShardIterator.js @@ -0,0 +1,60 @@ +var helpers = require('./helpers') + +var target = 'GetShardIterator', + request = helpers.request, + randomName = helpers.randomName, + opts = helpers.opts.bind(null, target) + +describe('getShardIterator', function() { + + describe('serializations', function() { + }) + + describe('validations', function() { + }) + + describe('functionality', function() { + + it('should return shard iterator', function(done) { + var table = { + TableName: randomName(), + AttributeDefinitions: [{AttributeName: 'a', AttributeType: 'S'}], + KeySchema: [{AttributeName: 'a', KeyType: 'HASH'}], + ProvisionedThroughput: {ReadCapacityUnits: 1, WriteCapacityUnits: 1}, + StreamSpecification: {StreamEnabled: true, StreamViewType: 'NEW_AND_OLD_IMAGES'}, + } + + request(helpers.opts('CreateTable', table), function(err, res) { + if (err) return done(err) + res.statusCode.should.be.equal(200) + + var streamArn = res.body.TableDescription.LatestStreamArn + + helpers.waitUntilActive(table.TableName, function(err) { + if (err) return done(err) + + request(helpers.opts('DescribeStream', {StreamArn: streamArn}), function(err, res) { + if (err) return done(err) + res.statusCode.should.be.equal(200) + res.body.StreamDescription.Shards[0].ShardId.should.not.be.empty + + var req = { + StreamArn: streamArn, + ShardId: res.body.StreamDescription.Shards[0].ShardId, + ShardIteratorType: 'TRIM_HORIZON', + } + request(opts(req), function(err, res) { + if (err) return done(err) + res.statusCode.should.be.equal(200) + res.body.ShardIterator.should.not.be.empty + + helpers.deleteWhenActive(table.TableName, done) + }) + }) + }) + }) + }) + + }) + +}) diff --git a/test/listStreams.js b/test/listStreams.js new file mode 100644 index 0000000..2777725 --- /dev/null +++ b/test/listStreams.js @@ -0,0 +1,88 @@ +var async = require('async'), + helpers = require('./helpers') + +var target = 'ListStreams', + request = helpers.request, + randomName = helpers.randomName, + opts = helpers.opts.bind(null, target) + +describe('listStreams', function() { + + describe('serializations', function() { + }) + + describe('validations', function() { + }) + + describe('functionality', function() { + + it('should return empty array if no streams are present', function(done) { + request(opts({}), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + res.body.Streams.should.be.Array() + res.body.Streams.should.be.empty + done() + }) + }) + + it('should return all streams if no table name is provided', function(done) { + var tableTemplate = { + AttributeDefinitions: [{AttributeName: 'a', AttributeType: 'S'}], + KeySchema: [{AttributeName: 'a', KeyType: 'HASH'}], + ProvisionedThroughput: {ReadCapacityUnits: 1, WriteCapacityUnits: 1}, + StreamSpecification: {StreamEnabled: true, StreamViewType: 'NEW_AND_OLD_IMAGES'}, + }, + table1 = {TableName: randomName()}, + table2 = {TableName: randomName()} + + for (var key in tableTemplate) { + table1[key] = tableTemplate[key] + table2[key] = tableTemplate[key] + } + + async.parallel([ + request.bind(null, helpers.opts('CreateTable', table1)), + request.bind(null, helpers.opts('CreateTable', table2)), + ], function(err, res) { + if (err) return done(err) + res.should.have.length(2) + + res[0].statusCode.should.be.equal(200) + res[0].body.TableDescription.LatestStreamArn.should.not.be.empty + res[0].body.TableDescription.LatestStreamLabel.should.not.be.empty + var streamArn1 = res[0].body.TableDescription.LatestStreamArn, + streamLabel1 = res[0].body.TableDescription.LatestStreamLabel + + res[1].statusCode.should.be.equal(200) + res[1].body.TableDescription.LatestStreamArn.should.not.be.empty + var streamArn2 = res[1].body.TableDescription.LatestStreamArn, + streamLabel2 = res[1].body.TableDescription.LatestStreamLabel + + async.parallel([ + helpers.waitUntilActive.bind(null, table1.TableName), + helpers.waitUntilActive.bind(null, table2.TableName), + ], function(err) { + if (err) return done(err) + + request(opts({}), function(err, res) { + if (err) return done(err) + res.statusCode.should.equal(200) + + // DynamoDB saves streams for 24h and it's impossible to force delete them + // therefore we should expect to receive old streams as well + res.body.Streams.should.be.Array() + res.body.Streams.length.should.be.aboveOrEqual(2) + + res.body.Streams.should.containEql({StreamArn: streamArn1, StreamLabel: streamLabel1, TableName: table1.TableName}) + res.body.Streams.should.containEql({StreamArn: streamArn2, StreamLabel: streamLabel2, TableName: table2.TableName}) + + done() + }) + }) + }) + }) + + }) + +}) diff --git a/validations/createTable.js b/validations/createTable.js index 9246896..030e6e0 100644 --- a/validations/createTable.js +++ b/validations/createTable.js @@ -173,6 +173,18 @@ exports.types = { }, }, }, + StreamSpecification: { + type: 'Structure', + children: { + StreamEnabled: { + type: 'Boolean', + }, + StreamViewType: { + type: 'String', + enum: ['KEYS_ONLY', 'NEW_IMAGE', 'OLD_IMAGE', 'NEW_AND_OLD_IMAGES'], + }, + }, + }, } exports.custom = function(data) { diff --git a/validations/describeStream.js b/validations/describeStream.js new file mode 100644 index 0000000..178e25d --- /dev/null +++ b/validations/describeStream.js @@ -0,0 +1,20 @@ +exports.types = { + StreamArn: { + required: true, + type: 'String', + regex: '[a-zA-Z0-9\-\.:/_]+', + lengthGreaterThanOrEqual: 37, + lengthLessThanOrEqual: 1024, + }, + Limit: { + type: 'Integer', + greaterThanOrEqual: 1, + lessThanOrEqual: 100, + }, + ExclusiveStartShardId: { + type: 'String', + regex: '[a-zA-Z0-9\-]+', + lengthGreaterThanOrEqual: 28, + lengthLessThanOrEqual: 65, + }, +} diff --git a/validations/getRecords.js b/validations/getRecords.js new file mode 100644 index 0000000..424daa8 --- /dev/null +++ b/validations/getRecords.js @@ -0,0 +1,13 @@ +exports.types = { + ShardIterator: { + required: true, + type: 'String', + lengthGreaterThanOrEqual: 1, + lengthLessThanOrEqual: 2048, + }, + Limit: { + type: 'Integer', + greaterThanOrEqual: 1, + lessThanOrEqual: 1000, + }, +} diff --git a/validations/getShardIterator.js b/validations/getShardIterator.js new file mode 100644 index 0000000..777da1c --- /dev/null +++ b/validations/getShardIterator.js @@ -0,0 +1,27 @@ +exports.types = { + ShardId: { + required: true, + type: 'String', + regex: '[a-zA-Z0-9\-]+', + lengthGreaterThanOrEqual: 28, + lengthLessThanOrEqual: 65, + }, + ShardIteratorType: { + required: true, + type: 'String', + enum: ['TRIM_HORIZON', 'LATEST', 'AT_SEQUENCE_NUMBER', 'AFTER_SEQUENCE_NUMBER'], + }, + StreamArn: { + required: true, + type: 'String', + regex: '[a-zA-Z0-9\-\.:/_]+', + lengthGreaterThanOrEqual: 37, + lengthLessThanOrEqual: 1024, + }, + SequenceNumber: { + type: 'String', + regex: '\d+', + lengthGreaterThanOrEqual: 21, + lengthLessThanOrEqual: 40, + }, +} diff --git a/validations/listStreams.js b/validations/listStreams.js new file mode 100644 index 0000000..b6f2574 --- /dev/null +++ b/validations/listStreams.js @@ -0,0 +1,18 @@ +exports.types = { + Limit: { + type: 'Integer', + greaterThanOrEqual: 1, + lessThanOrEqual: 100, + }, + ExclusiveStartStreamArn: { + type: 'String', + regex: '[a-zA-Z0-9\-\.:/]+', + lengthGreaterThanOrEqual: 37, + lengthLessThanOrEqual: 1024, + }, + TableName: { + type: 'String', + tableName: true, + regex: '[a-zA-Z0-9_.-]+', + }, +} From 7f795b88ec4443b03bdc967a3c5e305eef820144 Mon Sep 17 00:00:00 2001 From: Andrii Berezhynskyi Date: Thu, 28 Sep 2017 12:24:32 +0200 Subject: [PATCH 5/5] fix ApproximateCreationDateTime Date.now() returns milliseconds which is interpreted incorectly by boto3. For example boto3 will interprete the ApproximateCreationDateTime as something like `Fri Jan 08 49712 17:18:22`. This crashes boto3 with an error `year is out of range`. We need unix timestamp there (seconds) --- db/streams.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/streams.js b/db/streams.js index 41e09e7..2c2628e 100644 --- a/db/streams.js +++ b/db/streams.js @@ -25,7 +25,7 @@ function createStreamRecord(store, table, oldItem, newItem, cb) { var record = { awsRegion: store.tableDb.awsRegion, dynamodb: { - ApproximateCreationDateTime: Date.now(), + ApproximateCreationDateTime: Math.floor(Date.now() / 1000), Keys: {}, SequenceNumber: key, SizeBytes: 0,