Skip to content
This repository was archived by the owner on Aug 6, 2020. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ dist: trusty
sudo: false
group: beta
scala:
- 2.12.1
- 2.11.8
- 2.12.8
- 2.11.12
jdk:
- oraclejdk8
cache:
Expand Down
7 changes: 3 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import com.typesafe.sbt.SbtScalariform.ScalariformKeys
import interplay.ScalaVersions._
import scalariform.formatter.preferences._

val specsVersion = "3.8.9"
val specsVersion = "4.3.6"
val specsBuild = Seq(
"specs2-core",
"specs2-junit",
Expand All @@ -27,7 +27,7 @@ lazy val `play-iteratees` = project
scalaVersion := scala212,
crossScalaVersions := Seq(scala212, scala211),
libraryDependencies ++= Seq(
"org.scala-stm" %% "scala-stm" % "0.8"
"org.scala-stm" %% "scala-stm" % "0.9"
) ++ specsBuild.map(_ % Test)
)

Expand All @@ -39,7 +39,7 @@ lazy val `play-iteratees-reactive-streams` = project
scalaVersion := scala212,
crossScalaVersions := Seq(scala212, scala211),
libraryDependencies ++= Seq(
"org.reactivestreams" % "reactive-streams" % "1.0.0"
"org.reactivestreams" % "reactive-streams" % "1.0.2"
) ++ specsBuild.map(_ % Test)
).dependsOn(`play-iteratees`)

Expand All @@ -50,5 +50,4 @@ lazy val `play-iteratees-root` = (project in file("."))
scalaVersion := scala212,
crossScalaVersions := Seq(scala212, scala211)
)

playBuildRepoName in ThisBuild := "play-iteratees"
41 changes: 17 additions & 24 deletions iteratees/src/main/scala/play/api/libs/iteratee/Concurrent.scala
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,6 @@ object Concurrent {
* $paramEcSingle
*/
def buffer[E](maxBuffer: Int, length: Input[E] => Int)(implicit ec: ExecutionContext): Enumeratee[E, E] = new Enumeratee[E, E] {
val pec = ec.prepare()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If required for other versions, it cannot be removed as-it or will break back compat

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use of the prepare function is not encouraged.

  /** Prepares for the execution of a task. Returns the prepared
     *  execution context. The recommended implementation of
     *  `prepare` is to return `this`.
     *
     *  This method should no longer be overridden or called. It was
     *  originally expected that `prepare` would be called by
     *  all libraries that consume ExecutionContexts, in order to
     *  capture thread local context. However, this usage has proven
     *  difficult to implement in practice and instead it is
     *  now better to avoid using `prepare` entirely.
     *
     *  Instead, if an `ExecutionContext` needs to capture thread
     *  local context, it should capture that context when it is
     *  constructed, so that it doesn't need any additional
     *  preparation later.
     */


import scala.collection.immutable.Queue
import scala.concurrent.stm._
import play.api.libs.iteratee.Enumeratee.CheckDone
Expand Down Expand Up @@ -322,7 +320,7 @@ object Concurrent {
Iteratee.flatten(last.future)

case other =>
Iteratee.flatten(Future(length(other))(pec).map { chunkLength =>
Iteratee.flatten(Future(length(other))(ec).map { chunkLength =>
val s = state.single.getAndTransform {
case Queueing(q, l) if maxBuffer > 0 && l <= maxBuffer => Queueing(q.enqueue(other), l + chunkLength)

Expand Down Expand Up @@ -446,8 +444,6 @@ object Concurrent {
onComplete: => Unit = (),
onError: (String, Input[E]) => Unit = (_: String, _: Input[E]) => ()
)(implicit ec: ExecutionContext) = new Enumerator[E] {
implicit val pec = ec.prepare()

import scala.concurrent.stm.Ref

def apply[A](it: Iteratee[E, A]): Future[Iteratee[E, A]] = {
Expand Down Expand Up @@ -491,13 +487,13 @@ object Concurrent {
val next = k(item)
next.fold {
case Step.Done(a, in) => {
Future(onComplete)(pec).map { _ =>
Future(onComplete)(ec).map { _ =>
promise.success(next)
None
}(dec)
}
case Step.Error(msg, e) =>
Future(onError(msg, e))(pec).map { _ =>
Future(onError(msg, e))(ec).map { _ =>
promise.success(next)
None
}(dec)
Expand All @@ -512,7 +508,7 @@ object Concurrent {
}(dec)
}
}
Future(onStart(pushee))(pec).flatMap(_ => promise.future)(dec)
Future(onStart(pushee))(ec).flatMap(_ => promise.future)(dec)
}

}
Expand All @@ -529,8 +525,7 @@ object Concurrent {
* input, and the broadcaster.
*/
def broadcast[E](e: Enumerator[E], interestIsDownToZero: Broadcaster => Unit = _ => ())(implicit ec: ExecutionContext): (Enumerator[E], Broadcaster) = {
val pec = ec.prepare()
lazy val h: Hub[E] = hub(e, () => interestIsDownToZero(h))(pec)
lazy val h: Hub[E] = hub(e, () => interestIsDownToZero(h))(ec)
(h.getPatchCord(), h)
}

Expand Down Expand Up @@ -562,8 +557,6 @@ object Concurrent {
}

private def hub[E](e: Enumerator[E], interestIsDownToZero: () => Unit = () => ())(implicit ec: ExecutionContext): Hub[E] = {
val pec = ec.prepare()

import scala.concurrent.stm._

val iteratees: Ref[List[(Iteratee[E, _], Promise[Iteratee[E, _]])]] = Ref(List())
Expand Down Expand Up @@ -621,23 +614,23 @@ object Concurrent {

}
def result(): Iteratee[E, Unit] = if (in == Input.EOF || closeFlag) Done((), Input.Empty) else Cont(step)
if (downToZero) Future(interestIsDownToZero())(pec).map(_ => result())(dec) else Future.successful(result())
if (downToZero) Future(interestIsDownToZero())(ec).map(_ => result())(dec) else Future.successful(result())

}(dec))
}

new Hub[E] {

def noCords() = iteratees.single().isEmpty
def noCords(): Boolean = iteratees.single().isEmpty

def close(): Unit = {
closeFlag = true
}

def closed() = closeFlag
def closed(): Boolean = closeFlag

val redeemed = Ref(None: Option[Try[Iteratee[E, Unit]]])
def getPatchCord() = new Enumerator[E] {
def getPatchCord(): Enumerator[E] = new Enumerator[E] {

def apply[A](it: Iteratee[E, A]): Future[Iteratee[E, A]] = {
val result = Promise[Iteratee[E, A]]()
Expand Down Expand Up @@ -707,8 +700,6 @@ object Concurrent {
* $paramEcSingle
*/
def patchPanel[E](patcher: PatchPanel[E] => Unit)(implicit ec: ExecutionContext): Enumerator[E] = new Enumerator[E] {
val pec = ec.prepare()

import scala.concurrent.stm._

def apply[A](it: Iteratee[E, A]): Future[Iteratee[E, A]] = {
Expand Down Expand Up @@ -788,7 +779,7 @@ object Concurrent {
false
})
}
}))(pec).flatMap(_ => result.future)(dec)
}))(ec).flatMap(_ => result.future)(dec)

}
}
Expand Down Expand Up @@ -819,8 +810,9 @@ object Concurrent {
}
case err => folder(err)
}(ec)
toReturn.onFailure {
case e => doneIteratee.failure(e)
toReturn.onComplete {
case Failure(e) => doneIteratee.failure(e)
case _ =>
}(dec)
toReturn
}
Expand Down Expand Up @@ -848,10 +840,11 @@ object Concurrent {
val (consumeRemaining, remaining) = Concurrent.joined[E]
result.success((a, remaining))
consumeRemaining
}(dec)).onFailure {
case e => result.tryFailure(e)
}(dec)).onComplete {
case Failure(e) => result.tryFailure(e)
case _ =>
}(dec)

result.future
}
}
}
47 changes: 13 additions & 34 deletions iteratees/src/main/scala/play/api/libs/iteratee/Enumeratee.scala
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ object Enumeratee {
* $paramEcSingle
*/
def zipWith[E, A, B, C](inner1: Iteratee[E, A], inner2: Iteratee[E, B])(zipper: (A, B) => C)(implicit ec: ExecutionContext): Iteratee[E, C] = {
val pec = ec.prepare()
val pec = ec
import Execution.Implicits.{ defaultExecutionContext => ec } // Shadow ec to make this the only implicit EC in scope

def getNext(it1: Iteratee[E, A], it2: Iteratee[E, B]): Iteratee[E, C] = {
Expand Down Expand Up @@ -202,12 +202,10 @@ object Enumeratee {
*/
def mapInput[From] = new MapInput[From] {
def apply[To](f: Input[From] => Input[To])(implicit ec: ExecutionContext) = new CheckDone[From, To] {
val pec = ec.prepare()

def step[A](k: K[To, A]): K[From, Iteratee[To, A]] = {
case in @ (Input.El(_) | Input.Empty) => new CheckDone[From, To] {
def continue[A](k: K[To, A]) = Cont(step(k))
} &> Iteratee.flatten(Future(f(in))(pec).map(in => k(in))(dec))
} &> Iteratee.flatten(Future(f(in))(ec).map(in => k(in))(dec))

case Input.EOF => Done(Cont(k), Input.EOF)
}
Expand Down Expand Up @@ -274,11 +272,9 @@ object Enumeratee {
*/
def mapFlatten[From] = new MapFlatten[From] {
def apply[To](f: From => Enumerator[To])(implicit ec: ExecutionContext) = new CheckDone[From, To] {
val pec = ec.prepare()

def step[A](k: K[To, A]): K[From, Iteratee[To, A]] = {
case Input.El(e) =>
new CheckDone[From, To] { def continue[A](k: K[To, A]) = Cont(step(k)) } &> Iteratee.flatten(Future(f(e))(pec).flatMap(_.apply(Cont(k)))(dec))
new CheckDone[From, To] { def continue[A](k: K[To, A]) = Cont(step(k)) } &> Iteratee.flatten(Future(f(e))(ec).flatMap(_.apply(Cont(k)))(dec))

case Input.Empty =>
new CheckDone[From, To] { def continue[A](k: K[To, A]) = Cont(step(k)) } &> k(Input.Empty)
Expand Down Expand Up @@ -308,11 +304,9 @@ object Enumeratee {
*/
def mapInputFlatten[From] = new MapInputFlatten[From] {
def apply[To](f: Input[From] => Enumerator[To])(implicit ec: ExecutionContext) = new CheckDone[From, To] {
val pec = ec.prepare()

def step[A](k: K[To, A]): K[From, Iteratee[To, A]] = {
case in =>
new CheckDone[From, To] { def continue[A](k: K[To, A]) = Cont(step(k)) } &> Iteratee.flatten(Future(f(in))(pec).flatMap(_.apply(Cont(k)))(dec))
new CheckDone[From, To] { def continue[A](k: K[To, A]) = Cont(step(k)) } &> Iteratee.flatten(Future(f(in))(ec).flatMap(_.apply(Cont(k)))(dec))
}

def continue[A](k: K[To, A]) = Cont(step(k))
Expand All @@ -337,11 +331,9 @@ object Enumeratee {
*/
def mapInputM[From] = new MapInputM[From] {
def apply[To](f: Input[From] => Future[Input[To]])(implicit ec: ExecutionContext) = new CheckDone[From, To] {
val pec = ec.prepare()

def step[A](k: K[To, A]): K[From, Iteratee[To, A]] = {
case in @ (Input.El(_) | Input.Empty) =>
new CheckDone[From, To] { def continue[A](k: K[To, A]) = Cont(step(k)) } &> Iteratee.flatten(executeFuture(f(in))(pec).map(k(_))(dec))
new CheckDone[From, To] { def continue[A](k: K[To, A]) = Cont(step(k)) } &> Iteratee.flatten(executeFuture(f(in))(ec).map(k(_))(dec))

case Input.EOF => Done(Cont(k), Input.EOF)
}
Expand Down Expand Up @@ -510,11 +502,9 @@ object Enumeratee {
* $paramEcSingle
*/
def filter[E](predicate: E => Boolean)(implicit ec: ExecutionContext): Enumeratee[E, E] = new CheckDone[E, E] {
val pec = ec.prepare()

def step[A](k: K[E, A]): K[E, Iteratee[E, A]] = {

case in @ Input.El(e) => Iteratee.flatten(Future(predicate(e))(pec).map { b =>
case in @ Input.El(e) => Iteratee.flatten(Future(predicate(e))(ec).map { b =>
if (b) (new CheckDone[E, E] { def continue[A](k: K[E, A]) = Cont(step(k)) } &> k(in)) else Cont(step(k))
}(dec))

Expand Down Expand Up @@ -556,8 +546,6 @@ object Enumeratee {
*/
def collect[From] = new Collect[From] {
def apply[To](transformer: PartialFunction[From, To])(implicit ec: ExecutionContext): Enumeratee[From, To] = new CheckDone[From, To] {
val pec = ec.prepare()

def step[A](k: K[To, A]): K[From, Iteratee[To, A]] = {

case in @ Input.El(e) => Iteratee.flatten(Future {
Expand All @@ -566,7 +554,7 @@ object Enumeratee {
} else {
Cont(step(k))
}
}(pec))
}(ec))

case Input.Empty =>
new CheckDone[From, To] { def continue[A](k: K[To, A]) = Cont(step(k)) } &> k(Input.Empty)
Expand Down Expand Up @@ -607,12 +595,11 @@ object Enumeratee {
* $paramEcSingle
*/
def dropWhile[E](p: E => Boolean)(implicit ec: ExecutionContext): Enumeratee[E, E] = {
val pec = ec.prepare()
new CheckDone[E, E] {

def step[A](k: K[E, A]): K[E, Iteratee[E, A]] = {

case in @ Input.El(e) => Iteratee.flatten(Future(p(e))(pec).map {
case in @ Input.El(e) => Iteratee.flatten(Future(p(e))(ec).map {
b => if (b) Cont(step(k)) else (passAlong[E] &> k(in))
}(dec))

Expand All @@ -635,12 +622,11 @@ object Enumeratee {
* $paramEcSingle
*/
def takeWhile[E](p: E => Boolean)(implicit ec: ExecutionContext): Enumeratee[E, E] = {
val pec = ec.prepare()
new CheckDone[E, E] {

def step[A](k: K[E, A]): K[E, Iteratee[E, A]] = {

case in @ Input.El(e) => Iteratee.flatten(Future(p(e))(pec).map {
case in @ Input.El(e) => Iteratee.flatten(Future(p(e))(ec).map {
b => if (b) (new CheckDone[E, E] { def continue[A](k: K[E, A]) = Cont(step(k)) } &> k(in)) else Done(Cont(k), in)
}(dec))

Expand All @@ -663,10 +649,9 @@ object Enumeratee {
* $paramEcSingle
*/
def breakE[E](p: E => Boolean)(implicit ec: ExecutionContext) = new Enumeratee[E, E] {
val pec = ec.prepare()
def applyOn[A](inner: Iteratee[E, A]): Iteratee[E, Iteratee[E, A]] = {
def step(inner: Iteratee[E, A])(in: Input[E]): Iteratee[E, Iteratee[E, A]] = in match {
case Input.El(e) => Iteratee.flatten(Future(p(e))(pec).map(b => if (b) Done(inner, in) else stepNoBreak(inner)(in))(dec))
case Input.El(e) => Iteratee.flatten(Future(p(e))(ec).map(b => if (b) Done(inner, in) else stepNoBreak(inner)(in))(dec))
case _ => stepNoBreak(inner)(in)
}
def stepNoBreak(inner: Iteratee[E, A])(in: Input[E]): Iteratee[E, Iteratee[E, A]] =
Expand Down Expand Up @@ -723,10 +708,7 @@ object Enumeratee {
* $paramEcSingle
*/
def onIterateeDone[E](action: () => Unit)(implicit ec: ExecutionContext): Enumeratee[E, E] = new Enumeratee[E, E] {
val pec = ec.prepare()

def applyOn[A](iteratee: Iteratee[E, A]): Iteratee[E, Iteratee[E, A]] = passAlong[E](iteratee).map(_.map { a => action(); a }(pec))(dec)

def applyOn[A](iteratee: Iteratee[E, A]): Iteratee[E, Iteratee[E, A]] = passAlong[E](iteratee).map(_.map { a => action(); a }(ec))(dec)
}

/**
Expand All @@ -736,12 +718,10 @@ object Enumeratee {
* $paramEcSingle
*/
def onEOF[E](action: () => Unit)(implicit ec: ExecutionContext): Enumeratee[E, E] = new CheckDone[E, E] {
val pec = ec.prepare()

def step[A](k: K[E, A]): K[E, Iteratee[E, A]] = {

case Input.EOF =>
Iteratee.flatten(Future(action())(pec).map(_ => Done[E, Iteratee[E, A]](Cont(k), Input.EOF))(dec))
Iteratee.flatten(Future(action())(ec).map(_ => Done[E, Iteratee[E, A]](Cont(k), Input.EOF))(dec))

case in =>
new CheckDone[E, E] { def continue[A](k: K[E, A]) = Cont(step(k)) } &> k(in)
Expand Down Expand Up @@ -769,7 +749,6 @@ object Enumeratee {
* $paramEcSingle
*/
def recover[E](f: (Throwable, Input[E]) => Unit = (_: Throwable, _: Input[E]) => ())(implicit ec: ExecutionContext): Enumeratee[E, E] = {
val pec = ec.prepare()
new Enumeratee[E, E] {

def applyOn[A](it: Iteratee[E, A]): Iteratee[E, Iteratee[E, A]] = {
Expand All @@ -790,7 +769,7 @@ object Enumeratee {
case NonFatal(e) =>
f(e, in)
Cont(step(it))
})(pec)
})(ec)
Iteratee.flatten(next)
case Input.EOF =>
Done(it, Input.Empty)
Expand Down
Loading