Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions lib/Net/AMQP.pm
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ our $VERSION = 0.06;
use constant {
_HEADER_LEN => 7, # 'CnN'
_FOOTER_LEN => 1, # 'C'
_HEADER_FOOTER_LEN => 8, # = _HEADER_LEN + _FOOTER_LEN
};

=head1 CLASS METHODS
Expand All @@ -68,9 +69,9 @@ sub parse_raw_frames {
my ($class, $input_ref) = @_;

my @frames;
while (length($$input_ref) >= _HEADER_LEN + _FOOTER_LEN) {
while (length($$input_ref) >= _HEADER_FOOTER_LEN) {
my ($type_id, $channel, $size) = unpack 'CnN', $$input_ref;
last if length($$input_ref) < _HEADER_LEN + $size + _FOOTER_LEN;
last if length($$input_ref) < $size + _HEADER_FOOTER_LEN;
substr $$input_ref, 0, _HEADER_LEN, '';

my $payload = substr $$input_ref, 0, $size, '';
Expand All @@ -81,9 +82,7 @@ sub parse_raw_frames {
}

push @frames, Net::AMQP::Frame->factory(
type_id => $type_id,
channel => $channel,
payload => $payload,
$type_id, $channel, $payload,
);
}
return @frames;
Expand Down
39 changes: 23 additions & 16 deletions lib/Net/AMQP/Common.pm
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,7 @@ sub pack_timestamp { goto &pack_unsigned_long_long_integer }
sub unpack_timestamp { goto &unpack_unsigned_long_long_integer }

sub pack_short_string {
my $str = shift;
$str = '' unless defined $str;
my $str = $_[0] || '';
return pack('C', length $str) . $str;
}

Expand All @@ -167,8 +166,7 @@ sub pack_long_string {
# Here for Connection::StartOk->response
return pack_field_table(@_);
}
my $str = shift;
$str = '' unless defined $str;
my $str = $_[0] || '';
return pack('N', length $str) . $str;
}

Expand All @@ -179,15 +177,14 @@ sub unpack_long_string {
}

sub pack_field_table {
my $table = shift;
$table = {} unless defined $table;

my $table = $_[0] || {};
my $table_packed = '';
foreach my $key (sort keys %$table) { # sort so I can compare raw frames
my $value = $table->{$key};
$table_packed .= pack_short_string($key);
$table_packed .= _pack_field_value($table->{$key});
}

while( my ($key, $val) = each %{$table}) {
$table_packed .= pack_short_string($key);
$table_packed .= _pack_field_value($val);
}

return pack('N', length $table_packed) . $table_packed;
}

Expand Down Expand Up @@ -234,19 +231,29 @@ sub _pack_field_value {
}

my %_unpack_field_types = (
V => sub { undef },
S => \&unpack_long_string,
I => \&unpack_long_integer,
A => \&unpack_field_array,
B => \&unpack_unsigned_short_integer,
b => \&unpack_short_integer,
D => sub {
my $input_ref = shift;
my $exp = unpack_octet($input_ref);
my $num = unpack_long_integer($input_ref);
$num / 10.0 ** $exp;
},
#d => \&unpack_double,
F => \&unpack_field_table,
A => \&unpack_field_array,
#f => \&unpack_float,
I => \&unpack_long_integer,
i => \&unpack_unsigned_long_integer,
L => \&unpack_long_long_integer,
l => \&unpack_unsigned_long_long_integer,
S => \&unpack_long_string,
s => \&unpack_short_string,
T => \&unpack_timestamp,
t => \&unpack_boolean,
U => \&unpack_short_integer,
u => \&unpack_unsigned_short_integer,
V => sub { undef },
);

sub unpack_field_table {
Expand Down
62 changes: 37 additions & 25 deletions lib/Net/AMQP/Frame.pm
Original file line number Diff line number Diff line change
Expand Up @@ -47,42 +47,54 @@ sub new {
=head2 factory

Net::AMQP::Frame->factory(
type_id => 1,
channel => 1,
payload => '',
$type_id, # type_id => 1,
$channel, # channel => 1,
$payload, # payload => '',
);

Will attempt to identify a L<Net::AMQP::Frame> subclass for further parsing, and will croak on failure. Returns a L<Net::AMQP::Frame> subclass object.

=cut

sub factory {
my ($class, %args) = @_;
my ($class, $type_id, $channel, $payload) = @_;

unless (exists $args{type_id}) { croak "Mandatory parameter 'type_id' missing in call to Net::AMQP::Frame::factory"; }
unless (exists $args{channel}) { croak "Mandatory parameter 'channel' missing in call to Net::AMQP::Frame::factory"; }
unless (exists $args{payload}) { croak "Mandatory parameter 'payload' missing in call to Net::AMQP::Frame::factory"; }
unless (keys %args == 3) { croak "Invalid parameter passed in call to Net::AMQP::Frame::factory"; }
unless (defined $type_id) { croak "Mandatory parameter 'type_id' missing in call to Net::AMQP::Frame::factory"; }

my $subclass;
if ($args{type_id} == 1) {
$subclass = 'Method';
if ($type_id == 1) {
$subclass = 'Net::AMQP::Frame::Method';
}
elsif ($args{type_id} == 2) {
$subclass = 'Header';
elsif ($type_id == 2) {
$subclass = 'Net::AMQP::Frame::Header';
}
elsif ($args{type_id} == 3) {
$subclass = 'Body';
elsif ($type_id == 3) {
unless (defined $channel) { croak "Mandatory parameter 'channel' missing in call to Net::AMQP::Frame::factory"; }
unless (defined $payload) { croak "Mandatory parameter 'payload' missing in call to Net::AMQP::Frame::factory"; }

# see Net::AMQP::Frame::Body::parse_payload() - empty function
return bless {
type_id => $type_id,
channel => $channel,
payload => $payload,
}, 'Net::AMQP::Frame::Body';
}
elsif ($args{type_id} == 8) {
$subclass = 'Heartbeat';
elsif ($type_id == 8) {
$subclass = 'Net::AMQP::Frame::Heartbeat';
}
else {
croak "Unknown type_id $args{type_id}";
croak "Unknown type_id $type_id";
}

$subclass = 'Net::AMQP::Frame::' . $subclass;
my $object = bless \%args, $subclass;
unless (defined $channel) { croak "Mandatory parameter 'channel' missing in call to Net::AMQP::Frame::factory"; }
unless (defined $payload) { croak "Mandatory parameter 'payload' missing in call to Net::AMQP::Frame::factory"; }

#@ my $object = bless \%args, $subclass;
my $object = bless {
type_id => $type_id,
channel => $channel,
payload => $payload,
}, $subclass;
$object->parse_payload();
return $object;
}
Expand Down Expand Up @@ -123,13 +135,13 @@ sub to_raw_frame {
my $self = shift;
my $class = ref $self;

if (! defined $self->channel) {
$self->channel(0);
}
my $channel = ($self->channel || 0);
my $raw_payload = $self->to_raw_payload();

return pack('Cn', $self->type_id, $self->channel)
. pack_long_string($self->to_raw_payload())
. pack('C', 206);
return pack('CnN', $self->type_id, $channel, length($raw_payload))
# . pack_long_string($raw_payload) = length($raw_payload) . $raw_payload
. $raw_payload
. "\x{ce}" # . "\x{ce}" = pack('C', 206); # faster, duration of pack() = 1usec
}

=head2 type_string
Expand Down