Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion src/main/scala/spray/can/server/websockets/Pipelines.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import spray.http._
import akka.util.CompactByteString
import spray.can.Http.MessageCommand
import spray.can.server.StatsSupport.StatsHolder
import java.net.InetSocketAddress
import spray.can.server.websockets.SocketPhases.FrameCommand

/**
* Sister class to HttpListener, but with a pipeline that supports websockets
Expand All @@ -27,11 +29,38 @@ private class SocketListener(bindCommander: ActorRef,
private[this] val settingsX = bind.settings getOrElse ServerSettings(context.system)
private[this] val statsHolderX = if (settingsX.statsSupport) Some(new StatsHolder) else None
val pipelineStage = SocketListener.pipelineStage(settingsX, statsHolderX)

/**
* Used to let the frameHandler send back unwrapped Frames, which it
* will wrap before putting into the pipeline
*/
private class SocketServerConnection(
tcpConnection: ActorRef,
userLevelListener: ActorRef,
pipelineStage: RawPipelineStage[ServerFrontend.Context with SslTlsContext],
remoteAddress: InetSocketAddress,
localAddress: InetSocketAddress,
settings: ServerSettings) extends HttpServerConnection(
tcpConnection,
userLevelListener,
pipelineStage,
remoteAddress,
localAddress,
settings) {

override def running(tcpConnection: ActorRef, pipelines: Pipelines): Receive = {
val sup = super.running(tcpConnection, pipelines)
({
case f: model.Frame => pipelines.commandPipeline(FrameCommand(f))
}: Receive) orElse sup
}
}

override def connected(tcpListener: ActorRef): Receive = {
case Tcp.Connected(remoteAddress, localAddress) ⇒
val conn = sender
context.actorOf(
props = Props(new HttpServerConnection(conn, bind.listener, pipelineStage, remoteAddress, localAddress, settingsX))
props = Props(new SocketServerConnection(conn, bind.listener, pipelineStage, remoteAddress, localAddress, settingsX))
.withDispatcher(httpSettings.ConnectionDispatcher),
name = connectionCounterX.next().toString)

Expand Down
13 changes: 1 addition & 12 deletions src/main/scala/spray/can/server/websockets/SocketPhases.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,9 @@ import SocketPhases.{FrameCommand,FrameEvent}
*/
case class WebsocketFrontEnd(handler: ActorRef) extends PipelineStage{

/**
* Used to let the frameHandler send back unwrapped Frames, which it
* will wrap before putting into the pipeline
*/
class ReceiverProxy(pcontext: PipelineContext) extends Actor{
def receive = {
case f: model.Frame => pcontext.actorContext.self ! FrameCommand(f)
case Tcp.Close => pcontext.actorContext.self ! Tcp.Close
}
}

def apply(pcontext: PipelineContext, commandPL: CPL, eventPL: EPL): Pipelines =
new Pipelines{
val receiveAdapter = pcontext.actorContext.actorOf(Props(new ReceiverProxy(pcontext)))
val receiveAdapter = pcontext.actorContext.self
val commandPipeline: CPL = commandPL
val eventPipeline: EPL = {
case f @ FrameEvent(e) => commandPL(Pipeline.Tell(handler, e, receiveAdapter))
Expand Down