Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,17 @@ object ConnectorSettings {
ConnectorSettings(connectionProvider = LocalConnectionProvider)
}

/**
* Settings for configuring STOMP client connections.
*
* @param connectionProvider The provider responsible for creating STOMP client connections.
* @param destination Optional destination (topic or queue) to subscribe to or send messages to.
* Required for [[com.nachinius.akka.stream.stomp.scaladsl.StompClientSource]].
* Optional for [[com.nachinius.akka.stream.stomp.scaladsl.StompClientSink]]
* (can be set per message instead).
* @param withAck Whether to use client acknowledgment mode. When true, messages must be
* acknowledged before the next message is requested. Defaults to true.
*/
case class ConnectorSettings(
connectionProvider: ConnectionProvider,
destination: Option[String] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,16 @@ import java.util
import io.vertx.core.buffer.{Buffer => VertxBuffer}
import io.vertx.ext.stomp.{Frame => VertxFrame}

import scala.collection.JavaConverters._

/**
* Represents a STOMP message frame with headers and body.
*
* @param headers Map of header key-value pairs for the STOMP frame.
* @param body Message body as a vector of bytes.
*/
case class SendingFrame(headers: Map[String, String], body: Vector[Byte]) {

import scala.collection.JavaConverters._
def toVertexFrame: VertxFrame = {
val obj = new VertxFrame().setCommand(VertxFrame.Command.SEND)
val h: util.Map[String, String] = headers.asJava
Expand All @@ -29,7 +36,12 @@ case class SendingFrame(headers: Map[String, String], body: Vector[Byte]) {
}

object SendingFrame {
import scala.collection.JavaConverters._
/**
* Creates a SendingFrame from a Vert.x STOMP Frame.
*
* @param frame The Vert.x frame to convert.
* @return A SendingFrame with the frame's headers and body.
*/
def from(frame: VertxFrame): SendingFrame = {
val body: scala.collection.immutable.Vector[Byte] =
if (!frame.hasEmptyBody) frame.getBodyAsByteArray.toVector
Expand All @@ -41,10 +53,32 @@ object SendingFrame {
} else frame.getHeaders.asScala.toMap
SendingFrame(headers, body)
}

/**
* Creates a SendingFrame from a string body.
*
* @param bodyStr The string to use as the message body.
* @return A SendingFrame with the string converted to bytes.
*/
def from(bodyStr: String): SendingFrame =
from(bodyStr.getBytes().toVector)

/**
* Creates a SendingFrame from a byte vector body.
*
* @param body The message body as bytes.
* @return A SendingFrame with no headers and the given body.
*/
def from(body: Vector[Byte]): SendingFrame =
SendingFrame(Map(), body)

/**
* Creates a SendingFrame with a destination and body.
*
* @param destination The STOMP destination (topic or queue) for this message.
* @param body The message body as bytes.
* @return A SendingFrame with destination header and the given body.
*/
def from(destination: String, body: Vector[Byte]): SendingFrame =
from(body).withDestination(destination)
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import akka.stream._
import akka.stream.stage._
import io.vertx.ext.stomp.{Frame, StompClientConnection}

import scala.collection.{JavaConverters, mutable}
import scala.collection.mutable
import scala.collection.JavaConverters._
import scala.concurrent.{Future, Promise}

/**
Expand All @@ -36,24 +37,29 @@ final class SourceStage(settings: ConnectorSettings)

var pending: Option[Frame] = None
override def whenConnected(): Unit = {
val receiveSubscriptionMessage = getAsyncCallback[Frame] { frame =>
{
pending = Some(frame)
handleDelivery(frame)
}

}
settings.destination match {
case Some(dest) =>
val receiveSubscriptionMessage = getAsyncCallback[Frame] { frame =>
{
pending = Some(frame)
handleDelivery(frame)
}

import JavaConverters._
}

connection.subscribe(
settings.destination.get,
headers.asJava, { frame: Frame =>
receiveSubscriptionMessage.invoke(frame)
}, { frame: Frame =>
acknowledge(frame)
}
)
connection.subscribe(
dest,
headers.asJava, { frame: Frame =>
receiveSubscriptionMessage.invoke(frame)
}, { frame: Frame =>
acknowledge(frame)
}
)
case None =>
val ex = new IllegalArgumentException("destination is required for StompClientSource")
promise.tryFailure(ex)
failStage(ex)
}
}

override def receiveHandler(connection: StompClientConnection): Unit =
Expand Down Expand Up @@ -102,7 +108,7 @@ final class SourceStage(settings: ConnectorSettings)

override def shape: SourceShape[SendingFrame] = SourceShape.of(out)

override def toString: String = "StompClientSink"
override def toString: String = "StompClientSource"

override protected def initialAttributes: Attributes = SourceStage.defaultAttributes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,21 @@ import scala.concurrent.Future
object StompClientSink {

/**
* Scala API: Connects to a STOMP server upon materialization and sends incoming messages to the server.
* Each materialized sink will create one connection to the broker. This stage sends messages to the destination
* named in the settings options, if present, instead of the one written in the incoming message to the Sink.
* Scala API: Creates a Sink that connects to a STOMP server and sends messages.
*
* Upon materialization, this sink establishes a connection to the STOMP server specified in the
* settings. Messages received by the sink are sent to the STOMP destination. The destination
* can be set in the settings (applied to all messages) or in each individual SendingFrame.
* If both are set, the per-message destination takes precedence.
*
* When `withAck` is enabled in settings, the sink waits for server acknowledgment before
* requesting the next message, providing flow control.
*
* The materialized Future[Done] completes when the stream finishes normally or fails if
* there's a STOMP connection error.
*
* This stage materializes to a Future[Done], which can be used to know when the Sink completes, either normally
* or because of a stomp failure.
* @param settings Configuration for the STOMP connection. Destination is optional for sink.
* @return A Sink accepting SendingFrame messages, materializing to a Future[Done].
*/
def apply(settings: ConnectorSettings): Sink[SendingFrame, Future[Done]] = Sink.fromGraph(new SinkStage(settings))
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,18 @@ import scala.concurrent.Future
object StompClientSource {

/**
* Scala API: Upon materialization this source [[StompClientSource]] connects and subscribes to a topic (set in settings) published in a Stomp server. Each message may be Ack, and handles backpressure.
* Scala API: Creates a Source that connects to a STOMP server and subscribes to a topic or queue.
*
* Upon materialization, this source establishes a connection to the STOMP server specified in the
* settings and subscribes to the destination topic or queue. The materialized Future[Done] completes
* when the connection is successfully established and subscription is active.
*
* Messages received from the server are emitted downstream with automatic backpressure handling.
* When `withAck` is enabled in settings, messages are acknowledged after being pushed downstream.
*
* @param settings Configuration for the STOMP connection. Must include a destination.
* @return A Source of SendingFrame messages, materializing to a Future[Done] that completes when connected.
* @throws IllegalArgumentException if settings.destination is None.
*/
def apply(settings: ConnectorSettings): Source[SendingFrame, Future[Done]] =
Source.fromGraph(new SourceStage(settings))
Expand Down