diff --git a/build.sbt b/build.sbt index bf7803a..39f5d5c 100644 --- a/build.sbt +++ b/build.sbt @@ -6,7 +6,7 @@ version := "1.5-SNAPSHOT" scalaVersion := "2.11.7" -scalacOptions ++= Seq("-feature", "-language:postfixOps") +scalacOptions ++= Seq("-feature", "-language:postfixOps") resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/" diff --git a/project/build.properties b/project/build.properties new file mode 100644 index 0000000..c091b86 --- /dev/null +++ b/project/build.properties @@ -0,0 +1 @@ +sbt.version=0.13.16 diff --git a/src/main/scala/com/github/sstone/amqp/ChannelOwner.scala b/src/main/scala/com/github/sstone/amqp/ChannelOwner.scala index 807a00b..d4d528b 100644 --- a/src/main/scala/com/github/sstone/amqp/ChannelOwner.scala +++ b/src/main/scala/com/github/sstone/amqp/ChannelOwner.scala @@ -157,7 +157,7 @@ class ChannelOwner(init: Seq[Request] = Seq.empty[Request], channelParams: Optio import ChannelOwner._ - var requestLog: Vector[Request] = init.toVector + var requestLog: Vector[(Request, Option[ActorRef])] = init.map(_ -> None).toVector val statusListeners = mutable.HashSet.empty[ActorRef] override def preStart() = context.parent ! ConnectionOwner.CreateChannel @@ -185,13 +185,13 @@ class ChannelOwner(init: Seq[Request] = Seq.empty[Request], channelParams: Optio forwarder ! AddShutdownListener(self) forwarder ! AddReturnListener(self) onChannel(channel, forwarder) - requestLog.map(r => self forward r) + requestLog.foreach { case (request, sender) => self.tell(request, sender.getOrElse(context.sender())) } log.info(s"got channel $channel") statusListeners.map(a => a ! Connected) context.become(connected(channel, forwarder)) } case Record(request: Request) => { - requestLog :+= request + requestLog :+= request -> Some(sender()) } case AddStatusListener(actor) => addStatusListener(actor) @@ -203,7 +203,7 @@ class ChannelOwner(init: Seq[Request] = Seq.empty[Request], channelParams: Optio def connected(channel: Channel, forwarder: ActorRef): Receive = LoggingReceive { case Amqp.Ok(_, _) => () case Record(request: Request) => { - requestLog :+= request + requestLog :+= request -> Some(sender()) self forward request } case AddStatusListener(listener) => { diff --git a/src/test/scala/com/github/sstone/amqp/ChannelSpec.scala b/src/test/scala/com/github/sstone/amqp/ChannelSpec.scala index f179d87..14afee4 100644 --- a/src/test/scala/com/github/sstone/amqp/ChannelSpec.scala +++ b/src/test/scala/com/github/sstone/amqp/ChannelSpec.scala @@ -13,11 +13,11 @@ import com.rabbitmq.client.ConnectionFactory import com.github.sstone.amqp.Amqp._ import scala.util.Random -class ChannelSpec extends TestKit(ActorSystem("TestSystem")) with WordSpecLike with ShouldMatchers with BeforeAndAfter with ImplicitSender { +trait ChannelSpecNoTestKit extends WordSpecLike with ShouldMatchers with BeforeAndAfter { + implicit val system: ActorSystem + implicit val timeout = Timeout(5 seconds) val connFactory = new ConnectionFactory() - val uri = system.settings.config.getString("amqp-client-test.rabbitmq.uri") - connFactory.setUri(uri) var conn: ActorRef = _ var channelOwner: ActorRef = _ val random = new Random() @@ -32,6 +32,8 @@ class ChannelSpec extends TestKit(ActorSystem("TestSystem")) with WordSpecLike w before { println("before") + val uri = system.settings.config.getString("amqp-client-test.rabbitmq.uri") + connFactory.setUri(uri) conn = system.actorOf(ConnectionOwner.props(connFactory, 1 second)) channelOwner = ConnectionOwner.createChildActor(conn, ChannelOwner.props()) waitForConnection(system, conn, channelOwner).await(5, TimeUnit.SECONDS) @@ -42,3 +44,5 @@ class ChannelSpec extends TestKit(ActorSystem("TestSystem")) with WordSpecLike w Await.result(gracefulStop(conn, 5 seconds), 6 seconds) } } + +class ChannelSpec extends TestKit(ActorSystem("TestSystem")) with ChannelSpecNoTestKit with ImplicitSender diff --git a/src/test/scala/com/github/sstone/amqp/ConsumerSpec.scala b/src/test/scala/com/github/sstone/amqp/ConsumerSpec.scala index 37b9c1d..c71a35d 100644 --- a/src/test/scala/com/github/sstone/amqp/ConsumerSpec.scala +++ b/src/test/scala/com/github/sstone/amqp/ConsumerSpec.scala @@ -1,21 +1,28 @@ package com.github.sstone.amqp +import akka.actor.ActorSystem +import akka.pattern.{ask, gracefulStop} import akka.testkit.TestProbe +import akka.util.Timeout import com.github.sstone.amqp.Amqp._ -import concurrent.duration._ import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner +import scala.concurrent.Await +import scala.concurrent.duration._ + @RunWith(classOf[JUnitRunner]) -class ConsumerSpec extends ChannelSpec { +class ConsumerSpec extends ChannelSpecNoTestKit { + override implicit val system: ActorSystem = ActorSystem("ConsumerSpec") "Consumers" should { "receive messages sent by producers" in { val exchange = ExchangeParameters(name = "amq.direct", exchangeType = "", passive = true) val queue = QueueParameters(name = "", passive = false, exclusive = true) - ignoreMsg { + val probe = TestProbe() + implicit val sender = probe.ref + probe.ignoreMsg { case Amqp.Ok(p:Publish, _) => true } - val probe = TestProbe() val consumer = ConnectionOwner.createChildActor(conn, Consumer.props(listener = Some(probe.ref)), timeout = 5000 millis) val producer = ConnectionOwner.createChildActor(conn, ChannelOwner.props()) consumer ! AddStatusListener(probe.ref) @@ -23,7 +30,7 @@ class ConsumerSpec extends ChannelSpec { probe.expectMsg(1 second, ChannelOwner.Connected) probe.expectMsg(1 second, ChannelOwner.Connected) consumer ! AddBinding(Binding(exchange, queue, "my_key")) - val check = receiveOne(1 second) + val check = probe.receiveOne(1 second) println(check) val message = "yo!".getBytes producer ! Publish(exchange.name, "my_key", message) @@ -32,12 +39,16 @@ class ConsumerSpec extends ChannelSpec { "be able to set their channel's prefetch size" in { val queue = randomQueue val probe = TestProbe() + implicit val sender = probe.ref + probe.ignoreMsg { + case Amqp.Ok(p:Publish, _) => true + } val consumer = ConnectionOwner.createChildActor(conn, Consumer.props(listener = probe.ref, autoack = false, channelParams = Some(ChannelParameters(qos = 3))), timeout = 5000 millis) consumer ! AddStatusListener(probe.ref) probe.expectMsg(1 second, ChannelOwner.Connected) consumer ! AddQueue(queue) - val Amqp.Ok(AddQueue(_), _) = receiveOne(1 second) + val Amqp.Ok(AddQueue(_), _) = probe.receiveOne(1 second) consumer ! Publish("", queue.name, "test".getBytes("UTF-8")) val delivery1 = probe.expectMsgClass(200 milliseconds, classOf[Delivery]) @@ -52,14 +63,15 @@ class ConsumerSpec extends ChannelSpec { // but if we ack one our our messages we shoule get the 4th delivery consumer ! Ack(deliveryTag = delivery1.envelope.getDeliveryTag) - val Amqp.Ok(Ack(_), _) = receiveOne(1 second) + val Amqp.Ok(Ack(_), _) = probe.receiveOne(1 second) val delivery4 = probe.expectMsgClass(200 milliseconds, classOf[Delivery]) } "be restarted if their channel crashes" in { val exchange = ExchangeParameters(name = "amq.direct", exchangeType = "", passive = true) val queue = randomQueue val probe = TestProbe() - ignoreMsg { + implicit val sender = probe.ref + probe.ignoreMsg { case Amqp.Ok(p:Publish, _) => true } val consumer = ConnectionOwner.createChildActor(conn, Consumer.props(listener = Some(probe.ref)), timeout = 5000 millis) @@ -69,7 +81,7 @@ class ConsumerSpec extends ChannelSpec { probe.expectMsg(1 second, ChannelOwner.Connected) probe.expectMsg(1 second, ChannelOwner.Connected) consumer ! Record(AddBinding(Binding(exchange, queue, "my_key"))) - val Amqp.Ok(AddBinding(_), _) = receiveOne(1 second) + val Amqp.Ok(AddBinding(_), _) = probe.receiveOne(1 second) val message = "yo!".getBytes producer ! Publish(exchange.name, "my_key", message) @@ -77,8 +89,9 @@ class ConsumerSpec extends ChannelSpec { // crash the consumer's channel consumer ! DeclareExchange(ExchangeParameters(name = "foo", passive = true, exchangeType ="")) - receiveOne(1 second) + val Amqp.Error(DeclareExchange(_), _) = probe.receiveOne(1 second) probe.expectMsgAllOf(1 second, ChannelOwner.Disconnected, ChannelOwner.Connected) + val Ok(AddBinding(Binding(`exchange`, `queue`, "my_key")), Some(_)) = probe.receiveOne(1 second) Thread.sleep(100) producer ! Publish(exchange.name, "my_key", message) @@ -89,7 +102,8 @@ class ConsumerSpec extends ChannelSpec { val exchange = ExchangeParameters(name = randomExchangeName, exchangeType = "direct", passive = false, durable = false, autodelete = true) val queue = randomQueue val probe = TestProbe() - ignoreMsg { + implicit val sender = probe.ref + probe.ignoreMsg { case Amqp.Ok(p:Publish, _) => true } val consumer = ConnectionOwner.createChildActor(conn, Consumer.props(listener = Some(probe.ref)), timeout = 5000 millis) @@ -99,12 +113,12 @@ class ConsumerSpec extends ChannelSpec { probe.expectMsg(1 second, ChannelOwner.Connected) probe.expectMsg(1 second, ChannelOwner.Connected) consumer ! AddBinding(Binding(exchange, queue, "test_key")) - val Amqp.Ok(AddBinding(_), _) = receiveOne(1 second) + val Amqp.Ok(AddBinding(_), _) = probe.receiveOne(1 second) // check that our exchange was created val exchange1 = exchange.copy(passive = true) consumer ! DeclareExchange(exchange1) - val Amqp.Ok(DeclareExchange(_), _) = receiveOne(1 second) + val Amqp.Ok(DeclareExchange(_), _) = probe.receiveOne(1 second) // check that publishing works producer ! Publish(exchange.name, "test_key", "test message".getBytes("UTF-8")) @@ -114,7 +128,8 @@ class ConsumerSpec extends ChannelSpec { val queue1 = randomQueue val queue2 = randomQueue val probe = TestProbe() - ignoreMsg { + implicit val sender = probe.ref + probe.ignoreMsg { case Amqp.Ok(p:Publish, _) => true } val consumer = ConnectionOwner.createChildActor(conn, Consumer.props(listener = Some(probe.ref), autoack = false), timeout = 5000 millis) @@ -125,9 +140,9 @@ class ConsumerSpec extends ChannelSpec { probe.expectMsg(1 second, ChannelOwner.Connected) consumer ! AddQueue(queue1) - val Amqp.Ok(AddQueue(_), Some(consumerTag1: String)) = receiveOne(1 second) + val Amqp.Ok(AddQueue(_), Some(consumerTag1: String)) = probe.receiveOne(1 second) consumer ! AddQueue(queue2) - val Amqp.Ok(AddQueue(_), Some(consumerTag2: String)) = receiveOne(1 second) + val Amqp.Ok(AddQueue(_), Some(consumerTag2: String)) = probe.receiveOne(1 second) producer ! Publish("", queue1.name, "test1".getBytes("UTF-8")) val delivery1: Delivery = probe.expectMsgClass(classOf[Delivery]) @@ -138,13 +153,17 @@ class ConsumerSpec extends ChannelSpec { assert(delivery2.consumerTag === consumerTag2) consumer ! CancelConsumer(consumerTag1) - val Amqp.Ok(CancelConsumer(_), _) = receiveOne(1 second) + val Amqp.Ok(CancelConsumer(_), _) = probe.receiveOne(1 second) producer ! Publish("", queue1.name, "test1".getBytes("UTF-8")) probe.expectNoMsg() } "send consumer cancellation notifications" in { val probe = TestProbe() + implicit val sender = probe.ref + probe.ignoreMsg { + case Amqp.Ok(p:Publish, _) => true + } val queue = randomQueue val consumer = ConnectionOwner.createChildActor(conn, Consumer.props(listener = Some(probe.ref), autoack = false), timeout = 5000 millis) val producer = ConnectionOwner.createChildActor(conn, ChannelOwner.props()) @@ -154,18 +173,22 @@ class ConsumerSpec extends ChannelSpec { probe.expectMsg(1 second, ChannelOwner.Connected) consumer ! AddQueue(queue) - val Amqp.Ok(AddQueue(_), Some(consumerTag: String)) = receiveOne(1 second) + val Amqp.Ok(AddQueue(_), Some(consumerTag: String)) = probe.receiveOne(1 second) producer ! Publish("", queue.name, "test".getBytes("UTF-8")) val delivery: Delivery = probe.expectMsgClass(classOf[Delivery]) assert(delivery.consumerTag === consumerTag) producer ! DeleteQueue(queue.name) - val Ok(DeleteQueue(_, _, _), result) = receiveOne(1 second) + val Ok(DeleteQueue(_, _, _), result) = probe.receiveOne(1 second) probe.expectMsg(1 second, ConsumerCancelled(consumerTag)) } "create exclusive consumers" in { val probe = TestProbe() + implicit val sender = probe.ref + probe.ignoreMsg { + case Amqp.Ok(p:Publish, _) => true + } val queue = randomQueue val consumer = ConnectionOwner.createChildActor(conn, Consumer.props( listener = Some(probe.ref), autoack = false, init = Seq.empty[Request], channelParams = None, @@ -179,7 +202,7 @@ class ConsumerSpec extends ChannelSpec { probe.expectMsg(1 second, ChannelOwner.Connected) consumer ! AddQueue(queue) - val Amqp.Ok(AddQueue(_), Some(consumerTag: String)) = receiveOne(1 second) + val Amqp.Ok(AddQueue(_), Some(consumerTag: String)) = probe.receiveOne(1 second) producer ! Publish("", queue.name, "test".getBytes("UTF-8")) val delivery: Delivery = probe.expectMsgClass(classOf[Delivery]) @@ -194,7 +217,17 @@ class ConsumerSpec extends ChannelSpec { // you cannot have more than 1 exclusive consumer on the same queue consumer1 ! AddQueue(queue) - val Amqp.Error(_, reason) = receiveOne(1 second) + val Amqp.Error(_, reason) = probe.receiveOne(1 second) + } + "save sender for requests while disconnected" in { + val declareExchange = DeclareExchange(ExchangeParameters(name = "amq.direct", passive = true, exchangeType = "")) + val conn = system.actorOf(ConnectionOwner.props(connFactory, 1 second)) + try { + val channelOwner = ConnectionOwner.createChildActor(conn, ChannelOwner.props()) + val Ok(`declareExchange`, Some(_)) = Await.result(channelOwner.ask(Record(declareExchange))(Timeout(1 second)), 1 second) + } finally { + Await.result(gracefulStop(conn, 5 seconds), 6 seconds) + } } } }