An idiomatic Scala port of Clojure core.async, with facilities for asychronous programming and communication using Channels.
Channels has no external dependencies, though it is intended to be used with Scala Async blocks.
Add SBT dependencies:
scalaVersion := "2.11.4"
libraryDependencies += "com.ververve" %% "channels" % "0.1"
// Optional (for async/await style)
libraryDependencies += "org.scala-lang.modules" %% "scala-async" % "0.9.3"Create your first Channel:
import scala.concurrent.ExecutionContext.Implicits.global
import scala.async.Async.{async, await}
import com.ververve.channels._
val c = channel[String]()
async {
val res = await(c.take)
println(res)
}
c.put("Hello")
c.closeIn the above we
- Create a
Channelthat can accept and deal outStringvalues. - In an
asyncblock we wait for a future value totakefrom theChannel. - Outside the
asyncblock we thenputthe value"Hello", which allows theasynctaketo complete. - Finally we close the
Channel.
Create an unbuffered 'rendezvous' channel that accepts Long type:
val c = channel[Long]() // c: Channel[Long]Create a channel with fixed size buffer that accepts String type:
val c = channel[String](5) // c: Channel[String]Close a channel with Channel.close:
c.close()Synchronous (blocking) operations Channel.put_! and Channel.take_!:
val c = channel[String]
Future {
c.put_!("Hello")
}
val res = c.take_!() // res: Option[String]
assert(res == Some("Hello"))Asynchronous (non-blocking) operations Channel.put and Channel.take:
val c = channel[String]
c.put("Hello")
val f = c.take() // f: Future[Option[String]]
val res = Await.result(f, 1.second) // res: Option[String]
assert(res == Some("Hello"))Asynchronous (non-blocking) operations using async blocks:
val c = channel[String]
async {
val res = await(c.take) // res: Option[String]
assert(res == Some("Hello"))
}
c.put("Hello")Mixing synchronous and asynchronous operations on the same Channel:
val c = channel[String]
c.put("Hello")
val res = c.take_!() // res: Option[String]
assert(res == Some("Hello"))Select the first available Channel.take result with alts:
val c1 = channel[Int]
val c2 = channel[String]
async {
while (true) {
val res = await(alts(c1, c2))
println("Got" + res)
}
}
c2.put("Hello")
c1.put(34)
// Outputs:
// > Got (ChannelInternal@7f5ff567, Some(Hello))
// > Got (ChannelInternal@7232ab12, Some(34))We can even select the first available Channel.take or Channel.put with alts:
val c1 = channel[Int]
val c2 = channel[String]
async {
while (true) {
await(alts(34 -> c1, c2)) match {
case (`c1`, _)) => // Put complete
case (`c2`, res) => // Take result
}
}
}
c2.put("Hello")
assert(c1.take_! == Some(34))The timeout function is used to create a Channel that closes after the specified duration.
val t = timeout[String](5.seconds)A timeout channel can be used in alts to do timed Channel operations:
val c = channel[String]
val t = timeout[String](2.minutes)
alts_!(c, t)
// After 2 minutes... returns (`t`, None)A timeout instance can be used in multiple alts to ensure all operations have identical timeout conditions:
val c1 = channel[String]
val c2 = channel[String]
val t = timeout[String](9.seconds)
alts("Hi" -> c1, t)
alts(c2, t)Channels and async blocks are very lightweight so you can create a lot of them very cheaply - below we create and chain together 100,000 in <100 milliseconds:
val length = 100000
val first = channel[Int]()
var last = first
for (i <- 0 until length) {
val dest = channel[Int]()
val src = last
async {
val v = await(src.take)
dest.put(v.get + 1)
}
last = dest
}
first.put(1)
assert(last.take_! == Some(length + 1))Released under the Eclipse Public License v1.0.