Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,12 @@ parameters for rabbitmq_plugin
# --rabbitmq-username guest
# --rabbitmq-password guest
# --rabbitmq-accept-trx-exchange trx.accepted
# --rabbitmq-accept-block-exchange block.accepted
# --rabbitmq-irreversible-block-exchange block.irreversible
# --rabbitmq-applied-trx-exchange trx.applied
# --rabbitmq-block-start 100
# --rabbitmq-queue-size 5000
# --rabbitmq-deserialize-trace-action-data true
```

## TODOs
Expand Down
61 changes: 53 additions & 8 deletions rabbitmq_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ namespace eosio {
using chain::packed_transaction;

static appbase::abstract_plugin& _rabbitmq_plugin = app().register_plugin<rabbitmq_plugin>();

using rabbitmq_producer_ptr = std::shared_ptr<class rabbitmq_producer>;

class rabbitmq_plugin_impl {
Expand Down Expand Up @@ -85,6 +86,8 @@ using rabbitmq_producer_ptr = std::shared_ptr<class rabbitmq_producer>;

void _process_irreversible_block(const chain::block_state_ptr &);

void deserialize_action_data(chain::action_trace& at);

void init();

bool configured{false};
Expand Down Expand Up @@ -112,10 +115,11 @@ using rabbitmq_producer_ptr = std::shared_ptr<class rabbitmq_producer>;
rabbitmq_producer_ptr producer;
std::string m_accept_trx_exchange = "";
std::string m_applied_trx_exchange = "";

};

std::string m_accept_block_exchange = "";
std::string m_irreversible_block_exchange = "";
bool m_deserialize_trace_action_data = true;

};


namespace {
Expand Down Expand Up @@ -367,7 +371,13 @@ using rabbitmq_producer_ptr = std::shared_ptr<class rabbitmq_producer>;

void rabbitmq_plugin_impl::_process_applied_transaction(const trasaction_info_st &t) {

uint64_t time = (t.block_time.time_since_epoch().count()/1000);
if(m_deserialize_trace_action_data) {
for (auto &trace : t.trace->action_traces) {
deserialize_action_data(trace);
}
}

uint64_t time = (t.block_time.time_since_epoch().count()/1000);
string transaction_metadata_json =
"{\"block_number\":" + std::to_string(t.block_number) + ",\"block_time\":" + std::to_string(time) +
",\"trace\":" + fc::json::to_string(t.trace).c_str() + "}";
Expand All @@ -377,10 +387,30 @@ using rabbitmq_producer_ptr = std::shared_ptr<class rabbitmq_producer>;

void rabbitmq_plugin_impl::_process_accepted_block( const chain::block_state_ptr& bs )
{
string block_metadata_json = fc::json::to_string(bs);
producer->trx_rabbitmq_sendmsg("", m_accept_block_exchange, block_metadata_json);
}

void rabbitmq_plugin_impl::_process_irreversible_block(const chain::block_state_ptr& bs)
{
string block_metadata_json = fc::json::to_string(bs);
producer->trx_rabbitmq_sendmsg("", m_irreversible_block_exchange, block_metadata_json);
}

void rabbitmq_plugin_impl::deserialize_action_data(chain::action_trace& at){

fc::variant v = chain_plug->chain().to_variant_with_abi(at.act, chain_plug->get_abi_serializer_max_time());
fc::variant_object vo = v.get_object();

string action_data = fc::json::to_string(vo.find("data")->value());

at.act.data = vector<char>(action_data.begin(), action_data.end());

if(!at.inline_traces.empty()){
for(auto& trace : at.inline_traces) {
deserialize_action_data(trace);
}
}
}

rabbitmq_plugin_impl::rabbitmq_plugin_impl()
Expand Down Expand Up @@ -423,8 +453,14 @@ using rabbitmq_producer_ptr = std::shared_ptr<class rabbitmq_producer>;

void rabbitmq_plugin::set_program_options(options_description &cli, options_description &cfg) {
cfg.add_options()
("rabbitmq-deserialize-trace-action-data", bpo::value<bool>()->default_value(true),
"Deserialize action traces into json from abi bin")
("rabbitmq-accept-trx-exchange", bpo::value<std::string>()->default_value("trx.accepted"),
"The exchange for accepted transaction.")
("rabbitmq-accept-block-exchange", bpo::value<std::string>()->default_value("block.accepted"),
"The exchange for accepted blocks." )
("rabbitmq-irreversible-block-exchange", bpo::value<std::string>()->default_value("block.irreversible"),
"The exchange for irreversible blocks." )
("rabbitmq-applied-trx-exchange", bpo::value<std::string>()->default_value("trx.applied"),
"The exchange for appiled transaction.")
("rabbitmq-username", bpo::value<std::string>()->default_value("guest"),
Expand All @@ -434,7 +470,7 @@ using rabbitmq_producer_ptr = std::shared_ptr<class rabbitmq_producer>;
("rabbitmq-hostname", bpo::value<std::string>()->default_value("127.0.0.1"),
"the rabbitmq hostname (e.g. localhost or 127.0.0.1)")
("rabbitmq-port", bpo::value<uint32_t>()->default_value(5672),
"the rabbitmq port (e.g. 5672)")
"the rabbitmq port (e.g. 5672)")
("rabbitmq-queue-size", bpo::value<uint32_t>()->default_value(10000),
"The target queue size between nodeos and rabbitmq plugin thread.")
("rabbitmq-block-start", bpo::value<uint32_t>()->default_value(0),
Expand All @@ -458,13 +494,22 @@ using rabbitmq_producer_ptr = std::shared_ptr<class rabbitmq_producer>;
if (options.count("rabbitmq-applied-trx-exchange") != 0) {
my->m_applied_trx_exchange = options.at("rabbitmq-applied-trx-exchange").as<std::string>();
}

if (options.count("rabbitmq-accept-block-exchange") != 0){
my->m_accept_block_exchange = options.at("rabbitmq-accept-block-exchange").as<std::string>();
}
if (options.count("rabbitmq-irreversible-block-exchange") != 0){
my->m_irreversible_block_exchange = options.at("rabbitmq-irreversible-block-exchange").as<std::string>();
}
if (options.count("rabbitmq-deserialize-trace-action-data") != 0){
my->m_deserialize_trace_action_data = options.at("rabbitmq-deserialize-trace-action-data").as<bool>();
}

if (0!=my->producer->trx_rabbitmq_init(hostname, port, username, password)){
elog("trx_rabbitmq_init fail");
} else{
elog("trx_rabbitmq_init ok");
}

ilog("initializing rabbitmq_plugin");
my->configured = true;

Expand All @@ -482,7 +527,7 @@ using rabbitmq_producer_ptr = std::shared_ptr<class rabbitmq_producer>;
my->chain_plug = app().find_plugin<chain_plugin>();
EOS_ASSERT(my->chain_plug, chain::missing_chain_plugin_exception, "");
auto &chain = my->chain_plug->chain();

my->accepted_block_connection.emplace( chain.accepted_block.connect( [&]( const chain::block_state_ptr& bs ) {
my->accepted_block(bs);
}));
Expand Down
2 changes: 1 addition & 1 deletion rabbitmq_producer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ int rabbitmq_producer::trx_rabbitmq_init(std::string hostname, uint32_t port, st
void rabbitmq_producer::trx_rabbitmq_sendmsg(std::string routingKey, std::string exchange, std::string msgstr){
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
props.content_type = amqp_cstring_bytes("text/plain");
props.content_type = amqp_cstring_bytes("application/json");
props.delivery_mode = 2; /* persistent delivery mode */


Expand Down