diff --git a/.travis.yml b/.travis.yml index 2a5359f..02768d7 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,8 +3,8 @@ dist: trusty sudo: false group: beta scala: -- 2.12.1 -- 2.11.8 +- 2.12.8 +- 2.11.12 jdk: - oraclejdk8 cache: diff --git a/build.sbt b/build.sbt index a1e3176..737d84d 100644 --- a/build.sbt +++ b/build.sbt @@ -2,7 +2,7 @@ import com.typesafe.sbt.SbtScalariform.ScalariformKeys import interplay.ScalaVersions._ import scalariform.formatter.preferences._ -val specsVersion = "3.8.9" +val specsVersion = "4.3.6" val specsBuild = Seq( "specs2-core", "specs2-junit", @@ -27,7 +27,7 @@ lazy val `play-iteratees` = project scalaVersion := scala212, crossScalaVersions := Seq(scala212, scala211), libraryDependencies ++= Seq( - "org.scala-stm" %% "scala-stm" % "0.8" + "org.scala-stm" %% "scala-stm" % "0.9" ) ++ specsBuild.map(_ % Test) ) @@ -39,7 +39,7 @@ lazy val `play-iteratees-reactive-streams` = project scalaVersion := scala212, crossScalaVersions := Seq(scala212, scala211), libraryDependencies ++= Seq( - "org.reactivestreams" % "reactive-streams" % "1.0.0" + "org.reactivestreams" % "reactive-streams" % "1.0.2" ) ++ specsBuild.map(_ % Test) ).dependsOn(`play-iteratees`) @@ -50,5 +50,4 @@ lazy val `play-iteratees-root` = (project in file(".")) scalaVersion := scala212, crossScalaVersions := Seq(scala212, scala211) ) - playBuildRepoName in ThisBuild := "play-iteratees" diff --git a/iteratees/src/main/scala/play/api/libs/iteratee/Concurrent.scala b/iteratees/src/main/scala/play/api/libs/iteratee/Concurrent.scala index 92e0dd0..b935813 100644 --- a/iteratees/src/main/scala/play/api/libs/iteratee/Concurrent.scala +++ b/iteratees/src/main/scala/play/api/libs/iteratee/Concurrent.scala @@ -287,8 +287,6 @@ object Concurrent { * $paramEcSingle */ def buffer[E](maxBuffer: Int, length: Input[E] => Int)(implicit ec: ExecutionContext): Enumeratee[E, E] = new Enumeratee[E, E] { - val pec = ec.prepare() - import scala.collection.immutable.Queue import scala.concurrent.stm._ import play.api.libs.iteratee.Enumeratee.CheckDone @@ -322,7 +320,7 @@ object Concurrent { Iteratee.flatten(last.future) case other => - Iteratee.flatten(Future(length(other))(pec).map { chunkLength => + Iteratee.flatten(Future(length(other))(ec).map { chunkLength => val s = state.single.getAndTransform { case Queueing(q, l) if maxBuffer > 0 && l <= maxBuffer => Queueing(q.enqueue(other), l + chunkLength) @@ -446,8 +444,6 @@ object Concurrent { onComplete: => Unit = (), onError: (String, Input[E]) => Unit = (_: String, _: Input[E]) => () )(implicit ec: ExecutionContext) = new Enumerator[E] { - implicit val pec = ec.prepare() - import scala.concurrent.stm.Ref def apply[A](it: Iteratee[E, A]): Future[Iteratee[E, A]] = { @@ -491,13 +487,13 @@ object Concurrent { val next = k(item) next.fold { case Step.Done(a, in) => { - Future(onComplete)(pec).map { _ => + Future(onComplete)(ec).map { _ => promise.success(next) None }(dec) } case Step.Error(msg, e) => - Future(onError(msg, e))(pec).map { _ => + Future(onError(msg, e))(ec).map { _ => promise.success(next) None }(dec) @@ -512,7 +508,7 @@ object Concurrent { }(dec) } } - Future(onStart(pushee))(pec).flatMap(_ => promise.future)(dec) + Future(onStart(pushee))(ec).flatMap(_ => promise.future)(dec) } } @@ -529,8 +525,7 @@ object Concurrent { * input, and the broadcaster. */ def broadcast[E](e: Enumerator[E], interestIsDownToZero: Broadcaster => Unit = _ => ())(implicit ec: ExecutionContext): (Enumerator[E], Broadcaster) = { - val pec = ec.prepare() - lazy val h: Hub[E] = hub(e, () => interestIsDownToZero(h))(pec) + lazy val h: Hub[E] = hub(e, () => interestIsDownToZero(h))(ec) (h.getPatchCord(), h) } @@ -562,8 +557,6 @@ object Concurrent { } private def hub[E](e: Enumerator[E], interestIsDownToZero: () => Unit = () => ())(implicit ec: ExecutionContext): Hub[E] = { - val pec = ec.prepare() - import scala.concurrent.stm._ val iteratees: Ref[List[(Iteratee[E, _], Promise[Iteratee[E, _]])]] = Ref(List()) @@ -621,23 +614,23 @@ object Concurrent { } def result(): Iteratee[E, Unit] = if (in == Input.EOF || closeFlag) Done((), Input.Empty) else Cont(step) - if (downToZero) Future(interestIsDownToZero())(pec).map(_ => result())(dec) else Future.successful(result()) + if (downToZero) Future(interestIsDownToZero())(ec).map(_ => result())(dec) else Future.successful(result()) }(dec)) } new Hub[E] { - def noCords() = iteratees.single().isEmpty + def noCords(): Boolean = iteratees.single().isEmpty def close(): Unit = { closeFlag = true } - def closed() = closeFlag + def closed(): Boolean = closeFlag val redeemed = Ref(None: Option[Try[Iteratee[E, Unit]]]) - def getPatchCord() = new Enumerator[E] { + def getPatchCord(): Enumerator[E] = new Enumerator[E] { def apply[A](it: Iteratee[E, A]): Future[Iteratee[E, A]] = { val result = Promise[Iteratee[E, A]]() @@ -707,8 +700,6 @@ object Concurrent { * $paramEcSingle */ def patchPanel[E](patcher: PatchPanel[E] => Unit)(implicit ec: ExecutionContext): Enumerator[E] = new Enumerator[E] { - val pec = ec.prepare() - import scala.concurrent.stm._ def apply[A](it: Iteratee[E, A]): Future[Iteratee[E, A]] = { @@ -788,7 +779,7 @@ object Concurrent { false }) } - }))(pec).flatMap(_ => result.future)(dec) + }))(ec).flatMap(_ => result.future)(dec) } } @@ -819,8 +810,9 @@ object Concurrent { } case err => folder(err) }(ec) - toReturn.onFailure { - case e => doneIteratee.failure(e) + toReturn.onComplete { + case Failure(e) => doneIteratee.failure(e) + case _ => }(dec) toReturn } @@ -848,10 +840,11 @@ object Concurrent { val (consumeRemaining, remaining) = Concurrent.joined[E] result.success((a, remaining)) consumeRemaining - }(dec)).onFailure { - case e => result.tryFailure(e) + }(dec)).onComplete { + case Failure(e) => result.tryFailure(e) + case _ => }(dec) result.future } -} +} \ No newline at end of file diff --git a/iteratees/src/main/scala/play/api/libs/iteratee/Enumeratee.scala b/iteratees/src/main/scala/play/api/libs/iteratee/Enumeratee.scala index 1510bcb..1108193 100644 --- a/iteratees/src/main/scala/play/api/libs/iteratee/Enumeratee.scala +++ b/iteratees/src/main/scala/play/api/libs/iteratee/Enumeratee.scala @@ -129,7 +129,7 @@ object Enumeratee { * $paramEcSingle */ def zipWith[E, A, B, C](inner1: Iteratee[E, A], inner2: Iteratee[E, B])(zipper: (A, B) => C)(implicit ec: ExecutionContext): Iteratee[E, C] = { - val pec = ec.prepare() + val pec = ec import Execution.Implicits.{ defaultExecutionContext => ec } // Shadow ec to make this the only implicit EC in scope def getNext(it1: Iteratee[E, A], it2: Iteratee[E, B]): Iteratee[E, C] = { @@ -202,12 +202,10 @@ object Enumeratee { */ def mapInput[From] = new MapInput[From] { def apply[To](f: Input[From] => Input[To])(implicit ec: ExecutionContext) = new CheckDone[From, To] { - val pec = ec.prepare() - def step[A](k: K[To, A]): K[From, Iteratee[To, A]] = { case in @ (Input.El(_) | Input.Empty) => new CheckDone[From, To] { def continue[A](k: K[To, A]) = Cont(step(k)) - } &> Iteratee.flatten(Future(f(in))(pec).map(in => k(in))(dec)) + } &> Iteratee.flatten(Future(f(in))(ec).map(in => k(in))(dec)) case Input.EOF => Done(Cont(k), Input.EOF) } @@ -274,11 +272,9 @@ object Enumeratee { */ def mapFlatten[From] = new MapFlatten[From] { def apply[To](f: From => Enumerator[To])(implicit ec: ExecutionContext) = new CheckDone[From, To] { - val pec = ec.prepare() - def step[A](k: K[To, A]): K[From, Iteratee[To, A]] = { case Input.El(e) => - new CheckDone[From, To] { def continue[A](k: K[To, A]) = Cont(step(k)) } &> Iteratee.flatten(Future(f(e))(pec).flatMap(_.apply(Cont(k)))(dec)) + new CheckDone[From, To] { def continue[A](k: K[To, A]) = Cont(step(k)) } &> Iteratee.flatten(Future(f(e))(ec).flatMap(_.apply(Cont(k)))(dec)) case Input.Empty => new CheckDone[From, To] { def continue[A](k: K[To, A]) = Cont(step(k)) } &> k(Input.Empty) @@ -308,11 +304,9 @@ object Enumeratee { */ def mapInputFlatten[From] = new MapInputFlatten[From] { def apply[To](f: Input[From] => Enumerator[To])(implicit ec: ExecutionContext) = new CheckDone[From, To] { - val pec = ec.prepare() - def step[A](k: K[To, A]): K[From, Iteratee[To, A]] = { case in => - new CheckDone[From, To] { def continue[A](k: K[To, A]) = Cont(step(k)) } &> Iteratee.flatten(Future(f(in))(pec).flatMap(_.apply(Cont(k)))(dec)) + new CheckDone[From, To] { def continue[A](k: K[To, A]) = Cont(step(k)) } &> Iteratee.flatten(Future(f(in))(ec).flatMap(_.apply(Cont(k)))(dec)) } def continue[A](k: K[To, A]) = Cont(step(k)) @@ -337,11 +331,9 @@ object Enumeratee { */ def mapInputM[From] = new MapInputM[From] { def apply[To](f: Input[From] => Future[Input[To]])(implicit ec: ExecutionContext) = new CheckDone[From, To] { - val pec = ec.prepare() - def step[A](k: K[To, A]): K[From, Iteratee[To, A]] = { case in @ (Input.El(_) | Input.Empty) => - new CheckDone[From, To] { def continue[A](k: K[To, A]) = Cont(step(k)) } &> Iteratee.flatten(executeFuture(f(in))(pec).map(k(_))(dec)) + new CheckDone[From, To] { def continue[A](k: K[To, A]) = Cont(step(k)) } &> Iteratee.flatten(executeFuture(f(in))(ec).map(k(_))(dec)) case Input.EOF => Done(Cont(k), Input.EOF) } @@ -510,11 +502,9 @@ object Enumeratee { * $paramEcSingle */ def filter[E](predicate: E => Boolean)(implicit ec: ExecutionContext): Enumeratee[E, E] = new CheckDone[E, E] { - val pec = ec.prepare() - def step[A](k: K[E, A]): K[E, Iteratee[E, A]] = { - case in @ Input.El(e) => Iteratee.flatten(Future(predicate(e))(pec).map { b => + case in @ Input.El(e) => Iteratee.flatten(Future(predicate(e))(ec).map { b => if (b) (new CheckDone[E, E] { def continue[A](k: K[E, A]) = Cont(step(k)) } &> k(in)) else Cont(step(k)) }(dec)) @@ -556,8 +546,6 @@ object Enumeratee { */ def collect[From] = new Collect[From] { def apply[To](transformer: PartialFunction[From, To])(implicit ec: ExecutionContext): Enumeratee[From, To] = new CheckDone[From, To] { - val pec = ec.prepare() - def step[A](k: K[To, A]): K[From, Iteratee[To, A]] = { case in @ Input.El(e) => Iteratee.flatten(Future { @@ -566,7 +554,7 @@ object Enumeratee { } else { Cont(step(k)) } - }(pec)) + }(ec)) case Input.Empty => new CheckDone[From, To] { def continue[A](k: K[To, A]) = Cont(step(k)) } &> k(Input.Empty) @@ -607,12 +595,11 @@ object Enumeratee { * $paramEcSingle */ def dropWhile[E](p: E => Boolean)(implicit ec: ExecutionContext): Enumeratee[E, E] = { - val pec = ec.prepare() new CheckDone[E, E] { def step[A](k: K[E, A]): K[E, Iteratee[E, A]] = { - case in @ Input.El(e) => Iteratee.flatten(Future(p(e))(pec).map { + case in @ Input.El(e) => Iteratee.flatten(Future(p(e))(ec).map { b => if (b) Cont(step(k)) else (passAlong[E] &> k(in)) }(dec)) @@ -635,12 +622,11 @@ object Enumeratee { * $paramEcSingle */ def takeWhile[E](p: E => Boolean)(implicit ec: ExecutionContext): Enumeratee[E, E] = { - val pec = ec.prepare() new CheckDone[E, E] { def step[A](k: K[E, A]): K[E, Iteratee[E, A]] = { - case in @ Input.El(e) => Iteratee.flatten(Future(p(e))(pec).map { + case in @ Input.El(e) => Iteratee.flatten(Future(p(e))(ec).map { b => if (b) (new CheckDone[E, E] { def continue[A](k: K[E, A]) = Cont(step(k)) } &> k(in)) else Done(Cont(k), in) }(dec)) @@ -663,10 +649,9 @@ object Enumeratee { * $paramEcSingle */ def breakE[E](p: E => Boolean)(implicit ec: ExecutionContext) = new Enumeratee[E, E] { - val pec = ec.prepare() def applyOn[A](inner: Iteratee[E, A]): Iteratee[E, Iteratee[E, A]] = { def step(inner: Iteratee[E, A])(in: Input[E]): Iteratee[E, Iteratee[E, A]] = in match { - case Input.El(e) => Iteratee.flatten(Future(p(e))(pec).map(b => if (b) Done(inner, in) else stepNoBreak(inner)(in))(dec)) + case Input.El(e) => Iteratee.flatten(Future(p(e))(ec).map(b => if (b) Done(inner, in) else stepNoBreak(inner)(in))(dec)) case _ => stepNoBreak(inner)(in) } def stepNoBreak(inner: Iteratee[E, A])(in: Input[E]): Iteratee[E, Iteratee[E, A]] = @@ -723,10 +708,7 @@ object Enumeratee { * $paramEcSingle */ def onIterateeDone[E](action: () => Unit)(implicit ec: ExecutionContext): Enumeratee[E, E] = new Enumeratee[E, E] { - val pec = ec.prepare() - - def applyOn[A](iteratee: Iteratee[E, A]): Iteratee[E, Iteratee[E, A]] = passAlong[E](iteratee).map(_.map { a => action(); a }(pec))(dec) - + def applyOn[A](iteratee: Iteratee[E, A]): Iteratee[E, Iteratee[E, A]] = passAlong[E](iteratee).map(_.map { a => action(); a }(ec))(dec) } /** @@ -736,12 +718,10 @@ object Enumeratee { * $paramEcSingle */ def onEOF[E](action: () => Unit)(implicit ec: ExecutionContext): Enumeratee[E, E] = new CheckDone[E, E] { - val pec = ec.prepare() - def step[A](k: K[E, A]): K[E, Iteratee[E, A]] = { case Input.EOF => - Iteratee.flatten(Future(action())(pec).map(_ => Done[E, Iteratee[E, A]](Cont(k), Input.EOF))(dec)) + Iteratee.flatten(Future(action())(ec).map(_ => Done[E, Iteratee[E, A]](Cont(k), Input.EOF))(dec)) case in => new CheckDone[E, E] { def continue[A](k: K[E, A]) = Cont(step(k)) } &> k(in) @@ -769,7 +749,6 @@ object Enumeratee { * $paramEcSingle */ def recover[E](f: (Throwable, Input[E]) => Unit = (_: Throwable, _: Input[E]) => ())(implicit ec: ExecutionContext): Enumeratee[E, E] = { - val pec = ec.prepare() new Enumeratee[E, E] { def applyOn[A](it: Iteratee[E, A]): Iteratee[E, Iteratee[E, A]] = { @@ -790,7 +769,7 @@ object Enumeratee { case NonFatal(e) => f(e, in) Cont(step(it)) - })(pec) + })(ec) Iteratee.flatten(next) case Input.EOF => Done(it, Input.Empty) diff --git a/iteratees/src/main/scala/play/api/libs/iteratee/Enumerator.scala b/iteratees/src/main/scala/play/api/libs/iteratee/Enumerator.scala index 3032acf..112ad95 100644 --- a/iteratees/src/main/scala/play/api/libs/iteratee/Enumerator.scala +++ b/iteratees/src/main/scala/play/api/libs/iteratee/Enumerator.scala @@ -112,13 +112,11 @@ trait Enumerator[E] { * $paramEcSingle */ def onDoneEnumerating(callback: => Unit)(implicit ec: ExecutionContext): Enumerator[E] = new Enumerator[E] { - private val pec = ec.prepare() - def apply[A](it: Iteratee[E, A]): Future[Iteratee[E, A]] = parent.apply(it).andThen { case someTry => callback someTry.get - }(pec) + }(ec) } @@ -155,8 +153,9 @@ trait Enumerator[E] { * @return enumerator */ def flatMap[U](f: E => Enumerator[U])(implicit ec: ExecutionContext): Enumerator[U] = { - val pec = ec.prepare() + val pec = ec import Execution.Implicits.{ defaultExecutionContext => ec } // Shadow ec to make this the only implicit EC in scope + new Enumerator[U] { def apply[A](iteratee: Iteratee[U, A]): Future[Iteratee[U, A]] = { val folder = Iteratee.fold2[E, Iteratee[U, A]](iteratee) { (it, e) => @@ -366,10 +365,8 @@ object Enumerator { * $paramEcSingle */ def unfoldM[S, E](s: S)(f: S => Future[Option[(S, E)]])(implicit ec: ExecutionContext): Enumerator[E] = checkContinue1(s)(new TreatCont1[E, S] { - private val pec = ec.prepare() - def apply[A](loop: (Iteratee[E, A], S) => Future[Iteratee[E, A]], s: S, k: Input[E] => Iteratee[E, A]): Future[Iteratee[E, A]] = { - executeFuture(f(s))(pec).flatMap { + executeFuture(f(s))(ec).flatMap { case Some((newS, e)) => loop(k(Input.El(e)), newS) case None => Future.successful(Cont(k)) }(dec) @@ -394,9 +391,7 @@ object Enumerator { * $paramEcSingle */ def unfold[S, E](s: S)(f: S => Option[(S, E)])(implicit ec: ExecutionContext): Enumerator[E] = checkContinue1(s)(new TreatCont1[E, S] { - private val pec = ec.prepare() - - def apply[A](loop: (Iteratee[E, A], S) => Future[Iteratee[E, A]], s: S, k: Input[E] => Iteratee[E, A]): Future[Iteratee[E, A]] = Future(f(s))(pec).flatMap { + def apply[A](loop: (Iteratee[E, A], S) => Future[Iteratee[E, A]], s: S, k: Input[E] => Iteratee[E, A]): Future[Iteratee[E, A]] = Future(f(s))(ec).flatMap { case Some((s, e)) => loop(k(Input.El(e)), s) case None => Future.successful(Cont(k)) }(dec) @@ -409,10 +404,7 @@ object Enumerator { * $paramEcSingle */ def repeat[E](e: => E)(implicit ec: ExecutionContext): Enumerator[E] = checkContinue0(new TreatCont0[E] { - private val pec = ec.prepare() - - def apply[A](loop: Iteratee[E, A] => Future[Iteratee[E, A]], k: Input[E] => Iteratee[E, A]) = Future(e)(pec).flatMap(ee => loop(k(Input.El(ee))))(dec) - + def apply[A](loop: Iteratee[E, A] => Future[Iteratee[E, A]], k: Input[E] => Iteratee[E, A]) = Future(e)(ec).flatMap(ee => loop(k(Input.El(ee))))(dec) }) /** @@ -422,10 +414,7 @@ object Enumerator { * $paramEcSingle */ def repeatM[E](e: => Future[E])(implicit ec: ExecutionContext): Enumerator[E] = checkContinue0(new TreatCont0[E] { - private val pec = ec.prepare() - - def apply[A](loop: Iteratee[E, A] => Future[Iteratee[E, A]], k: Input[E] => Iteratee[E, A]) = executeFuture(e)(pec).flatMap(ee => loop(k(Input.El(ee))))(dec) - + def apply[A](loop: Iteratee[E, A] => Future[Iteratee[E, A]], k: Input[E] => Iteratee[E, A]) = executeFuture(e)(ec).flatMap(ee => loop(k(Input.El(ee))))(dec) }) /** @@ -436,9 +425,7 @@ object Enumerator { * future eventually redeemed with None if the end of the stream has been reached. */ def generateM[E](e: => Future[Option[E]])(implicit ec: ExecutionContext): Enumerator[E] = checkContinue0(new TreatCont0[E] { - private val pec = ec.prepare() - - def apply[A](loop: Iteratee[E, A] => Future[Iteratee[E, A]], k: Input[E] => Iteratee[E, A]) = executeFuture(e)(pec).flatMap { + def apply[A](loop: Iteratee[E, A] => Future[Iteratee[E, A]], k: Input[E] => Iteratee[E, A]) = executeFuture(e)(ec).flatMap { case Some(e) => loop(k(Input.El(e))) case None => Future.successful(Cont(k)) }(dec) @@ -499,7 +486,6 @@ object Enumerator { onComplete: () => Unit = () => (), onError: (String, Input[E]) => Unit = (_: String, _: Input[E]) => () )(implicit ec: ExecutionContext) = new Enumerator[E] { - private val pec = ec.prepare() def apply[A](it: Iteratee[E, A]): Future[Iteratee[E, A]] = { val iterateeP = Promise[Iteratee[E, A]]() @@ -508,7 +494,7 @@ object Enumerator { val next = it.fold { case Step.Cont(k) => { - executeFuture(retriever(initial))(pec).map { + executeFuture(retriever(initial))(ec).map { case None => { val remainingIteratee = k(Input.EOF) iterateeP.success(remainingIteratee) @@ -529,15 +515,16 @@ object Enumerator { Future.successful(None) }(dec) - next.onFailure { - case reason: Exception => + next.onComplete { + case Failure(reason) => onError(reason.getMessage(), Input.Empty) + case _ => }(dec) next.onComplete { case Success(Some(i)) => step(i) - case Success(None) => Future(onComplete())(pec) + case Success(None) => Future(onComplete())(ec) case Failure(e) => iterateeP.failure(e) }(dec) @@ -559,7 +546,6 @@ object Enumerator { * @param ec The ExecutionContext to execute blocking code. */ def fromStream(input: java.io.InputStream, chunkSize: Int = 1024 * 8)(implicit ec: ExecutionContext): Enumerator[Array[Byte]] = { - implicit val pec = ec.prepare() generateM({ val buffer = new Array[Byte](chunkSize) val bytesRead = blocking { input.read(buffer) } @@ -572,7 +558,7 @@ object Enumerator { Some(input) } Future.successful(chunk) - })(pec).onDoneEnumerating(input.close)(pec) + })(ec).onDoneEnumerating(input.close)(ec) } /** diff --git a/iteratees/src/main/scala/play/api/libs/iteratee/Iteratee.scala b/iteratees/src/main/scala/play/api/libs/iteratee/Iteratee.scala index 0558642..22b16df 100644 --- a/iteratees/src/main/scala/play/api/libs/iteratee/Iteratee.scala +++ b/iteratees/src/main/scala/play/api/libs/iteratee/Iteratee.scala @@ -4,7 +4,6 @@ package play.api.libs.iteratee import scala.concurrent.{ ExecutionContext, Future } -import scala.util.control.NonFatal import play.api.libs.iteratee.Execution.Implicits.{ defaultExecutionContext => dec } import play.api.libs.iteratee.internal.{ eagerFuture, executeFuture, executeIteratee, prepared } @@ -53,12 +52,11 @@ object Iteratee { * $paramEcSingle */ def foldM[E, A](state: A)(f: (A, E) => Future[A])(implicit ec: ExecutionContext): Iteratee[E, A] = { - val pec = ec.prepare() def step(s: A)(i: Input[E]): Iteratee[E, A] = i match { case Input.EOF => Done(s, Input.EOF) case Input.Empty => Cont[E, A](step(s)) - case Input.El(e) => { val newS = executeFuture(f(s, e))(pec); flatten(newS.map(s1 => Cont[E, A](step(s1)))(dec)) } + case Input.El(e) => { val newS = executeFuture(f(s, e))(ec); flatten(newS.map(s1 => Cont[E, A](step(s1)))(dec)) } } (Cont[E, A](step(state))) } @@ -72,12 +70,11 @@ object Iteratee { * $paramEcSingle */ def fold2[E, A](state: A)(f: (A, E) => Future[(A, Boolean)])(implicit ec: ExecutionContext): Iteratee[E, A] = { - val pec = ec.prepare() def step(s: A)(i: Input[E]): Iteratee[E, A] = i match { case Input.EOF => Done(s, Input.EOF) case Input.Empty => Cont[E, A](step(s)) - case Input.El(e) => { val newS = executeFuture(f(s, e))(pec); flatten(newS.map[Iteratee[E, A]] { case (s1, done) => if (!done) Cont[E, A](step(s1)) else Done(s1, Input.Empty) }(dec)) } + case Input.El(e) => { val newS = executeFuture(f(s, e))(ec); flatten(newS.map[Iteratee[E, A]] { case (s1, done) => if (!done) Cont[E, A](step(s1)) else Done(s1, Input.Empty) }(dec)) } } (Cont[E, A](step(state))) } @@ -93,7 +90,7 @@ object Iteratee { * $paramEcSingle */ def fold1[E, A](state: Future[A])(f: (A, E) => Future[A])(implicit ec: ExecutionContext): Iteratee[E, A] = { - prepared(ec)(pec => flatten(state.map(s => foldM(s)(f)(pec))(dec))) + prepared(ec)(pec => flatten(state.map(s => foldM(s)(f)(ec))(dec))) } /** @@ -545,8 +542,7 @@ trait Iteratee[E, +A] { case Step.Error(msg, e) => Error(msg, e) }(dec) case Step.Cont(k) => { - implicit val pec = ec.prepare() - Cont((in: Input[E]) => executeIteratee(k(in))(dec).flatMap(f)(pec)) + Cont((in: Input[E]) => executeIteratee(k(in))(dec).flatMap(f)(ec)) } case Step.Error(msg, e) => Error(msg, e) } @@ -572,10 +568,9 @@ trait Iteratee[E, +A] { * $paramEcSingle Note: input concatenation is performed in the iteratee default execution context, not in the user-supplied context. */ def flatMapTraversable[B, X](f: A => Iteratee[E, B])(implicit p: E => scala.collection.TraversableLike[X, E], bf: scala.collection.generic.CanBuildFrom[E, X, E], ec: ExecutionContext): Iteratee[E, B] = { - val pec = ec.prepare() self.pureFlatFold { case Step.Done(a, Input.Empty) => f(a) - case Step.Done(a, e) => executeIteratee(f(a))(pec).pureFlatFold { + case Step.Done(a, e) => executeIteratee(f(a))(ec).pureFlatFold { case Step.Done(a, eIn) => { val fullIn = (e, eIn) match { case (Input.Empty, in) => in @@ -590,7 +585,7 @@ trait Iteratee[E, +A] { case Step.Cont(k) => k(e) case Step.Error(msg, e) => Error(msg, e) }(dec) - case Step.Cont(k) => Cont((in: Input[E]) => k(in).flatMap(f)(pec)) + case Step.Cont(k) => Cont((in: Input[E]) => k(in).flatMap(f)(ec)) case Step.Error(msg, e) => Error(msg, e) }(dec) } @@ -629,8 +624,7 @@ trait Iteratee[E, +A] { * @return */ def recoverM[B >: A](pf: PartialFunction[Throwable, Future[B]])(implicit ec: ExecutionContext): Iteratee[E, B] = { - val pec = ec.prepare() - recoverWith { case t: Throwable if pf.isDefinedAt(t) => Iteratee.flatten(pf(t).map(b => Done[E, B](b))(pec)) }(ec) + recoverWith { case t: Throwable if pf.isDefinedAt(t) => Iteratee.flatten(pf(t).map(b => Done[E, B](b))(ec)) }(ec) } /** @@ -642,8 +636,6 @@ trait Iteratee[E, +A] { * @return */ def recoverWith[B >: A](pf: PartialFunction[Throwable, Iteratee[E, B]])(implicit ec: ExecutionContext): Iteratee[E, B] = { - implicit val pec = ec.prepare() - def recoveringIteratee(it: Iteratee[E, A]): Iteratee[E, B] = Iteratee.flatten(it.pureFold[Iteratee[E, B]] { case Step.Cont(k) => @@ -653,7 +645,7 @@ trait Iteratee[E, +A] { throw new IterateeException(msg) case done => done.it - }(dec).recover(pf)(pec)) + }(dec).recover(pf)(ec)) recoveringIteratee(this) } @@ -753,7 +745,7 @@ private final class FutureIteratee[E, A]( ) extends Iteratee[E, A] { def fold[B](folder: Step[E, A] => Future[B])(implicit ec: ExecutionContext): Future[B] = - itFut.flatMap { _.fold(folder)(ec.prepare()) }(dec) + itFut.flatMap { _.fold(folder)(ec) }(dec) } diff --git a/iteratees/src/main/scala/play/api/libs/iteratee/RunQueue.scala b/iteratees/src/main/scala/play/api/libs/iteratee/RunQueue.scala index 8fae15b..68de3cb 100644 --- a/iteratees/src/main/scala/play/api/libs/iteratee/RunQueue.scala +++ b/iteratees/src/main/scala/play/api/libs/iteratee/RunQueue.scala @@ -63,7 +63,7 @@ private[play] final class RunQueue { * The operation will execute in the given ExecutionContext. */ def schedule[A](body: => Future[A])(implicit ec: ExecutionContext): Unit = { - schedule(Op(() => body.asInstanceOf[Future[Unit]], ec.prepare)) + schedule(Op(() => body.asInstanceOf[Future[Unit]], ec)) } /** @@ -154,7 +154,7 @@ private object RunQueue { * A reified operation to be executed. * * @param thunk The logic to execute. - * @param ec The ExecutionContext to use for execution. Already prepared. + * @param ec The ExecutionContext to use for execution. */ final case class Op(thunk: () => Future[Unit], ec: ExecutionContext) diff --git a/iteratees/src/main/scala/play/api/libs/iteratee/TraversableIteratee.scala b/iteratees/src/main/scala/play/api/libs/iteratee/TraversableIteratee.scala index 486f793..9d0eb00 100644 --- a/iteratees/src/main/scala/play/api/libs/iteratee/TraversableIteratee.scala +++ b/iteratees/src/main/scala/play/api/libs/iteratee/TraversableIteratee.scala @@ -96,12 +96,10 @@ object Traversable { * $paramEcSingle */ def splitOnceAt[M, E](p: E => Boolean)(implicit traversableLike: M => scala.collection.TraversableLike[E, M], ec: ExecutionContext): Enumeratee[M, M] = new CheckDone[M, M] { - val pec = ec.prepare() - def step[A](k: K[M, A]): K[M, Iteratee[M, A]] = { case in @ Input.El(e) => - Iteratee.flatten(Future(e.span(p))(pec).map { + Iteratee.flatten(Future(e.span(p))(ec).map { case (prefix, suffix) if suffix.isEmpty => new CheckDone[M, M] { def continue[A](k: K[M, A]) = Cont(step(k)) } &> k(Input.El(prefix)) case (prefix, suffix) => Done(if (prefix.isEmpty) Cont(k) else k(Input.El(prefix)), Input.El(suffix.drop(1))) }(dec)) diff --git a/iteratees/src/main/scala/play/api/libs/iteratee/package.scala b/iteratees/src/main/scala/play/api/libs/iteratee/package.scala index 5af1641..16dc91a 100644 --- a/iteratees/src/main/scala/play/api/libs/iteratee/package.scala +++ b/iteratees/src/main/scala/play/api/libs/iteratee/package.scala @@ -51,10 +51,7 @@ package play.api.libs.iteratee { * def myFunc(implicit ec: ExecutionContext) = prepared(ec)(pec => ...) * }}} */ - def prepared[A](ec: ExecutionContext)(f: ExecutionContext => A): A = { - val pec = ec.prepare() - f(pec) - } + def prepared[A](ec: ExecutionContext)(f: ExecutionContext => A): A = f(ec) val identityFunc: (Any => Any) = (x: Any) => x } diff --git a/iteratees/src/test/scala/play/api/libs/iteratee/ConcurrentSpec.scala b/iteratees/src/test/scala/play/api/libs/iteratee/ConcurrentSpec.scala index a743884..3a0c8e1 100644 --- a/iteratees/src/test/scala/play/api/libs/iteratee/ConcurrentSpec.scala +++ b/iteratees/src/test/scala/play/api/libs/iteratee/ConcurrentSpec.scala @@ -111,11 +111,10 @@ object ConcurrentSpec extends Specification case in => throw new MatchError(in) // Shouldn't occur, but here to suppress compiler warning }, Duration(100, MILLISECONDS))) val fastEnumerator = Enumerator[Long](1, 2, 3, 4, 5, 6, 7, 8, 9, 10) >>> Enumerator.eof - val preparedMapEC = mapEC.prepare() val result = fastEnumerator |>>> (Concurrent.buffer(20) &>> - slowIteratee).flatMap { l => Iteratee.getChunks.map(l ++ (_: List[Long]))(preparedMapEC) }(flatMapEC) + slowIteratee).flatMap { l => Iteratee.getChunks.map(l ++ (_: List[Long]))(mapEC) }(flatMapEC) Await.result(result, Duration.Inf) must not equalTo (List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) flatMapEC.executionCount must beGreaterThan(0) diff --git a/iteratees/src/test/scala/play/api/libs/iteratee/ExecutionSpecification.scala b/iteratees/src/test/scala/play/api/libs/iteratee/ExecutionSpecification.scala index a943645..7ccab01 100644 --- a/iteratees/src/test/scala/play/api/libs/iteratee/ExecutionSpecification.scala +++ b/iteratees/src/test/scala/play/api/libs/iteratee/ExecutionSpecification.scala @@ -14,8 +14,7 @@ trait ExecutionSpecification { def testExecution[A](f: TestExecutionContext => A): A = { val ec = TestExecutionContext() - val result = ec.preparable(f(ec)) - result + f(ec) } def testExecution[A](f: (TestExecutionContext, TestExecutionContext) => A): A = { diff --git a/iteratees/src/test/scala/play/api/libs/iteratee/IterateesSpec.scala b/iteratees/src/test/scala/play/api/libs/iteratee/IterateesSpec.scala index d816dfb..fd1e161 100644 --- a/iteratees/src/test/scala/play/api/libs/iteratee/IterateesSpec.scala +++ b/iteratees/src/test/scala/play/api/libs/iteratee/IterateesSpec.scala @@ -37,8 +37,7 @@ object IterateesSpec extends Specification def checkFutureFoldFailure[A, E](i: Iteratee[A, E]) = { mustExecute(1, 1) { (foldEC, folderEC) => val e = new Exception("exception") - val preparedFolderEC = folderEC.prepare() - val result = ready(i.fold(_ => Future(throw e)(preparedFolderEC))(foldEC)) + val result = ready(i.fold(_ => Future(throw e)(folderEC))(foldEC)) result.value must equalTo(Some(Failure(e))) } } diff --git a/iteratees/src/test/scala/play/api/libs/iteratee/TestExecutionContext.scala b/iteratees/src/test/scala/play/api/libs/iteratee/TestExecutionContext.scala index e1920c4..2d7fb9d 100644 --- a/iteratees/src/test/scala/play/api/libs/iteratee/TestExecutionContext.scala +++ b/iteratees/src/test/scala/play/api/libs/iteratee/TestExecutionContext.scala @@ -20,43 +20,18 @@ object TestExecutionContext { * @param delegate The underlying `ExecutionContext` to delegate execution to. */ class TestExecutionContext(delegate: ExecutionContext) extends ExecutionContext { - top => val count = new java.util.concurrent.atomic.AtomicInteger() - val local = new ThreadLocal[java.lang.Boolean] - - def preparable[A](body: => A): A = { - local.set(true) - try body finally local.set(null) - } - def execute(runnable: Runnable): Unit = { - throw new RuntimeException("Cannot execute unprepared TestExecutionContext") + count.getAndIncrement() + delegate.execute(runnable) } def reportFailure(t: Throwable): Unit = { println(t) } - override def prepare(): ExecutionContext = { - val isLocal = Option(local.get()).getOrElse(false: java.lang.Boolean) - if (!isLocal) throw new RuntimeException("Can only prepare TestExecutionContext within 'preparable' scope") - val preparedDelegate = delegate.prepare() - return new ExecutionContext { - - def execute(runnable: Runnable): Unit = { - count.getAndIncrement() - preparedDelegate.execute(runnable) - } - - def reportFailure(t: Throwable): Unit = { - println(t) - } - - } - } - def executionCount: Int = count.get() } diff --git a/project/build.properties b/project/build.properties index 1fc4b80..c0bab04 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.2.8 \ No newline at end of file +sbt.version=1.2.8 diff --git a/project/plugins.sbt b/project/plugins.sbt index 1c63e2d..850599f 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -3,5 +3,3 @@ resolvers ++= DefaultOptions.resolvers(snapshot = true) addSbtPlugin("com.typesafe.play" % "interplay" % "2.0.5") addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.3.0") addSbtPlugin("org.scalariform" % "sbt-scalariform" % "1.8.2") - - diff --git a/streams/src/main/scala/play/api/libs/iteratee/streams/IterateeStreams.scala b/streams/src/main/scala/play/api/libs/iteratee/streams/IterateeStreams.scala index a549d3a..056c97b 100644 --- a/streams/src/main/scala/play/api/libs/iteratee/streams/IterateeStreams.scala +++ b/streams/src/main/scala/play/api/libs/iteratee/streams/IterateeStreams.scala @@ -113,7 +113,7 @@ object IterateeStreams { * by iterateeDoneToPublisher to extract the value of a Done iteratee. */ private def iterateeFoldToPublisher[T, U, V](iter: Iteratee[T, U], f: Step[T, U] => Future[V])(implicit ec: ExecutionContext): Publisher[V] = { - val fut: Future[V] = iter.fold(f)(ec.prepare) + val fut: Future[V] = iter.fold(f)(ec) val pubr: Publisher[V] = futureToPublisher(fut) pubr } diff --git a/streams/src/main/scala/play/api/libs/iteratee/streams/impl/SubscriberIteratee.scala b/streams/src/main/scala/play/api/libs/iteratee/streams/impl/SubscriberIteratee.scala index 87a5b0e..0d81931 100644 --- a/streams/src/main/scala/play/api/libs/iteratee/streams/impl/SubscriberIteratee.scala +++ b/streams/src/main/scala/play/api/libs/iteratee/streams/impl/SubscriberIteratee.scala @@ -55,13 +55,12 @@ private[streams] class SubscriberIteratee[T](subscriber: Subscriber[T]) extends def fold[B](folder: (Step[T, Unit]) => Future[B])(implicit ec: ExecutionContext): Future[B] = { val promise = Promise[B]() - val pec = ec.prepare() exclusive { case NotSubscribed => - state = awaitDemand(promise, folder, pec) + state = awaitDemand(promise, folder, ec) subscriber.onSubscribe(this) case NoDemand => - state = awaitDemand(promise, folder, pec) + state = awaitDemand(promise, folder, ec) case AwaitingDemand(_, _) => throw new IllegalStateException("fold invoked while already waiting for demand") case Demand(n) => @@ -70,9 +69,9 @@ private[streams] class SubscriberIteratee[T](subscriber: Subscriber[T]) extends } else { state = Demand(n - 1) } - demand(promise, folder, pec) + demand(promise, folder, ec) case Cancelled => - cancelled(promise, folder, pec) + cancelled(promise, folder, ec) } promise.future