diff --git a/Changes b/Changes index 9add750..fd30130 100644 --- a/Changes +++ b/Changes @@ -1,4 +1,9 @@ {{$NEXT}} + - fix testing behaviour under Test::Routine::Common 0.25 (thanks + Ilmari!) + - make tests more robust by waiting for the server to actually create + the trace subdirectories, instead of waiting for a specific amount + of time (thanks Ilmari!) 1.13 2015-07-23 17:38:50+01:00 Europe/London - if the incoming frame does not declare a content-type, assume diff --git a/lib/Plack/Handler/Stomp.pm b/lib/Plack/Handler/Stomp.pm index 70a36e6..ecb41c4 100644 --- a/lib/Plack/Handler/Stomp.pm +++ b/lib/Plack/Handler/Stomp.pm @@ -1,6 +1,7 @@ package Plack::Handler::Stomp; use Moose; -use MooseX::Types::Moose qw(Bool); +use MooseX::Types::Moose qw(Bool HashRef); +use MooseX::Types::Common::Numeric qw(PositiveNum); use Plack::Handler::Stomp::Types qw(Logger PathMap); use Net::Stomp::MooseHelpers::Types qw(NetStompish); use Plack::Handler::Stomp::PathInfoMunger 'munge_path_info'; @@ -160,6 +161,9 @@ sub run { $exception = $_; }; if ($exception) { + # if we 'return', we'll exit from ->reconnect_on_failure, + # so the last few lines of this 'run' method will be + # executed if (!blessed $exception) { $exception = "unhandled exception $exception"; return; @@ -174,6 +178,8 @@ sub run { $exception=undef; return; } + # ok, something went wrong with the connection, let's + # 'die', ->reconnect_on_failure will reconnect die $exception; } }); @@ -186,6 +192,12 @@ sub run { Loop forever receiving frames from the STOMP connection. Call L for each frame. +If L is true, and we are waiting for any receipt, +set a timeout (of L) on each call to +L: if we don't get anything before timing +out, the connection is dead, so we throw an exception and depend on +L to re-connect. + If L is set, this function exits after having consumed exactly 1 frame. @@ -195,10 +207,14 @@ sub frame_loop { my ($self,$app) = @_; while (1) { - my $frame = $self->connection->receive_frame(); + my $frame = $self->connection->receive_frame( + $self->receipts_to_wait_for + ? { timeout => $self->receipt_timeout } + : () + ); if(!$frame || !ref($frame)) { Net::Stomp::MooseHelpers::Exceptions::Stomp->throw({ - stomp_error => 'empty frame received', + stomp_error => 'empty frame, connection broken?', }); } $self->handle_stomp_frame($app, $frame); @@ -248,13 +264,67 @@ sub handle_stomp_error { $self->logger->warn($error); } +=attr C + +If true, request a receipt from the broker for each C we +send. This ensures that a broken connection is recognised very soon, +since if the receipt does not arrive in a few seconds (see +L), we can confindently say that something went +wrong. Defaults to false. + +=cut + +has receipt_for_ack => ( + is => 'ro', + isa => Bool, + default => 0, +); + +=attr C + +Floating point number, greater than 0. How long to wait (in seconds) +for a receipt (or, in fact, any other frame from the broker) before +considering the connection lost, if we are actually waiting for a +receipt. + +L will always wait indefinitely if no receipt is +expected: maybe we only get one message a month! + +Defaults to 10 seconds. + +=cut + +has receipt_timeout => ( + is => 'ro', + isa => PositiveNum, + default => 10, +); + +has _waiting_receipts => ( + is => 'ro', + init_arg => undef, + isa => HashRef, + default => sub { +{} }, + traits => ['Hash'], + handles => { + check_and_clear_waiting_receipt => 'delete', + new_receipt_to_wait_for => 'set', + receipts_to_wait_for => 'count', + }, +); + +around new_receipt_to_wait_for => sub { + my ($orig,$self,$receipt) = @_; + $self->$orig($receipt,1); +}; + =method C Calls L to convert the STOMP message into a PSGI environment. The environment is then passed to L, and the -frame is acknowledged. +frame is acknowledged via L. =cut @@ -265,7 +335,7 @@ sub handle_stomp_message { try { $self->process_the_message($app,$env); - $self->connection->ack({ frame => $frame }); + $self->ack_message($frame); } catch { Plack::Handler::Stomp::Exceptions::AppError->throw({ app_error => $_ @@ -273,6 +343,25 @@ sub handle_stomp_message { }; } +=method C + +Sends a C frame to the broker. If L is true, +the frame will request a receipt. + +=cut + +sub ack_message { + my ($self,$frame) = @_; + my %args = ( frame => $frame ); + + if ($self->receipt_for_ack) { + $args{receipt} = $frame->headers->{'message-id'} . '-ack'; + $self->new_receipt_to_wait_for($args{receipt}); + } + + $self->connection->ack(\%args); +} + =method C Runs a PSGI environment through the application, then flattens the @@ -335,8 +424,15 @@ receipts. sub handle_stomp_receipt { my ($self, $app, $frame) = @_; - $self->logger->debug('ignored RECEIPT frame for ' - .$frame->headers->{'receipt-id'}); + my $receipt = $frame->headers->{'receipt-id'}; + if ($self->check_and_clear_waiting_receipt($receipt)) { + $self->logger->debug("got RECEIPT frame for $receipt"); + } + else { + $self->logger->debug("ignored RECEIPT frame for $receipt"); + } + + return; } =method C diff --git a/t/lib/BrokerTestApp.pm b/t/lib/BrokerTestApp.pm index 95fca15..394527d 100644 --- a/t/lib/BrokerTestApp.pm +++ b/t/lib/BrokerTestApp.pm @@ -7,7 +7,7 @@ my $app = sub { my $body; (delete $env->{'psgi.input'})->read($body,1000000); my $data = JSON::XS::decode_json($body); - my $response = {}; + my $response = { payload => ($data->{payload} || {}) }; exit 0 if $data->{exit_now}; diff --git a/t/lib/RunTestApp.pm b/t/lib/RunTestApp.pm index df64e07..335e9fa 100644 --- a/t/lib/RunTestApp.pm +++ b/t/lib/RunTestApp.pm @@ -7,6 +7,8 @@ use BrokerTestApp; use Test::More; use Moose::Util 'apply_all_roles'; use File::Temp 'tempdir'; +use Path::Class; +use JSON::XS; my $mq; @@ -62,6 +64,12 @@ sub _build_trace_dir { return tempdir(CLEANUP => ( $ENV{TEST_VERBOSE} ? 0 : 1 )); } +has receipt_for_ack => ( + is => 'ro', + lazy_build => 1, +); +sub _build_receipt_for_ack { 0 } + sub _build_child { my ($self) = @_; @@ -85,6 +93,7 @@ sub _build_child { }, path_info => '/topic/ch2', }, ], + receipt_for_ack => $self->receipt_for_ack, }); apply_all_roles($runner,'Net::Stomp::MooseHelpers::TraceStomp'); $runner->trace_basedir($trace_dir); @@ -98,22 +107,27 @@ sub _build_child { } else { diag "server started, waiting for spinup..."; - sleep($ENV{NET_STOMP_DELAY}||5); + sleep 1 until dir($trace_dir)->children; return $pid; } } -sub DEMOLISH { +sub kill_application { my ($self) = @_; - return unless $self->has_child; my $child = $self->child; kill 'TERM',$child; diag "waitpid for child\n"; waitpid($child,0); + $self->clear_child; } +sub DEMOLISH {} +after DEMOLISH => sub { + shift->kill_application; +}; + has reply_to => ( is => 'rw' ); before 'run_test' => sub { @@ -134,16 +148,21 @@ before 'run_test' => sub { 'subscribe to temp queue'); $self->reply_to($reply_to); + $self->child; # start the child process + return; }; after 'run_test' => sub { my ($self) = @_; + $self->kill_application; my $conn = $self->server_conn; $conn->disconnect; ok(!$conn->socket->connected, 'disconnected'); $self->reply_to(undef); + $self->clear_server_conn; + return; }; 1; diff --git a/t/lib/RunTestAppNoNet.pm b/t/lib/RunTestAppNoNet.pm index 3022a3e..05f6454 100644 --- a/t/lib/RunTestAppNoNet.pm +++ b/t/lib/RunTestAppNoNet.pm @@ -5,6 +5,7 @@ use BrokerTestApp; use Test::More; use Moose::Util 'apply_all_roles'; use File::Temp 'tempdir'; +use Path::Class; use Net::Stomp::Producer; has producer => ( @@ -63,18 +64,19 @@ sub _build_child { } else { diag "server started, waiting for spinup..."; - sleep($ENV{NET_STOMP_DELAY}||5); + sleep 1 until dir($trace_dir)->children; return $pid; } } -sub DEMOLISH { +sub DEMOLISH {} +after DEMOLISH => sub { my ($self) = @_; return unless $self->has_child; my $child = $self->child; kill 'TERM',$child; diag "waitpid for child"; waitpid($child,0); -} +}; 1; diff --git a/t/real_broker.t b/t/real_broker.t index 733c8a7..efddd31 100644 --- a/t/real_broker.t +++ b/t/real_broker.t @@ -1,4 +1,5 @@ #!perl +package Test::Plack::Handler::Stomp::RealBroker; use lib 't/lib'; use Test::Routine; use Test::Routine::Util; @@ -7,80 +8,99 @@ use JSON::XS; use Net::Stomp::MooseHelpers::ReadTrace; with 'RunTestApp'; -test 'talk to the app' => sub { - my ($self) = @_; +sub send_message { + my ($self,$case) = @_; + + my $message = { + payload => $case->{payload}, + reply_to => $self->reply_to, + type => 'testaction', + }; + + $self->server_conn->send( { + destination => $case->{destination}, + body => JSON::XS::encode_json($message), + JMSType => $case->{JMSType}, + custom_header => $case->{custom_header}, + } ); +} + +sub check_reply { + my ($self,$case) = @_; + + my $reply_frame = $self->server_conn->receive_frame(); + cmp_ok($reply_frame->command,'eq','MESSAGE','received the response'); + + my $response = JSON::XS::decode_json($reply_frame->body); + cmp_ok( + $response->{path_info},'eq',$case->{path_info}, + 'correct response path', + ); + cmp_deeply( + $response->{payload}, + $case->{payload}, + 'correct response payload', + ); +} - my $child = $self->child; - my $conn = $self->server_conn; - my $reply_to = $self->reply_to; +has cases => ( + is => 'ro', + isa => 'ArrayRef', + lazy_build => 1, +); - my @cases = ( +sub _build_cases { + return [ { destination => '/queue/plack-handler-stomp-test', + payload => { foo => 1, bar => 2 }, JMSType => 'anything', custom_header => '3', path_info => '/queue/plack-handler-stomp-test', }, { destination => '/topic/plack-handler-stomp-test', + payload => { foo => 2, bar => 3 }, JMSType => 'test_foo', custom_header => '3', path_info => '/topic/ch1', }, { destination => '/topic/plack-handler-stomp-test', + payload => { foo => 3, bar => 4 }, JMSType => 'anything', custom_header => '1', path_info => '/topic/ch1', }, { destination => '/topic/plack-handler-stomp-test', + payload => { foo => 4, bar => 5 }, JMSType => 'test_bar', custom_header => '3', path_info => '/topic/ch2', }, { destination => '/topic/plack-handler-stomp-test', + payload => { foo => 5, bar => 6 }, JMSType => 'anything', custom_header => '2', path_info => '/topic/ch2', }, - ); - - subtest 'send & reply' => sub { - for my $case (@cases) { - my $message = { - payload => { foo => 1, bar => 2 }, - reply_to => $reply_to, - type => 'testaction', - }; - - $conn->send( { - destination => $case->{destination}, - body => JSON::XS::encode_json($message), - JMSType => $case->{JMSType}, - custom_header => $case->{custom_header}, - } ); - - my $reply_frame = $conn->receive_frame(); - ok($reply_frame, 'got a reply'); - - my $response = JSON::XS::decode_json($reply_frame->body); - ok($response, 'response ok'); - ok($response->{path_info} eq $case->{path_info}, 'worked'); - } - }; + ]; +} - subtest 'tracing' => sub { - my $reader = Net::Stomp::MooseHelpers::ReadTrace->new({ - trace_basedir => $self->trace_dir, - }); - my @frames = $reader->sorted_frames(); +sub case_comparers { + my ($self) = @_; - my @case_comparers = map { + return ( + methods(command=>'CONNECT'), + methods(command=>'CONNECTED'), + (methods(command=>'SUBSCRIBE')) x 3, + map { my %h=%$_; $h{type}=delete $h{JMSType}; my $pi=delete $h{path_info}; + delete $h{payload}; ( methods(command=>'MESSAGE', @@ -94,32 +114,45 @@ test 'talk to the app' => sub { ), methods(command=>'ACK'), ) - } @cases; - - cmp_deeply(\@frames, - [ - methods(command=>'CONNECT'), - methods(command=>'CONNECTED'), - (methods(command=>'SUBSCRIBE')) x 3, - @case_comparers, - ], - 'tracing works' - ); - }; + } @{$self->cases}, + ); +} - # we send the "exit now" command on the topic, so we're sure we - # won't find it on the next run - # - # BrokerTestApp exits without ACK-ing the message, so it would - # remain on the queue, ready to stop the application the next time - # we try to run the test - $conn->send( { - destination => '/topic/plack-handler-stomp-test', - body => JSON::XS::encode_json({exit_now=>1}), - JMSType => 'test_foo', - } ); +sub check_trace { + my ($self,$frames) = @_; + + my @case_comparers = $self->case_comparers; + + cmp_deeply( + $frames, + \@case_comparers, + 'tracing works' + ) or explain $frames; +} + +test 'talk to the app' => sub { + my ($self) = @_; + subtest 'send & reply' => sub { + for my $case (@{$self->cases}) { + $self->send_message($case); + $self->check_reply($case); + } + }; + sleep(1); # let's wait a bit in case the app needs to read some + # more frames, we need this to make + # real_broker_receipt.t work a bit more reliably + + subtest 'tracing' => sub { + my $reader = Net::Stomp::MooseHelpers::ReadTrace->new({ + trace_basedir => $self->trace_dir, + }); + my @frames = $reader->sorted_frames(); + $self->check_trace(\@frames); + }; }; -run_me; -done_testing; +unless (caller) { + run_me; + done_testing(); +} diff --git a/t/real_broker_receipt.t b/t/real_broker_receipt.t new file mode 100644 index 0000000..29290c1 --- /dev/null +++ b/t/real_broker_receipt.t @@ -0,0 +1,66 @@ +#!perl +package Test::Plack::Handler::Stomp::RealBroker::Receipt; +use lib 't/lib'; +use Test::Routine; +use Test::Routine::Util; +use MyTesting; +require 't/real_broker.t'; +with 'Test::Plack::Handler::Stomp::RealBroker'; + +sub _build_receipt_for_ack { 1 } + +sub check_trace { + my ($self,$frames) = @_; + + my @case_comparers = $self->case_comparers; + my %acks_without_receipt; + + for my $frame (@$frames) { + if ($frame->command eq 'RECEIPT') { + my $ack = $frame->headers->{'receipt-id'}; + ok(delete $acks_without_receipt{$ack},'got receipt for ack'); + } + else { + if ($frame->command eq 'ACK') { + my $ack = $frame->headers->{'receipt'}; + $acks_without_receipt{$ack} = 1; + } + my $should_match = shift @case_comparers; + cmp_deeply( + $frame, + $should_match, + 'tracing works', + ) or explain $frame; + } + } + ok(!%acks_without_receipt,'all ack receipted'); +} + +test 'all messages up front, with receipts on ack' => sub { + my ($self) = @_; + + # we need to send all messages on the same destination, otherwise + # they'll be delivered in a random order and the tests will + # sometimes fail + my @cases = map { + { + %{$_}, + destination => '/queue/plack-handler-stomp-test', + path_info => '/queue/plack-handler-stomp-test', + }, + } @{$self->cases}; + + subtest 'send & reply' => sub { + for my $case (@cases) { + $self->send_message($case); + } + for my $case (@cases) { + $self->check_reply($case); + } + }; +}; + +unless (caller) { + run_me; + done_testing(); +}