Skip to content
This repository was archived by the owner on Jun 11, 2021. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ version := "1.5-SNAPSHOT"

scalaVersion := "2.11.7"

scalacOptions ++= Seq("-feature", "-language:postfixOps")
scalacOptions ++= Seq("-feature", "-language:postfixOps")

resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"

Expand Down
1 change: 1 addition & 0 deletions project/build.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sbt.version=0.13.16
8 changes: 4 additions & 4 deletions src/main/scala/com/github/sstone/amqp/ChannelOwner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ class ChannelOwner(init: Seq[Request] = Seq.empty[Request], channelParams: Optio

import ChannelOwner._

var requestLog: Vector[Request] = init.toVector
var requestLog: Vector[(Request, Option[ActorRef])] = init.map(_ -> None).toVector
val statusListeners = mutable.HashSet.empty[ActorRef]

override def preStart() = context.parent ! ConnectionOwner.CreateChannel
Expand Down Expand Up @@ -185,13 +185,13 @@ class ChannelOwner(init: Seq[Request] = Seq.empty[Request], channelParams: Optio
forwarder ! AddShutdownListener(self)
forwarder ! AddReturnListener(self)
onChannel(channel, forwarder)
requestLog.map(r => self forward r)
requestLog.foreach { case (request, sender) => self.tell(request, sender.getOrElse(context.sender())) }
Copy link
Author

@dontgitit dontgitit Jan 31, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we saved a sender (which we do for non-init requests), forward that sender. Otherwise, do current behavior (forward current sender)

log.info(s"got channel $channel")
statusListeners.map(a => a ! Connected)
context.become(connected(channel, forwarder))
}
case Record(request: Request) => {
requestLog :+= request
requestLog :+= request -> Some(sender())
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we get requests while disconnected, save the sender so that we reply to them later

}
case AddStatusListener(actor) => addStatusListener(actor)

Expand All @@ -203,7 +203,7 @@ class ChannelOwner(init: Seq[Request] = Seq.empty[Request], channelParams: Optio
def connected(channel: Channel, forwarder: ActorRef): Receive = LoggingReceive {
case Amqp.Ok(_, _) => ()
case Record(request: Request) => {
requestLog :+= request
requestLog :+= request -> Some(sender())
self forward request
}
case AddStatusListener(listener) => {
Expand Down
10 changes: 7 additions & 3 deletions src/test/scala/com/github/sstone/amqp/ChannelSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ import com.rabbitmq.client.ConnectionFactory
import com.github.sstone.amqp.Amqp._
import scala.util.Random

class ChannelSpec extends TestKit(ActorSystem("TestSystem")) with WordSpecLike with ShouldMatchers with BeforeAndAfter with ImplicitSender {
trait ChannelSpecNoTestKit extends WordSpecLike with ShouldMatchers with BeforeAndAfter {
Copy link
Author

@dontgitit dontgitit Jan 31, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests didn't pass for me. Specifically, I had issues with ConsumerSpec. The test suite extends TestKit, and then uses TestProbe... but that actually reuses the same actor across all tests in the suite (I think due to mixing in ImplicitSender; that uses the actor from the testkit). The tests mostly worked when run individually, but would run into issues when run all together via sbt test.

This change lets each test create its own actor. See also https://hseeberger.gitlab.io/2017/09/13/how-to-use-akka-testkit.html

implicit val system: ActorSystem

implicit val timeout = Timeout(5 seconds)
val connFactory = new ConnectionFactory()
val uri = system.settings.config.getString("amqp-client-test.rabbitmq.uri")
connFactory.setUri(uri)
var conn: ActorRef = _
var channelOwner: ActorRef = _
val random = new Random()
Expand All @@ -32,6 +32,8 @@ class ChannelSpec extends TestKit(ActorSystem("TestSystem")) with WordSpecLike w

before {
println("before")
val uri = system.settings.config.getString("amqp-client-test.rabbitmq.uri")
connFactory.setUri(uri)
conn = system.actorOf(ConnectionOwner.props(connFactory, 1 second))
channelOwner = ConnectionOwner.createChildActor(conn, ChannelOwner.props())
waitForConnection(system, conn, channelOwner).await(5, TimeUnit.SECONDS)
Expand All @@ -42,3 +44,5 @@ class ChannelSpec extends TestKit(ActorSystem("TestSystem")) with WordSpecLike w
Await.result(gracefulStop(conn, 5 seconds), 6 seconds)
}
}

class ChannelSpec extends TestKit(ActorSystem("TestSystem")) with ChannelSpecNoTestKit with ImplicitSender
75 changes: 54 additions & 21 deletions src/test/scala/com/github/sstone/amqp/ConsumerSpec.scala
Original file line number Diff line number Diff line change
@@ -1,29 +1,36 @@
package com.github.sstone.amqp

import akka.actor.ActorSystem
import akka.pattern.{ask, gracefulStop}
import akka.testkit.TestProbe
import akka.util.Timeout
import com.github.sstone.amqp.Amqp._
import concurrent.duration._
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner

import scala.concurrent.Await
import scala.concurrent.duration._

@RunWith(classOf[JUnitRunner])
class ConsumerSpec extends ChannelSpec {
class ConsumerSpec extends ChannelSpecNoTestKit {
override implicit val system: ActorSystem = ActorSystem("ConsumerSpec")
"Consumers" should {
"receive messages sent by producers" in {
val exchange = ExchangeParameters(name = "amq.direct", exchangeType = "", passive = true)
val queue = QueueParameters(name = "", passive = false, exclusive = true)
ignoreMsg {
val probe = TestProbe()
implicit val sender = probe.ref
probe.ignoreMsg {
case Amqp.Ok(p:Publish, _) => true
}
val probe = TestProbe()
val consumer = ConnectionOwner.createChildActor(conn, Consumer.props(listener = Some(probe.ref)), timeout = 5000 millis)
val producer = ConnectionOwner.createChildActor(conn, ChannelOwner.props())
consumer ! AddStatusListener(probe.ref)
producer ! AddStatusListener(probe.ref)
probe.expectMsg(1 second, ChannelOwner.Connected)
probe.expectMsg(1 second, ChannelOwner.Connected)
consumer ! AddBinding(Binding(exchange, queue, "my_key"))
val check = receiveOne(1 second)
val check = probe.receiveOne(1 second)
println(check)
val message = "yo!".getBytes
producer ! Publish(exchange.name, "my_key", message)
Expand All @@ -32,12 +39,16 @@ class ConsumerSpec extends ChannelSpec {
"be able to set their channel's prefetch size" in {
val queue = randomQueue
val probe = TestProbe()
implicit val sender = probe.ref
probe.ignoreMsg {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test was doing Publishes but wasn't expecting Ok(Publish)

case Amqp.Ok(p:Publish, _) => true
}
val consumer = ConnectionOwner.createChildActor(conn, Consumer.props(listener = probe.ref, autoack = false, channelParams = Some(ChannelParameters(qos = 3))), timeout = 5000 millis)
consumer ! AddStatusListener(probe.ref)
probe.expectMsg(1 second, ChannelOwner.Connected)

consumer ! AddQueue(queue)
val Amqp.Ok(AddQueue(_), _) = receiveOne(1 second)
val Amqp.Ok(AddQueue(_), _) = probe.receiveOne(1 second)

consumer ! Publish("", queue.name, "test".getBytes("UTF-8"))
val delivery1 = probe.expectMsgClass(200 milliseconds, classOf[Delivery])
Expand All @@ -52,14 +63,15 @@ class ConsumerSpec extends ChannelSpec {

// but if we ack one our our messages we shoule get the 4th delivery
consumer ! Ack(deliveryTag = delivery1.envelope.getDeliveryTag)
val Amqp.Ok(Ack(_), _) = receiveOne(1 second)
val Amqp.Ok(Ack(_), _) = probe.receiveOne(1 second)
val delivery4 = probe.expectMsgClass(200 milliseconds, classOf[Delivery])
}
"be restarted if their channel crashes" in {
val exchange = ExchangeParameters(name = "amq.direct", exchangeType = "", passive = true)
val queue = randomQueue
val probe = TestProbe()
ignoreMsg {
implicit val sender = probe.ref
probe.ignoreMsg {
case Amqp.Ok(p:Publish, _) => true
}
val consumer = ConnectionOwner.createChildActor(conn, Consumer.props(listener = Some(probe.ref)), timeout = 5000 millis)
Expand All @@ -69,16 +81,17 @@ class ConsumerSpec extends ChannelSpec {
probe.expectMsg(1 second, ChannelOwner.Connected)
probe.expectMsg(1 second, ChannelOwner.Connected)
consumer ! Record(AddBinding(Binding(exchange, queue, "my_key")))
val Amqp.Ok(AddBinding(_), _) = receiveOne(1 second)
val Amqp.Ok(AddBinding(_), _) = probe.receiveOne(1 second)

val message = "yo!".getBytes
producer ! Publish(exchange.name, "my_key", message)
probe.expectMsgClass(1.second, classOf[Delivery])

// crash the consumer's channel
consumer ! DeclareExchange(ExchangeParameters(name = "foo", passive = true, exchangeType =""))
receiveOne(1 second)
val Amqp.Error(DeclareExchange(_), _) = probe.receiveOne(1 second)
probe.expectMsgAllOf(1 second, ChannelOwner.Disconnected, ChannelOwner.Connected)
val Ok(AddBinding(Binding(`exchange`, `queue`, "my_key")), Some(_)) = probe.receiveOne(1 second)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after the channel crashes and reconnects, it plays back the AddBinding as well

Thread.sleep(100)

producer ! Publish(exchange.name, "my_key", message)
Expand All @@ -89,7 +102,8 @@ class ConsumerSpec extends ChannelSpec {
val exchange = ExchangeParameters(name = randomExchangeName, exchangeType = "direct", passive = false, durable = false, autodelete = true)
val queue = randomQueue
val probe = TestProbe()
ignoreMsg {
implicit val sender = probe.ref
probe.ignoreMsg {
case Amqp.Ok(p:Publish, _) => true
}
val consumer = ConnectionOwner.createChildActor(conn, Consumer.props(listener = Some(probe.ref)), timeout = 5000 millis)
Expand All @@ -99,12 +113,12 @@ class ConsumerSpec extends ChannelSpec {
probe.expectMsg(1 second, ChannelOwner.Connected)
probe.expectMsg(1 second, ChannelOwner.Connected)
consumer ! AddBinding(Binding(exchange, queue, "test_key"))
val Amqp.Ok(AddBinding(_), _) = receiveOne(1 second)
val Amqp.Ok(AddBinding(_), _) = probe.receiveOne(1 second)

// check that our exchange was created
val exchange1 = exchange.copy(passive = true)
consumer ! DeclareExchange(exchange1)
val Amqp.Ok(DeclareExchange(_), _) = receiveOne(1 second)
val Amqp.Ok(DeclareExchange(_), _) = probe.receiveOne(1 second)

// check that publishing works
producer ! Publish(exchange.name, "test_key", "test message".getBytes("UTF-8"))
Expand All @@ -114,7 +128,8 @@ class ConsumerSpec extends ChannelSpec {
val queue1 = randomQueue
val queue2 = randomQueue
val probe = TestProbe()
ignoreMsg {
implicit val sender = probe.ref
probe.ignoreMsg {
case Amqp.Ok(p:Publish, _) => true
}
val consumer = ConnectionOwner.createChildActor(conn, Consumer.props(listener = Some(probe.ref), autoack = false), timeout = 5000 millis)
Expand All @@ -125,9 +140,9 @@ class ConsumerSpec extends ChannelSpec {
probe.expectMsg(1 second, ChannelOwner.Connected)

consumer ! AddQueue(queue1)
val Amqp.Ok(AddQueue(_), Some(consumerTag1: String)) = receiveOne(1 second)
val Amqp.Ok(AddQueue(_), Some(consumerTag1: String)) = probe.receiveOne(1 second)
consumer ! AddQueue(queue2)
val Amqp.Ok(AddQueue(_), Some(consumerTag2: String)) = receiveOne(1 second)
val Amqp.Ok(AddQueue(_), Some(consumerTag2: String)) = probe.receiveOne(1 second)

producer ! Publish("", queue1.name, "test1".getBytes("UTF-8"))
val delivery1: Delivery = probe.expectMsgClass(classOf[Delivery])
Expand All @@ -138,13 +153,17 @@ class ConsumerSpec extends ChannelSpec {
assert(delivery2.consumerTag === consumerTag2)

consumer ! CancelConsumer(consumerTag1)
val Amqp.Ok(CancelConsumer(_), _) = receiveOne(1 second)
val Amqp.Ok(CancelConsumer(_), _) = probe.receiveOne(1 second)

producer ! Publish("", queue1.name, "test1".getBytes("UTF-8"))
probe.expectNoMsg()
}
"send consumer cancellation notifications" in {
val probe = TestProbe()
implicit val sender = probe.ref
probe.ignoreMsg {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test publishes without expecting Ok

case Amqp.Ok(p:Publish, _) => true
}
val queue = randomQueue
val consumer = ConnectionOwner.createChildActor(conn, Consumer.props(listener = Some(probe.ref), autoack = false), timeout = 5000 millis)
val producer = ConnectionOwner.createChildActor(conn, ChannelOwner.props())
Expand All @@ -154,18 +173,22 @@ class ConsumerSpec extends ChannelSpec {
probe.expectMsg(1 second, ChannelOwner.Connected)

consumer ! AddQueue(queue)
val Amqp.Ok(AddQueue(_), Some(consumerTag: String)) = receiveOne(1 second)
val Amqp.Ok(AddQueue(_), Some(consumerTag: String)) = probe.receiveOne(1 second)

producer ! Publish("", queue.name, "test".getBytes("UTF-8"))
val delivery: Delivery = probe.expectMsgClass(classOf[Delivery])
assert(delivery.consumerTag === consumerTag)

producer ! DeleteQueue(queue.name)
val Ok(DeleteQueue(_, _, _), result) = receiveOne(1 second)
val Ok(DeleteQueue(_, _, _), result) = probe.receiveOne(1 second)
probe.expectMsg(1 second, ConsumerCancelled(consumerTag))
}
"create exclusive consumers" in {
val probe = TestProbe()
implicit val sender = probe.ref
probe.ignoreMsg {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test also had a publish without expecting Ok(Publish)

case Amqp.Ok(p:Publish, _) => true
}
val queue = randomQueue
val consumer = ConnectionOwner.createChildActor(conn, Consumer.props(
listener = Some(probe.ref), autoack = false, init = Seq.empty[Request], channelParams = None,
Expand All @@ -179,7 +202,7 @@ class ConsumerSpec extends ChannelSpec {
probe.expectMsg(1 second, ChannelOwner.Connected)

consumer ! AddQueue(queue)
val Amqp.Ok(AddQueue(_), Some(consumerTag: String)) = receiveOne(1 second)
val Amqp.Ok(AddQueue(_), Some(consumerTag: String)) = probe.receiveOne(1 second)

producer ! Publish("", queue.name, "test".getBytes("UTF-8"))
val delivery: Delivery = probe.expectMsgClass(classOf[Delivery])
Expand All @@ -194,7 +217,17 @@ class ConsumerSpec extends ChannelSpec {

// you cannot have more than 1 exclusive consumer on the same queue
consumer1 ! AddQueue(queue)
val Amqp.Error(_, reason) = receiveOne(1 second)
val Amqp.Error(_, reason) = probe.receiveOne(1 second)
}
"save sender for requests while disconnected" in {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a new test to make sure ask works if you make a request while the actor is in disconnected state.

previously, it would time out (because the reply didn't go back to the asker)

val declareExchange = DeclareExchange(ExchangeParameters(name = "amq.direct", passive = true, exchangeType = ""))
val conn = system.actorOf(ConnectionOwner.props(connFactory, 1 second))
try {
val channelOwner = ConnectionOwner.createChildActor(conn, ChannelOwner.props())
val Ok(`declareExchange`, Some(_)) = Await.result(channelOwner.ask(Record(declareExchange))(Timeout(1 second)), 1 second)
} finally {
Await.result(gracefulStop(conn, 5 seconds), 6 seconds)
}
}
}
}