diff --git a/README.md b/README.md index 06f12da..d224455 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,6 @@ -# wamp.rt: A WAMP V2 nodejs router -##Copyright Orange 2014, All Rights Reserved +# FOX.WAMP is a WAMP v2 message router implementation -wamp.rt is a WebSocket Application Messaging Protocol [WAMP](http://wamp.ws/) V2 router implementation based on nodejs. - -The router is compliant with the WAMP V2 [Basic Profile](https://github.com/tavendo/WAMP/blob/master/spec/basic.md). - -wamp.rt implements both [Dealer](https://github.com/tavendo/WAMP/blob/master/spec/basic.md#peers-and-roles) and [Broker](https://github.com/tavendo/WAMP/blob/master/spec/basic.md#peers-and-roles) roles. +The message router is compliant with the [WAMP V2 Basic Profile](http://wamp-proto.org/). ## Build Instructions @@ -13,7 +8,27 @@ Install using npm. Depending on what you want to do, your mileage may vary. ## Credits -wamp.rt has been inspired by the following Open Source projects: +fox.wamp has been inspired by the following Open Source projects: + +- [wamp.rt](https://github.com/Orange-OpenSource/wamp.rt) +- [wamp.io](https://github.com/nicokaiser/wamp.io) + + +## Changes: +2017-05-17: +- Concrete topic published to +- Progressive Calls (receive_progress & progress) + +2017-05-07: +- exclude_me option of publish + +2017-04-26: +- integration with [StatsD](https://github.com/etsy/statsd) -- [wamp.io](https://github.com/nicokaiser/wamp.io) +2016-04-03: +- ticket auth support added +2016-03-09: +- internal api moved to realm +- callrpc method has args & kwargs arguments +- publish method does not require message id diff --git a/democli/backend.js b/democli/backend.js new file mode 100644 index 0000000..3e26e96 --- /dev/null +++ b/democli/backend.js @@ -0,0 +1,114 @@ +AUTOBAHN_DEBUG = true; +var autobahn = require('autobahn'); +var program = require('commander'); + +program + .option('-p, --port ', 'Server IP port', 9000) + .option('-i, --ip ', 'Server IP address','127.0.0.1') + .parse(process.argv); + +var connectUrl = 'ws://' + program.ip + ':' + program.port; +console.log('connectUrl:', connectUrl); + +var connection = new autobahn.Connection({ + url: connectUrl, + realm: 'realm1'} +); + +connection.onopen = function (session) { + + var reg = null; + var reg2 = null; + + function utcprogress(args, kwargs, options) { + console.log("Someone is calling utc function", args, kwargs, options); + if (options.progress) { + setTimeout(function () { + var now = new Date(); + options.progress(now.toISOString()); + }, 100); + setTimeout(function () { + var now = new Date(); + options.progress(now.toISOString()); + }, 200); + return new Promise((resolve, reject) => { + setTimeout(function () { + var now = new Date(); + resolve(now.toISOString()); + }, 300); + }); + } + else { + var now = new Date(); + return now.toISOString(); + } + } + + session.register('com.timeservice.now', utcprogress).then( + function (registration) { + console.log("Procedure registered:", registration.id); + reg = registration; + }, + function (error) { + console.log("Registration failed:", error); + } + ); + + function echo(args,kwargs) { + console.log("args",args,"kwargs",kwargs); + return new autobahn.Result(args, kwargs); + } + + session.register('com.echoservice.echo', echo).then( + function (registration) { + console.log("Procedure echo registered:", registration.id); + reg2 = registration; + }, + function (error) { + console.log("Registration failed:", error); + } + ); + + // Define an event handler + function onEvent(publishArgs, kwargs, opts) { + console.log('Event', opts.topic, 'received args', publishArgs, 'kwargs ',kwargs); + } + + // Subscribe to a topic + session.subscribe('com.myapp.topic1', onEvent).then( + function(subscription) { + console.log("subscription successfull", subscription); + }, + function(error) { + console.log("subscription failed", error); + } + ); + + session.subscribe('wamp.session.on_join', onEvent).then( + function(subscription) { + console.log("subscription successfull", subscription); + }, + function(error) { + console.log("subscription failed", error); + } + ); + + session.subscribe('wamp.session.on_leave', onEvent).then( + function(subscription) { + console.log("subscription successfull", subscription); + }, + function(error) { + console.log("subscription failed", error); + } + ); + + setTimeout(function() { + console.log("Unregistration"); + session.unregister(reg); + session.unregister(reg2); + }, + 20000 + ); +}; + +connection.open(); diff --git a/test/frontend.js b/democli/frontend.js similarity index 72% rename from test/frontend.js rename to democli/frontend.js index 0518505..9b2faae 100644 --- a/test/frontend.js +++ b/democli/frontend.js @@ -3,24 +3,39 @@ var autobahn = require('autobahn'); var program = require('commander'); program - .option('-p, --port ', 'Server IP port', parseInt,9000) + .option('-p, --port ', 'Server IP port', 9000) .option('-i, --ip ', 'Server IP address','127.0.0.1') .parse(process.argv); -var connection = new autobahn.Connection({ - url: 'ws://' + program.ip + ':' + program.port, - realm: 'realm1'} -); +var connectUrl = 'ws://' + program.ip + ':' + program.port + '/ws'; +console.log('connectUrl:', connectUrl); + +var user = "joe"; +var key = "joe-secret"; + +// this callback is fired during authentication +function onchallenge (session, method, extra) { + if (method === "ticket") { + return key; + } else { + throw "don't know how to authenticate using '" + method + "'"; + } +} -var session = null; +var connection = new autobahn.Connection({ + url: connectUrl, + realm: 'realm1', +// authmethods: ["ticket", "wampcra"], + authid: user, + onchallenge: onchallenge +}); -connection.onopen = function (new_session) { +connection.onopen = function (session, details) { - session = new_session; session.log("Session open."); var starttime = Date.now(); - session.call('com.timeservice.now').then( + var c1 = session.call('com.timeservice.now', [], {}, {receive_progress:true}).then( function (now) { // this method returns a plain value session.log("Call com.timeservice.now completed in " + @@ -28,8 +43,12 @@ connection.onopen = function (new_session) { " ms: result =", now); }, function (error) { - console.log("Call failed:", error); - }); + console.log("Call failed:", error); + }, + function (progress) { + console.log("Call progress:", progress); + } + ); session.call('com.echoservice.echo').then( function (res) { @@ -90,12 +109,11 @@ connection.onopen = function (new_session) { } ); - session.call('wamp.rt.foo',["test"]).then( + session.call('test.foo', ["test"], {foo:'bar'}).then( function (res) { - session.log("Call wamp.rt.foo completed in " + + session.log("Call test.foo completed in " + (Date.now() - starttime) + " ms: result =", res); - }, function (error) { console.log("Call failed:", error); @@ -111,20 +129,23 @@ connection.onopen = function (new_session) { session.publish('com.myapp.topic1', ["Arg1", "Arg2" ], { "kwarg1": "kwarg1", "kwarg2": "kwarg2"}, { acknowledge : false }); - session.publish('com.myapp.topic1', [ "Arg_1", "Arg_2" ], {}, { acknowledge : true }).then( + var p1 = session.publish('com.myapp.topic1', [ "Arg_1", "Arg_2" ], {}, { acknowledge : true }).then( function(publication) { console.log("published, publication ID is ", publication); - connection.close(); }, - function(error) { console.log("publication error", error); - connection.close(); + function(error) { + console.log("publication error", error); + return Promise.resolve(true); } ); + + Promise.all([c1,p1]).then(function () { + connection.close(); + }); }; connection.onclose = function (reason, details) { - console.log("connection 1", reason, details); + console.log("close connection:", reason, details); }; connection.open(); - diff --git a/test/pubsub/publisher.js b/democli/pubsub/publisher.js similarity index 91% rename from test/pubsub/publisher.js rename to democli/pubsub/publisher.js index 412d6d0..5428c8f 100644 --- a/test/pubsub/publisher.js +++ b/democli/pubsub/publisher.js @@ -4,12 +4,15 @@ var program = require('commander'); var when = require('when'); program - .option('-p, --port ', 'Server IP port', parseInt,9000) + .option('-p, --port ', 'Server IP port', 9000) .option('-i, --ip ', 'Server IP address','127.0.0.1') .parse(process.argv); +var connectUrl = 'ws://' + program.ip + ':' + program.port; +console.log('connectUrl:', connectUrl); + var connection = new autobahn.Connection({ - url: 'ws://' + program.ip + ':' + program.port, + url: connectUrl, realm: 'realm1'} ); diff --git a/test/pubsub/subscriber.js b/democli/pubsub/subscriber.js similarity index 88% rename from test/pubsub/subscriber.js rename to democli/pubsub/subscriber.js index 25ce560..39d7d09 100644 --- a/test/pubsub/subscriber.js +++ b/democli/pubsub/subscriber.js @@ -3,12 +3,15 @@ var autobahn = require('autobahn'); var program = require('commander'); program - .option('-p, --port ', 'Server IP port', parseInt,9000) + .option('-p, --port ', 'Server IP port', 9000) .option('-i, --ip ', 'Server IP address','127.0.0.1') .parse(process.argv); +var connectUrl = 'ws://' + program.ip + ':' + program.port; +console.log('connectUrl:', connectUrl); + var connection = new autobahn.Connection({ - url: 'ws://' + program.ip + ':' + program.port, + url: connectUrl, realm: 'realm1'} ); diff --git a/test/rpc/client.js b/democli/rpc/client.js similarity index 94% rename from test/rpc/client.js rename to democli/rpc/client.js index 41b313b..404745c 100644 --- a/test/rpc/client.js +++ b/democli/rpc/client.js @@ -4,13 +4,16 @@ var program = require('commander'); var when = require('when'); program - .option('-p, --port ', 'Server IP port', parseInt,9000) + .option('-p, --port ', 'Server IP port', 9000) .option('-i, --ip ', 'Server IP address','127.0.0.1') .parse(process.argv); +var connectUrl = 'ws://' + program.ip + ':' + program.port; +console.log('connectUrl:', connectUrl); + var connection = new autobahn.Connection({ - url: 'ws://' + program.ip + ':' + program.port, - realm: 'realm1'} + url: connectUrl, + realm: 'realm1'} ); connection.onopen = function (session) { diff --git a/test/rpc/server.js b/democli/rpc/server.js similarity index 88% rename from test/rpc/server.js rename to democli/rpc/server.js index d6e59b9..eb6339c 100644 --- a/test/rpc/server.js +++ b/democli/rpc/server.js @@ -3,13 +3,16 @@ var autobahn = require('autobahn'); var program = require('commander'); program - .option('-p, --port ', 'Server IP port', parseInt,9000) + .option('-p, --port ', 'Server IP port', 9000) .option('-i, --ip ', 'Server IP address','127.0.0.1') .parse(process.argv); +var connectUrl = 'ws://' + program.ip + ':' + program.port; +console.log('connectUrl:', connectUrl); + var connection = new autobahn.Connection({ - url: 'ws://' + program.ip + ':' + program.port, - realm: 'realm1'} + url: connectUrl, + realm: 'realm1'} ); connection.onopen = function (session) { diff --git a/examples/basic.js b/examples/basic.js index d9369d6..e0225ff 100644 --- a/examples/basic.js +++ b/examples/basic.js @@ -10,30 +10,20 @@ WAMPRT_TRACE = true; -var Router = require('../lib/wamp.rt'); +var MSG = require('../lib/messages'); +var WampRouter = require('../lib/wamp.rt'); var program = require('commander'); - program - .option('-p, --port ', 'Server IP port', parseInt,9000); - - -function onRPCRegistered(uri) { - console.log('onRPCRegistered RPC registered', uri); -} + .option('-p, --port ', 'Server IP port', 9000) + .parse(process.argv); -function onRPCUnregistered(uri) { - console.log('onRPCUnregistered RPC unregistered', uri); -} - -function onPublish(topicUri, args) { - console.log('onPublish Publish', topicUri, args); -} +console.log('Listening port:', program.port); // // WebSocket server // -var app = new Router( +var app = new WampRouter( { port: program.port, // The router will select the appropriate protocol, // but we can still deny the connection @@ -46,11 +36,23 @@ var app = new Router( } ); -app.on('RPCRegistered', onRPCRegistered); -app.on('RPCUnregistered', onRPCUnregistered); -app.on('Publish', onPublish); +app.on('RPCRegistered', function (realm, uri) { + console.log('onRPCRegistered RPC registered', uri); +}); +app.on('RPCUnregistered', function (realm, uri) { + console.log('onRPCUnregistered RPC unregistered', uri); +}); +app.on('Publish', function onPublish(realm, topicUri, args) { + console.log('onPublish Publish', topicUri, args); +}); +app.on(MSG.REALM_CREATED, function (realm, realmName) { + console.log('new Relm:', realmName); +}); -app.regrpc('wamp.rt.foo', function(id,args) { - console.log('called with ' + args); - app.resrpc(id,["bar", "bar2"], {"key1": "bar1", "key2": "bar2"}); +app.getRealm('realm1', function (realm) { + var api = realm.api(); + api.regrpc('test.foo', function(id, args, kwargs) { + console.log('called with ', args, kwargs); + api.resrpc(id, null /* no error */, ["bar", "bar2"], {"key1": "bar1", "key2": "bar2"}); + }); }); diff --git a/examples/with_auth.js b/examples/with_auth.js new file mode 100644 index 0000000..11476ad --- /dev/null +++ b/examples/with_auth.js @@ -0,0 +1,41 @@ +// +// This is authenticate router example +// + +WAMPRT_TRACE = true; + +var WampRouter = require('../lib/wamp.rt'); +var program = require('commander'); + +program + .option('-p, --port ', 'Server IP port', 9000) + .parse(process.argv); + +console.log('Listening port:', program.port); + +var Auth = function () { + this.authenticate = function (realmName, secureDetails, secret, callback) { + console.log('AUTH:', secureDetails, secret); + if (secureDetails.authid+'-secret' === secret) + callback(); + else + callback('authorization_failed'); + }; +}; + +// +// WebSocket server +// +var app = new WampRouter( + { port: program.port, + // The router will select the appropriate protocol, + // but we can still deny the connection + // TODO: this should be the other way round, really ... + handleProtocols: function(protocols,cb) { + console.log(protocols); + cb(true,protocols[0]); + //cb(false); + } + }, + new Auth() +); diff --git a/examples/with_statsd.js b/examples/with_statsd.js new file mode 100644 index 0000000..ae435e3 --- /dev/null +++ b/examples/with_statsd.js @@ -0,0 +1,18 @@ +// +// This is a basic router example with sonnectivity to the statsd server +// + +var WampRouter = require('../lib/wamp.rt'); +var program = require('commander'); +var StatsD = require('../ext/statsd'); + +StatsD.init(program); + +program + .option('-p, --port ', 'Server IP port', 9000) + .parse(process.argv); + +console.log('Listening port:', program.port); + +var app = new WampRouter({port: program.port}); +var trace = new StatsD.TraceRouter(program, app); diff --git a/ext/statsd.js b/ext/statsd.js new file mode 100644 index 0000000..9925a5b --- /dev/null +++ b/ext/statsd.js @@ -0,0 +1,42 @@ +/*jshint node: true */ +'use strict'; + +var + StatsD = require('node-statsd'); + +function init(program) { + program + .option('-t, --statsd-port ', 'StatsD Server IP port', 8125) + .option('-s, --statsd-server ', 'StatsD Server IP', 'localhost'); +}; + +function TraceRouter(program, router) { + + var client = new StatsD({ + host: program.statsdServer, + port: program.statsdPort, + prefix: 'wamp.' + }); + + router.on('session.Tx', function (session, data) { + var realmName = 'UNKNOWN'; + if (session.realm) + realmName = session.getRealmName(); + + client.increment(realmName+'.Tx.count', 1); + client.increment(realmName+'.Tx.size', data.length); + }); + + router.on('session.Rx', function (session, data) { + var realmName = 'UNKNOWN'; + if (session.realm) + realmName = session.getRealmName(); + + client.increment(realmName+'.Rx.count', 1); + client.increment(realmName+'.Rx.size', data.length); + }); + +}; + +exports.init = init; +exports.TraceRouter = TraceRouter; diff --git a/lib/handlers.js b/lib/handlers.js index 59f6459..61bdd35 100644 --- a/lib/handlers.js +++ b/lib/handlers.js @@ -1,268 +1,217 @@ -// wamp.rt -// Copyright Orange 2014 +/*jshint node: true */ +'use strict'; -var WAMP = require('./protocol'), - util = require('./util'), - log = require('./log'); +var + WAMP = require('./protocol'); var handlers = {}; -// This handlers are meant to be called in the context of the router object +var Facade = function () { + this.checkRealm = function (wampCommand, requestId) { + if (this.realm) { + return true; + } else { + this.sendError(wampCommand, requestId, "wamp.error.not_authorized"); + return false; + } + }; + this.sendWelcome = function (details) { + details.roles = {"dealer": {}}; + this.send([WAMP.WELCOME, this.sessionId, details]); + }; + this.sendChallenge = function (authmethod) { + this.send([WAMP.CHALLENGE, authmethod, {}]); + }; + this.sendRegistered = function (requestId, registrationId) { + this.send([WAMP.REGISTERED, requestId, registrationId]); + }; + this.sendUnregistered = function (requestId) { + if (requestId) // do not send on disconnect + this.send([WAMP.UNREGISTERED, requestId]); + }; + this.sendInvoke = function (regId, invId, args, kwargs, options) { + var msg = [ + WAMP.INVOCATION, + invId, + regId, + options, + ]; + if (undefined !== args) msg.push(args); + if (undefined !== kwargs) msg.push(kwargs); + this.send(msg); + }; + this.sendResult = function (invId, err, args, kwargs, options) { + if (err) { + this.sendError(WAMP.CALL, invId, "wamp.error.callee_failure"); + } + else { + var msg = [ + WAMP.RESULT, + invId, + options, + ]; + if (undefined !== args) msg.push(args); + if (undefined !== kwargs) msg.push(kwargs); + this.send(msg); + } + }; + this.sendSubscribed = function (requestId, topicId) { + this.send([WAMP.SUBSCRIBED, requestId, topicId]); + }; + this.sendUnsubscribed = function (requestId) { + if (requestId) // do not send on disconnect + this.send([WAMP.UNSUBSCRIBED, requestId]); + }; + this.sendPublished = function (requestId, publicationId) { + this.send([WAMP.PUBLISHED, requestId, publicationId]); + }; + this.sendEvent = function (subscriptionId, publicationId, args, kwargs, eventOpts) { + var msg = [ + WAMP.EVENT, + subscriptionId, + publicationId, + eventOpts + ]; + // Manage optional parameters args + kwargs + if (args !== undefined) { + msg.push(args); + } + if (kwargs !== undefined) { + msg.push(kwargs); + } + this.send(msg); + }; + this.sendError = function (cmd, requestId, txt, args) { + if (requestId) { // do not send on disconnect + var msg = [WAMP.ERROR, cmd, requestId, {}, txt]; + if (args) + msg.push(args); + this.send(msg); + } + }; + this.sendGoodbye = function () { + // Graceful termination + var msg = [WAMP.GOODBYE, {}, "wamp.error.goodbye_and_out"]; + this.send(msg, function (error) { + this.terminate(1000, "Server closed WAMP session"); + }.bind(this)); + }; + this.sendAbort = function (reason) { // auth failed + var msg = [WAMP.ABORT, {}, reason]; + this.send(msg, function (error) { + this.terminate(1000, "Server closed WAMP session"); + }.bind(this)); + }; + this.handle = function (msg) { + if (!Array.isArray(msg)) { + this.terminate(1003, "protocol violation"); + return; + } + var msgType = msg.shift(); + if (!handlers[msgType]) { + this.terminate(1003, "protocol violation"); + return; + } + handlers[msgType].call(this, msg); + }; +}; + +// This handlers are meant to be called in the context of the SESSION object -handlers[WAMP.HELLO] = function(session, args) { - var realm = args.shift(); +handlers[WAMP.HELLO] = function(args) { + var realmName = args.shift(); var details = args.shift(); - if (typeof session.id === 'undefined') { - session.id = util.randomId(); - log.trace('New session :' + session.id); - // Send welcome message - var msg = [ - WAMP.WELCOME, - session.id, - { - "roles": { - "dealer": {} - } - }]; - session.send(msg); + if (this.realm === null) { + this.hello(realmName, details); + } else { + this.terminate(1002, "protocol violation"); + } +}; + +handlers[WAMP.AUTHENTICATE] = function(args) { + var secret = args.shift(); + if (this.realm === null) { + this.authenticate(secret); } else { - session.terminate(1002, "protocol violation"); + this.terminate(1002, "protocol violation"); } }; -handlers[WAMP.GOODBYE] = function(session, args) { +handlers[WAMP.GOODBYE] = function(args) { // Ack the goodbye - var msg = [ - WAMP.GOODBYE, - {}, - "wamp.error.goodbye_and_out" - ]; - session.send(msg, function (error) { - session.terminate(1000, "Client closed WAMP session"); - }); + this.sendGoodbye(); }; -handlers[WAMP.REGISTER] = function (session, args) { - var request = args.shift(); +handlers[WAMP.REGISTER] = function (args) { + var requestId = args.shift(); var options = args.shift(); var procUri = args.shift(); - args = args || []; - var msg; - if (typeof this.getrpc(procUri) === 'undefined') { - var regId = session.register(procUri); - this.regrpc(procUri, function (invId, args) { - log.trace('Invoking RPC ' + procUri, args); - var msg = [ - WAMP.INVOCATION, - invId, - regId, - {}, - ]; - // Manage optional parameters args + kwargs - for(var i = 0; i < args.length && i < 2; i++) { - msg.push(args[i]); - } - session.send(msg); - }); - msg = [ - WAMP.REGISTERED, - request, - regId - ]; - } else { - msg = [ - WAMP.ERROR, - WAMP.REGISTER, - request, - {}, - "wamp.error.procedure_already_exists" - ]; - } - session.send(msg); + if (this.checkRealm(WAMP.REGISTER, requestId)) + this.realm.regrpc(this, requestId, procUri, options); }; -handlers[WAMP.CALL] = function (session, args) { +handlers[WAMP.CALL] = function (args) { var callId = args.shift(); var options = args.shift(); var procUri = args.shift(); - args = args || []; - var resultCallback = function(err, args) { - if (err) { - var msg = [ - WAMP.ERROR, - WAMP.CALL, - callId, - {}, - "wamp.error.callee_failure" - ]; - session.send(msg); - } else { - var msg = [ - WAMP.RESULT, - callId, - {}, - ]; - // Manage optional parameters args + kwargs - for(var i = 0; i < args.length && i < 2; i++) { - msg.push(args[i]); - } - session.send(msg); - } - }; - if (!this.callrpc(procUri, args, resultCallback)) { - var msg = [ - WAMP.ERROR, - WAMP.CALL, - callId, - {}, - "wamp.error.no_such_procedure" - ]; - session.send(msg); - } + var fArgs = args.shift() || []; + var kwArgs = args.shift() || {}; + if (this.checkRealm(WAMP.CALL, callId)) + this.realm.callrpc(this, callId, procUri, options, fArgs, kwArgs); }; -handlers[WAMP.UNREGISTER] = function (session, args) { +handlers[WAMP.UNREGISTER] = function (args) { var requestId = args.shift(); var registrationId = args.shift(); - var msg; - var uri = session.unregister(registrationId); - if (typeof uri === 'undefined') { - msg = [ - WAMP.ERROR, - WAMP.UNREGISTER, - requestId, - {}, - "wamp.error.no_such_registration" - ]; - } else { - this.unregrpc(uri); - msg = [ - WAMP.UNREGISTERED, - requestId - ]; - } - session.send(msg); + if (this.checkRealm(WAMP.UNREGISTER, requestId)) + this.realm.unregrpc(this, requestId, registrationId); }; -handlers[WAMP.YIELD] = function (session, args) { +handlers[WAMP.YIELD] = function (args) { var invId = args.shift(); var options = args.shift(); - args = args || []; - this.resrpc(invId, null, args); + var fArgs = args.shift() || []; + var kwArgs = args.shift(); + if (this.checkRealm(WAMP.CALL, invId)) + this.realm.resrpc(this, invId, null /* no error */, fArgs, kwArgs, options); }; -handlers[WAMP.SUBSCRIBE] = function(session, args) { +handlers[WAMP.SUBSCRIBE] = function(args) { var requestId = args.shift(); var options = args.shift(); var topicUri = args.shift(); - args = args || []; - var msg; - - var eventCallback = function(publicationId, args, kwargs) { - log.trace('eventCallback', publicationId, args, kwargs); - var msg = [ - WAMP.EVENT, - subsId, - publicationId, - {} - ]; - // Manage optional parameters args + kwargs - if (args !== undefined) { - msg.push(args); - } - if (kwargs !== undefined) { - msg.push(kwargs); - } - session.send(msg); - }; - - var subsId = session.subscribe(topicUri); - this.substopic(topicUri, subsId, eventCallback); - msg = [ - WAMP.SUBSCRIBED, - requestId, - subsId - ]; - log.trace('Subscribe Topic ' + topicUri); - session.send(msg); + if (this.checkRealm(WAMP.SUBSCRIBE, requestId)) + this.realm.substopic(this, requestId, topicUri, options); }; -handlers[WAMP.UNSUBSCRIBE] = function(session, args) { +handlers[WAMP.UNSUBSCRIBE] = function(args) { var requestId = args.shift(); - var subsid = args.shift(); - var topicUri = session.unsubscribe(subsid); - args = args || []; - var msg; - - if (typeof this.gettopic(topicUri) === 'undefined') { - msg = [ - WAMP.ERROR, - WAMP.UNSUBSCRIBE, - requestId, - {}, - "wamp.error.no_such_subscription" - ]; - log.trace('Unsubscription error ' + topicUri); - } else { - this.unsubstopic(topicUri, subsid); - msg = [ - WAMP.UNSUBSCRIBED, - requestId - ]; - log.trace('Unsubscribe Topic ' + topicUri); - } - session.send(msg); + var topicUri = args.shift(); + if (this.checkRealm(WAMP.UNSUBSCRIBE, requestId)) + this.realm.unsubstopic(this, requestId, topicUri); }; -handlers[WAMP.PUBLISH] = function(session, msg) { +handlers[WAMP.PUBLISH] = function(msg) { var requestId = msg.shift(); var options = msg.shift(); var topicUri = msg.shift(); - var ack = options && options.acknowledge; - var publicationId = util.randomId(); var args = msg.shift() || []; var kwargs = msg.shift() || {}; - - if (ack) { - msg = [ - WAMP.PUBLISHED, - requestId, - publicationId - ]; - session.send(msg); - log.trace('Publish Topic with ack ' + topicUri + ' ' + publicationId); - } else { - log.trace('Publish Topic without ack ' + topicUri + ' ' + publicationId); - } - - // Router (this) is in charge of the events dispatching - this.publish(topicUri, publicationId, args, kwargs); -}; - -handlers[WAMP.EVENT] = function(session, args) { - var subscriptionId = args.shift(); - var publicationId = args.shift(); - args = args || []; - - log.trace('Event received subscriptionId ' + subscriptionId - + ' publicationId ' + publicationId); + if (this.checkRealm(WAMP.PUBLISH, requestId)) + this.realm.publish(this, requestId, topicUri, options, args, kwargs); }; -handlers[WAMP.ERROR] = function(session, msg) { +handlers[WAMP.ERROR] = function(msg) { var requestType = msg.shift(); var requestId = msg.shift(); var details = msg.shift(); var errorUri = msg.shift(); var args = msg.shift() || []; - var kwargs = msg.shift() || {}; + var kwargs = msg.shift(); - var err = new Error(details); - if (requestType === WAMP.INVOCATION) { - // An invocation failed - var invId = requestId; - this.resrpc(invId, err, args); - } - -} + // An invocation failed + if (this.checkRealm(WAMP.ERROR, requestId) && requestType === WAMP.INVOCATION) + this.realm.resrpc(this, requestId, new Error(details), args, kwargs); +}; -module.exports = handlers; +module.exports = Facade; diff --git a/lib/log.js b/lib/log.js deleted file mode 100644 index de8bd53..0000000 --- a/lib/log.js +++ /dev/null @@ -1,12 +0,0 @@ -// wamp.rt -// Copyright Orange 2014 - -var trace = function () {}; - -if ('WAMPRT_TRACE' in global && WAMPRT_TRACE && 'console' in global) { - trace = function () { - console.log.apply(console, arguments); - }; -} - -exports.trace = trace; diff --git a/lib/messages.js b/lib/messages.js new file mode 100644 index 0000000..9128f51 --- /dev/null +++ b/lib/messages.js @@ -0,0 +1,8 @@ +module.exports = { + REALM_CREATED: 'realm.created', + SESSION_JOIN: 'session.join', + SESSION_LEAVE: 'session.leave', + SESSION_COUNT: 'session.count', + SESSION_LIST: 'session.list', + SESSION_GET: 'session.get', +}; diff --git a/lib/realm.js b/lib/realm.js new file mode 100644 index 0000000..33ed003 --- /dev/null +++ b/lib/realm.js @@ -0,0 +1,251 @@ +/*jshint node: true */ +'use strict'; + +var WAMP = require('./protocol'), + handlers = require('./handlers'), + tools = require('./tools'); + +function Api(router, realm) { + var _callback = {}; + var _rpc = {}; + + handlers.call(this); + this.sessionId = tools.randomId(); + router.registerSession(this); + + // API functions + // regrpc callback = function(id, args, kwargs, options) + this.regrpc = function(uri, callback) { + var regId = realm.regrpc(this, tools.randomId(), uri); + if (regId) { + _rpc[regId] = callback; + } + return regId; + }; + this.unregrpc = function(regId) { + var uri = realm.unregrpc(this, tools.randomId(), regId); + delete _rpc[regId]; + return uri; + }; + this.callrpc = function (uri, args, kwargs, callback, options) { + var callId = tools.randomId(); + if (realm.callrpc(this, callId, uri, options, args, kwargs)) { + _callback[callId] = callback; + } + }; + this.resrpc = function (invId, err, args, kwargs, options) { + return realm.resrpc(this, invId, err, args, kwargs, options); + }; + this.substopic = function(topicUri, callback) { + var topicId = realm.substopic(this, tools.randomId(), topicUri, {}); + _rpc[topicId] = callback; + return topicId; + }; + this.unsubstopic = function(topicId) { + delete _rpc[topicId]; + return realm.unsubstopic(this, tools.randomId(), topicId); + }; + this.publish = function (topicUri, args, kwargs, options) { + var requestId = tools.randomId(); + realm.publish(this, requestId, topicUri, options, args, kwargs); + }; + + // override/internal part + this.sendInvoke = function (regId, invId, args, kwargs, options) { + if (_rpc.hasOwnProperty(regId)) { + _rpc[regId](invId, args, kwargs, options); + } + }; + this.sendResult = function (callId, err, args, kwargs, options) { + var callback = _callback[callId]; + if (!options || !options.progress) { + delete _callback[callId]; + } + callback(err, args, kwargs, options); + }; + this.sendEvent = function (subscriptionId, publicationId, args, kwargs) { + if (_rpc.hasOwnProperty(subscriptionId)) { + _rpc[subscriptionId](publicationId, args, kwargs); + } + }; + this.send = function (msg) {}; +} + +function Realm(router) { + var _sessRPC = {}; + var _sessTopic = {}; // topics by sessionId + + var _rpcs = {}; + var _topics = {}; // topics by uri + var _pending = {}; + var _api = null; + this.isSecured = false; + + this.api = function () { + if (!_api) { + _api = new Api(router, this); + } + return _api; + }; + + // RPC Management + this.regrpc = function(session, requestId, procUri, options) { + if (_rpcs.hasOwnProperty(procUri)) { + session.sendError(WAMP.REGISTER, requestId, "wamp.error.procedure_already_exists"); + return false; + } + var registrationId = tools.randomId(); + _rpcs[procUri] = {sessionId:session.sessionId, regId:registrationId}; + if (!_sessRPC.hasOwnProperty(session.sessionId)) + _sessRPC[session.sessionId] = {}; + _sessRPC[session.sessionId][registrationId] = procUri; + router.emit('RPCRegistered', this, procUri); + session.sendRegistered(requestId, registrationId); + return registrationId; + }; + this.unregrpc = function(session, requestId, regId) { + var procUri = ''; + if (_sessRPC.hasOwnProperty(session.sessionId) && _sessRPC[session.sessionId].hasOwnProperty(regId)) { + procUri = _sessRPC[session.sessionId][regId]; + delete _rpcs[procUri]; + delete _sessRPC[session.sessionId][regId]; + router.emit('RPCUnRegistered', this, procUri); + session.sendUnregistered(requestId); + } else { + session.sendError(WAMP.UNREGISTER, requestId, "wamp.error.no_such_registration"); + } + return procUri; + }; + + this.callrpc = function(session, callId, procUri, options, args, kwargs) { + if (!_rpcs.hasOwnProperty(procUri)) { + session.sendError(WAMP.CALL, callId, "wamp.error.no_such_procedure", ['no callee registered for procedure <'+procUri+'>']); + return false; + } + var destSession = router.getSession(_rpcs[procUri].sessionId); + if (destSession) { + var invId = tools.randomId(); + _pending[invId] = [callId, session.sessionId]; + var invOpts = {}; + if (options && options.receive_progress) { + invOpts.receive_progress = true; + } + destSession.sendInvoke(_rpcs[procUri].regId, invId, args, kwargs, invOpts); + return invId; + } else { + delete _rpcs[procUri]; + } + return false; + }; + + this.resrpc = function(session, invId, err, args, kwargs, options) { + var resOpts = {}; + if (options && options.progress) { + resOpts.progress = true; + }; + if (_pending.hasOwnProperty(invId)) { + var destSession = router.getSession(_pending[invId][1]); + if (destSession) { + destSession.sendResult(_pending[invId][0], err, args, kwargs, resOpts); + } + } + if (!resOpts.progress) { + delete _pending[invId]; + } + }; + + // Topic Management + this.substopic = function(session, requestId, topicUri, options) { + var topicId = tools.randomId(); + if (!_sessTopic.hasOwnProperty(session.sessionId)) { + _sessTopic[session.sessionId] = {}; + } + _sessTopic[session.sessionId][topicId] = topicUri; + if (!_topics.hasOwnProperty(topicUri)) { + _topics[topicUri] = {}; + } + _topics[topicUri][topicId] = session.sessionId; + + router.emit('Subscribed', this, topicUri); + session.sendSubscribed(requestId, topicId); + return topicId; + }; + + this.unsubstopic = function(session, requestId, topicId) { + var topicUri = ''; + if (_sessTopic.hasOwnProperty(session.sessionId) && _sessTopic[session.sessionId].hasOwnProperty(topicId)) { + topicUri = _sessTopic[session.sessionId][topicId]; + delete _topics[topicUri][topicId]; + delete _sessTopic[session.sessionId][topicId]; + router.emit('UnSubscribed', this, topicUri); + session.sendUnsubscribed(requestId); + } else { + session.sendError(WAMP.UNSUBSCRIBE, requestId, "wamp.error.no_such_subscription"); + } + return topicUri; + }; + + // By default, a Publisher of an event will not itself receive an event published, even when subscribed to the topic the Publisher is publishing to. + // If supported by the Broker, this behavior can be overridden via the option exclude_me set to false. + // session.publish('com.myapp.hello', ['Hello, world!'], {}, {exclude_me: false}); + + this.publish = function(session, requestId, topicUri, options, args, kwargs) { + var publicationId = tools.randomId(); + if (_topics.hasOwnProperty(topicUri)) { + var eventOpts = {topic:topicUri}; + for(var subscriptionId in _topics[topicUri]) { + var destSession = router.getSession(_topics[topicUri][subscriptionId]); + if (destSession) { + if (session.sessionId !== destSession.sessionId || + (options && false === options.exclude_me) + ) + destSession.sendEvent(parseInt(subscriptionId), publicationId, args, kwargs, eventOpts); + } else { + delete _topics[topicUri][subscriptionId]; + } + } + } + var ack = options && options.acknowledge; + router.emit('Publish', this, topicUri, args, kwargs, ack); + if (ack) + session.sendPublished(requestId, publicationId); + return publicationId; + }; + + this.cleanupRPC = function (session) { + var procIds = []; + var procUris = []; + if (_sessRPC.hasOwnProperty(session.sessionId)) { + for (var regId in _sessRPC[session.sessionId]) + procIds.push(regId); + for (var i=0; i "+data); + }); + + this.on('session.Rx', function (session, data) { + trace("["+session.sessionId+"] RX > "+data); + }); + + this.on('session.debug', function (session, msg) { + trace("["+session.sessionId+"] "+msg); + }); + + this.on('session.warning', function (session, msg, data) { + trace("["+session.sessionId+"] "+msg+' '+data); + }); } + +module.exports = Router; diff --git a/lib/session.js b/lib/session.js index 53dfef4..4eb450b 100644 --- a/lib/session.js +++ b/lib/session.js @@ -1,109 +1,83 @@ -// wamp.rt -// Copyright Orange 2014 +/*jshint node: true */ +'use strict'; -var WAMP = require('./protocol'), - handlers = require('./handlers'), - util = require('./util'), - log = require('./log'); +var + MSG = require('./messages'), + handlers = require('./handlers'), + inherits = require('util').inherits; -module.exports = Session; +// requires sender with +// sender.send(msg, callback) +// sender.close(code, reason) -function Session (router, wsclient) { - var _registeredUris = {}; - var _subscribedUris = {}; - var _trace = function (msg) { - var trace = "[SESSION][" + - ((typeof this.id === 'undefined') ? "?" : this.id) + - "] " + msg; - log.trace(trace); - }.bind(this); - this.register = function (uri) { - var registrationId = util.randomId(); - _registeredUris[registrationId] = uri; - return registrationId; - }; - this.unregister = function (id) { - var uri = _registeredUris[id]; - if (typeof uri !== 'undefined') { - delete _registeredUris[id]; - } - return uri; - }; +// authHandler.authenticate(realmName, secureDetails, secret, callback) + +function Session (router, sender, sessionId) { + var secureRealmName; + var secureDetails; + var authHandler = null; + this.realm = null; + this.sessionId = sessionId; + handlers.call(this); - this.subscribe = function (uri) { - var subscriptionId = util.randomId(); - _subscribedUris[subscriptionId] = uri; - return subscriptionId; + // setup auth module that is configured in transport + this.setAuthHandler = function (auth) { + authHandler = auth; }; - this.unsubscribe = function (id) { - var uri = _subscribedUris[id]; - if (typeof uri !== 'undefined') { - delete _subscribedUris[id]; + this.hello = function (realmName, details) { + secureRealmName = realmName; + if (authHandler) { + secureDetails = details; + if (details.hasOwnProperty('authmethods') && details.authmethods.indexOf('ticket') >= 0) { + this.sendChallenge('ticket', {}); + } else { + this.sendAbort("wamp.error.authorization_failed"); + } + } + else { + router.getRealm(realmName, function (realm) { + this.realm = realm; + router.emit(MSG.SESSION_JOIN, this, realm); + this.sendWelcome({}); + }.bind(this)); } - return uri; }; - - this.send = function (msg, callback) { - data = JSON.stringify(msg); - var defaultCallback = function (error) { - if (error) { - log.trace("Failed to send message: " + error); - this.terminate(1011, "Unexpected error"); + this.authenticate = function (secret) { + authHandler.authenticate(secureRealmName, secureDetails, secret, function (err) { + if (err) { + this.sendAbort("wamp.error.authorization_failed"); + } else { + router.getRealm(secureRealmName, function (realm) { + this.realm = realm; + router.emit(MSG.SESSION_JOIN, this, realm); + var details = { + authid:secureDetails.authid, + authmethod:"ticket" + }; + this.sendWelcome(details); + }.bind(this)); } - }.bind(this); - _trace('TX > ' + data); - wsclient.send(data, (typeof callback === 'function') ? - callback : defaultCallback); + }.bind(this)); }; - wsclient.on('message', function(data) { - var msg; - - try { - msg = JSON.parse(data); - } catch (e) { - log.trace('invalid json'); - this.terminate(1003, "protocol violation"); - return; - } - if (!Array.isArray(msg)) { - log.trace('msg not a list'); - this.terminate(1003, "protocol violation"); - return; - } - var type = msg.shift(); - if (!handlers[type]) { - log.trace('unknown message type'); - this.terminate(1003, "protocol violation"); - return; - } - _trace('RX < ' + data); - handlers[type].apply(router, [this, msg]); - }.bind(this)); - this.close = function () { - // Graceful termination - var msg = [ - WAMP.GOODBYE, - {}, - "wamp.error.close_realm" - ]; - this.send(msg,function (error) { - session.terminate(1000, "Server closed WAMP session"); - }); + this.send = function (msg, callback) { + sender.send(msg, callback); }; this.terminate = function (code, reason) { - log.trace('Closing WebSocket connection: [' + - code + '] ' + reason); - wsclient.close(code, reason); - }; - this.cleanup = function () { - _trace('Cleaning up session'); - for( var regId in _registeredUris) { - router.unregrpc(_registeredUris[regId]); - delete _registeredUris[regId]; - } - for (var subId in _subscribedUris) { - router.unsubstopic(_subscribedUris[subId],subId); - delete _subscribedUris[subId]; - } + if (this.realm) + router.emit(MSG.SESSION_LEAVE, this, this.realm); + sender.close(code, reason); }; + this.getRealmName = function() { + return secureRealmName; + } } + +module.exports = Session; +inherits(Session, handlers); + +Session.prototype.cleanup = function () { + if (this.realm) { + this.realm.cleanup(this); + this.realm = null; + } +}; diff --git a/lib/util.js b/lib/tools.js similarity index 100% rename from lib/util.js rename to lib/tools.js diff --git a/lib/transport.js b/lib/transport.js new file mode 100644 index 0000000..45f6ab2 --- /dev/null +++ b/lib/transport.js @@ -0,0 +1,69 @@ +/*jshint node: true */ +'use strict'; + +var + WebSocketServer = require('ws').Server; + +module.exports = Transport; + +function WsParser(wsclient, router, session) { + wsclient.on('message', function(data) { + var msg; + try { + msg = JSON.parse(data); + } catch (e) { + router.emit('session.warning', session, 'invalid json', data); + session.terminate(1003, "protocol violation"); + return; + } + router.emit('session.Rx', session, data); + session.handle(msg); + }); + + wsclient.on('close', function() { + router.emit('session.debug', session, 'WebSocket is closed.'); + session.cleanup(); + router.removeSession(session); + }); +} + +function WsSender(wsclient, router, sessionId) { + var defaultCallback = function (error) { + if (error) { + router.emit('session.warning', "Failed to send message:", error); + this.close(1011, "Unexpected error"); + } + }.bind(this); + + this.send = function (msg, callback) { + var data = JSON.stringify(msg); + router.emit('session.Tx', this.session, data); + wsclient.send(data, (typeof callback === 'function') ? + callback : defaultCallback); + }; + + this.close = function (code, reason) { + router.emit('session.debug', this.session, 'Closing WebSocket connection: [' + code + '] ' + reason); + wsclient.close(code, reason); + }; +} + +function Transport(router, auth, SessionClass, wsOptions) { + var _wss = new WebSocketServer(wsOptions); + // Create a Session object for the lifetime of each + // WebSocket client object + _wss.on('connection', function (wsclient) { + var sessionId = router.getNewSessionId(); + var sender = new WsSender(wsclient, router, sessionId); + var session = new SessionClass(router, sender, sessionId); + sender.session = session; + session.setAuthHandler(auth); + router.registerSession(session); + var parser = new WsParser(wsclient, router, session); + router.emit('session.debug', session, 'New session'); + }); + + this.close = function() { + _wss.close(); + }; +} diff --git a/lib/wamp.rt.js b/lib/wamp.rt.js index 6bf4312..3120aa0 100644 --- a/lib/wamp.rt.js +++ b/lib/wamp.rt.js @@ -1 +1,35 @@ -module.exports = require('./router'); +var + inherits = require('util').inherits, + Session = require('./session'), + Transport = require('./transport'), + Router = require('./router'); + +RouterTransport = function (options, auth) { + Router.call(this); + + var _options = options || {}; + + if ( !_options.disableProtocolCheck ) { + // We need to verify that the subprotocol is wamp.2.json + var cb = _options.handleProtocols; + _options.handleProtocols = function (protocols, callback) { + var i=0; + var result = false; + while(i < protocols.length && result === false) { + result = (protocols[i] == "wamp.2.json"); + i++; + } + if (result && typeof cb == 'function') { + // If a handleProtocol function was provided by the + // calling script, just filter out the results + cb([protocols[i-1]], callback); + } else { + callback(result, result ? protocols[i-1] : null); + } + }; + } + _transport = new Transport(this, auth, Session, _options); +}; + +inherits(RouterTransport, Router); +module.exports = RouterTransport; diff --git a/package.json b/package.json index 78afbd8..0396841 100644 --- a/package.json +++ b/package.json @@ -1,28 +1,38 @@ { - "name": "wamp.rt", - "version": "0.1.4", + "name": "fox.wamp", + "version": "0.2.0", "description": "Basic WebSocket Application Messaging Protocol (WAMP) V2 router library", "author": { - "name": "David Corvoysier", - "email": "david.corvoysier@orange.com" + "name": "Anatoly Tsapkov", + "email": "anatoly.tsapkov@gmail.com" }, "contributors": [ - "Dominique ALOË " + "Dominique ALOË ", + "David Corvoysier " ], "repository": { "type": "git", - "url": "https://github.com/Orange-OpenSource/wamp.rt.git" + "url": "https://github.com/kalmyk/fox.wamp.git" }, "dependencies": { - "ws": "~0.4.31", - "commander": "~2.0.0" + "ws": "~1", + "commander": "*", + "node-statsd": "*" }, "devDependencies": { - "autobahn": "*", - "when": "*" + "autobahn": "~0.9.4", + "when": "*", + "chai": "*", + "chai-as-promised": "*", + "chai-spies": "*" }, "main": "lib/wamp.rt", "engines": { "node": ">=0.10.0" + }, + "scripts": { + "mocha-node-test": "mocha", + "lint": "jshint lib/*.js test/*.js", + "test": "npm run-script lint && npm run-script mocha-node-test" } } diff --git a/test/auth.js b/test/auth.js new file mode 100644 index 0000000..c63f6b3 --- /dev/null +++ b/test/auth.js @@ -0,0 +1,82 @@ +/*jshint mocha: true */ +/*jshint node: true */ +/*jshint expr: true */ +'use strict'; + +var + chai = require('chai'), + spies = require('chai-spies'), + expect = chai.expect, + WAMP = require('../lib/protocol'), + Session = require('../lib/session'), + Router = require('../lib/router'); + +chai.use(spies); + +var Auth = function () { + this.authenticate = function (realmName, secureDetails, secret, callback) { + if (realmName+'-'+secureDetails.authid+'-secret' === secret) + callback(); + else + callback('authorization_failed'); + }; +}; + +describe('authenticate', function() { + var + router, + sender, + cli; + + beforeEach(function(){ + sender = {}; + router = new Router(); + + cli = new Session(router, sender, router.getNewSessionId()); + cli.setAuthHandler(new Auth()); + router.registerSession(cli); + }); + + afterEach(function(){ + }); + + it('Joe AUTH:FAIL', function () { + sender.send = chai.spy( + function (msg, callback) { + expect(msg[0]).to.equal(WAMP.CHALLENGE); + expect(msg[1]).to.equal('ticket'); + } + ); + cli.handle([WAMP.HELLO, 'test', {authid: 'joe', authmethods:['ticket']}]); + expect(sender.send).to.have.been.called.once(); + + sender.send = chai.spy( + function (msg, callback) { + expect(msg[0]).to.equal(WAMP.ABORT); +// callback(); + } + ); + cli.handle([WAMP.AUTHENTICATE, 'incorrect-secret']); + expect(sender.send).to.have.been.called.once(); + }); + + it('Joe AUTH:OK', function () { + sender.send = chai.spy( + function (msg, callback) { + expect(msg[0]).to.equal(WAMP.CHALLENGE); + expect(msg[1]).to.equal('ticket'); + } + ); + cli.handle([WAMP.HELLO, 'test', {authid: 'joe', authmethods:['ticket']}]); + expect(sender.send).to.have.been.called.once(); + + sender.send = chai.spy( + function (msg, callback) { + expect(msg[0]).to.equal(WAMP.WELCOME); + } + ); + cli.handle([WAMP.AUTHENTICATE, 'test-joe-secret']); + expect(sender.send).to.have.been.called.once(); + }); + +}); diff --git a/test/backend.js b/test/backend.js deleted file mode 100644 index d5ad504..0000000 --- a/test/backend.js +++ /dev/null @@ -1,83 +0,0 @@ -AUTOBAHN_DEBUG = true; -var autobahn = require('autobahn'); -var program = require('commander'); - -program - .option('-p, --port ', 'Server IP port', parseInt,9000) - .option('-i, --ip ', 'Server IP address','127.0.0.1') - .parse(process.argv); - -var connection = new autobahn.Connection({ - url: 'ws://' + program.ip + ':' + program.port, - realm: 'realm1'} -); - -connection.onopen = function (session) { - - var reg = null; - var reg2 = null; - - function utcnow() { - console.log("Someone is calling me;)"); - now = new Date(); - return now.toISOString(); - } - - session.register('com.timeservice.now', utcnow).then( - function (registration) { - console.log("Procedure registered:", registration.id); - reg = registration; - }, - function (error) { - console.log("Registration failed:", error); - } - ); - - function echo(args,kwargs) { - console.log("args",args,"kwargs",kwargs); - return new autobahn.Result(args, kwargs); - } - - session.register('com.echoservice.echo', echo).then( - function (registration) { - console.log("Procedure echo registered:", registration.id); - reg2 = registration; - }, - function (error) { - console.log("Registration failed:", error); - } - ); - - var currentSubscription = null; - var counter = 0; - - // Define an event handler - function onEvent(publishArgs, kwargs) { - console.log('Event received args', publishArgs, 'kwargs ',kwargs); - counter++; - if (counter > 20) { - session.unsubscribe(currentSubscription).then(function(gone) { - console.log("unsubscribe successfull"); - }, function(error) { - console.log("unsubscribe failed", error); - }); - } - } - - // Subscribe to a topic - session.subscribe('com.myapp.topic1', onEvent).then( - function(subscription) { - console.log("subscription successfull", subscription); - currentSubscription = subscription; - }, - function(error) { - console.log("subscription failed", error); - } - ); - - setTimeout(function() {console.log("Unregistration");session.unregister(reg);session.unregister(reg2);},20000); - - -}; - -connection.open(); diff --git a/test/realm.js b/test/realm.js new file mode 100644 index 0000000..bc86462 --- /dev/null +++ b/test/realm.js @@ -0,0 +1,349 @@ +/*jshint mocha: true */ +/*jshint node: true */ +/*jshint expr: true */ +'use strict'; + +var + chai = require('chai'), + spies = require('chai-spies'), + expect = chai.expect, + WAMP = require('../lib/protocol'), + Realm = require('../lib/realm'), + Session = require('../lib/session'), + Router = require('../lib/router'); + +chai.use(spies); + +describe('protocol', function() { + var + router, + realm, + sender, + cli, + api; + + beforeEach(function(){ + sender = {}; + router = new Router(); + realm = new Realm(router); + api = realm.api(); + cli = new Session(router, sender, router.getNewSessionId()); + router.registerSession(cli); + cli.realm = realm; + }); + + afterEach(function(){ + }); + + it('empty cleanup', function () { + realm.cleanup(api); + }); + + it('CALL to RPC not exist', function () { + sender.send = chai.spy( + function (msg, callback) { + expect(msg[0]).to.equal(WAMP.ERROR); + expect(msg[1]).to.equal(WAMP.CALL); + expect(msg[2]).to.equal(1234); + expect(msg[4]).to.equal('wamp.error.no_such_procedure'); + expect(msg[5]).to.deep.equal([ 'no callee registered for procedure ' ]); + } + ); + cli.handle([WAMP.CALL, 1234, {}, 'any.function.name', []]); + expect(sender.send).to.have.been.called.once(); + }); + + it('cleanup RPC API', function () { + var procSpy = chai.spy(function() {}); + api.regrpc('func1', procSpy); + expect(realm.cleanupRPC(api)).to.deep.equal(['func1']); + expect(realm.cleanupRPC(api)).to.deep.equal([]); + expect(procSpy).to.not.have.been.called(); + }); + + it('CALL to router', function () { + var procSpy = chai.spy(function(id, args, kwargs) { + api.resrpc(id, undefined, ['result.1','result.2'], {kVal:'kRes'}); + }); + var regId = api.regrpc('func1', procSpy); + + sender.send = chai.spy( + function (msg, callback) { + expect(msg[0]).to.equal(WAMP.RESULT); + expect(msg[1]).to.equal(1234); + expect(msg[3]).to.deep.equal(['result.1','result.2']); + expect(msg[4]).to.deep.equal({kVal:'kRes'}); + } + ); + cli.handle([WAMP.CALL, 1234, {}, 'func1', ['arg1', 'arg2'], {'kArg':'kVal'}]); + expect(procSpy, 'RPC delivered').to.have.been.called.once(); + expect(sender.send, 'result delivered').to.have.been.called.once(); + expect(api.unregrpc(regId)).to.equal('func1'); + }); + + it('CALL to router with error', function () { + var callId = null; + var procSpy = chai.spy(function(id, args, kwargs) { + callId = id; + }); + api.regrpc('func1', procSpy); + sender.send = chai.spy( + function (msg, callback) { + expect(msg[0]).to.equal(WAMP.ERROR); + expect(msg[1]).to.equal(WAMP.CALL); + expect(msg[2]).to.equal(1234); + expect(msg[4]).to.deep.equal('wamp.error.callee_failure'); + } + ); + cli.handle([WAMP.CALL, 1234, {}, 'func1', ['arg1', 'arg2'], {'kArg':'kVal'}]); + api.resrpc(callId, 1, ['result.1','result.2'], {kVal:'kRes'}); + expect(procSpy).to.have.been.called.once(); + expect(sender.send).to.have.been.called.once(); + }); + + it('UNREGISTER error', function () { + sender.send = chai.spy( + function (msg, callback) { + expect(msg[0]).to.equal(WAMP.ERROR); + expect(msg[1]).to.equal(WAMP.UNREGISTER); + expect(msg[2]).to.equal(2345); + // 3 options + expect(msg[4]).to.equal('wamp.error.no_such_registration'); + } + ); + cli.handle([WAMP.UNREGISTER, 2345, 1234567890]); + expect(sender.send, 'unregistration confirmed').to.have.been.called.once(); + }); + + it('UNREGISTER', function () { + var registrationId = null; + + sender.send = chai.spy( + function (msg, callback) { + expect(msg[0]).to.equal(WAMP.REGISTERED); + expect(msg[1]).to.equal(1234); + registrationId = msg[2]; + } + ); + cli.handle([WAMP.REGISTER, 1234, {}, 'func1']); + expect(sender.send, 'registration confirmed').to.have.been.called.once(); + + sender.send = chai.spy( + function (msg, callback) { + expect(msg[0]).to.equal(WAMP.UNREGISTERED); + expect(msg[1]).to.equal(2345); + } + ); + cli.handle([WAMP.UNREGISTER, 2345, registrationId]); + expect(sender.send, 'unregistration confirmed').to.have.been.called.once(); + }); + + it('CALL to remote', function () { + var registrationId = null; + + sender.send = chai.spy( + function (msg, callback) { + expect(msg[0]).to.equal(WAMP.REGISTERED); + expect(msg[1]).to.equal(1234); + registrationId = msg[2]; + } + ); + cli.handle([WAMP.REGISTER, 1234, {}, 'func1']); + expect(sender.send, 'registration confirmed').to.have.been.called.once(); + + var callId = null; + sender.send = chai.spy( + function (msg, callback) { + expect(msg[0]).to.equal(WAMP.INVOCATION); + callId = msg[1]; + expect(msg[2]).to.equal(registrationId); + expect(msg[3]).to.deep.equal({}); // options + expect(msg[4]).to.deep.equal(['arg.1','arg.2']); + expect(msg[5]).to.deep.equal({kVal:'kRes'}); + } + ); + var callResponse = chai.spy(function(err, args, kwargs) { + expect(err).to.equal(null); + expect(args).to.deep.equal(['result.1','result.2'], 'args call spy response'); + expect(kwargs).to.deep.equal({foo:'bar'}, 'kwargs call spy response'); + }); + api.callrpc('func1', ['arg.1','arg.2'], {kVal:'kRes'}, callResponse); + expect(sender.send, 'invocation received').to.have.been.called.once(); + + // return the function result + cli.handle([WAMP.YIELD, callId, {}, ['result.1','result.2'], {foo:'bar'}]); + + expect(callResponse, 'result delivered').to.have.been.called.once(); + }); + + it('CALL error to remote', function () { + sender.send = function () {}; + cli.handle([WAMP.REGISTER, 1234, {}, 'func1']); + + var callId = null; + sender.send = chai.spy( + function (msg, callback) { + callId = msg[1]; + } + ); + var callSpy = chai.spy(function(err, args) { + expect(err).to.be.an('error'); + expect(args).to.deep.equal(['err.detail.1','err.detail.2']); + }); + api.callrpc('func1', ['arg.1','arg.2'], {kVal:'kRes'}, callSpy); + expect(sender.send, 'invocation received').to.have.been.called.once(); + + cli.handle([WAMP.ERROR, WAMP.INVOCATION, callId, {}, 'wamp.error.runtime_error', ['err.detail.1','err.detail.2']]); + expect(callSpy, 'error delivered').to.have.been.called.once(); + }); + + it('Progress remote CALL', function () { + sender.send = function (msg, callback) {}; + cli.handle([WAMP.REGISTER, 1234, {}, 'func1']); + + var callId = null; + sender.send = chai.spy( + function (msg, callback) { + expect(msg[0]).to.equal(WAMP.INVOCATION); + callId = msg[1]; + // registrationId + expect(msg[3]).to.deep.equal({receive_progress:true}); + } + ); + var result; + var options; + var callResponse = chai.spy(function(err, args, kwargs, options) { + expect(err).to.equal(null); + expect(args).to.deep.equal(result, 'args call spy response'); + expect(options).to.deep.equal(options, 'progress 1'); + }); + api.callrpc('func1', [], {}, callResponse, {receive_progress:1}); + expect(sender.send, 'invocation received').to.have.been.called.once(); + + result = ['result.1']; + options = {progress:true}; + cli.handle([WAMP.YIELD, callId, {progress:true}, ['result.1']]); + + result = ['result.2']; + options = {progress:true}; + cli.handle([WAMP.YIELD, callId, {progress:true}, ['result.2']]); + + result = ['result.3.final']; + options = {}; + cli.handle([WAMP.YIELD, callId, {}, ['result.3.final']]); + + cli.handle([WAMP.YIELD, callId, {}, ['result.not_delivered']]); + + expect(callResponse, 'result delivered').to.have.been.called.exactly(3); + }); + + it('UNSUBSCRIBE-ERROR', function () { + sender.send = chai.spy( + function (msg, callback) { + expect(msg[0]).to.equal(WAMP.ERROR); + expect(msg[1]).to.equal(WAMP.UNSUBSCRIBE); + expect(msg[2]).to.equal(2345); + // 3 options + expect(msg[4]).to.equal('wamp.error.no_such_subscription'); + } + ); + cli.handle([WAMP.UNSUBSCRIBE, 2345, 1234567890]); + expect(sender.send, 'unsubscription confirmed').to.have.been.called.once(); + }); + + it('UNSUBSCRIBE-OK', function () { + var subscriptionId = null; + + sender.send = chai.spy( + function (msg, callback) { + expect(msg[0]).to.equal(WAMP.SUBSCRIBED); + expect(msg[1]).to.equal(1234); + subscriptionId = msg[2]; + } + ); + cli.handle([WAMP.SUBSCRIBE, 1234, {}, 'topic1']); + expect(sender.send, 'subscription confirmed').to.have.been.called.once(); + + sender.send = chai.spy( + function (msg, callback) { + expect(msg[0]).to.equal(WAMP.UNSUBSCRIBED); + expect(msg[1]).to.equal(2345); + } + ); + cli.handle([WAMP.UNSUBSCRIBE, 2345, subscriptionId]); + expect(sender.send, 'unsubscription confirmed').to.have.been.called.once(); + }); + + it('cleanup Topic API', function () { + var subSpy = chai.spy(function () {}); + api.substopic('topic1', subSpy); + expect(cli.realm.cleanupTopic(api)).to.deep.equal(['topic1']); + expect(cli.realm.cleanupTopic(api)).to.deep.equal([]); + expect(subSpy).to.not.have.been.called(); + }); + + it('PUBLISH default exclude_me:true', function () { + var subSpy = chai.spy(function () {}); + api.substopic('topic1', subSpy); + api.publish('topic1', [], {}); + expect(subSpy).to.not.have.been.called(); + }); + + it('PUBLISH exclude_me:false', function () { + var subSpy = chai.spy(function () {}); + api.substopic('topic1', subSpy); + api.publish('topic1', [], {}, {exclude_me:false}); + expect(subSpy).to.have.been.called.once(); + }); + + it('PUBLISH to remote', function () { + var subscriptionId = null; + + sender.send = chai.spy( + function (msg, callback) { + expect(msg[0]).to.equal(WAMP.SUBSCRIBED); + expect(msg[1]).to.equal(1234); + subscriptionId = msg[2]; + } + ); + cli.handle([WAMP.SUBSCRIBE, 1234, {}, 'topic1']); + expect(sender.send, 'subscription confirmed').to.have.been.called.once(); + + sender.send = chai.spy( + function (msg, callback) { + expect(msg[0]).to.equal(WAMP.EVENT); + expect(msg[1]).to.equal(subscriptionId); + // 2 published message Id + expect(msg[3]).to.deep.equal({topic:'topic1'}); + expect(msg[4]).to.deep.equal(['arg.1','arg.2']); + expect(msg[5]).to.deep.equal({foo:'bar'}); + } + ); + api.publish('topic1', ['arg.1','arg.2'], {foo:'bar'}); + expect(sender.send, 'publication received').to.have.been.called.once(); + }); + + it('SUBSCRIBE to remote', function () { + var subSpy = chai.spy( + function (publicationId, args, kwargs) { + expect(args).to.deep.equal(['arg.1','arg.2']); + expect(kwargs).to.deep.equal({foo:'bar'}); + } + ); + var subId = api.substopic('topic1', subSpy); + + sender.send = chai.spy( + function (msg, callback) { + expect(msg[0]).to.equal(WAMP.PUBLISHED); + expect(msg[1]).to.equal(2345); + } + ); + cli.handle([WAMP.PUBLISH, 1234, {}, "topic1", ['arg.1','arg.2'],{foo:'bar'}]); + expect(sender.send, 'published').to.not.have.been.called(); + cli.handle([WAMP.PUBLISH, 2345, {"acknowledge":true}, "topic1", ['arg.1','arg.2'],{foo:'bar'}]); + expect(sender.send, 'published').to.have.been.called.once(); + + expect(subSpy, 'publication done').to.have.been.called.twice(); + expect(api.unsubstopic(subId)).to.equal('topic1'); + }); +}); diff --git a/test/session.js b/test/session.js new file mode 100644 index 0000000..683f157 --- /dev/null +++ b/test/session.js @@ -0,0 +1,89 @@ +/*jshint mocha: true */ +/*jshint node: true */ +/*jshint expr: true */ +'use strict'; + +var + chai = require('chai'), + spies = require('chai-spies'), + expect = chai.expect, + WAMP = require('../lib/protocol'), + Session = require('../lib/session'), + Router = require('../lib/router'); + +chai.use(spies); + +describe('session', function() { + var + router, + sender, + cli; + + beforeEach(function(){ + sender = {}; + router = new Router(); + cli = new Session(router, sender, router.getNewSessionId()); + router.registerSession(cli); + }); + + afterEach(function(){ + }); + + it('HELLO/WELCOME', function () { + sender.send = chai.spy( + function (msg, callback) { + expect(msg[0]).to.equal(WAMP.WELCOME); + } + ); + cli.handle([WAMP.HELLO, 'test', {}]); + expect(sender.send).to.have.been.called.once(); + + // second hello command raises error and disconnects the user + sender.send = chai.spy(function (msg, callback) {}); + sender.close = chai.spy(function (error, reason) {}); + cli.handle([WAMP.HELLO, 'test', {}]); + expect(sender.send).to.not.have.been.called(); + expect(sender.close).to.have.been.called.once(); + }); + + it('GOODBYE', function () { + sender.send = chai.spy( + function (msg, callback) { + expect(msg[0]).to.equal(WAMP.GOODBYE); + callback(); + } + ); + sender.close = chai.spy( + function (error) {} + ); + cli.handle([WAMP.GOODBYE]); + expect(sender.send).to.have.been.called.once(); + expect(sender.close).to.have.been.called.once(); + }); + + it('CALL to no realm RPC', function () { + sender.send = chai.spy( + function (msg, callback) { + expect(msg[0]).to.equal(WAMP.ERROR); + expect(msg[1]).to.equal(WAMP.CALL); + expect(msg[2]).to.equal(1234); + expect(msg[4]).to.equal('wamp.error.not_authorized'); + } + ); + cli.handle([WAMP.CALL, 1234, {}, 'any.function.name', []]); + expect(sender.send).to.have.been.called.once(); + }); + + it('REGISTER to no realm', function () { + sender.send = chai.spy( + function (msg, callback) { + expect(msg[0]).to.equal(WAMP.ERROR); + expect(msg[1]).to.equal(WAMP.REGISTER); + expect(msg[2]).to.equal(1234); + expect(msg[4]).to.equal('wamp.error.not_authorized'); + } + ); + cli.handle([WAMP.REGISTER, 1234, {}, 'func1']); + expect(sender.send, 'registration failed').to.have.been.called.once(); + }); +});