Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 69 additions & 32 deletions actions/createTable.js
Original file line number Diff line number Diff line change
@@ -1,24 +1,49 @@
var crypto = require('crypto')
var async = require('async'),
crypto = require('crypto'),
kinesaliteCreateStream = require('kinesalite/actions/createStream')

module.exports = function createTable(store, data, cb) {

var key = data.TableName, tableDb = store.tableDb

tableDb.lock(key, function(release) {
cb = release(cb)

tableDb.get(key, function(err) {
if (err && err.name != 'NotFoundError') return cb(err)
if (!err) {
err = new Error
err.statusCode = 400
err.body = {
__type: 'com.amazonaws.dynamodb.v20120810#ResourceInUseException',
message: '',
async.auto({
lock: function(callback) {
tableDb.lock(key, function(release) {
callback(null, release)
})
},
checkTable: ['lock', function(results, callback) {
tableDb.get(key, function(err) {
if (err && err.name != 'NotFoundError') return callback(err)
if (!err) {
err = new Error
err.statusCode = 400
err.body = {
__type: 'com.amazonaws.dynamodb.v20120810#ResourceInUseException',
message: '',
}
return callback(err)
}
return cb(err)

callback()
})
}],
streamUpdates: ['checkTable', function(results, callback) {
if (!data.StreamSpecification) {
return callback()
}

kinesaliteCreateStream(store.kinesalite, {StreamName: data.TableName, ShardCount: 1}, function(err) {
if (err) return callback(err)

callback(null, {
StreamSpecification: data.StreamSpecification,
LatestStreamLabel: (new Date()).toISOString().replace('Z', ''),
LatestStreamArn: 'arn:aws:dynamodb:' + tableDb.awsRegion + ':' + tableDb.awsAccountId + ':table/' + data.TableName + '/stream/' + data.LatestStreamLabel,
})
})
}],
createTable: ['streamUpdates', function(results, callback) {
data.TableArn = 'arn:aws:dynamodb:' + tableDb.awsRegion + ':' + tableDb.awsAccountId + ':table/' + data.TableName
data.TableId = uuidV4()
data.CreationDateTime = Date.now() / 1000
Expand All @@ -45,31 +70,43 @@ module.exports = function createTable(store, data, cb) {
})
}

tableDb.put(key, data, function(err) {
if (err) return cb(err)

setTimeout(function() {
if (results.streamUpdates) {
data.LatestStreamLabel = (new Date()).toISOString().replace('Z', '')
data.LatestStreamArn = 'arn:aws:dynamodb:' + tableDb.awsRegion + ':' + tableDb.awsAccountId + ':table/' + data.TableName + '/stream/' + data.LatestStreamLabel
}

// Shouldn't need to lock/fetch as nothing should have changed
data.TableStatus = 'ACTIVE'
if (data.GlobalSecondaryIndexes) {
data.GlobalSecondaryIndexes.forEach(function(index) {
index.IndexStatus = 'ACTIVE'
})
}
tableDb.put(key, data, callback)
}],
setActive: ['createTable', function(results, callback) {
setTimeout(function() {

tableDb.put(key, data, function(err) {
// eslint-disable-next-line no-console
if (err && !/Database is not open/.test(err)) console.error(err.stack || err)
// Shouldn't need to lock/fetch as nothing should have changed
data.TableStatus = 'ACTIVE'
if (data.GlobalSecondaryIndexes) {
data.GlobalSecondaryIndexes.forEach(function(index) {
index.IndexStatus = 'ACTIVE'
})
}

}, store.options.createTableMs)
tableDb.put(key, data, function(err) {
// eslint-disable-next-line no-console
if (err && !/Database is not open/.test(err)) console.error(err.stack || err)
})

cb(null, {TableDescription: data})
})
})
})
}, store.options.createTableMs)

callback()
}],
}, function(err, results) {
var release = results.lock
cb = release(cb)

if (err) {
return cb(err)
}

cb(null, {TableDescription: data})
})
}

function uuidV4() {
Expand Down
14 changes: 12 additions & 2 deletions actions/deleteItem.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
var db = require('../db')
var db = require('../db'),
utils = require('../utils')

module.exports = function deleteItem(store, data, cb) {

Expand Down Expand Up @@ -29,7 +30,16 @@ module.exports = function deleteItem(store, data, cb) {

itemDb.del(key, function(err) {
if (err) return cb(err)
cb(null, returnObj)

if (!table.LatestStreamArn) {
return cb(null, returnObj)
}

var streamRecord = utils.createStreamRecord(table, existingItem, null)
utils.writeStreamRecord(store, data.TableName, streamRecord, function(err) {
if (err) return cb(err)
cb(null, returnObj)
})
})
})
})
Expand Down
31 changes: 31 additions & 0 deletions actions/describeStream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
var kinesaliteDescribeStream = require('kinesalite/actions/describeStream'),
utils = require('../utils')

module.exports = function describeStream(store, data, cb) {
var tableName = utils.getTableNameFromStreamArn(data.StreamArn)

store.getTable(tableName, false, function(err, table) {
if (err) return cb(err)

kinesaliteDescribeStream(store.kinesalite, {StreamName: tableName}, function(err, stream) {
if (err) return cb(err)

cb(null, {
StreamDescription: {
CreationRequestDateTime: stream.StreamDescription.StreamCreationTimestamp,
KeySchema: table.KeySchema,
Shards: stream.StreamDescription.Shards.map(function(shard) {
shard.ShardId = utils.makeShardIdLong(shard.ShardId)
shard.ParentShardId = utils.makeShardIdLong(shard.ParentShardId)
return shard
}),
StreamArn: table.LatestStreamArn,
StreamLabel: table.LatestStreamLabel,
StreamStatus: stream.StreamDescription.StreamStatus,
StreamViewType: table.StreamSpecification.StreamViewType,
TableName: tableName,
},
})
})
})
}
16 changes: 16 additions & 0 deletions actions/getRecords.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
var kinesaliteGetRecords = require('kinesalite/actions/getRecords')

module.exports = function getRecords(store, data, cb) {
kinesaliteGetRecords(store.kinesalite, data, function(err, results) {
if (err) return cb(err)

cb(null, {
NextShardIterator: results.NextShardIterator,
Records: results.Records.map(function(record) {
var recordData = JSON.parse(record.Data)
recordData.dynamodb.SequenceNumber = record.SequenceNumber
return recordData
}),
})
})
}
13 changes: 13 additions & 0 deletions actions/getShardIterator.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
var kinesaliteGetShardIterator = require('kinesalite/actions/getShardIterator'),
utils = require('../utils')

module.exports = function getShardIterator(store, data, cb) {
var options = {
ShardId: utils.makeShardIdShort(data.ShardId),
ShardIteratorType: data.ShardIteratorType,
StartingSequenceNumber: data.SequenceNumber,
StreamName: utils.getTableNameFromStreamArn(data.StreamArn),
}

kinesaliteGetShardIterator(store.kinesalite, options, cb)
}
34 changes: 34 additions & 0 deletions actions/listStreams.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
var once = require('once'),
db = require('../db'),
getTableNameFromStreamArn = require('../utils').getTableNameFromStreamArn

module.exports = function listStreams(store, data, cb) {
cb = once(cb)
var opts, limit = data.Limit || 100

if (data.ExclusiveStartStreamArn) {
opts = {gt: getTableNameFromStreamArn(data.ExclusiveStartStreamArn)}
}

db.lazy(store.tableDb.createValueStream(opts), cb)
.filter(function(table) {
return table.hasOwnProperty('StreamSpecification')
})
.take(limit + 1)
.map(function(table) {
return {
StreamArn: table.LatestStreamArn,
StreamLabel: table.LatestStreamLabel,
TableName: table.TableName,
}
})
.join(function(streams) {
var result = {}
if (streams.length > limit) {
streams.splice(limit)
result.LastEvaluatedStreamArn = streams[streams.length - 1].StreamArn
}
result.Streams = streams
cb(null, result)
})
}
14 changes: 12 additions & 2 deletions actions/putItem.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
var db = require('../db')
var db = require('../db'),
utils = require('../utils')

module.exports = function putItem(store, data, cb) {

Expand Down Expand Up @@ -29,7 +30,16 @@ module.exports = function putItem(store, data, cb) {

itemDb.put(key, data.Item, function(err) {
if (err) return cb(err)
cb(null, returnObj)

if (!table.LatestStreamArn) {
return cb(null, returnObj)
}

var streamRecord = utils.createStreamRecord(table, existingItem, data.Item)
utils.writeStreamRecord(store, data.TableName, streamRecord, function(err) {
if (err) return cb(err)
cb(null, returnObj)
})
})
})
})
Expand Down
14 changes: 12 additions & 2 deletions actions/updateItem.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
var Big = require('big.js'),
db = require('../db')
db = require('../db'),
utils = require('../utils')

module.exports = function updateItem(store, data, cb) {

Expand Down Expand Up @@ -52,7 +53,16 @@ module.exports = function updateItem(store, data, cb) {

itemDb.put(key, item, function(err) {
if (err) return cb(err)
cb(null, returnObj)

if (!table.LatestStreamArn) {
return cb(null, returnObj)
}

var streamRecord = utils.createStreamRecord(table, oldItem, item)
utils.writeStreamRecord(store, data.TableName, streamRecord, function(err) {
if (err) return cb(err)
cb(null, returnObj)
})
})
})
})
Expand Down
Loading