-
Notifications
You must be signed in to change notification settings - Fork 141
ServiceQueue
QBit is made up of queues. There are request queues, response queues and event queues.
A serviceQueue is a set of three queues, namely requests (methodCalls), responses and events. The serviceQueue turns an ordinary POJO (plain old Java Object) into a Service Actor. A serviceQueue is building block of QBit.
-
serviceQueueturns a POJO into a Service Actor -
serviceBundlegroupsserviceQueues under different addresses, shares a response queue, allows for service pools, serviceSharding, etc. -
serviceServerexposes aserviceBundleto REST and WebSocket RPC.
QBit allows you to adapt POJOs to become Service Actors. A Service Actor is a form of an active object. Method calls to a Service Actor are delivered asynchronously, and handled on one thread which can handle tens of millions or more method calls per second. Let's demonstrate by creating a simple POJO and turning it into a Service Actor.
ServiceQueue serviceQueue;
...
// Create a serviceQueue with a serviceBuilder.
final ServiceBuilder serviceBuilder = serviceBuilder();
//Start the serviceQueue.
serviceQueue = serviceBuilder
.setServiceObject(new TodoManagerImpl())
.buildAndStartAll();The above code registers the POJO TodoManagerImpl with a serviceQueue by using the method serviceBuilder.setServiceObject. The serviceQueue is started by the buildAndStartAll method of ServiceBuilder.
ServiceQueue is an interface (io.advantageous.qbit.service.ServiceQueue). The ServiceQueue is created with a ServiceBuilder (io.advantageous.qbit.service.ServiceBuilder).
You create a Service Actor by associating a POJO with a serviceQueue. You make this association between the serviceQueue and your service POJO with the `ServiceBuilder.
Once started the serviceQueue can handle method calls on behalf of the TodoManagerImpl and recieve events and deliver them to TodoManagerImpl. TodoManagerImpl can sit behind the serviceQueue. If you only access TodoManagerImpl POJO service from a serviceQueue then it will only ever be accessed by one thread. TodoManagerImpl can handle tens of millions of calls per second, and all of those calls will be thread safe. Here is a simple example of a POJO that we will expose as a Service Actor.
package com.mammatustech.todo;
import io.advantageous.qbit.reactive.Callback;
import java.util.ArrayList;
import java.util.Map;
import java.util.TreeMap;
public class TodoManagerImpl {
private final Map<String, Todo> todoMap = new TreeMap<>();
public TodoManagerImpl() {
}
public void add(final Callback<Boolean> callback, final Todo todo) {
todoMap.put(todo.getId(), todo);
callback.resolve(true);
}
public void remove(final Callback<Boolean> callback, final String id) {
final Todo removed = todoMap.remove(id);
callback.resolve(removed != null);
}
public void list(final Callback<ArrayList<Todo>> callback) {
callback.resolve(new ArrayList<>(todoMap.values()));
}
}Notice that this example does not return values, instead it uses the callback to send a response back to the client. A call to
callback.resolve(someValue) will send that value to the responseQueue. Method calls come in on the requestQueue. The responses go out on the the responseQueue. Let's explore this concept.
The serviceQueue has the following interface.
/**
* Manages a service that sits behind a queue.
* created by Richard on 7/21/14.
*
* @author rhightower
*/
public interface ServiceQueue extends ... {
...
Object service();
SendQueue<MethodCall<Object>> requests();
SendQueue<Event<Object>> events();
ReceiveQueue<Response<Object>> responses();
...These methods are not typically accessed. They are for integration and internal usage but they can help you understand QBit microservices a bit better.
You can access the POJO that the serviceQueue is wrapping with service(). You can send method calls directly to the serviceQueue by using the requests() method to get a sendQueue (SendQueue<MethodCall<Object>>). You can send events directly to the serviceQueue by using the events() method to get a sendQueue. Note that the sendQueue you receive will not be thread safe (they implement micro-batching), so each thread will need to get its own copy of an event or methodCall (request) sendQueue. A sendQueue is the client's view of the queue.
On the receiver side (service side) events and methodCalls queues are handled by the same thread so that all events and methodCalls go to the POJO (e.g., TodoManagerImpl) on the same thread. This is what makes that POJO a Service Actor (active object).
Typically to make calls to a Service Actor, you use a service client proxy, which is just an interface. The service client proxy can return Promises or take a Callback as the first or last argument of the method. A promise is a deferred result that you can handle asynchronously. The Promise interface is similar to ES6 promises.
package com.mammatustech.todo;
import io.advantageous.reakt.promise.Promise;
import java.util.List;
public interface TodoManagerClient {
Promise<Boolean> add(Todo todo);
Promise<Boolean> remove(String id);
Promise<List<Todo>> list();
}package com.mammatustech.todo;
public class Todo {
private final String name;
private final String description;
private final long createTime;
private String id;
public Todo(String name, String description, long createTime) {
...
}
//normal getters, equals, hashCode
}To create an use a service client proxy you use the serviceQueue.
TodoManagerClient client;
ServiceQueue serviceQueue;
//Create a client proxy to communicate with the service actor.
client = serviceQueue
.createProxyWithAutoFlush(TodoManagerClient.class,
Duration.milliseconds(5));
//Add an item
final Promise<Boolean> promise = Promises.blockingPromiseBoolean();
// Add the todo item.
client.add(new Todo("write", "Write tutorial", timer.time()))
.invokeWithPromise(promise);
assertTrue("The call was successful", promise.success());
assertTrue("The return from the add call", promise.get());
//Get a list of items
final Promise<List<Todo>> promiseList = Promises.blockingPromiseList(Todo.class);
// Get a list of todo items.
client.list().invokeWithPromise(promiseList);
// See if the Todo item we created is in the listing.
final List<Todo> todoList =
promiseList.get().stream()...
//Remove an item
// Remove promise
final Promise<Boolean> removePromise =
Promises.blockingPromiseBoolean();
client.remove(todo.getId())
.invokeWithPromise(removePromise);
Note Blocking Promises are great for testing and integration but not something you typically use in your reactive microserivce (sot of defeats the whole purpose).
Here is a simple unit test showing what we have done and talked about so far, after this let's show a non-blocking example and some call coordination.
package com.mammatustech.todo;
import io.advantageous.qbit.service.ServiceBuilder;
import io.advantageous.qbit.service.ServiceQueue;
import io.advantageous.qbit.time.Duration;
import io.advantageous.qbit.util.Timer;
import io.advantageous.reakt.promise.Promise;
import io.advantageous.reakt.promise.Promises;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
import java.util.stream.Collectors;
import static io.advantageous.qbit.service.ServiceBuilder.serviceBuilder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TodoManagerImplTest {
TodoManagerClient client;
ServiceQueue serviceQueue;
final Timer timer = Timer.timer();
@Before
public void setup() {
// Create a serviceQueue with a serviceBuilder.
final ServiceBuilder serviceBuilder = serviceBuilder();
//Start the serviceQueue.
serviceQueue = serviceBuilder
.setServiceObject(new TodoManagerImpl())
.buildAndStartAll();
//Create a client proxy to communicate with the service actor.
client = serviceQueue.createProxyWithAutoFlush(TodoManagerClient.class, Duration.milliseconds(5));
}
@Test
public void test() throws Exception {
final Promise<Boolean> promise = Promises.blockingPromiseBoolean();
// Add the todo item.
client.add(new Todo("write", "Write tutorial", timer.time()))
.invokeWithPromise(promise);
assertTrue("The call was successful", promise.success());
assertTrue("The return from the add call", promise.get());
final Promise<List<Todo>> promiseList = Promises.blockingPromiseList(Todo.class);
// Get a list of todo items.
client.list().invokeWithPromise(promiseList);
// See if the Todo item we created is in the listing.
final List<Todo> todoList = promiseList.get().stream()
.filter(todo -> todo.getName().equals("write")
&& todo.getDescription().equals("Write tutorial")).collect(Collectors.toList());
// Make sure we found it.
assertEquals("Make sure there is one", 1, todoList.size());
// Remove promise
final Promise<Boolean> removePromise = Promises.blockingPromiseBoolean();
client.remove(todoList.get(0).getId()).invokeWithPromise(removePromise);
final Promise<List<Todo>> promiseList2 = Promises.blockingPromiseList(Todo.class);
// Make sure it is removed.
client.list().invokeWithPromise(promiseList2);
// See if the Todo item we created is removed.
final List<Todo> todoList2 = promiseList2.get().stream()
.filter(todo -> todo.getName().equals("write")
&& todo.getDescription().equals("Write tutorial")).collect(Collectors.toList());
// Make sure we don't find it.
assertEquals("Make sure there is one", 0, todoList2.size());
}
@After
public void tearDown() {
serviceQueue.stop();
}
}You can find this source code at this github repo.
Here is a build file for the example so you can see the dependencies.
group 'qbit-ex'
version '1.0-SNAPSHOT'
apply plugin: 'java'
apply plugin: 'application'
mainClassName = "com.mammatustech.todo.TodoServiceMain"
compileJava {
sourceCompatibility = 1.8
}
repositories {
mavenCentral()
mavenLocal()
}
dependencies {
testCompile group: 'junit', name: 'junit', version: '4.11'
compile 'io.advantageous.qbit:qbit-vertx:1.8.3'
compile 'io.advantageous.qbit:qbit-admin:1.8.3'
}We can execute a bunch of methods at once and use Promises.all to do the next item when they all succeed or Promises.any to something when any of them succeed.
@Test
public void testUsingAll() throws Exception {
/* A list of promises for things we want to do all at once. */
final List<Promise<Boolean>> promises = new ArrayList<>(3);
final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean success = new AtomicBoolean();
/** Add a todoItem to the client add method */
final Todo todo = new Todo("write", "Write tutorial", timer.time());
final Promise<Boolean> promise
= client.add(todo);
promises.add(promise);
/** Add two more. */
promises.add(client.add(new Todo("callMom", "Call Mom", timer.time())));
promises.add(client.add(new Todo("callSis", "Call Sister", timer.time())));
/** Now async wait for them all to come back. */
Promises.all(promises).then(done -> {
success.set(true);
latch.countDown();
}).catchError(e-> {
success.set(false);
latch.countDown();
});
/** Invoke the promises. */
promises.forEach(Promise::invoke);
/** They are all going to come back async. */
latch.await();
assertTrue(success.get());
}The serviceQueue can be started and stopped. There are several options to start a serviceQueue. You can start it with two threads, one thread for response handling and another thread for request/event handling (startAll()). You can start the serviceQueue with just the request/event handling thread (start()). You can also start it with one thread managing request/event and responses. Caution must be exercised with the last way since if a callback or promise blocks then your serviceQueue will be blocked. Typically you use startAll or you use a serviceBundle where one response queue is shared with many serviceQueues. The serviceQueue was meant to be composable so you can access the queues and provide your own thread model if needed or desired.
Typically you handle a exception from a Service Actor by calling callback.reject(exception) to pass the exception downstream to the client or you catch it and handle it in whatever way makes sense. If you do not catch an exception then the thread for your Service Actor will terminate. However, QBit will log the exception that you did not handle and restart a new thread to manage your Service Actor.
In the QBit microservice lib it is common to call other async services, remote Service Actors, REST services, and async NoSQL database drivers. If you Service Actor is stateful (which is common with high-speed services), then you will want to do use a Reactor. There is the Reactor that comes with QBit which is EOL (since we are replacing it with the one we wrote for Reakt), and then there is the Reactor that comes Reakt. The serviceQueue allows events/method calls to all come to the Service Actor on one thread. The reactor is a way to also allow method call callbacks to happen on the same thread, and since the callbacks happen on the same thread as the Service Actor access to the Service Actors data (fields, collaborating objects, etc.) are also thread safe. You only need to use a Reactor if you want to handle callback on the same thread as the Service Actor, which is not always needed. You can also use the Reactor to handle streaming data on the same thread as the Service Actor. The Reactor can also be used for scheduling async tasks or just scheduling a task to be run on the Service Actor as soon as possible.
You can get notified of different Service Actor lifecycle events like started, stopped, when the micro batch limit was met, when the request queue is empty, and more. These lifecycle events allow you to do thing in batches and thus effectively pass data from one service to another (both remote and local). The reactor for example has a process method that is usually called when the request queue has reached a limit or is empty. There are two ways to do this. You can use a QueueCallbackHandler with a ServiceBuilder (or ServiceBundle) or you can use the annotation @QueueCallback.
The Admin package adds Consul discovery, and StatsD support to QBit microservices, and provides a simplified builder for creating a set of managed services which you can easily expose via REST or WebSocket RPC.
It is quite easy to build bridges into the QBit world and we have done so via Kafka, Vert.x event bus and even JMS. QBit was meant to be composeable so you can pick your messaging platform and plug QBit into it.
Two main packages of note in the QBit admin packages are the ManagedServiceBuilder and the ServiceManagementBundle. The ManagedServiceBuilder gives you access to building a group of services and then easily wiring them to the same health monitor, discovery system and metrics/stats system. Whilst the ServiceManagementBundle allows services to interact with common QBit services like stats, health and discovery.
Let's show some simple examples using these that we will continue on in our discussion of the ServiceBundle and the ServiceEndpointServer.
QBit Website What is Microservices Architecture?
QBit Java Micorservices lib tutorials
The Java microservice lib. QBit is a reactive programming lib for building microservices - JSON, HTTP, WebSocket, and REST. QBit uses reactive programming to build elastic REST, and WebSockets based cloud friendly, web services. SOA evolved for mobile and cloud. ServiceDiscovery, Health, reactive StatService, events, Java idiomatic reactive programming for Microservices.
Reactive Programming, Java Microservices, Rick Hightower
Java Microservices Architecture
[Microservice Service Discovery with Consul] (http://www.mammatustech.com/Microservice-Service-Discovery-with-Consul)
Microservices Service Discovery Tutorial with Consul
[Reactive Microservices] (http://www.mammatustech.com/reactive-microservices)
[High Speed Microservices] (http://www.mammatustech.com/high-speed-microservices)
Reactive Microservices Tutorial, using the Reactor
QBit is mentioned in the Restlet blog
All code is written using JetBrains Idea - the best IDE ever!
Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting
Tutorials
- QBit tutorials
- Microservices Intro
- Microservice KPI Monitoring
- Microservice Batteries Included
- RESTful APIs
- QBit and Reakt Promises
- Resourceful REST
- Microservices Reactor
- Working with JSON maps and lists
__
Docs
Getting Started
- First REST Microservice
- REST Microservice Part 2
- ServiceQueue
- ServiceBundle
- ServiceEndpointServer
- REST with URI Params
- Simple Single Page App
Basics
- What is QBit?
- Detailed Overview of QBit
- High level overview
- Low-level HTTP and WebSocket
- Low level WebSocket
- HttpClient
- HTTP Request filter
- HTTP Proxy
- Queues and flushing
- Local Proxies
- ServiceQueue remote and local
- ManagedServiceBuilder, consul, StatsD, Swagger support
- Working with Service Pools
- Callback Builders
- Error Handling
- Health System
- Stats System
- Reactor callback coordination
- Early Service Examples
Concepts
REST
Callbacks and Reactor
Event Bus
Advanced
Integration
- Using QBit in Vert.x
- Reactor-Integrating with Cassandra
- Using QBit with Spring Boot
- SolrJ and service pools
- Swagger support
- MDC Support
- Reactive Streams
- Mesos, Docker, Heroku
- DNS SRV
QBit case studies
QBit 2 Roadmap
-- Related Projects
- QBit Reactive Microservices
- Reakt Reactive Java
- Reakt Guava Bridge
- QBit Extensions
- Reactive Microservices
Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting