Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Changes
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
{{$NEXT}}
- fix testing behaviour under Test::Routine::Common 0.25 (thanks
Ilmari!)
- make tests more robust by waiting for the server to actually create
the trace subdirectories, instead of waiting for a specific amount
of time (thanks Ilmari!)

1.13 2015-07-23 17:38:50+01:00 Europe/London
- if the incoming frame does not declare a content-type, assume
Expand Down
110 changes: 103 additions & 7 deletions lib/Plack/Handler/Stomp.pm
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package Plack::Handler::Stomp;
use Moose;
use MooseX::Types::Moose qw(Bool);
use MooseX::Types::Moose qw(Bool HashRef);
use MooseX::Types::Common::Numeric qw(PositiveNum);
use Plack::Handler::Stomp::Types qw(Logger PathMap);
use Net::Stomp::MooseHelpers::Types qw(NetStompish);
use Plack::Handler::Stomp::PathInfoMunger 'munge_path_info';
Expand Down Expand Up @@ -160,6 +161,9 @@ sub run {
$exception = $_;
};
if ($exception) {
# if we 'return', we'll exit from ->reconnect_on_failure,
# so the last few lines of this 'run' method will be
# executed
if (!blessed $exception) {
$exception = "unhandled exception $exception";
return;
Expand All @@ -174,6 +178,8 @@ sub run {
$exception=undef;
return;
}
# ok, something went wrong with the connection, let's
# 'die', ->reconnect_on_failure will reconnect
die $exception;
}
});
Expand All @@ -186,6 +192,12 @@ sub run {
Loop forever receiving frames from the STOMP connection. Call
L</handle_stomp_frame> for each frame.

If L</receipt_for_ack> is true, and we are waiting for any receipt,
set a timeout (of L</receipt_timeout>) on each call to
L<Net::Stomp/receive_frame>: if we don't get anything before timing
out, the connection is dead, so we throw an exception and depend on
L</run> to re-connect.

If L</one_shot> is set, this function exits after having consumed
exactly 1 frame.

Expand All @@ -195,10 +207,14 @@ sub frame_loop {
my ($self,$app) = @_;

while (1) {
my $frame = $self->connection->receive_frame();
my $frame = $self->connection->receive_frame(
$self->receipts_to_wait_for
? { timeout => $self->receipt_timeout }
: ()
);
if(!$frame || !ref($frame)) {
Net::Stomp::MooseHelpers::Exceptions::Stomp->throw({
stomp_error => 'empty frame received',
stomp_error => 'empty frame, connection broken?',
});
}
$self->handle_stomp_frame($app, $frame);
Expand Down Expand Up @@ -248,13 +264,67 @@ sub handle_stomp_error {
$self->logger->warn($error);
}

=attr C<receipt_for_ack>

If true, request a receipt from the broker for each C<ACK> we
send. This ensures that a broken connection is recognised very soon,
since if the receipt does not arrive in a few seconds (see
L</receipt_timeout>), we can confindently say that something went
wrong. Defaults to false.

=cut

has receipt_for_ack => (
is => 'ro',
isa => Bool,
default => 0,
);

=attr C<receipt_timeout>

Floating point number, greater than 0. How long to wait (in seconds)
for a receipt (or, in fact, any other frame from the broker) before
considering the connection lost, if we are actually waiting for a
receipt.

L</frame_loop> will always wait indefinitely if no receipt is
expected: maybe we only get one message a month!

Defaults to 10 seconds.

=cut

has receipt_timeout => (
is => 'ro',
isa => PositiveNum,
default => 10,
);

has _waiting_receipts => (
is => 'ro',
init_arg => undef,
isa => HashRef,
default => sub { +{} },
traits => ['Hash'],
handles => {
check_and_clear_waiting_receipt => 'delete',
new_receipt_to_wait_for => 'set',
receipts_to_wait_for => 'count',
},
);

around new_receipt_to_wait_for => sub {
my ($orig,$self,$receipt) = @_;
$self->$orig($receipt,1);
};

=method C<handle_stomp_message>

Calls L</build_psgi_env> to convert the STOMP message into a PSGI
environment.

The environment is then passed to L</process_the_message>, and the
frame is acknowledged.
frame is acknowledged via L</ack_message>.

=cut

Expand All @@ -265,14 +335,33 @@ sub handle_stomp_message {
try {
$self->process_the_message($app,$env);

$self->connection->ack({ frame => $frame });
$self->ack_message($frame);
} catch {
Plack::Handler::Stomp::Exceptions::AppError->throw({
app_error => $_
});
};
}

=method C<ack_message>

Sends a C<ACK> frame to the broker. If L</receipt_for_ack> is true,
the frame will request a receipt.

=cut

sub ack_message {
my ($self,$frame) = @_;
my %args = ( frame => $frame );

if ($self->receipt_for_ack) {
$args{receipt} = $frame->headers->{'message-id'} . '-ack';
$self->new_receipt_to_wait_for($args{receipt});
}

$self->connection->ack(\%args);
}

=method C<process_the_message>

Runs a PSGI environment through the application, then flattens the
Expand Down Expand Up @@ -335,8 +424,15 @@ receipts.
sub handle_stomp_receipt {
my ($self, $app, $frame) = @_;

$self->logger->debug('ignored RECEIPT frame for '
.$frame->headers->{'receipt-id'});
my $receipt = $frame->headers->{'receipt-id'};
if ($self->check_and_clear_waiting_receipt($receipt)) {
$self->logger->debug("got RECEIPT frame for $receipt");
}
else {
$self->logger->debug("ignored RECEIPT frame for $receipt");
}

return;
}

=method C<maybe_send_reply>
Expand Down
2 changes: 1 addition & 1 deletion t/lib/BrokerTestApp.pm
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ my $app = sub {
my $body;
(delete $env->{'psgi.input'})->read($body,1000000);
my $data = JSON::XS::decode_json($body);
my $response = {};
my $response = { payload => ($data->{payload} || {}) };

exit 0 if $data->{exit_now};

Expand Down
25 changes: 22 additions & 3 deletions t/lib/RunTestApp.pm
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use BrokerTestApp;
use Test::More;
use Moose::Util 'apply_all_roles';
use File::Temp 'tempdir';
use Path::Class;
use JSON::XS;

my $mq;

Expand Down Expand Up @@ -62,6 +64,12 @@ sub _build_trace_dir {
return tempdir(CLEANUP => ( $ENV{TEST_VERBOSE} ? 0 : 1 ));
}

has receipt_for_ack => (
is => 'ro',
lazy_build => 1,
);
sub _build_receipt_for_ack { 0 }

sub _build_child {
my ($self) = @_;

Expand All @@ -85,6 +93,7 @@ sub _build_child {
},
path_info => '/topic/ch2', },
],
receipt_for_ack => $self->receipt_for_ack,
});
apply_all_roles($runner,'Net::Stomp::MooseHelpers::TraceStomp');
$runner->trace_basedir($trace_dir);
Expand All @@ -98,22 +107,27 @@ sub _build_child {
}
else {
diag "server started, waiting for spinup...";
sleep($ENV{NET_STOMP_DELAY}||5);
sleep 1 until dir($trace_dir)->children;
return $pid;
}
}

sub DEMOLISH {
sub kill_application {
my ($self) = @_;

return unless $self->has_child;

my $child = $self->child;
kill 'TERM',$child;
diag "waitpid for child\n";
waitpid($child,0);
$self->clear_child;
}

sub DEMOLISH {}
after DEMOLISH => sub {
shift->kill_application;
};

has reply_to => ( is => 'rw' );

before 'run_test' => sub {
Expand All @@ -134,16 +148,21 @@ before 'run_test' => sub {
'subscribe to temp queue');

$self->reply_to($reply_to);
$self->child; # start the child process
return;
};

after 'run_test' => sub {
my ($self) = @_;

$self->kill_application;
my $conn = $self->server_conn;

$conn->disconnect;
ok(!$conn->socket->connected, 'disconnected');
$self->reply_to(undef);
$self->clear_server_conn;
return;
};

1;
8 changes: 5 additions & 3 deletions t/lib/RunTestAppNoNet.pm
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use BrokerTestApp;
use Test::More;
use Moose::Util 'apply_all_roles';
use File::Temp 'tempdir';
use Path::Class;
use Net::Stomp::Producer;

has producer => (
Expand Down Expand Up @@ -63,18 +64,19 @@ sub _build_child {
}
else {
diag "server started, waiting for spinup...";
sleep($ENV{NET_STOMP_DELAY}||5);
sleep 1 until dir($trace_dir)->children;
return $pid;
}
}

sub DEMOLISH {
sub DEMOLISH {}
after DEMOLISH => sub {
my ($self) = @_;
return unless $self->has_child;
my $child = $self->child;
kill 'TERM',$child;
diag "waitpid for child";
waitpid($child,0);
}
};

1;
Loading