Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ $ npm install -g dynalite
TODO
----

- Implement DynamoDB Streams
- Implement `ReturnItemCollectionMetrics` on all remaining endpoints
- Implement size info for tables and indexes
- Add ProvisionedThroughput checking
Expand Down
4 changes: 4 additions & 0 deletions actions/createTable.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ module.exports = function createTable(store, data, cb) {
index.ProvisionedThroughput.NumberOfDecreasesToday = 0
})
}
if (data.StreamSpecification) {
data.LatestStreamLabel = (new Date()).toISOString().replace('Z', '')
data.LatestStreamArn = 'arn:aws:dynamodb:' + tableDb.awsRegion + ':' + tableDb.awsAccountId + ':table/' + data.TableName + '/stream/' + data.LatestStreamLabel
}

tableDb.put(key, data, function(err) {
if (err) return cb(err)
Expand Down
14 changes: 12 additions & 2 deletions actions/deleteItem.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
var db = require('../db')
var db = require('../db'),
streams = require('../db/streams')

module.exports = function deleteItem(store, data, cb) {

Expand Down Expand Up @@ -29,7 +30,16 @@ module.exports = function deleteItem(store, data, cb) {

itemDb.del(key, function(err) {
if (err) return cb(err)
cb(null, returnObj)

if (table.LatestStreamArn) {
streams.createStreamRecord(store, table, existingItem, null, function(err) {
if (err) return cb(err)

cb(null, returnObj)
})
} else {
cb(null, returnObj)
}
})
})
})
Expand Down
30 changes: 30 additions & 0 deletions actions/describeStream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
module.exports = function describeStream(store, data, cb) {
var streamArnParts = data.StreamArn.split('/stream/'),
tableArn = streamArnParts[0],
tableArnParts = tableArn.split(':table/'),
tableName = tableArnParts[1]

store.getTable(tableName, false, function(err, table) {
if (err) return cb(err)

cb(null, {
StreamDescription: {
CreationRequestDateTime: table.CreationDateTime,
KeySchema: table.KeySchema,
Shards: [
{
ShardId: 'shardId-00000000000000000000-00000001',
SequenceNumberRange: {
StartingSequenceNumber: '100000000000000000001',
},
},
],
StreamArn: table.LatestStreamArn,
StreamLabel: table.LatestStreamLabel,
StreamStatus: 'ENABLED',
StreamViewType: table.StreamSpecification.StreamViewType,
TableName: table.TableName,
},
})
})
}
51 changes: 51 additions & 0 deletions actions/getRecords.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
var once = require('once'),
db = require('../db'),
streams = require('../db/streams')

module.exports = function getRecords(store, data, cb) {
cb = once(cb)

var opts, limit = data.Limit || 100,
iterator = streams.decodeIterator(data.ShardIterator),
tableName = streams.getTableNameFromStreamArn(iterator.StreamArn)

if (iterator.SequenceNumber) {
switch (iterator.ShardIteratorType) {
case 'AT_SEQUENCE_NUMBER':
opts = {gte: iterator.SequenceNumber}
break
case 'AFTER_SQUENCE_NUMBER':
opts = {gt: iterator.SequenceNumber}
break
}
}

store.getTable(tableName, false, function(err, table) {
if (err) return cb(err)

var streamDb = store.getStreamDb(table.LatestStreamArn)
db.lazy(streamDb.createValueStream(opts), cb)
.take(limit + 1)
.join(function(records) {
var result = {}
if (records.length > limit) {
iterator.ShardIteratorType = 'AT_SEQUENCE_NUMBER'
iterator.SequenceNumber = records[limit].eventID

records.splice(limit)

result.NextShardIterator = streams.encodeIterator(iterator)
} else if (records.length > 0) {
iterator.ShardIteratorType = 'AFTER_SQUENCE_NUMBER'
iterator.SequenceNumber = records[records.length - 1].eventID

result.NextShardIterator = streams.encodeIterator(iterator)
} else {
result.NextShardIterator = data.ShardIterator
}
result.Records = records

cb(null, result)
})
})
}
12 changes: 12 additions & 0 deletions actions/getShardIterator.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
var once = require('once'),
streams = require('../db/streams')

module.exports = function getShardIterator(store, data, cb) {
cb = once(cb)

streams.createShardIteratorToken(store, data, function(err, token) {
if (err) return cb(err)

cb(null, {ShardIterator: token})
})
}
34 changes: 34 additions & 0 deletions actions/listStreams.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
var once = require('once'),
db = require('../db'),
streams = require('../db/streams')

module.exports = function listStreams(store, data, cb) {
cb = once(cb)
var opts, limit = data.Limit || 100

if (data.ExclusiveStartStreamArn) {
opts = {gt: streams.getTableNameFromStreamArn(data.ExclusiveStartStreamArn)}
}

db.lazy(store.tableDb.createValueStream(opts), cb)
.filter(function(table) {
return table.hasOwnProperty('StreamSpecification')
})
.take(limit + 1)
.map(function(table) {
return {
StreamArn: table.LatestStreamArn,
StreamLabel: table.LatestStreamLabel,
TableName: table.TableName,
}
})
.join(function(streams) {
var result = {}
if (streams.length > limit) {
streams.splice(limit)
result.LastEvaluatedStreamArn = streams[streams.length - 1].StreamArn
}
result.Streams = streams
cb(null, result)
})
}
14 changes: 12 additions & 2 deletions actions/putItem.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
var db = require('../db')
var db = require('../db'),
streams = require('../db/streams')

module.exports = function putItem(store, data, cb) {

Expand Down Expand Up @@ -29,7 +30,16 @@ module.exports = function putItem(store, data, cb) {

itemDb.put(key, data.Item, function(err) {
if (err) return cb(err)
cb(null, returnObj)

if (table.LatestStreamArn) {
streams.createStreamRecord(store, table, null, data.Item, function(err) {
if (err) return cb(err)

cb(null, returnObj)
})
} else {
cb(null, returnObj)
}
})
})
})
Expand Down
14 changes: 12 additions & 2 deletions actions/updateItem.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
var Big = require('big.js'),
db = require('../db')
db = require('../db'),
streams = require('../db/streams')

module.exports = function updateItem(store, data, cb) {

Expand Down Expand Up @@ -52,7 +53,16 @@ module.exports = function updateItem(store, data, cb) {

itemDb.put(key, item, function(err) {
if (err) return cb(err)
cb(null, returnObj)

if (table.LatestStreamArn) {
streams.createStreamRecord(store, table, oldItem, item, function(err) {
if (err) return cb(err)

cb(null, returnObj)
})
} else {
cb(null, returnObj)
}
})
})
})
Expand Down
2 changes: 1 addition & 1 deletion cli.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ if (argv.help) {
].join('\n'))
}

var server = require('./index.js')(argv).listen(argv.port || 4567, function() {
var server = require('./index.js').server(argv).listen(argv.port || 4567, function() {
var address = server.address(), protocol = argv.ssl ? 'https' : 'http'
// eslint-disable-next-line no-console
console.log('Listening at %s://%s:%s', protocol, address.address, address.port)
Expand Down
10 changes: 10 additions & 0 deletions db/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ function create(options) {
deleteSubDb('index-' + indexType.toLowerCase() + '~' + tableName + '~' + indexName, cb)
}

function getStreamDb(streamArn) {
return getSubDb('stream-' + streamArn)
}

function deleteStreamDb(streamArn, cb) {
deleteSubDb('stream-' + streamArn, cb)
}

function getSubDb(name) {
if (!subDbs[name]) {
subDbs[name] = sublevelDb.sublevel(name, {valueEncoding: 'json'})
Expand Down Expand Up @@ -127,6 +135,8 @@ function create(options) {
deleteItemDb: deleteItemDb,
getIndexDb: getIndexDb,
deleteIndexDb: deleteIndexDb,
getStreamDb: getStreamDb,
deleteStreamDb: deleteStreamDb,
getTable: getTable,
recreate: recreate,
}
Expand Down
121 changes: 121 additions & 0 deletions db/streams.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
var db = require('.'),
once = require('once'),
Big = require('big.js')

exports.createStreamRecord = createStreamRecord
exports.createShardIteratorToken = createShardIteratorToken
exports.getTableNameFromStreamArn = getTableNameFromStreamArn
exports.encodeIterator = encodeIterator
exports.decodeIterator = decodeIterator

function createStreamRecord(store, table, oldItem, newItem, cb) {
if (!oldItem && !newItem) {
throw new Error('Both old and new item are undefined')
}

var streamDb = store.getStreamDb(table.LatestStreamArn)

var insertStreamRecord = once(function(key) {
if (key) {
key = Big(key).plus(1).toString()
} else {
key = '100000000000000000001'
}

var record = {
awsRegion: store.tableDb.awsRegion,
dynamodb: {
ApproximateCreationDateTime: Math.floor(Date.now() / 1000),
Keys: {},
SequenceNumber: key,
SizeBytes: 0,
StreamViewType: table.StreamSpecification.StreamViewType,
},
eventID: key,
eventSource: 'aws:dynamodb',
eventVersion: '1.1',
}

if (oldItem) {
record.dynamodb.OldImage = oldItem
record.dynamodb.SizeBytes += db.itemSize(oldItem, false, true)
record.eventName = 'REMOVE'
}
if (newItem) {
record.dynamodb.NewImage = newItem
record.dynamodb.SizeBytes += db.itemSize(newItem, false, true)

if (record.eventName) {
record.eventName = 'MODIFY'
} else {
record.eventName = 'INSERT'
}
}

db.traverseKey(table, function(attr) {
if (newItem) {
return record.dynamodb.Keys[attr] = newItem[attr]
} else {
return record.dynamodb.Keys[attr] = oldItem[attr]
}
})

streamDb.put(key, record, cb)
})

db.lazy(streamDb.createKeyStream({reverse: true, limit: 1}), cb)
.on('end', insertStreamRecord)
.head(insertStreamRecord)
}

function createShardIteratorToken(store, iterator, cb) {
if (iterator.ShardIteratorType == 'LATEST') {
var processKey = once(function(key) {
if (key) {
iterator.ShardIteratorType = 'AFTER_SQUENCE_NUMBER'
iterator.SequenceNumber = key
} else {
iterator.ShardIteratorType = 'TRIM_HORIZON'
}

cb(null, encodeIterator(iterator))
})

var tableName = getTableNameFromStreamArn(iterator.StreamArn)
store.getTable(tableName, false, function(err, table) {
if (err) return cb(err)

var streamDb = store.getStreamDb(table.LatestStreamArn)
db.lazy(streamDb.createKeyStream({reverse: true, limit: 1}), cb)
.on('end', processKey)
.head(processKey)
})
} else {
cb(null, encodeIterator(iterator))
}
}

function getTableNameFromStreamArn(streamArn) {
var streamArnParts = streamArn.split('/stream/'),
tableArn = streamArnParts[0],
tableArnParts = tableArn.split(':table/'),
tableName = tableArnParts[1]

return tableName
}

function encodeIterator(iterator) {
var encodedString = Buffer.from(JSON.stringify(iterator)).toString('base64'),
result = iterator.StreamArn + '|1|' + encodedString

return result
}

function decodeIterator(token) {
var iteratorParts = token.split('|1|'),
encodedString = iteratorParts[1],
decodedString = Buffer.from(encodedString, 'base64').toString('utf8'),
iterator = JSON.parse(decodedString)

return iterator
}
Loading