From b54c43526ac3f897971e222a649a97e169216eb5 Mon Sep 17 00:00:00 2001 From: Julius de Bruijn Date: Fri, 24 Feb 2017 18:01:15 +0100 Subject: [PATCH] The ability to stop consumers --- src/channel.rs | 35 ++++++++++++++++++++++++++--------- src/session.rs | 18 ++++++++++-------- 2 files changed, 36 insertions(+), 17 deletions(-) diff --git a/src/channel.rs b/src/channel.rs index 9037d92..8d2babf 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -1,5 +1,6 @@ use amqp_error::{AMQPResult, AMQPError}; use std::sync::mpsc::Receiver; +use std::thread; use framing::{MethodFrame, ContentHeaderFrame, Frame, FrameType}; use table::Table; @@ -13,7 +14,10 @@ use connection::Connection; use std::collections::HashMap; use std::cell::RefCell; use std::rc::Rc; +use std::time::Duration; use method::Method; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; pub trait Consumer: Send { fn handle_delivery(&mut self, @@ -55,17 +59,19 @@ pub struct Channel { consumers: Rc>>>, receiver: Receiver>, connection: Connection, + control: Arc } unsafe impl Send for Channel {} impl Channel { - pub fn new(id: u16, receiver: Receiver>, connection: Connection) -> Channel { + pub fn new(id: u16, receiver: Receiver>, connection: Connection, control: Arc) -> Channel { Channel { id: id, receiver: receiver, consumers: Rc::new(RefCell::new(HashMap::new())), connection: connection, + control: control, } } @@ -88,14 +94,25 @@ impl Channel { /// Will block until it reads a frame, other than `basic.deliver`. pub fn read(&mut self) -> AMQPResult { let mut unprocessed_frame = None; - while unprocessed_frame.is_none() { - let frame = try!(self.receiver - .recv() - .map_err(|_| AMQPError::Protocol("Error reading packet from channel".to_owned())) - .and_then(|frame| frame)); - unprocessed_frame = try!(self.try_consume(frame)); + + while self.control.load(Ordering::Relaxed) && unprocessed_frame.is_none() { + match self.receiver.try_recv() { + Ok(Ok(frame)) => { + unprocessed_frame = try!(self.try_consume(frame)); + }, + Ok(Err(e)) => { + return Err(AMQPError::Protocol("Error reading packet from channel".to_owned())); + }, + Err(_) => { + thread::park_timeout(Duration::from_millis(100)); + }, + }; + } + + match unprocessed_frame { + Some(frame) => Ok(frame), + None => Err(AMQPError::Protocol("Exiting...".to_owned())) } - Ok(unprocessed_frame.unwrap()) } pub fn write(&mut self, frame: Frame) -> AMQPResult<()> { @@ -247,7 +264,7 @@ impl Channel { // Will run the infinite loop, which will receive frames on the given channel & // call consumers. pub fn start_consuming(&mut self) { - loop { + while self.control.load(Ordering::Relaxed) { if let Err(err) = self.read() { error!("Error consuming {:?}", err); return; diff --git a/src/session.rs b/src/session.rs index da228dd..07ba370 100644 --- a/src/session.rs +++ b/src/session.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; use std::sync::mpsc::{SyncSender, sync_channel}; use std::thread; use std::cmp; +use std::sync::atomic::AtomicBool; use url::{Url, percent_encoding}; @@ -65,6 +66,7 @@ pub struct Session { channels: Arc>>>>, channel_max_limit: u16, channel_zero: channel::Channel, + control: Arc, } impl Session { @@ -77,9 +79,9 @@ impl Session { /// `"amqp://localhost//"` and it will connect to rabbitmq server, /// running on `localhost` on port `5672`, /// with login `"guest"`, password: `"guest"` to vhost `"/"` - pub fn open_url(url_string: &str) -> AMQPResult { + pub fn open_url(url_string: &str, control: Arc) -> AMQPResult { let options = try!(parse_url(url_string)); - Session::new(options) + Session::new(options, control) } /// Initialize new rabbitmq session. @@ -93,11 +95,11 @@ impl Session { /// Err(error) => panic!("Failed openning an amqp session: {:?}", error) /// }; /// ``` - pub fn new(options: Options) -> AMQPResult { + pub fn new(options: Options, control: Arc) -> AMQPResult { let connection = try!(get_connection(&options)); let channels = Arc::new(Mutex::new(HashMap::new())); let (channel_zero_sender, channel_receiver) = sync_channel(CHANNEL_BUFFER_SIZE); //channel0 - let channel_zero = channel::Channel::new(0, channel_receiver, connection.clone()); + let channel_zero = channel::Channel::new(0, channel_receiver, connection.clone(), control.clone()); try!(channels.lock().map_err(|_| AMQPError::SyncError)).insert(0, channel_zero_sender); let con1 = connection.clone(); let channels_clone = channels.clone(); @@ -107,6 +109,7 @@ impl Session { channels: channels, channel_max_limit: 65535, channel_zero: channel_zero, + control: control, }; try!(session.init(options)); Ok(session) @@ -217,7 +220,7 @@ impl Session { pub fn open_channel(&mut self, channel_id: u16) -> AMQPResult { debug!("Openning channel: {}", channel_id); let (sender, receiver) = sync_channel(CHANNEL_BUFFER_SIZE); - let mut channel = channel::Channel::new(channel_id, receiver, self.connection.clone()); + let mut channel = channel::Channel::new(channel_id, receiver, self.connection.clone(), self.control.clone()); try!(self.channels.lock().map_err(|_| AMQPError::SyncError)).insert(channel_id, sender); try!(channel.open()); Ok(channel) @@ -236,10 +239,9 @@ impl Session { class_id: 0, method_id: 0, }; - let _: protocol::connection::CloseOk = self.channel_zero + let _: Option = self.channel_zero .rpc(&close, "connection.close-ok") - .ok() - .unwrap(); + .ok(); } // Receives and dispatches frames from the connection to the corresponding