diff --git a/build.sbt b/build.sbt index c6c10b0..de0873a 100644 --- a/build.sbt +++ b/build.sbt @@ -11,6 +11,11 @@ crossScalaVersions := Seq("2.12.7") conflictManager := ConflictManager.strict +val JunitVersion = "4.12" +val ScalaTestVersion = "3.0.5" +val LogbackVersion = "1.2.3" +val CuratorVersion = "4.2.0" + val customScalacOptions = Seq( "-unchecked", "-deprecation", @@ -23,8 +28,8 @@ val customScalacOptions = Seq( ) val customDependencies = Seq( - "junit" % "junit" % "4.12" % Test, - "org.scalatest" %% "scalatest" % "3.0.5" % Test + "junit" % "junit" % JunitVersion % Test, + "org.scalatest" %% "scalatest" % ScalaTestVersion % Test ) lazy val root = (project in file(".")) @@ -43,9 +48,11 @@ lazy val zookeeper = (project in file("zookeeper")) name := "cockpit-zookeeper", scalacOptions ++= customScalacOptions, libraryDependencies ++= customDependencies ++ Seq( - "org.apache.curator" % "curator-framework" % "4.2.0" + "ch.qos.logback" % "logback-classic" % LogbackVersion, + "org.apache.curator" % "curator-framework" % CuratorVersion, + "org.apache.curator" % "curator-test" % CuratorVersion % Test, ) - ).dependsOn(core % "compile->compile,test->test") + ).dependsOn(core % "compile->compile;test->test") lazy val akkaHttp = (project in file("akka-http")) .settings( @@ -54,6 +61,5 @@ lazy val akkaHttp = (project in file("akka-http")) libraryDependencies ++= customDependencies ) -//coverageEnabled := true - +// TODO coverageEnabled := true // TODO scalastyle \ No newline at end of file diff --git a/core/src/main/scala/ru/alesavin/cockpit/impl/HoldersDesk.scala b/core/src/main/scala/ru/alesavin/cockpit/impl/HoldersDesk.scala index f275bc1..f70caf7 100644 --- a/core/src/main/scala/ru/alesavin/cockpit/impl/HoldersDesk.scala +++ b/core/src/main/scala/ru/alesavin/cockpit/impl/HoldersDesk.scala @@ -44,7 +44,7 @@ abstract class HoldersDesk[F[_]](ft: ControlTypes, init: String): Control[V] protected def updateInner(name: String) - (current2next: String => String): F[Unit] + (updater: String => String): F[Unit] } object HoldersDesk { diff --git a/core/src/main/scala/ru/alesavin/cockpit/impl/InMemoryDesk.scala b/core/src/main/scala/ru/alesavin/cockpit/impl/InMemoryDesk.scala index ccdc806..bf394b0 100644 --- a/core/src/main/scala/ru/alesavin/cockpit/impl/InMemoryDesk.scala +++ b/core/src/main/scala/ru/alesavin/cockpit/impl/InMemoryDesk.scala @@ -44,10 +44,10 @@ class InMemoryDesk(ft: ControlTypes, } override protected def updateInner(name: String) - (current2next: String => String): Future[Unit] = + (updater: String => String): Future[Unit] = synchronized { Future.fromTry(Try { val old = featureMap(name) - val n = current2next(old) + val n = updater(old) featureMap.update(name, n) })} } diff --git a/core/src/main/scala/ru/alesavin/cockpit/model/Control.scala b/core/src/main/scala/ru/alesavin/cockpit/model/Control.scala index 716cde4..077a90a 100644 --- a/core/src/main/scala/ru/alesavin/cockpit/model/Control.scala +++ b/core/src/main/scala/ru/alesavin/cockpit/model/Control.scala @@ -18,6 +18,8 @@ trait Control[V] { override def hashCode: Int = name.hashCode + + override def toString: String = s"Control($name)" } object Control { diff --git a/core/src/test/scala/ru/alesavin/cockpit/impl/DeskSpecBase.scala b/core/src/test/scala/ru/alesavin/cockpit/impl/DeskSpecBase.scala index 9b4bfa4..c46d1a5 100644 --- a/core/src/test/scala/ru/alesavin/cockpit/impl/DeskSpecBase.scala +++ b/core/src/test/scala/ru/alesavin/cockpit/impl/DeskSpecBase.scala @@ -102,7 +102,7 @@ trait DeskSpecBase d.register(name, 122) d.delete(name).futureValue shouldBe true - d.list.futureValue.isEmpty shouldBe true + d.list.futureValue shouldBe empty } "return false on calling delete for nonexistent feature" in { val d = desk() diff --git a/zookeeper/src/main/scala/ru/alesavin/cockpit/impl/CachedStorage.scala b/zookeeper/src/main/scala/ru/alesavin/cockpit/impl/CachedStorage.scala new file mode 100644 index 0000000..6549ea9 --- /dev/null +++ b/zookeeper/src/main/scala/ru/alesavin/cockpit/impl/CachedStorage.scala @@ -0,0 +1,40 @@ +package ru.alesavin.cockpit.impl + +import com.google.common.cache.CacheBuilder + +import scala.concurrent.duration.FiniteDuration +import scala.util.{Success, Try} + +/** + * Caching mix-in for [[Storage]] + * + * @author alesavin + */ +trait CachedStorage extends Storage { + + def duration: FiniteDuration + + private val Cache: com.google.common.cache.Cache[String, String] = + CacheBuilder.newBuilder() + .expireAfterWrite(duration.length, duration.unit) + .build() + + abstract override def get(key: String): Try[Option[String]] = + for { + cached <- Try(Option(Cache.getIfPresent(key))) + r <- cached match { + case s@Some(_) => Success(s) + case _ => super.get(key).map { v => + v.foreach(Cache.put(key, _)) + v + } + } + } yield r + + + abstract override def set(key: String, value: String): Try[Unit] = + for { + _ <- Try(Cache.invalidate(key)) + r <- super.set(key, value) + } yield r +} diff --git a/zookeeper/src/main/scala/ru/alesavin/cockpit/impl/Storage.scala b/zookeeper/src/main/scala/ru/alesavin/cockpit/impl/Storage.scala new file mode 100644 index 0000000..72fe98f --- /dev/null +++ b/zookeeper/src/main/scala/ru/alesavin/cockpit/impl/Storage.scala @@ -0,0 +1,17 @@ +package ru.alesavin.cockpit.impl + +import scala.util.Try + +/** + * Provide simple interface to storage + * TODO move to core + * + * @author alesavin + */ +trait Storage { + + def keys: Try[Iterable[String]] // TODO Curator Async => Future + def get(key: String): Try[Option[String]] + def set(key: String, value: String): Try[Unit] + def remove(key: String): Try[Boolean] +} diff --git a/zookeeper/src/main/scala/ru/alesavin/cockpit/impl/ZookeeperDesk.scala b/zookeeper/src/main/scala/ru/alesavin/cockpit/impl/ZookeeperDesk.scala new file mode 100644 index 0000000..534f195 --- /dev/null +++ b/zookeeper/src/main/scala/ru/alesavin/cockpit/impl/ZookeeperDesk.scala @@ -0,0 +1,78 @@ +package ru.alesavin.cockpit.impl + +import java.util.NoSuchElementException + +import org.apache.curator.framework.CuratorFramework +import ru.alesavin.cockpit.impl.HoldersDesk.HolderControlType +import ru.alesavin.cockpit.impl.ZookeeperDesk.registrationLock +import ru.alesavin.cockpit.model.{Control, ControlType, ControlTypes, Holder} + +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.util.{Failure, Success, Try} + +/** + * Impl of [[ru.alesavin.cockpit.model.Desk]] over Zookeeper + * + * @author alesavin + */ +class ZookeeperDesk(client: CuratorFramework, + baseZkPath: String, + ft: ControlTypes, + hType: ControlType[Holder] = HolderControlType, + cacheDuration: FiniteDuration = 30.seconds) + extends HoldersDesk[Future](ft, hType) { + + private val zkStorage = + new ZookeeperStorage(client, baseZkPath) + with CachedStorage { + override def duration: FiniteDuration = cacheDuration + } + + override def list: Future[Seq[Control[Holder]]] = + Future.fromTry( + for { + keys <- zkStorage.keys + result <- keys.foldLeft(Success(Seq.empty) : Try[Seq[Control[Holder]]]) { + case (f@Failure(_), _) => f + case (Success(s), k) => + val nc = for { + optV <- zkStorage.get(k) + v <- Try(optV.getOrElse(throw new NoSuchElementException(s"No $k"))) + } yield Control(k, _ => hType.from(v).get) + nc.map(ch => s :+ ch) + } + } yield result + ) + + override def delete(name: String): Future[Boolean] = + Future.fromTry(zkStorage.remove(name)) + + override protected def registerInner[V](name: String, + init: String): Control[V] = + registrationLock.synchronized { // do registrations sequential due of zk performance + (for { + exist <- zkStorage.get(name) + _ <- exist match { + case None => zkStorage.set(name, init) + case _ => Success(()) + } + } yield Control[V](name, n => decode[V](zkStorage.get(n).get.get))) // TODO + .get + } + + protected def updateInner(name: String) + (updater: String => String): Future[Unit] = + Future.fromTry { + for { + optExist <- zkStorage.get(name) + exist <- Try(optExist.getOrElse(throw new NoSuchElementException(s"No $name"))) + _ <- zkStorage.set(name, updater(exist)) + } yield () + } +} + +object ZookeeperDesk { + + private val registrationLock = new Object +} diff --git a/zookeeper/src/main/scala/ru/alesavin/cockpit/impl/ZookeeperStorage.scala b/zookeeper/src/main/scala/ru/alesavin/cockpit/impl/ZookeeperStorage.scala new file mode 100644 index 0000000..fb65fd1 --- /dev/null +++ b/zookeeper/src/main/scala/ru/alesavin/cockpit/impl/ZookeeperStorage.scala @@ -0,0 +1,61 @@ +package ru.alesavin.cockpit.impl + +import org.apache.curator.framework.CuratorFramework +import org.apache.curator.utils.ZKPaths +import org.apache.zookeeper.{CreateMode, KeeperException} +import ru.alesavin.cockpit.impl.ZookeeperStorage._ + +import scala.util.Try +import scala.collection.JavaConverters._ + + +/** + * Impl of [[Storage]] over Zookeeper + * + * @author alesavin + */ +class ZookeeperStorage(client: CuratorFramework, + baseZkPath: String) extends Storage { + + require(baseZkPath.nonEmpty, "Empty zookeeper path") + + override def keys: Try[Iterable[String]] = + Try(client.getChildren.forPath(baseZkPath).asScala) + + override def get(key: String): Try[Option[String]] = + Try { + Option(client.getData.forPath(ZKPaths.makePath(baseZkPath, key))).map(to) + }.recover { + case _: KeeperException.NoNodeException => None + } + + override def set(key: String, value: String): Try[Unit] = + Try { + client + .create() + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath(ZKPaths.makePath(baseZkPath, key), from(value)) + () + }.recover { + case _: KeeperException.NodeExistsException => + client + .setData() + .forPath(ZKPaths.makePath(baseZkPath, key), from(value)) + () + } + + override def remove(key: String): Try[Boolean] = + Try { + client.delete().forPath(ZKPaths.makePath(baseZkPath, key)) + true + }.recover { + case _: KeeperException.NoNodeException => false + } +} + +object ZookeeperStorage { + + def to(data: Array[Byte]): String = new String(data, "UTF-8") + def from(data: String): Array[Byte] = data.getBytes("UTF-8") +} \ No newline at end of file diff --git a/zookeeper/src/test/resources/logback-test.xml b/zookeeper/src/test/resources/logback-test.xml new file mode 100644 index 0000000..99a16b2 --- /dev/null +++ b/zookeeper/src/test/resources/logback-test.xml @@ -0,0 +1,12 @@ + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + \ No newline at end of file diff --git a/zookeeper/src/test/scala/ru/alesavin/cockpit/impl/ZookeeperDeskSpec.scala b/zookeeper/src/test/scala/ru/alesavin/cockpit/impl/ZookeeperDeskSpec.scala new file mode 100644 index 0000000..1301cbf --- /dev/null +++ b/zookeeper/src/test/scala/ru/alesavin/cockpit/impl/ZookeeperDeskSpec.scala @@ -0,0 +1,34 @@ +package ru.alesavin.cockpit.impl + +import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory} +import org.apache.curator.retry.ExponentialBackoffRetry +import org.apache.curator.test.TestingCluster +import ru.alesavin.cockpit.model.{ControlTypes, Desk} + +import scala.concurrent.Future + +/** + * Specd on [[ZookeeperDesk]] + * + * @author alesavin + */ +class ZookeeperDeskSpec + extends DeskSpecBase { + + def desk(ft: ControlTypes): Desk[Future] = { + val zkCluster = new TestingCluster(3) + zkCluster.start() + + val Curator: CuratorFramework = { + val curatorFramework = CuratorFrameworkFactory.newClient( + zkCluster.getConnectString, + new ExponentialBackoffRetry(100, 3)) + curatorFramework.start() + curatorFramework + } + val BasePath = "/test" + Curator.create().creatingParentsIfNeeded().forPath(BasePath) + + new ZookeeperDesk(Curator, BasePath, ft) + } +} diff --git a/zookeeper/src/test/scala/ru/alesavin/cockpit/impl/ZookeeperStorageSpec.scala b/zookeeper/src/test/scala/ru/alesavin/cockpit/impl/ZookeeperStorageSpec.scala new file mode 100644 index 0000000..167f859 --- /dev/null +++ b/zookeeper/src/test/scala/ru/alesavin/cockpit/impl/ZookeeperStorageSpec.scala @@ -0,0 +1,100 @@ +package ru.alesavin.cockpit.impl + +import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory} +import org.apache.curator.retry.ExponentialBackoffRetry +import org.apache.zookeeper.KeeperException +import org.scalatest.{Matchers, WordSpec} + +import scala.util.{Failure, Success} + +/** + * Specs on [[ZookeeperStorage]] + * + * @author alesavin + */ +class ZookeeperStorageSpec + extends WordSpec + with Matchers { + + import org.apache.curator.test.TestingCluster + + private val zkCluster = new TestingCluster(3) + zkCluster.start() + + "ZookeeperStorage" should { + + val Curator: CuratorFramework = { + val curatorFramework = CuratorFrameworkFactory.newClient( + zkCluster.getConnectString, + new ExponentialBackoffRetry(100, 3)) + curatorFramework.start() + curatorFramework + } + val BasePath = "/test" + val zkStorage = + new ZookeeperStorage(Curator, BasePath) + + "fail if have no base path" in { + zkStorage.keys match { + case Failure(_ : KeeperException.NoNodeException) => info("Done") + case other => fail(s"Unexpected $other") + } + } + "list keys" in { + Curator.create().creatingParentsIfNeeded().forPath(BasePath) + zkStorage.keys match { + case Success(ks) if ks.isEmpty => info("Done") + case other => fail(s"Unexpected $other") + } + } + "get non-exist key" in { + zkStorage.get("k") match { + case Success(None) => info("Done") + case other => fail(s"Unexpected $other") + } + } + "set data for new key" in { + zkStorage.set("k", "v") match { + case Success(()) => info("Done") + case other => fail(s"Unexpected $other") + } + zkStorage.get("k") match { + case Success(Some("v")) => info("Done") + case other => fail(s"Unexpected $other") + } + } + "update data for key" in { + zkStorage.get("k") match { + case Success(Some("v")) => info("Done") + case other => fail(s"Unexpected $other") + } + zkStorage.set("k", "v2") match { + case Success(()) => info("Done") + case other => fail(s"Unexpected $other") + } + zkStorage.get("k") match { + case Success(Some("v2")) => info("Done") + case other => fail(s"Unexpected $other") + } + } + "remove keys" in { + zkStorage.remove("k2") match { + case Success(false) => info("Done") + case other => fail(s"Unexpected $other") + } + zkStorage.get("k") match { + case Success(Some("v2")) => info("Done") + case other => fail(s"Unexpected $other") + } + zkStorage.remove("k") match { + case Success(true) => info("Done") + case other => fail(s"Unexpected $other") + } + zkStorage.get("k") match { + case Success(None) => info("Done") + case other => fail(s"Unexpected $other") + } + } + } +} +