Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions lib/bundleManager.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
var EventEmitter = require('events').EventEmitter
, fs = require('fs')
, _ = require('underscore')
, winston = require('./logging').winston
, log = require('./logging').log
, engine = require('./engine')
, util = require('util')
, watch = require('watch')
Expand Down Expand Up @@ -44,7 +44,7 @@ var BundleManager = function() {
// Either this is the first load or a bundle has been added, deleted or changed
this.refreshBundles = function( targetFile, callback ) {

winston.info('bundleManager refreshBundles');
log.info('bundleManager refreshBundles');

var bundles = {}
, files = fs.readdirSync(bundlesFolder);
Expand All @@ -57,12 +57,12 @@ var BundleManager = function() {
try {
var tempBundle = require(bundlesFolder + "/" + file);
_.each(tempBundle, function(part, key) {
winston.info("Bundle \""+key+"\": loaded from \"" + file + "\"");
log.info("Bundle \""+key+"\": loaded from \"" + file + "\"");
bundles[key] = part;
bundles[key].locked = false;
});
} catch(err) {
winston.error("Error parsing bundle \""+file+"\": "+err);
log.error("Error parsing bundle \""+file+"\": "+err);
}
}

Expand All @@ -82,7 +82,7 @@ var BundleManager = function() {
_.each(bundle, function (api, key) {
if (api.schedule) {
var job = new cronJob(api.schedule, function(){
winston.info('cronjob '+key+' called');
log.info('cronjob '+key+' called');
engine.refresh(api, key, bid, bundle);
}, null, true);
GLOBAL.cronjobs.push(job);
Expand All @@ -99,14 +99,14 @@ var BundleManager = function() {

monitor.on("created", function (file, stat) {
if ( file.endsWith('.js') ) {
winston.event('Bundle file created: ' + file);
log.event('Bundle file created: ' + file);
self.refreshBundles();
}
});

monitor.on("changed", function (file, curr, prev) {
if ( file.endsWith('.js') ) {
winston.event('Bundle file changed: ' + file);
log.event('Bundle file changed: ' + file);

// To bust node's module caching we rename the file before calling updateBundle
var tempFile = '.'+uuid.v4();
Expand All @@ -120,7 +120,7 @@ var BundleManager = function() {

monitor.on("removed", function (file, stat) {
if ( file.endsWith('.js') ) {
winston.event('Bundle file removed: ' + file);
log.event('Bundle file removed: ' + file);
self.refreshBundles();
}
});
Expand Down
94 changes: 52 additions & 42 deletions lib/engine.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ var
, _ = require('underscore')._
, neuron = require('neuron')
, zlib = require('zlib')
, winston = require('./logging').winston
, logger = require('./logging').log
, oauth = require("./oauth")
, oauth2 = require('./oauth2')
;
Expand Down Expand Up @@ -42,6 +42,7 @@ var filter = function( source, map) {
// ## Function to send the response to the user
//
var sendResponse = function(jDoc, myRes, ip, bid, callback, gzip) {
var log = logger.child({bid: bid, ip: ip});

// Convert the string representation of date to a Date object
jDoc.expires = new Date(jDoc.expires);
Expand Down Expand Up @@ -103,48 +104,51 @@ var sendResponse = function(jDoc, myRes, ip, bid, callback, gzip) {
responseHeaders['content-encoding'] = 'gzip';
zlib.gzip(doc, function(err, zbuf) {
if (!err) {
winston.event('Send gzipped response for ' + bid +', ' + zbuf.toString().length + ', ' + ip);
var size = zbuf.toString().length;
log.event({size: size}, 'Send gzipped response for ' + bid +', ' + size + ', ' + ip);
myRes.writeHead(200, responseHeaders);
myRes.end(zbuf);
}
});
} else {
// If a callback name was passed, use it. Otherwise, just output the object
var size = doc.length;
var tbuf = new Buffer(doc);
myRes.writeHead(200, responseHeaders);
winston.event('Send response for ' + bid +', ' + doc.length + ', ' + ip);
log.event({size: size}, 'Send response for ' + bid +', ' + size + ', ' + ip);
myRes.end(tbuf);
}

}

}
};

//
// ## Perform scheduled refresh
//
exports.refresh = function(api, key, bid, bundle) {
var log = logger.child({bid: bid, key: key});

winston.info('exports.refresh: ' + api);
log.info('exports.refresh: ' + bid + ' ' + key);
// We're forcing a refresh of the content so run the api.code
api.resource( api.params, api.credentials, function( err, res ) {
if ( err ) {

// We got an error so set our output object to be the error and expire immediately
api.expires = ( new Date() );
var tout = {
expires: api.expires,
result: err,
iid: bid+key,
cname: key,
scheduled: true
};
var tout = {
expires: api.expires,
result: err,
iid: bid+key,
cname: key,
scheduled: true
};

// Why are we doing this? Nothing happens here.
// Why are we doing this? Nothing happens here.

} else {

winston.event('Get data for ' + bid + ' from ' + key + ', ' + res.size);
log.event('Get data for ' + bid + ' from ' + key + ', ' + res.size);

// Perform cleanup function on API response
if (_.has(api, 'cleanup')) {
Expand All @@ -161,11 +165,11 @@ exports.refresh = function(api, key, bid, bundle) {
bundle[key] = api;

var tout = {
expires: api.expires,
result: res,
iid: api.iid,
cname: key,
scheduled: true
expires: api.expires,
result: res,
iid: api.iid,
cname: key,
scheduled: true
};

// Save the API response to Redis
Expand All @@ -183,8 +187,9 @@ exports.refresh = function(api, key, bid, bundle) {
// ## Retrieve the requested bundle
//
exports.fulfill = function ( myRes, ip, bid, callback, gzip, override ) {
var log = logger.child({bid: bid, ip: ip, override: override});

winston.info('exports.fulfill: ' + bid);
log.info('exports.fulfill: ' + bid);

var bundle = GLOBAL.bundles[bid],
now = new Date();
Expand Down Expand Up @@ -235,7 +240,7 @@ exports.fulfill = function ( myRes, ip, bid, callback, gzip, override ) {
// There was an error so force refresh on bundle
exports.fulfill( myRes, ip, bid, callback, gzip, true );
} else {
winston.debug('bid'+bid+':' + doc);
log.debug('bid'+bid+':' + doc);
jDoc = JSON.parse( doc );
GLOBAL.bundles[bid].expiration = new Date(jDoc.expires);
jDoc.fromcache = true;
Expand All @@ -250,8 +255,9 @@ exports.fulfill = function ( myRes, ip, bid, callback, gzip, override ) {

manager.addJob('fulfillPart', {
work: function(api, bid, key, override, cachedPart) {
var log = logger.child({bid: bid, key: key, override: override, ip: ip});

winston.info('manager:fulfillPart: ' + bid + '.' + key + ', override: '+override);
log.info('manager:fulfillPart: ' + bid + '.' + key + ', override: '+override);

var self = this;

Expand All @@ -266,8 +272,9 @@ exports.fulfill = function ( myRes, ip, bid, callback, gzip, override ) {

doc = JSON.parse( doc );
doc.expires = new Date(doc.expires);
var secleft = -1;
if ( ('expires' in doc) && _.isDate(doc.expires) ) {
var secleft = doc.expires.getSecondsBetween( now ) * -1;
secleft = doc.expires.getSecondsBetween( now ) * -1;
}
if (secleft < 0) {
self.finished = true;
Expand All @@ -284,7 +291,7 @@ exports.fulfill = function ( myRes, ip, bid, callback, gzip, override ) {

if (_.has( api, 'auth')) {

winston.info('Bundle uses auth type ' + api.auth.type);
log.info('Bundle uses auth type ' + api.auth.type);
// If the API request object has an auth scheme defined
if (api.auth.type == 'oauth') {
oauth.authorize (api, bid, key, function( result, authParams ) {
Expand All @@ -307,7 +314,7 @@ exports.fulfill = function ( myRes, ip, bid, callback, gzip, override ) {
self.finished = true;
});
} else {
winston.error('auth type ' + api.auth.type + ' not recognized');
log.error('auth type ' + api.auth.type + ' not recognized');
//Could potentially perma lock here if all APIs have bad types
}
} else {
Expand All @@ -323,8 +330,9 @@ exports.fulfill = function ( myRes, ip, bid, callback, gzip, override ) {

manager.addJob('startRequest', {
work: function( api, key, cachedPart, bid ) {
var log = logger.child({bid: bid, key: key, ip: ip});

winston.info('manager:startRequest: ' + key);
log.info('manager:startRequest: ' + key);

var self = this;

Expand All @@ -342,11 +350,11 @@ exports.fulfill = function ( myRes, ip, bid, callback, gzip, override ) {
}
manager.enqueue('finishRequest', cachedPart );
self.finished = true;
}, api.timeout, self)
}, api.timeout, self);
}

api.resource( api.params, api.credentials, function( err, res ) {
clearTimeout(self.timeout)
clearTimeout(self.timeout);
delete self.timeout;

if ( err ) {
Expand All @@ -358,11 +366,11 @@ exports.fulfill = function ( myRes, ip, bid, callback, gzip, override ) {
tout.result = _.isUndefined(cachedPart) ? {} : cachedPart;
tout.fromcache = true;
tout.err = err;
winston.error('Problem retrieving data for ' + bid + ' from ' + key + ': ' + JSON.stringify(err));
log.error('Problem retrieving data for ' + bid + ' from ' + key + ': ' + JSON.stringify(err));

} else {

winston.event('Get data for ' + bid + ' from ' + key + ', ' + res.size);
log.event('Get data for ' + bid + ' from ' + key + ', ' + res.size);

// Perform cleanup function on API response
if (_.has(api, 'cleanup')) {
Expand Down Expand Up @@ -392,22 +400,24 @@ exports.fulfill = function ( myRes, ip, bid, callback, gzip, override ) {
self.finished = true;
});
}
})
});

manager.addJob('finishRequest', {
work: function(apiResponse) {
var key = apiResponse.cname;
var log = logger.child({bid: bid, key: key, ip: ip});

winston.info('manager:finishRequest');
log.info('manager:finishRequest');

queriesInThisBundle--;

if (_.has(apiResponse, 'redirect')) {
thisResponse["redirect"] = apiResponse.redirect;
thisResponse["guid"] = apiResponse.guid || '';
thisResponse["authBundle"] = bid;
thisResponse["authPart"] = apiResponse.cname;
thisResponse["authPart"] = key;
}
thisResponse[apiResponse.cname] = apiResponse;
thisResponse[key] = apiResponse;

if (queriesInThisBundle === 0) {
manager.enqueue('composeResponse', bid);
Expand All @@ -419,7 +429,7 @@ exports.fulfill = function ( myRes, ip, bid, callback, gzip, override ) {
manager.addJob('composeResponse', {
work: function() {

winston.info('manager:composeResponse');
log.info('manager:composeResponse');

// Update the expiration date on the bundle
var tout = {
Expand All @@ -428,11 +438,11 @@ exports.fulfill = function ( myRes, ip, bid, callback, gzip, override ) {
};

if (_.has( thisResponse, 'redirect')) {
tout.redirect = thisResponse.redirect,
tout.guid = thisResponse.guid,
tout.authBundle = thisResponse.authBundle,
tout.authPart = thisResponse.authPart
};
tout.redirect = thisResponse.redirect;
tout.guid = thisResponse.guid;
tout.authBundle = thisResponse.authBundle;
tout.authPart = thisResponse.authPart;
}

// Insert api responses into bundle
_.each( thisResponse, function( val, idx ) {
Expand Down Expand Up @@ -463,7 +473,7 @@ exports.fulfill = function ( myRes, ip, bid, callback, gzip, override ) {
manager.addJob('sendResponse', {
work: function(doc) {

winston.info('manager:sendResponse');
log.info('manager:sendResponse');

if (_.has(doc, 'redirect')) {

Expand Down Expand Up @@ -506,4 +516,4 @@ exports.fulfill = function ( myRes, ip, bid, callback, gzip, override ) {

manager.enqueue('fulfillBundle');
}
}
};
Loading