Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 26 additions & 9 deletions src/channel.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use amqp_error::{AMQPResult, AMQPError};
use std::sync::mpsc::Receiver;
use std::thread;

use framing::{MethodFrame, ContentHeaderFrame, Frame, FrameType};
use table::Table;
Expand All @@ -13,7 +14,10 @@ use connection::Connection;
use std::collections::HashMap;
use std::cell::RefCell;
use std::rc::Rc;
use std::time::Duration;
use method::Method;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};

pub trait Consumer: Send {
fn handle_delivery(&mut self,
Expand Down Expand Up @@ -55,17 +59,19 @@ pub struct Channel {
consumers: Rc<RefCell<HashMap<String, Box<Consumer>>>>,
receiver: Receiver<AMQPResult<Frame>>,
connection: Connection,
control: Arc<AtomicBool>
}

unsafe impl Send for Channel {}

impl Channel {
pub fn new(id: u16, receiver: Receiver<AMQPResult<Frame>>, connection: Connection) -> Channel {
pub fn new(id: u16, receiver: Receiver<AMQPResult<Frame>>, connection: Connection, control: Arc<AtomicBool>) -> Channel {
Channel {
id: id,
receiver: receiver,
consumers: Rc::new(RefCell::new(HashMap::new())),
connection: connection,
control: control,
}
}

Expand All @@ -88,14 +94,25 @@ impl Channel {
/// Will block until it reads a frame, other than `basic.deliver`.
pub fn read(&mut self) -> AMQPResult<Frame> {
let mut unprocessed_frame = None;
while unprocessed_frame.is_none() {
let frame = try!(self.receiver
.recv()
.map_err(|_| AMQPError::Protocol("Error reading packet from channel".to_owned()))
.and_then(|frame| frame));
unprocessed_frame = try!(self.try_consume(frame));

while self.control.load(Ordering::Relaxed) && unprocessed_frame.is_none() {
match self.receiver.try_recv() {
Ok(Ok(frame)) => {
unprocessed_frame = try!(self.try_consume(frame));
},
Ok(Err(e)) => {
return Err(AMQPError::Protocol("Error reading packet from channel".to_owned()));
},
Err(_) => {
thread::park_timeout(Duration::from_millis(100));
},
};
}

match unprocessed_frame {
Some(frame) => Ok(frame),
None => Err(AMQPError::Protocol("Exiting...".to_owned()))
}
Ok(unprocessed_frame.unwrap())
}

pub fn write(&mut self, frame: Frame) -> AMQPResult<()> {
Expand Down Expand Up @@ -247,7 +264,7 @@ impl Channel {
// Will run the infinite loop, which will receive frames on the given channel &
// call consumers.
pub fn start_consuming(&mut self) {
loop {
while self.control.load(Ordering::Relaxed) {
if let Err(err) = self.read() {
error!("Error consuming {:?}", err);
return;
Expand Down
18 changes: 10 additions & 8 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::collections::HashMap;
use std::sync::mpsc::{SyncSender, sync_channel};
use std::thread;
use std::cmp;
use std::sync::atomic::AtomicBool;


use url::{Url, percent_encoding};
Expand Down Expand Up @@ -65,6 +66,7 @@ pub struct Session {
channels: Arc<Mutex<HashMap<u16, SyncSender<AMQPResult<Frame>>>>>,
channel_max_limit: u16,
channel_zero: channel::Channel,
control: Arc<AtomicBool>,
}

impl Session {
Expand All @@ -77,9 +79,9 @@ impl Session {
/// `"amqp://localhost//"` and it will connect to rabbitmq server,
/// running on `localhost` on port `5672`,
/// with login `"guest"`, password: `"guest"` to vhost `"/"`
pub fn open_url(url_string: &str) -> AMQPResult<Session> {
pub fn open_url(url_string: &str, control: Arc<AtomicBool>) -> AMQPResult<Session> {
let options = try!(parse_url(url_string));
Session::new(options)
Session::new(options, control)
}

/// Initialize new rabbitmq session.
Expand All @@ -93,11 +95,11 @@ impl Session {
/// Err(error) => panic!("Failed openning an amqp session: {:?}", error)
/// };
/// ```
pub fn new(options: Options) -> AMQPResult<Session> {
pub fn new(options: Options, control: Arc<AtomicBool>) -> AMQPResult<Session> {
let connection = try!(get_connection(&options));
let channels = Arc::new(Mutex::new(HashMap::new()));
let (channel_zero_sender, channel_receiver) = sync_channel(CHANNEL_BUFFER_SIZE); //channel0
let channel_zero = channel::Channel::new(0, channel_receiver, connection.clone());
let channel_zero = channel::Channel::new(0, channel_receiver, connection.clone(), control.clone());
try!(channels.lock().map_err(|_| AMQPError::SyncError)).insert(0, channel_zero_sender);
let con1 = connection.clone();
let channels_clone = channels.clone();
Expand All @@ -107,6 +109,7 @@ impl Session {
channels: channels,
channel_max_limit: 65535,
channel_zero: channel_zero,
control: control,
};
try!(session.init(options));
Ok(session)
Expand Down Expand Up @@ -217,7 +220,7 @@ impl Session {
pub fn open_channel(&mut self, channel_id: u16) -> AMQPResult<channel::Channel> {
debug!("Openning channel: {}", channel_id);
let (sender, receiver) = sync_channel(CHANNEL_BUFFER_SIZE);
let mut channel = channel::Channel::new(channel_id, receiver, self.connection.clone());
let mut channel = channel::Channel::new(channel_id, receiver, self.connection.clone(), self.control.clone());
try!(self.channels.lock().map_err(|_| AMQPError::SyncError)).insert(channel_id, sender);
try!(channel.open());
Ok(channel)
Expand All @@ -236,10 +239,9 @@ impl Session {
class_id: 0,
method_id: 0,
};
let _: protocol::connection::CloseOk = self.channel_zero
let _: Option<protocol::connection::CloseOk> = self.channel_zero
.rpc(&close, "connection.close-ok")
.ok()
.unwrap();
.ok();
}

// Receives and dispatches frames from the connection to the corresponding
Expand Down