diff --git a/CometdGrailsPlugin.groovy b/CometdGrailsPlugin.groovy index 3f82d3a..915af54 100644 --- a/CometdGrailsPlugin.groovy +++ b/CometdGrailsPlugin.groovy @@ -17,15 +17,19 @@ import grails.util.Environment import org.codehaus.groovy.grails.commons.ConfigurationHolder +import org.codehaus.groovy.grails.commons.ServiceArtefactHandler import org.cometd.server.BayeuxServerImpl import org.cometd.server.CometdServlet import org.cometd.bayeux.server.BayeuxServer +import grails.plugin.cometd.ServiceCometdProcessor import org.springframework.web.context.support.ServletContextAttributeExporter +import org.apache.commons.logging.LogFactory + class CometdGrailsPlugin { - def version = "0.2.2" + def version = "0.2.5" def grailsVersion = "1.2.1 > *" def dependsOn = [:] def pluginExcludes = [ @@ -44,9 +48,13 @@ This plugin allows your Grails application to send asynchronous notifications to CometD and the Bayeux protocol. ''' def documentation = "http://www.grails.org/plugin/cometd" + def loadAfter = ['services', 'controllers'] + def observe = ['services', 'controllers'] def doWithWebDescriptor = { xml -> def conf = ConfigurationHolder.config.plugins.cometd + + LOG.debug("Initializing with continutationFilter: ${!conf.continuationFilter.disable}") if (!conf.continuationFilter.disable) { def filters = xml.'filter' filters[filters.size() - 1] + { @@ -70,6 +78,17 @@ CometD and the Bayeux protocol. servlet { 'servlet-name'('cometd') 'servlet-class'(CometdServlet.class.name) + + // Add servlet init params from the config file + if (conf.init?.params) { + LOG.debug("Initializing with init-params: ${conf.init.params}") + conf.init.params.each { key, value -> + 'init-param' { + 'param-name'(key) + 'param-value'(value) + } + } + } } } @@ -83,7 +102,7 @@ CometD and the Bayeux protocol. } def doWithSpring = { - bayeux(BayeuxServerImpl, true) { bean -> + bayeux(BayeuxServerImpl) { bean -> bean.destroyMethod = 'stop' } @@ -92,4 +111,20 @@ CometD and the Bayeux protocol. attributes = [(BayeuxServer.ATTRIBUTE): ref('bayeux')] } } + + def processor = new ServiceCometdProcessor() + + def doWithDynamicMethods = { context -> + application.serviceClasses?.each { service -> + processor.process(service.referenceInstance, context) + } + } + + def onChange = { event -> + if (application.isServiceClass(event.source)) { + def artefact = application.addArtefact(ServiceArtefactHandler.TYPE, event.source) + + processor.process(artefact.referenceInstance, event.ctx) + } + } } diff --git a/README.md b/README.md new file mode 100644 index 0000000..b9afa73 --- /dev/null +++ b/README.md @@ -0,0 +1,91 @@ +# Grails Cometd Plugin + +[CometD](http://cometd.org) is a scalable HTTP-based event routing bus that uses an Ajax Push technology pattern known as [Comet](http://en.wikipedia.org/wiki/Comet_(programming\)). This plugin allows your Grails application to push asynchronous notifications to HTTP clients using CometD and the [Bayeux](http://cometd.org/documentation/bayeux) protocol. + +## Installation +NOTE: Because of a bug this won't work in grails 1.3.2 (fixed in 1.3.3): + + grails install-plugin cometd + +You can always download the [source](http://github.com/marcusb/grails-cometd) and then from the plugin directory: + + grails package-plugin + +And then from your application directory: + + grails install-plugin /path/to/grails-cometd-plugin-zip-file + +## Usage + +### CometD Servlet +*** + +The plugin configures a CometdServlet, mapped to the path cometd relative to your web application's context path. + +### Bayeux Service +*** + +A bean named bayeux is made available to your application. It is an instance of [BayeuxServer](http://download.cometd.org/bayeux-api-2.0.beta0-javadoc/org/cometd/bayeux/server/BayeuxServer.html). This is used to interact with the Comet server. + +### Annotations +*** + +You can annotate your service classes and the plugin will wire everything up for you. This needs a bit of Cometd knowledge, but it's fairly straight forward. In order to register your service as a Cometd service you just have to add a static property to the service, much like the JMS and XFire plugins. (though this might be moving to a class level annotation, we'll see.) + + static exposes = ["cometd"] + +#### @ChannelInitializer +When you annotate a service method with the @ChannelInitializer annotation, the plugin will register it as a ConfigurableServerChannel.Initializer with the bayeux server. Your service method will be called with one argument, which is the ConfigurableServerChannel instance of the channel. Example: + + @ChannelInitializer("/foo/bar") + def configure(channel) { + channel.persistent = true + } + +#### @MessageListener +Annotating a service method with the @MessageListener annotation will register the method as a ServerChannel.MessageListener on the provided channel. The functionality is similar to that found with the AbstractService that comes with the Cometd distribution. Depending on the signature of your method, it will be called with different arguments. Possible options include: + + def service(session, data) // The "from" Session and Message.data + def service(session, data, id) // The "from" Session, Message.data, and the client id + def service(session, channel, message, id) // The "from" Session, the channel(String), the Message, and the client id + +If you were to explicitly define the parameter type for message you will get the actual Message object and not just the data field on the Message: + + def service(session, Message message) // The "from" Session and Message + +If you return false from the method, the message will not be broadcast. If you return nothing or true, the message will be broadcast. If you return an object or message, that message will be delivered to the client that initialized the service call. + +The actual use of this annotation would look thusly: + + @MessageListener("/foo/bar") + def onFooBar(session, data) { + // do something with the data + + [message: "thanks for the info mr. client"] + } + +### Configuration +*** + +The plugin is configured in Config.groovy, with options prefixed with "plugins.cometd". The following options are defined: + +* **plugins.cometd.continuationFilter.disable**: if set, do not install the to [ContinuationFilter](http://download.eclipse.org/jetty/stable-7/apidocs/org/eclipse/jetty/continuation/ContinuationFilter.html) +* **plugins.cometd.init.params**: a map of init-name -> init-value pairs for an init-param element to be applied to the cometd servlet definition. Make sure you read the javadoc for org.cometd.bayeux.server.ServerTransport for how the init params are applied. Some useful settings include: + * **timeout**: In the default long polling http transport is the period of time the server waits before answering to a long poll + * **interval**: In the default long polling http transport is the period of time that the client waits between long polls + * **maxInterval**: In the default long polling http transport is the period of time that must elapse before the server consider the client being lost + +## Contributing +Contributions are welcome, preferably by pull request on GitHub + + git clone git://github.com/marcusb/grails-cometd.git + +## History + +### Version 0.2.1 +*** +* Install the [ContinuationFilter](http://download.eclipse.org/jetty/stable-7/apidocs/org/eclipse/jetty/continuation/ContinuationFilter.html) by default, it is needed for Tomcat 6 + +### Version 0.2 +*** +* Rewritten from scratch for CometD 2.0 \ No newline at end of file diff --git a/application.properties b/application.properties index 021ea07..9288920 100644 --- a/application.properties +++ b/application.properties @@ -1,9 +1,9 @@ #Grails Metadata file -#Tue Oct 19 04:56:43 GMT+01:00 2010 -app.grails.version=1.3.5 +#Thu Apr 14 14:23:13 BRT 2011 +app.grails.version=1.3.7 app.name=grails-cometd app.servlet.version=2.4 app.version=0.1 plugins.functional-test=1.2.7 -plugins.hibernate=1.3.5 -plugins.tomcat=1.3.5 +plugins.hibernate=1.3.7 +plugins.tomcat=1.3.7 diff --git a/grails-app/conf/BuildConfig.groovy b/grails-app/conf/BuildConfig.groovy index 84be2a4..f3594fd 100644 --- a/grails-app/conf/BuildConfig.groovy +++ b/grails-app/conf/BuildConfig.groovy @@ -21,10 +21,14 @@ grails.project.dependency.resolution = { inherits 'global' log "warn" // log level of Ivy resolver, either 'error', 'warn', 'info', 'debug' or 'verbose' repositories { + grailsPlugins() + grailsHome() + grailsCentral() + mavenLocal() mavenCentral() } dependencies { - def cometdVer = '2.0.0' + def cometdVer = '2.2.0' compile(group: 'org.cometd.java', name: 'cometd-java-server', version: cometdVer) { excludes 'servlet-api' } diff --git a/grails-app/services/grails/plugins/cometd/test/EchoService.groovy b/grails-app/services/grails/plugins/cometd/test/EchoService.groovy deleted file mode 100644 index 7d7262e..0000000 --- a/grails-app/services/grails/plugins/cometd/test/EchoService.groovy +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright © 2010 MBTE Sweden AB - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package grails.plugins.cometd.test - -import org.cometd.bayeux.server.ServerChannel - -import org.springframework.beans.factory.InitializingBean - -class EchoService implements InitializingBean -{ - def bayeux - - void afterPropertiesSet() - { - assert bayeux : 'bayeux object must be set' - def localSession = bayeux.newLocalSession('echo') - localSession.handshake() - def serverSession = localSession.getServerSession() - def channel = bayeux.getChannel('/echo', true) - channel.addListener({ from, chan, msg -> - if (from != localSession.getServerSession()) { - from.deliver serverSession, chan.id, [echo: msg.data.msg], null - } - true - } as ServerChannel.MessageListener) - } -} diff --git a/src/groovy/grails/plugin/cometd/ServiceCometdProcessor.groovy b/src/groovy/grails/plugin/cometd/ServiceCometdProcessor.groovy new file mode 100644 index 0000000..7de55ba --- /dev/null +++ b/src/groovy/grails/plugin/cometd/ServiceCometdProcessor.groovy @@ -0,0 +1,171 @@ +package grails.plugin.cometd + +import grails.util.GrailsNameUtils as GNU +import org.codehaus.groovy.grails.commons.GrailsClassUtils as GCU + +import org.cometd.bayeux.Message +import org.cometd.bayeux.server.ServerChannel +import org.cometd.bayeux.server.ConfigurableServerChannel + +class ServiceCometdProcessor { + final static EXPOSES = "exposes" + final static EXPOSE = "expose" + final static COMETD = "cometd" + + def _context + def _messageListenerRemovers = [:] + def _configurations = [ + "initializers": [:], + "messageListeners": [:] + ] + + /** + * Determine if the service class would like to register itself as a cometd service. It does so by declaring a staic variable "exposes" + * or "expose", which is an array that must contain "cometd" + */ + static exposesCometd(service) { + GCU.getStaticPropertyValue(service, EXPOSES)?.contains(COMETD) || GCU.getStaticPropertyValue(service, EXPOSE)?.contains(COMETD) + } + + def process(service, context) { + def clazz = service.class + def bayeux = context.bayeux + + _context = context; + + // If the service has declared itself a cometd service + if (ServiceCometdProcessor.exposesCometd(clazz)) { + // Setup a localsession for the service + def localSession = bayeux.newLocalSession(clazz.simpleName) + localSession.handshake(); + + clazz.metaClass.seeOwnPublishes = false + clazz.metaClass.localSession = localSession + clazz.metaClass.serverSession = localSession.serverSession + + // Process each method looking for cometd annotations + clazz.methods?.each { method -> + processServiceMethod(service, method) + } + + // Initialize all of the configurations. This allows us to control the order in which annotations are processed across many services + _configurations.initializers.each { channel, configurations -> + configureInitializers(channel, configurations, bayeux) + } + + // If listeners for this class exist, we need to remove them + _messageListenerRemovers.get(clazz.name, []).each({remover -> remover.call()}).clear() + + // Initialize all of the messageListeners + _configurations.messageListeners.each { channel, configurations -> + configureMessageListeners(channel, configurations, bayeux) + } + } + } + + /** + * Processes a method for cometd configuration annotations. + */ + def processServiceMethod(service, method) { + processInitializer(service, method) + processMessageListener(service, method) + } + + /** + * Tries to determine if the method is a channel initializer. + */ + def processInitializer(service, method) { + def annotation = method.getAnnotation(ChannelInitializer) + + if (annotation) { + // Method must have one parameter. + if (method.parameterTypes.length != 1) { + throw new IllegalArgumentException(getParameterMessage(ChannelInitializer.class.simpleName, service, method, "one parameter")) + } + + // Add the method to the list of initializers for the given channel + _configurations["initializers"].get(annotation.value(), []) << [annotation: annotation, service: service, method: method] + } + } + + /** + * Determines if the method is a message listener, and adds it to the configurations accordingly + */ + def processMessageListener(service, method) { + def annotation = method.getAnnotation(MessageListener) + + if (annotation) { + // Method must declare 2..4 parameters + if (!(2..4).contains(method.parameterTypes.length)) { + throw new IllegalArgumentException(getParameterMessage(MessageListener.class.simpleName, service, method, "2 to 4 parameters")) + } + + // Add the method to the list of message listeners for the channel + _configurations["messageListeners"].get(annotation.value() ?: annotation.channel(), []) << [annotation: annotation, service: service, method: method] + } + } + + /** + * Each channel can have multiple initializers, possibly from multiple services. Here we initialize them all for a given channel. + */ + def configureInitializers(channel, configurations, bayeux) { + bayeux.createIfAbsent(channel, { configurableServerChannel -> + configurations.each { configuration -> + configuration.method.invoke(configuration.service, configurableServerChannel) + } + } as ConfigurableServerChannel.Initializer) + } + + def configureMessageListeners(channelId, configurations, bayeux) { + bayeux.createIfAbsent(channelId) + + configurations.each { configuration -> + def method = configuration.method + def serviceClass = configuration.service.class + def service = _context[GNU.getPropertyNameRepresentation(serviceClass)] + def arguments = configuration.method.parameterTypes + def channel = bayeux.getChannel(channelId) + + def listener = { session, listenerChannel, message -> + try { + if (service.seeOwnPublishes || session != service.serverSession) { + def data = Message.class.isAssignableFrom(arguments[1]) ? message : message.data + def reply + + switch (arguments.length) { + case 2: + reply = method.invoke(service, session, data) + break; + case 3: + reply = method.invoke(service, session, data, message.id) + break; + case 4: + reply = method.invoke(service, session, message.channel, data, message.id) + } + + if (reply){ + session.deliver(service.serverSession, message.channel, reply, message.id); + } + } + } catch (e) { + e.printStackTrace(); + } + + return configuration.annotation.broadcast() + } as ServerChannel.MessageListener + + // Add a remover closure, so that we can easily detatch the listener and ditch the configuration + _messageListenerRemovers.get(serviceClass.name, []) << { + channel.removeListener(listener) + configurations.remove(configuration) + } + + channel.addListener(listener) + } + + } + + private getParameterMessage(annotation, service, method, requirement) { + return "@${annotation}: '${service.class.simpleName}.${method.name}' declared ${method.parameterTypes.length} parameters, but must declare ${requirement}" + } +} \ No newline at end of file diff --git a/src/java/grails/plugin/cometd/ChannelInitializer.java b/src/java/grails/plugin/cometd/ChannelInitializer.java new file mode 100644 index 0000000..f9024f4 --- /dev/null +++ b/src/java/grails/plugin/cometd/ChannelInitializer.java @@ -0,0 +1,8 @@ +package grails.plugin.cometd; + +import java.lang.annotation.*; + +@Retention(RetentionPolicy.RUNTIME) +public @interface ChannelInitializer { + String value() default ""; +} \ No newline at end of file diff --git a/src/java/grails/plugin/cometd/MessageListener.java b/src/java/grails/plugin/cometd/MessageListener.java new file mode 100644 index 0000000..fb6278f --- /dev/null +++ b/src/java/grails/plugin/cometd/MessageListener.java @@ -0,0 +1,10 @@ +package grails.plugin.cometd; + +import java.lang.annotation.*; + +@Retention(RetentionPolicy.RUNTIME) +public @interface MessageListener { + String value() default ""; + String channel() default ""; + boolean broadcast() default true; +} \ No newline at end of file diff --git a/test/unit/grails/plugin/cometd/ServiceCometdProcessorSpec.groovy b/test/unit/grails/plugin/cometd/ServiceCometdProcessorSpec.groovy new file mode 100644 index 0000000..02c644e --- /dev/null +++ b/test/unit/grails/plugin/cometd/ServiceCometdProcessorSpec.groovy @@ -0,0 +1,176 @@ +package grails.plugin.cometd + +import grails.plugin.spock.* + +import org.cometd.bayeux.Message +import org.cometd.server.BayeuxServerImpl + +class ServiceCometdProcessorSpec extends UnitSpec { + def bayeux = new BayeuxServerImpl() + def processor = new ServiceCometdProcessor() + def context = [bayeux: bayeux] + + def "can identify services that expose cometd functionality"() { + given: "a cometd service" + def service = CometdExposingService + + expect: "the processor to find that the service exposes cometd functionality" + ServiceCometdProcessor.exposesCometd(service) == true + } + + def "will ignore services that don't expose cometd functionality"() { + given: "a non-cometd service" + def service = NonCometdService + + expect: "the processor to find that the service doesn't expose cometd functionality" + ServiceCometdProcessor.exposesCometd(service) == false + } + + def "can find channel initializer method"() { + given: "a cometd service which has a @ChannelInitializer annotated method" + def service = new MethodInitializerService() + + when: "the service is processed" + processor.process(service, context) + + then: "a channel should be created and it should be persistent" + def channel = bayeux.getChannel("/foo/bar") + channel != null + channel.persistent == true + } + + def "encounters an exception when the annotated initializer method doesn't have the correct arguments"() { + given: "a cometd service which has a @ChannelInitializer annotated method with no arguments defined" + def service = new MethodInitializerExceptionService() + + when: "the service is processed" + processor.process(service, context) + + then: "an exception should be thrown" + IllegalArgumentException iae = thrown() + } + + def "can register multiple initializers for one channel"() { + when: "the service is processed" + processor.process(service, context) + + then: "the channel should be initialized" + bayeux.getChannel("/foo/bar").persistent == true + + where: "there are two services" + service << [new MethodInitializerService(), new MethodInitializerService()] + } + + def "can find message listener method and initialize it so that it is listening to channel publish events"() { + given: "a cometed service which has a @MessageListener annotated method" + def service = new MessageListenerService() + context["messageListenerService"] = service + def local = bayeux.newLocalSession("local") + local.handshake() + + when: "the service is processed and the channel is published to" + processor.process(service, context) + bayeux.getChannel("/foo/bar").publish(local.serverSession, [body: "hola"], null) + + then: "the method listener should be registered and called" + bayeux.getChannel("/foo/bar") != null + service.body == "hola" + } + + def "can correctly initialize message listeners with different signatures"() { + given: "a cometed service which has @MessageListener annotated methods all with different method signatures" + def service = new MessageListenerSignaturesService() + context["messageListenerSignaturesService"] = service + def local = bayeux.newLocalSession("local") + local.handshake() + + when: "the service is processed and a message is published on the channel" + processor.process(service, context) + bayeux.getChannel("/foo/bar").publish(local.serverSession, [body: "hola"], "17") + + then: "all listeners should be initialized and called" + service.called.sort() == ["two", "twoTyped", "three", "four"].sort() + } + + def "can reload message listeners gracefully"() { + given: "a cometed service with @MessageListeners" + def service = new MessageListenerService() + context["messageListenerService"] = service + def local = bayeux.newLocalSession("local") + local.handshake() + + when: "the service is processed and then reprocessed" + processor.process(service, context) + processor.process(service, context) + + then: "the channel should have the correct listeners attached" + bayeux.getChannel("/foo/bar").listeners.size() == 1 + } +} + +class CometdExposingService { + static exposes = ["jms", "xfire", "cometd"] +} + +class NonCometdService { + static exposes = ["jms"] +} + +class MethodInitializerService { + static exposes = ["cometd"] + + @ChannelInitializer("/foo/bar") + def configure(channel) { + channel.persistent = true + } +} + +class MethodInitializerExceptionService { + static exposes = ["cometd"] + + @ChannelInitializer("/foo/bar") + def configure() {} +} + +class MessageListenerService { + static exposes = ["cometd"] + def body + + @MessageListener(channel = "/foo/bar", broadcast = false) + def receive(session, message) { + body = message.body + } +} + +class MessageListenerSignaturesService { + static exposes = ["cometd"] + def called = [] + + @MessageListener("/foo/bar") + def two(session, message) { + if (message.body == "hola") { + called << "two" + } + } + + @MessageListener("/foo/bar") + def twoTyped(session, Message message) { + if (message.data.body == "hola") { + called << "twoTyped" + } + } + + @MessageListener("/foo/bar") + def three(session, message, id) { + if (message.body == "hola" && id == "17") { + called << "three" + } + } + + @MessageListener("/foo/bar") + def four(session, channel, message, id) { + if (message.body == "hola" && id == "17" && channel == "/foo/bar") { + called << "four" + } + } +} \ No newline at end of file