Writing a Mini ESB
- Practice with ZeroMQ
- Writing connectors and adapters
- Use the VM
- ZeroMQ site for documentation and examples
- Install ZeroMQ with this terminal command:
sudo apt install libzmq3-dev - [Install the Boost libraries for process control:
sudo apt install libboost-dev - [Install Gnuplot for plotting:
sudo apt install gnuplot
Introduction
For this assignment, you will implement an example a simple Service Bus
architecture using ZeroMQ. You architecture will be as follows:

In general, this architecture will support applications that monitor a running system and capture statistics provided by the statistics generator and send those statistics to those interested. Statistics monitors are one such type of interested applications. A statistics monitor picks up static message and displays them in a graphical display. In particular, your ESB will enable two applications that do not normally communicate with each other to do so. For this assignment, the statistics generator will be a command line tool call vmstat, and the statistics monitor is another command line utility called Gnuplot. Neither of these utilities are network enabled.
The Message Queue will allow the Satistics Generator to send information
to the Statistics Monitor. Each message in the Message Queue takes
form of a message identifier followed by a comma, and one or more comma
separated values for the statistic. For this assignment, you will only
be concerned with a statistic indicating the amount of free memory
available. In particular, the message that flows through the queue from
the statistics generator will look like the following:
MSG_MEMSTAT,*epoch*,*free_mem*0
*epoch*is
the time in seconds since the epoch (generated by the
time() function).
*free_mem*
is the amount of free memory reported by vmstat. The message is
terminated with a zero byte. The Statistics Connector needs to run
vmstat, capture its output, parse its output to determine the
free-memory statistic, call the adapter to convert the output to the
acceptable message format, and send the message to the Message Queue.
The Monitor Connector spawns the Statistics Monitor which is, in this case, Gnuplot, and waits for incoming messages. Once a message arrives, the Monitor Connector should get messages from the queue, call the adapter to convert the message to a properly formatted data for the Statistics Monitor, and then send the data to the Statistics Monitor. The Statistics Monitor should then execute the command to view the data.
Implementation Details
Statistics Connector
The Statistics Connector should be written in C/C++, and should be its
own program (have its own main()). It should connect to the message
queue, and then it should create a vmstat child process. It should also
create and ouput stream to communicate with the vmstat process. The
parent should enter a loop in which it reads from the childs output
stream, converts the read data into a message, and sends the message to
the message queue. The data read will be the output of vmstat (what
vmstat prints). Below is the pseudocode for the Statistics Connector:
// The next two lines are real C++ code
#include <boost/process.hpp>
namespace bp = boost::process;
void statistics_connector()
create ZMQ_PUB socket and connect to message queue
create input and output stream for child using Boost
spawn child // use Boost
read first line from read end of pipe and throw it away
read first two header lines from vmstat and discard them
for each line read from the child's input stream
call adapter_vmstat_to_csv(line, converted)
call zmq_send to send converted to message queue
end for
close the childs inputstream pipe and input stream
wait for the child to exit
end
int adapter_vmstat_to_csv(line, converted)
get epoch using time()
parse free memory stat from line
construct string: MSG_MEMSTAT,cur_time,free_mem0 and put it into converted
return length of converted
end adapter_vmstat_to_csv
To make process creation and management easy, use the Boost library. Below is an example to show you how process creation and communication works using the Boost library. The example spawns the Unix cat program. Cat simply prints to standard output whatever it reads from standard input. The example program then passes cat one string via its standard input. The string is "Hello from the parent!". The program then reads the output from cat, which will be that same string, and prints it. If you save the code in the file bp.cpp, then you compile this example as follows: [g++ -Wall -g bp.cpp -o bp]{style="font-family: 'courier new', courier, sans-serif;"}
#include <iostream>
#include <string>
#include <vector>
#include <boost/process.hpp>
namespace bp = boost::process;
int main() {
// 'cat' is a simple program that echoes back its input on Unix-like systems.
std::string program = "cat";
std::vector<std::string> args;
// Define the pipes for both writing to stdin and reading from stdout.
bp::opstream child_stdin;
bp::ipstream child_stdout;
try {
// Launch the child process with its I/O redirected to our pipes.
bp::child child(program,
bp::std_in < child_stdin,
bp::std_out > child_stdout,
args);
// Parent writes to the child's standard input.
child_stdin << "Hello from the parent!" << std::endl;
child_stdin.pipe().close();
child_stdin.close(); // Important: Close the input stream to signal EOF.
// Parent reads from the child's standard output.
std::string line;
if (std::getline(child_stdout, line)) {
std::cout << "Parent received: '" << line << "'" << std::endl;
}
// Wait for the child process to exit.
child.wait();
std::cout << "Child process finished with exit code " << child.exit_code() << std::endl;
} catch (const bp::process_error& e) {
std::cerr << "Error spawning process: " << e.what() << std::endl;
return 1;
}
return 0;
}
Message Queue
The message queue should be its own program (have its own main()). It
requires very little code to implement, because ZeroMQ has implemented
the Forwarder paradigm for you. Simple write a program that create two
zmq sockets. Once should be of type ZMQ_XSUB, and the other should be
of type ZMQ_XPUB (Note that you can use ZMQ_PUSH and ZMQ_PULL instead -
either approach will work). After creating the sockets, call
zmq_proxy (frontend, backend, NULL) where frontend is the XSUB socket
and backend is the XPUB socket. The resulting code will forward all
messages from the frontend to the backend socket.
Monitor Connector
The monitor Connector whould be written in C/C++, and should be its own
program (have its own main()). It should first create local queue data
structure to keep a history of the statistics. It will use this history
to plot the last 20 values reported. After creating the queue data
structure, it should connect to the message queue, and the spawn a
Gnuplot child process. It should also create and input stream and an
ouput stream to communicate with the vmstat process. The parent should
enter a loop in which it gets a message from the message queue, converts
the message to a data point for gnuplot, and then sends a command to
gnuplot to plot the last 20 data points. Below is the pseudocode for the
Monitor Connector.
Monitor_Connector()
create a local queue data structure
create a ZMQ_SUB socket and connect to zmq message queue
create input and output stream for child using Boost
spawn child // use Boost
print "set term x11\n" to child's input stream
flush the input stream
loop forever
zmq_recv next message from queue
call adapter_csv_to_plot(msg, converted);
call push converted to back of local queue
if the queue has more than 20 things in it
dequeue oldest // just dequeue
fi
print plot command 'plot "-" with linespoints\n' to the child's input stream
for each value in queue
print value plus a newline to the child's input stream
end for
print "e\n" to the child's input stream // ends the plot
flush the child's input stream
end loop
end Monitor_Connector
adapter_csv_to_plot(line,converted)
// line looks like: MSG_MEMSTAT,epoch,free_mem\n0
// the first number in an epoch, and the second is the amount of mem free
parse line to get epoch
convert epoch to a time struct using gmtime()
convert time to seconds since beginning of the day (hrs*3600+min*60+sec)
construct string containing the seconds, followed by a space, followed by
the amount of free memory
append a newline onto the string and make sure it is zero terminated
copy string to converted
return lenght of converted
}
Putting it all together Once you have all three programs written, you can start the Message Queue, the Statistics Connector, and the Monitor Connector, and you should immediately see a plot of the machine available memory.
| Criteria | Points |
|---|---|
| Correct implementation of Connectors and adapters | 30 |
| Correct implementation of message queuing paradigm | 20 |
| Correct implementation of IPC and process management | 20 |
| Makefile and test cases | 20 |
| Code quality and documentation | 10 |
| Total | 100 |
Submit a single compressed file (in zip, 7zip, or tar.gz format). Submit your compressed files as your_moodle_id.zip (or .7z or .tar.gz) where your_moodle_id is your Moodle login name to the Moodle dropbox below. Your submission should contain your source code, a Makefile. The Makefile should have a all target at the top the builds all three executables (but does not run them), a run target that runs the executables, a killall target that kills all of your running processes, and a clean target and remove all build files.