From dd2ee869bb2a83127aea2158e55099b0d792f355 Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Wed, 14 Oct 2015 23:55:56 +0200 Subject: [PATCH 01/19] Experiment with #26 --- .../src/main/scala/stamina/Persister.scala | 21 ++++++++++---- .../src/main/scala/stamina/Persisters.scala | 29 +++++++++++++++---- .../scala/stamina/StaminaAkkaSerializer.scala | 25 ++++++++-------- .../src/main/scala/stamina/stamina.scala | 11 +++++-- .../test/scala/stamina/PersistersSpec.scala | 16 +++++----- .../stamina/StaminaAkkaSerializerSpec.scala | 18 ++++++------ .../scala/stamina/TestOnlyPersister.scala | 2 +- .../src/main/scala/stamina/json/json.scala | 10 ++++--- .../stamina/json/JsonPersisterSpec.scala | 12 ++++---- .../stamina/testkit/StaminaTestKit.scala | 4 +-- .../testkit/ScalatestTestGenerationSpec.scala | 2 +- 11 files changed, 93 insertions(+), 57 deletions(-) diff --git a/stamina-core/src/main/scala/stamina/Persister.scala b/stamina-core/src/main/scala/stamina/Persister.scala index 0356340..bbaee8d 100644 --- a/stamina-core/src/main/scala/stamina/Persister.scala +++ b/stamina-core/src/main/scala/stamina/Persister.scala @@ -11,18 +11,24 @@ import scala.util._ abstract class Persister[T: ClassTag, V <: Version: VersionInfo](val key: String) { lazy val currentVersion = Version.numberFor[V] - def persist(t: T): Persisted + def persist(t: T): Array[Byte] def unpersist(persisted: Persisted): T + def unpersist(manifest: String, persisted: Array[Byte]): T = + // TODO I should really remove that one but this is easier for the PoC + unpersist(Persisted(Manifest.key(manifest), Manifest.version(manifest), persisted)) def canPersist(a: AnyRef): Boolean = convertToT(a).isDefined - def canUnpersist(p: Persisted): Boolean = p.key == key && p.version == currentVersion + lazy val currentManifest = Manifest.encode(key, currentVersion) + /* To be overridden when a Persister can persist multiple versions */ + def canUnpersist(p: Persisted): Boolean = canUnpersist(Manifest.encode(p.key, p.version)) + def canUnpersist(m: String): Boolean = Manifest.key(m) == key && Manifest.version(m) == currentVersion private[stamina] def convertToT(any: AnyRef): Option[T] = any match { case t: T ⇒ Some(t) case _ ⇒ None } - private[stamina] def persistAny(any: AnyRef): Persisted = { + private[stamina] def persistAny(any: AnyRef): Array[Byte] = { convertToT(any).map(persist(_)).getOrElse( throw new IllegalArgumentException( s"persistAny() was called on Persister[${implicitly[ClassTag[T]].runtimeClass}] with an instance of ${any.getClass}." @@ -30,10 +36,13 @@ abstract class Persister[T: ClassTag, V <: Version: VersionInfo](val key: String ) } - private[stamina] def unpersistAny(persisted: Persisted): AnyRef = { - Try(unpersist(persisted).asInstanceOf[AnyRef]) match { + private[stamina] def unpersistAny(manifest: String, persistedBytes: Array[Byte]): AnyRef = { + Try(unpersist(manifest, persistedBytes).asInstanceOf[AnyRef]) match { case Success(anyref) ⇒ anyref - case Failure(error) ⇒ throw UnrecoverableDataException(persisted, error) + case Failure(error) ⇒ + // TODO simplify + val persisted = Persisted(key, Manifest.version(manifest), ByteString(persistedBytes)) + throw UnrecoverableDataException(persisted, error) } } } diff --git a/stamina-core/src/main/scala/stamina/Persisters.scala b/stamina-core/src/main/scala/stamina/Persisters.scala index 377a5a0..52c640a 100644 --- a/stamina-core/src/main/scala/stamina/Persisters.scala +++ b/stamina-core/src/main/scala/stamina/Persisters.scala @@ -11,19 +11,38 @@ import scala.reflect.ClassTag */ case class Persisters(persisters: List[Persister[_, _]]) { def canPersist(a: AnyRef): Boolean = persisters.exists(_.canPersist(a)) - def canUnpersist(p: Persisted): Boolean = persisters.exists(_.canUnpersist(p)) + def canUnpersist(manifest: String): Boolean = persisters.exists(_.canUnpersist(manifest)) // format: OFF - def persist(anyref: AnyRef): Persisted = { + def manifest(anyref: AnyRef): String = { + persisters.find(_.canPersist(anyref)) + .map(_.currentManifest) + .getOrElse(throw UnregisteredTypeException(anyref)) + } + + def persist(anyref: AnyRef): Array[Byte] = { persisters.find(_.canPersist(anyref)) .map(_.persistAny(anyref)) .getOrElse(throw UnregisteredTypeException(anyref)) } + def unpersist(manifest: String, persisted: Array[Byte]): AnyRef = { + persisters.find(_.canUnpersist(manifest)) + .map(_.unpersistAny(manifest, persisted)) + .getOrElse(throw UnsupportedDataException(Manifest.key(manifest), Manifest.version(manifest))) + } + + def persistAndWrap(anyref: AnyRef): Persisted = { + persisters.find(_.canPersist(anyref)) + .map(p => Persisted(p.key, p.currentVersion, p.persistAny(anyref))) + .getOrElse(throw UnregisteredTypeException(anyref)) + } + def unpersist(persisted: Persisted): AnyRef = { - persisters.find(_.canUnpersist(persisted)) - .map(_.unpersistAny(persisted)) - .getOrElse(throw UnsupportedDataException(persisted)) + val manifest = Manifest.encode(persisted.key, persisted.version) + persisters.find(_.canUnpersist(manifest)) + .map(_.unpersistAny(manifest, persisted.bytes.toArray)) + .getOrElse(throw UnsupportedDataException(Manifest.key(manifest), Manifest.version(manifest))) } // format: ON diff --git a/stamina-core/src/main/scala/stamina/StaminaAkkaSerializer.scala b/stamina-core/src/main/scala/stamina/StaminaAkkaSerializer.scala index 50ad105..ace0b22 100644 --- a/stamina-core/src/main/scala/stamina/StaminaAkkaSerializer.scala +++ b/stamina-core/src/main/scala/stamina/StaminaAkkaSerializer.scala @@ -5,15 +5,15 @@ import akka.serialization._ /** * A custom Akka Serializer specifically designed for use with Akka Persistence. */ -abstract class StaminaAkkaSerializer private[stamina] (persisters: Persisters, codec: PersistedCodec) extends Serializer { - def this(persisters: List[Persister[_, _]], codec: PersistedCodec = DefaultPersistedCodec) = this(Persisters(persisters), codec) - def this(persister: Persister[_, _], persisters: Persister[_, _]*) = this(Persisters(persister :: persisters.toList), DefaultPersistedCodec) +abstract class StaminaAkkaSerializer private[stamina] (persisters: Persisters) extends SerializerWithStringManifest { + def this(persisters: List[Persister[_, _]]) = this(Persisters(persisters)) + def this(persister: Persister[_, _], persisters: Persister[_, _]*) = this(Persisters(persister :: persisters.toList)) - /** We don't need class manifests since we're using keys to identify types. */ - val includeManifest: Boolean = false + /** Uniquely identifies this Serializer. */ + val identifier = 490304 - /** Uniquely identifies this Serializer by combining the codec with a unique number. */ - val identifier = 42 * codec.identifier + def manifest(obj: AnyRef): String = + persisters.manifest(obj) /** * @throws UnregisteredTypeException when the specified object is not supported by the persisters. @@ -21,18 +21,17 @@ abstract class StaminaAkkaSerializer private[stamina] (persisters: Persisters, c def toBinary(obj: AnyRef): Array[Byte] = { if (!persisters.canPersist(obj)) throw UnregisteredTypeException(obj) - codec.writePersisted(persisters.persist(obj)) + persisters.persist(obj) } /** * @throws UnsupportedDataException when the persisted key and/or version is not supported. * @throws UnrecoverableDataException when the key and version are supported but recovery throws an exception. */ - def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = { - val persisted = codec.readPersisted(bytes) + def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = { + if (manifest.isEmpty) throw new IllegalArgumentException("No manifest found") + if (!persisters.canUnpersist(manifest)) throw UnsupportedDataException(Manifest.key(manifest), Manifest.version(manifest)) - if (!persisters.canUnpersist(persisted)) throw UnsupportedDataException(persisted) - - persisters.unpersist(persisted) + persisters.unpersist(manifest, bytes) } } diff --git a/stamina-core/src/main/scala/stamina/stamina.scala b/stamina-core/src/main/scala/stamina/stamina.scala index bfd0d2d..b14cfaa 100644 --- a/stamina-core/src/main/scala/stamina/stamina.scala +++ b/stamina-core/src/main/scala/stamina/stamina.scala @@ -30,11 +30,18 @@ package stamina { extends RuntimeException(s"No persister registered for class: ${obj.getClass}") with NoStackTrace - case class UnsupportedDataException(persisted: Persisted) - extends RuntimeException(s"No unpersister registered for key: '${persisted.key}' and version: ${persisted.version}") + case class UnsupportedDataException(key: String, version: Int) + extends RuntimeException(s"No unpersister registered for key: '$key' and version: $version") with NoStackTrace case class UnrecoverableDataException(persisted: Persisted, error: Throwable) extends RuntimeException(s"Error while trying to unpersist data with key '${persisted.key}' and version ${persisted.version}. Cause: ${error}") with NoStackTrace + + // TODO this probably needs to change and move, just a PoC + object Manifest { + def encode(key: String, version: Int): String = version + "-" + key + def key(manifest: String): String = manifest.substring(manifest.indexOf('-') + 1) + def version(manifest: String): Int = Integer.valueOf(manifest.substring(0, manifest.indexOf('-'))) + } } diff --git a/stamina-core/src/test/scala/stamina/PersistersSpec.scala b/stamina-core/src/test/scala/stamina/PersistersSpec.scala index 3d2800d..7e51577 100644 --- a/stamina-core/src/test/scala/stamina/PersistersSpec.scala +++ b/stamina-core/src/test/scala/stamina/PersistersSpec.scala @@ -22,20 +22,20 @@ class PersistersSpec extends StaminaSpec { } "correctly implement canUnpersist()" in { - canUnpersist(itemPersister.persist(item1)) should be(true) - canUnpersist(cartPersister.persist(cart)) should be(true) + canUnpersist(itemPersister.currentManifest) should be(true) + canUnpersist(cartPersister.currentManifest) should be(true) - canUnpersist(cartCreatedPersister.persist(cartCreated)) should be(false) - canUnpersist(Persisted("unknown", 1, ByteString("..."))) should be(false) - canUnpersist(Persisted("item", 2, ByteString("..."))) should be(false) + canUnpersist(cartCreatedPersister.currentManifest) should be(false) + canUnpersist(Manifest.encode("unknown", 1)) should be(false) + canUnpersist(Manifest.encode("item", 2)) should be(false) // works because canUnpersist only looks at the key and the version, not at the raw data - canUnpersist(Persisted("item", 1, ByteString("Not an item at all!"))) should be(true) + canUnpersist(Manifest.encode("item", 1)) should be(true) } "correctly implement persist() and unpersist()" in { - unpersist(persist(item1)) should equal(item1) - unpersist(persist(cart)) should equal(cart) + unpersist(persistAndWrap(item1)) should equal(item1) + unpersist(persistAndWrap(cart)) should equal(cart) } "throw an UnregisteredTypeException when persisting an unregistered type" in { diff --git a/stamina-core/src/test/scala/stamina/StaminaAkkaSerializerSpec.scala b/stamina-core/src/test/scala/stamina/StaminaAkkaSerializerSpec.scala index cffa695..1729681 100644 --- a/stamina-core/src/test/scala/stamina/StaminaAkkaSerializerSpec.scala +++ b/stamina-core/src/test/scala/stamina/StaminaAkkaSerializerSpec.scala @@ -10,7 +10,7 @@ class StaminaAkkaSerializerSpec extends StaminaSpec { val cartCreatedPersister = persister[CartCreated]("cart-created") class MyAkkaSerializer1a extends StaminaAkkaSerializer(List(itemPersister, cartPersister, cartCreatedPersister)) - class MyAkkaSerializer1b extends StaminaAkkaSerializer(List(itemPersister, cartPersister, cartCreatedPersister), DefaultPersistedCodec) + class MyAkkaSerializer1b extends StaminaAkkaSerializer(List(itemPersister, cartPersister, cartCreatedPersister)) class MyAkkaSerializer2 extends StaminaAkkaSerializer(itemPersister, cartPersister, cartCreatedPersister) val serializer = new MyAkkaSerializer1a @@ -19,29 +19,29 @@ class StaminaAkkaSerializerSpec extends StaminaSpec { "The StaminaAkkaSerializer" should { "correctly serialize and deserialize the current version of the domain" in { - fromBinary(toBinary(item1)) should equal(item1) - fromBinary(toBinary(item2)) should equal(item2) - fromBinary(toBinary(cart)) should equal(cart) - fromBinary(toBinary(cartCreated)) should equal(cartCreated) + fromBinary(toBinary(item1), manifest(item1)) should equal(item1) + fromBinary(toBinary(item2), manifest(item2)) should equal(item2) + fromBinary(toBinary(cart), manifest(cart)) should equal(cart) + fromBinary(toBinary(cartCreated), manifest(cartCreated)) should equal(cartCreated) } "throw an UnregisteredTypeException when serializing an unregistered type" in { - a[UnregisteredTypeException] should be thrownBy toBinary("a raw String is not supported") + a[UnregisteredTypeException] should be thrownBy toBinary("a raw String is not supported", Manifest.encode("foo", 32)) } "throw an UnsupportedDataException when deserializing data with an unknown key" in { an[UnsupportedDataException] should - be thrownBy fromBinary(writePersisted(Persisted("unknown", 1, ByteString("...")))) + be thrownBy fromBinary(writePersisted(Persisted("unknown", 1, ByteString("..."))), Manifest.encode("unknown", 1)) } "throw an UnsupportedDataException when deserializing data with an unsupported version" in { an[UnsupportedDataException] should - be thrownBy fromBinary(writePersisted(Persisted("item", 2, ByteString("...")))) + be thrownBy fromBinary(writePersisted(Persisted("item", 2, ByteString("..."))), Manifest.encode("item", 2)) } "throw an UnrecoverableDataException when an exception occurs while deserializing" in { an[UnrecoverableDataException] should - be thrownBy fromBinary(writePersisted(Persisted("item", 1, ByteString("not an item")))) + be thrownBy fromBinary(writePersisted(Persisted("item", 1, ByteString("not an item"))), Manifest.encode("item", 1)) } } } diff --git a/stamina-core/src/test/scala/stamina/TestOnlyPersister.scala b/stamina-core/src/test/scala/stamina/TestOnlyPersister.scala index bc6c737..63256c1 100644 --- a/stamina-core/src/test/scala/stamina/TestOnlyPersister.scala +++ b/stamina-core/src/test/scala/stamina/TestOnlyPersister.scala @@ -12,7 +12,7 @@ object TestOnlyPersister { def persister[T <: AnyRef: ClassTag](key: String): Persister[T, V1] = new JavaPersister[T](key) private class JavaPersister[T <: AnyRef: ClassTag](key: String) extends Persister[T, V1](key) { - def persist(t: T): Persisted = Persisted(key, currentVersion, toBinary(t)) + def persist(t: T): Array[Byte] = toBinary(t) def unpersist(p: Persisted): T = { if (canUnpersist(p)) fromBinary(p.bytes.toArray).asInstanceOf[T] else throw new IllegalArgumentException("") diff --git a/stamina-json/src/main/scala/stamina/json/json.scala b/stamina-json/src/main/scala/stamina/json/json.scala index 6a72d44..5aa323b 100644 --- a/stamina-json/src/main/scala/stamina/json/json.scala +++ b/stamina-json/src/main/scala/stamina/json/json.scala @@ -55,7 +55,9 @@ package object json { */ def persister[T: RootJsonFormat: ClassTag, V <: Version: VersionInfo: MigratableVersion](key: String, migrator: JsonMigrator[V]): JsonPersister[T, V] = new VnJsonPersister[T, V](key, migrator) - private[json] def toJsonBytes[T](t: T)(implicit writer: RootJsonWriter[T]): ByteString = ByteString(writer.write(t).compactPrint) + import java.nio.charset.StandardCharsets + val UTF_8: String = StandardCharsets.UTF_8.name() + private[json] def toJsonBytes[T](t: T)(implicit writer: RootJsonWriter[T]): Array[Byte] = writer.write(t).compactPrint.getBytes(UTF_8) private[json] def fromJsonBytes[T](bytes: ByteString)(implicit reader: RootJsonReader[T]): T = reader.read(parseJson(bytes)) private[json] def parseJson(bytes: ByteString): JsValue = JsonParser(ParserInput(bytes.toArray)) } @@ -70,7 +72,7 @@ package json { } private[json] class V1JsonPersister[T: RootJsonFormat: ClassTag](key: String) extends JsonPersister[T, V1](key) { - def persist(t: T): Persisted = Persisted(key, currentVersion, toJsonBytes(t)) + def persist(t: T): Array[Byte] = toJsonBytes(t) def unpersist(p: Persisted): T = { if (canUnpersist(p)) fromJsonBytes[T](p.bytes) else throw new IllegalArgumentException(cannotUnpersist(p)) @@ -78,9 +80,9 @@ package json { } private[json] class VnJsonPersister[T: RootJsonFormat: ClassTag, V <: Version: VersionInfo: MigratableVersion](key: String, migrator: JsonMigrator[V]) extends JsonPersister[T, V](key) { - override def canUnpersist(p: Persisted): Boolean = p.key == key && migrator.canMigrate(p.version) + override def canUnpersist(m: String): Boolean = Manifest.key(m) == key && migrator.canMigrate(Manifest.version(m)) - def persist(t: T): Persisted = Persisted(key, currentVersion, toJsonBytes(t)) + def persist(t: T): Array[Byte] = toJsonBytes(t) def unpersist(p: Persisted): T = { if (canUnpersist(p)) migrator.migrate(parseJson(p.bytes), p.version).convertTo[T] else throw new IllegalArgumentException(cannotUnpersist(p)) diff --git a/stamina-json/src/test/scala/stamina/json/JsonPersisterSpec.scala b/stamina-json/src/test/scala/stamina/json/JsonPersisterSpec.scala index 6d0f0d2..1245262 100644 --- a/stamina-json/src/test/scala/stamina/json/JsonPersisterSpec.scala +++ b/stamina-json/src/test/scala/stamina/json/JsonPersisterSpec.scala @@ -21,19 +21,19 @@ class JsonPersisterSpec extends StaminaJsonSpec { "V1 persisters produced by SprayJsonPersister" should { "correctly persist and unpersist domain events " in { import v1CartCreatedPersister._ - unpersist(persist(v1CartCreated)) should equal(v1CartCreated) + unpersist(currentManifest, persist(v1CartCreated)) should equal(v1CartCreated) } } "V2 persisters with migrators produced by SprayJsonPersister" should { "correctly persist and unpersist domain events " in { import v2CartCreatedPersister._ - unpersist(persist(v2CartCreated)) should equal(v2CartCreated) + unpersist(currentManifest, persist(v2CartCreated)) should equal(v2CartCreated) } "correctly migrate and unpersist V1 domain events" in { val v1Persisted = v1CartCreatedPersister.persist(v1CartCreated) - val v2Unpersisted = v2CartCreatedPersister.unpersist(v1Persisted) + val v2Unpersisted = v2CartCreatedPersister.unpersist(v1CartCreatedPersister.currentManifest, v1Persisted) v2Unpersisted.cart.items.map(_.price).toSet should equal(Set(1000)) } @@ -42,15 +42,15 @@ class JsonPersisterSpec extends StaminaJsonSpec { "V3 persisters with migrators produced by SprayJsonPersister" should { "correctly persist and unpersist domain events " in { import v3CartCreatedPersister._ - unpersist(persist(v3CartCreated)) should equal(v3CartCreated) + unpersist(currentManifest, persist(v3CartCreated)) should equal(v3CartCreated) } "correctly migrate and unpersist V1 domain events" in { val v1Persisted = v1CartCreatedPersister.persist(v1CartCreated) val v2Persisted = v2CartCreatedPersister.persist(v2CartCreated) - val v1Unpersisted = v3CartCreatedPersister.unpersist(v1Persisted) - val v2Unpersisted = v3CartCreatedPersister.unpersist(v2Persisted) + val v1Unpersisted = v3CartCreatedPersister.unpersist(v1CartCreatedPersister.currentManifest, v1Persisted) + val v2Unpersisted = v3CartCreatedPersister.unpersist(v2CartCreatedPersister.currentManifest, v2Persisted) v1Unpersisted.cart.items.map(_.price).toSet should equal(Set(1000)) v2Unpersisted.timestamp should (be > 0L and be < System.currentTimeMillis) diff --git a/stamina-testkit/src/main/scala/stamina/testkit/StaminaTestKit.scala b/stamina-testkit/src/main/scala/stamina/testkit/StaminaTestKit.scala index 7036aec..d5a1f38 100644 --- a/stamina-testkit/src/main/scala/stamina/testkit/StaminaTestKit.scala +++ b/stamina-testkit/src/main/scala/stamina/testkit/StaminaTestKit.scala @@ -26,7 +26,7 @@ trait StaminaTestKit { self: org.scalatest.WordSpecLike ⇒ private def generateRoundtripTestFor(sample: PersistableSample) = { s"persist and unpersist $sample" in { - persisters.unpersist(persisters.persist(sample.persistable)) should equal(sample.persistable) + persisters.unpersist(persisters.persistAndWrap(sample.persistable)) should equal(sample.persistable) } } @@ -42,7 +42,7 @@ trait StaminaTestKit { self: org.scalatest.WordSpecLike ⇒ def latestVersion(persistable: AnyRef) = Try(persisters.persisters.filter(_.canPersist(persistable)).map(_.currentVersion).max).toOption private def verifyByteStringDeserialization(sample: PersistableSample, version: Int, latestVersion: Int): Unit = { - val serialized = persisters.persist(sample.persistable) + val serialized = persisters.persistAndWrap(sample.persistable) byteStringFromResource(serialized.key, version, sample.sampleId) match { case Success(binary) ⇒ persisters.unpersist(binary) should equal(sample.persistable) diff --git a/stamina-testkit/src/test/scala/stamina/testkit/ScalatestTestGenerationSpec.scala b/stamina-testkit/src/test/scala/stamina/testkit/ScalatestTestGenerationSpec.scala index 6daeb23..3f8ce13 100644 --- a/stamina-testkit/src/test/scala/stamina/testkit/ScalatestTestGenerationSpec.scala +++ b/stamina-testkit/src/test/scala/stamina/testkit/ScalatestTestGenerationSpec.scala @@ -12,7 +12,7 @@ class ScalatestTestGenerationSpec extends StaminaTestKitSpec { import TestDomain._ case class ItemPersister(override val key: String) extends Persister[Item, V1](key) { - def persist(t: Item): Persisted = Persisted(key, currentVersion, ByteString()) + def persist(t: Item): Array[Byte] = Array[Byte]() def unpersist(p: Persisted): Item = item1 } From a8925d115ad4f9c475fe08b8852e56436b530ec1 Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Thu, 15 Oct 2015 21:47:01 +0200 Subject: [PATCH 02/19] Add Codec-based Serializer (mostly for legacy users) --- .../CodecBasedStaminaAkkaSerializer.scala | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 stamina-core/src/main/scala/stamina/CodecBasedStaminaAkkaSerializer.scala diff --git a/stamina-core/src/main/scala/stamina/CodecBasedStaminaAkkaSerializer.scala b/stamina-core/src/main/scala/stamina/CodecBasedStaminaAkkaSerializer.scala new file mode 100644 index 0000000..c2a3cf6 --- /dev/null +++ b/stamina-core/src/main/scala/stamina/CodecBasedStaminaAkkaSerializer.scala @@ -0,0 +1,39 @@ +package stamina + +import akka.serialization._ + +/** + * A custom Akka Serializer specifically designed for use with Akka Persistence. + */ +abstract class CodecBasedStaminaAkkaSerializer private[stamina] (persisters: Persisters, codec: PersistedCodec) extends Serializer { + def this(persisters: List[Persister[_, _]], codec: PersistedCodec = DefaultPersistedCodec) = this(Persisters(persisters), codec) + def this(persister: Persister[_, _], persisters: Persister[_, _]*) = this(Persisters(persister :: persisters.toList), DefaultPersistedCodec) + + /** We don't need class manifests since we're using keys to identify types. */ + val includeManifest: Boolean = false + + /** Uniquely identifies this Serializer by combining the codec with a unique number. */ + val identifier = 42 * codec.identifier + + /** + * @throws UnregisteredTypeException when the specified object is not supported by the persisters. + */ + def toBinary(obj: AnyRef): Array[Byte] = { + if (!persisters.canPersist(obj)) throw UnregisteredTypeException(obj) + + codec.writePersisted(persisters.persistAndWrap(obj)) + } + + /** + * @throws UnsupportedDataException when the persisted key and/or version is not supported. + * @throws UnrecoverableDataException when the key and version are supported but recovery throws an exception. + */ + def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = { + val persisted = codec.readPersisted(bytes) + val manifest = Manifest.encode(persisted.key, persisted.version) + + if (!persisters.canUnpersist(manifest)) throw UnsupportedDataException(persisted.key, persisted.version) + + persisters.unpersist(manifest, persisted.bytes.toArray) + } +} From 362b3523a08b86b538f391a3b345b37258731120 Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Thu, 15 Oct 2015 22:04:29 +0200 Subject: [PATCH 03/19] Split Persisted into manifest and bytes in more places --- .../src/main/scala/stamina/Persister.scala | 9 ++------- .../scala/stamina/TestOnlyPersister.scala | 4 ++-- .../src/main/scala/stamina/json/json.scala | 20 +++++++++---------- .../testkit/ScalatestTestGenerationSpec.scala | 2 +- 4 files changed, 15 insertions(+), 20 deletions(-) diff --git a/stamina-core/src/main/scala/stamina/Persister.scala b/stamina-core/src/main/scala/stamina/Persister.scala index bbaee8d..426ac2b 100644 --- a/stamina-core/src/main/scala/stamina/Persister.scala +++ b/stamina-core/src/main/scala/stamina/Persister.scala @@ -11,16 +11,11 @@ import scala.util._ abstract class Persister[T: ClassTag, V <: Version: VersionInfo](val key: String) { lazy val currentVersion = Version.numberFor[V] + lazy val currentManifest = Manifest.encode(key, currentVersion) def persist(t: T): Array[Byte] - def unpersist(persisted: Persisted): T - def unpersist(manifest: String, persisted: Array[Byte]): T = - // TODO I should really remove that one but this is easier for the PoC - unpersist(Persisted(Manifest.key(manifest), Manifest.version(manifest), persisted)) + def unpersist(manifest: String, persisted: Array[Byte]): T def canPersist(a: AnyRef): Boolean = convertToT(a).isDefined - lazy val currentManifest = Manifest.encode(key, currentVersion) - /* To be overridden when a Persister can persist multiple versions */ - def canUnpersist(p: Persisted): Boolean = canUnpersist(Manifest.encode(p.key, p.version)) def canUnpersist(m: String): Boolean = Manifest.key(m) == key && Manifest.version(m) == currentVersion private[stamina] def convertToT(any: AnyRef): Option[T] = any match { diff --git a/stamina-core/src/test/scala/stamina/TestOnlyPersister.scala b/stamina-core/src/test/scala/stamina/TestOnlyPersister.scala index 63256c1..0523eb6 100644 --- a/stamina-core/src/test/scala/stamina/TestOnlyPersister.scala +++ b/stamina-core/src/test/scala/stamina/TestOnlyPersister.scala @@ -13,8 +13,8 @@ object TestOnlyPersister { private class JavaPersister[T <: AnyRef: ClassTag](key: String) extends Persister[T, V1](key) { def persist(t: T): Array[Byte] = toBinary(t) - def unpersist(p: Persisted): T = { - if (canUnpersist(p)) fromBinary(p.bytes.toArray).asInstanceOf[T] + def unpersist(manifest: String, p: Array[Byte]): T = { + if (canUnpersist(manifest)) fromBinary(p).asInstanceOf[T] else throw new IllegalArgumentException("") } } diff --git a/stamina-json/src/main/scala/stamina/json/json.scala b/stamina-json/src/main/scala/stamina/json/json.scala index 5aa323b..e01bad4 100644 --- a/stamina-json/src/main/scala/stamina/json/json.scala +++ b/stamina-json/src/main/scala/stamina/json/json.scala @@ -58,8 +58,8 @@ package object json { import java.nio.charset.StandardCharsets val UTF_8: String = StandardCharsets.UTF_8.name() private[json] def toJsonBytes[T](t: T)(implicit writer: RootJsonWriter[T]): Array[Byte] = writer.write(t).compactPrint.getBytes(UTF_8) - private[json] def fromJsonBytes[T](bytes: ByteString)(implicit reader: RootJsonReader[T]): T = reader.read(parseJson(bytes)) - private[json] def parseJson(bytes: ByteString): JsValue = JsonParser(ParserInput(bytes.toArray)) + private[json] def fromJsonBytes[T](bytes: Array[Byte])(implicit reader: RootJsonReader[T]): T = reader.read(parseJson(bytes)) + private[json] def parseJson(bytes: Array[Byte]): JsValue = JsonParser(ParserInput(bytes.toArray)) } package json { @@ -67,15 +67,15 @@ package json { * Simple abstract marker superclass to unify (and hide) the two internal Persister implementations. */ sealed abstract class JsonPersister[T: RootJsonFormat: ClassTag, V <: Version: VersionInfo](key: String) extends Persister[T, V](key) { - private[json] def cannotUnpersist(p: Persisted) = - s"""JsonPersister[${implicitly[ClassTag[T]].runtimeClass.getSimpleName}, V${currentVersion}](key = "${key}") cannot unpersist data with key "${p.key}" and version ${p.version}.""" + private[json] def cannotUnpersist(manifest: String) = + s"""JsonPersister[${implicitly[ClassTag[T]].runtimeClass.getSimpleName}, V${currentVersion}](key = "${key}") cannot unpersist data with manifest "$manifest".""" } private[json] class V1JsonPersister[T: RootJsonFormat: ClassTag](key: String) extends JsonPersister[T, V1](key) { def persist(t: T): Array[Byte] = toJsonBytes(t) - def unpersist(p: Persisted): T = { - if (canUnpersist(p)) fromJsonBytes[T](p.bytes) - else throw new IllegalArgumentException(cannotUnpersist(p)) + def unpersist(manifest: String, p: Array[Byte]): T = { + if (canUnpersist(manifest)) fromJsonBytes[T](p) + else throw new IllegalArgumentException(cannotUnpersist(manifest)) } } @@ -83,9 +83,9 @@ package json { override def canUnpersist(m: String): Boolean = Manifest.key(m) == key && migrator.canMigrate(Manifest.version(m)) def persist(t: T): Array[Byte] = toJsonBytes(t) - def unpersist(p: Persisted): T = { - if (canUnpersist(p)) migrator.migrate(parseJson(p.bytes), p.version).convertTo[T] - else throw new IllegalArgumentException(cannotUnpersist(p)) + def unpersist(manifest: String, p: Array[Byte]): T = { + if (canUnpersist(manifest)) migrator.migrate(parseJson(p), Manifest.version(manifest)).convertTo[T] + else throw new IllegalArgumentException(cannotUnpersist(manifest)) } } } diff --git a/stamina-testkit/src/test/scala/stamina/testkit/ScalatestTestGenerationSpec.scala b/stamina-testkit/src/test/scala/stamina/testkit/ScalatestTestGenerationSpec.scala index 3f8ce13..a699a44 100644 --- a/stamina-testkit/src/test/scala/stamina/testkit/ScalatestTestGenerationSpec.scala +++ b/stamina-testkit/src/test/scala/stamina/testkit/ScalatestTestGenerationSpec.scala @@ -13,7 +13,7 @@ class ScalatestTestGenerationSpec extends StaminaTestKitSpec { case class ItemPersister(override val key: String) extends Persister[Item, V1](key) { def persist(t: Item): Array[Byte] = Array[Byte]() - def unpersist(p: Persisted): Item = item1 + def unpersist(manifest: String, p: Array[Byte]): Item = item1 } "A spec generated by StaminaTestKit" should { From 2ca2cca716d475dff31ad9d663b2469e10718173 Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Thu, 15 Oct 2015 22:59:31 +0200 Subject: [PATCH 04/19] Manifest as a case class --- .../stamina/CodecBasedStaminaAkkaSerializer.scala | 2 +- stamina-core/src/main/scala/stamina/Persister.scala | 10 +++++----- stamina-core/src/main/scala/stamina/Persisters.scala | 12 ++++++------ .../main/scala/stamina/StaminaAkkaSerializer.scala | 7 ++++--- stamina-core/src/main/scala/stamina/stamina.scala | 9 +++++---- .../src/test/scala/stamina/PersistersSpec.scala | 6 +++--- .../scala/stamina/StaminaAkkaSerializerSpec.scala | 9 ++++----- .../src/test/scala/stamina/TestOnlyPersister.scala | 2 +- stamina-json/src/main/scala/stamina/json/json.scala | 10 +++++----- .../testkit/ScalatestTestGenerationSpec.scala | 2 +- 10 files changed, 35 insertions(+), 34 deletions(-) diff --git a/stamina-core/src/main/scala/stamina/CodecBasedStaminaAkkaSerializer.scala b/stamina-core/src/main/scala/stamina/CodecBasedStaminaAkkaSerializer.scala index c2a3cf6..b2fb5ec 100644 --- a/stamina-core/src/main/scala/stamina/CodecBasedStaminaAkkaSerializer.scala +++ b/stamina-core/src/main/scala/stamina/CodecBasedStaminaAkkaSerializer.scala @@ -30,7 +30,7 @@ abstract class CodecBasedStaminaAkkaSerializer private[stamina] (persisters: Per */ def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = { val persisted = codec.readPersisted(bytes) - val manifest = Manifest.encode(persisted.key, persisted.version) + val manifest = Manifest(persisted.key, persisted.version) if (!persisters.canUnpersist(manifest)) throw UnsupportedDataException(persisted.key, persisted.version) diff --git a/stamina-core/src/main/scala/stamina/Persister.scala b/stamina-core/src/main/scala/stamina/Persister.scala index 426ac2b..5e6bc4a 100644 --- a/stamina-core/src/main/scala/stamina/Persister.scala +++ b/stamina-core/src/main/scala/stamina/Persister.scala @@ -11,12 +11,12 @@ import scala.util._ abstract class Persister[T: ClassTag, V <: Version: VersionInfo](val key: String) { lazy val currentVersion = Version.numberFor[V] - lazy val currentManifest = Manifest.encode(key, currentVersion) + lazy val currentManifest = Manifest(key, currentVersion) def persist(t: T): Array[Byte] - def unpersist(manifest: String, persisted: Array[Byte]): T + def unpersist(manifest: Manifest, persisted: Array[Byte]): T def canPersist(a: AnyRef): Boolean = convertToT(a).isDefined - def canUnpersist(m: String): Boolean = Manifest.key(m) == key && Manifest.version(m) == currentVersion + def canUnpersist(m: Manifest): Boolean = m.key == key && m.version <= currentVersion private[stamina] def convertToT(any: AnyRef): Option[T] = any match { case t: T ⇒ Some(t) @@ -31,12 +31,12 @@ abstract class Persister[T: ClassTag, V <: Version: VersionInfo](val key: String ) } - private[stamina] def unpersistAny(manifest: String, persistedBytes: Array[Byte]): AnyRef = { + private[stamina] def unpersistAny(manifest: Manifest, persistedBytes: Array[Byte]): AnyRef = { Try(unpersist(manifest, persistedBytes).asInstanceOf[AnyRef]) match { case Success(anyref) ⇒ anyref case Failure(error) ⇒ // TODO simplify - val persisted = Persisted(key, Manifest.version(manifest), ByteString(persistedBytes)) + val persisted = Persisted(key, manifest.version, ByteString(persistedBytes)) throw UnrecoverableDataException(persisted, error) } } diff --git a/stamina-core/src/main/scala/stamina/Persisters.scala b/stamina-core/src/main/scala/stamina/Persisters.scala index 52c640a..9a68fca 100644 --- a/stamina-core/src/main/scala/stamina/Persisters.scala +++ b/stamina-core/src/main/scala/stamina/Persisters.scala @@ -11,10 +11,10 @@ import scala.reflect.ClassTag */ case class Persisters(persisters: List[Persister[_, _]]) { def canPersist(a: AnyRef): Boolean = persisters.exists(_.canPersist(a)) - def canUnpersist(manifest: String): Boolean = persisters.exists(_.canUnpersist(manifest)) + def canUnpersist(manifest: Manifest): Boolean = persisters.exists(_.canUnpersist(manifest)) // format: OFF - def manifest(anyref: AnyRef): String = { + def manifest(anyref: AnyRef): Manifest = { persisters.find(_.canPersist(anyref)) .map(_.currentManifest) .getOrElse(throw UnregisteredTypeException(anyref)) @@ -26,10 +26,10 @@ case class Persisters(persisters: List[Persister[_, _]]) { .getOrElse(throw UnregisteredTypeException(anyref)) } - def unpersist(manifest: String, persisted: Array[Byte]): AnyRef = { + def unpersist(manifest: Manifest, persisted: Array[Byte]): AnyRef = { persisters.find(_.canUnpersist(manifest)) .map(_.unpersistAny(manifest, persisted)) - .getOrElse(throw UnsupportedDataException(Manifest.key(manifest), Manifest.version(manifest))) + .getOrElse(throw UnsupportedDataException(manifest.key, manifest.version)) } def persistAndWrap(anyref: AnyRef): Persisted = { @@ -39,10 +39,10 @@ case class Persisters(persisters: List[Persister[_, _]]) { } def unpersist(persisted: Persisted): AnyRef = { - val manifest = Manifest.encode(persisted.key, persisted.version) + val manifest = Manifest(persisted.key, persisted.version) persisters.find(_.canUnpersist(manifest)) .map(_.unpersistAny(manifest, persisted.bytes.toArray)) - .getOrElse(throw UnsupportedDataException(Manifest.key(manifest), Manifest.version(manifest))) + .getOrElse(throw UnsupportedDataException(manifest.key, manifest.version)) } // format: ON diff --git a/stamina-core/src/main/scala/stamina/StaminaAkkaSerializer.scala b/stamina-core/src/main/scala/stamina/StaminaAkkaSerializer.scala index ace0b22..0d75186 100644 --- a/stamina-core/src/main/scala/stamina/StaminaAkkaSerializer.scala +++ b/stamina-core/src/main/scala/stamina/StaminaAkkaSerializer.scala @@ -13,7 +13,7 @@ abstract class StaminaAkkaSerializer private[stamina] (persisters: Persisters) e val identifier = 490304 def manifest(obj: AnyRef): String = - persisters.manifest(obj) + persisters.manifest(obj).manifest /** * @throws UnregisteredTypeException when the specified object is not supported by the persisters. @@ -30,8 +30,9 @@ abstract class StaminaAkkaSerializer private[stamina] (persisters: Persisters) e */ def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = { if (manifest.isEmpty) throw new IllegalArgumentException("No manifest found") - if (!persisters.canUnpersist(manifest)) throw UnsupportedDataException(Manifest.key(manifest), Manifest.version(manifest)) + val m = Manifest(manifest) + if (!persisters.canUnpersist(m)) throw UnsupportedDataException(m.key, m.version) - persisters.unpersist(manifest, bytes) + persisters.unpersist(m, bytes) } } diff --git a/stamina-core/src/main/scala/stamina/stamina.scala b/stamina-core/src/main/scala/stamina/stamina.scala index b14cfaa..ae97bbd 100644 --- a/stamina-core/src/main/scala/stamina/stamina.scala +++ b/stamina-core/src/main/scala/stamina/stamina.scala @@ -38,10 +38,11 @@ package stamina { extends RuntimeException(s"Error while trying to unpersist data with key '${persisted.key}' and version ${persisted.version}. Cause: ${error}") with NoStackTrace - // TODO this probably needs to change and move, just a PoC + case class Manifest(manifest: String) { + lazy val key: String = manifest.substring(manifest.indexOf('-') + 1) + lazy val version: Int = Integer.valueOf(manifest.substring(0, manifest.indexOf('-'))) + } object Manifest { - def encode(key: String, version: Int): String = version + "-" + key - def key(manifest: String): String = manifest.substring(manifest.indexOf('-') + 1) - def version(manifest: String): Int = Integer.valueOf(manifest.substring(0, manifest.indexOf('-'))) + def apply(key: String, version: Int): Manifest = Manifest(version + "-" + key) } } diff --git a/stamina-core/src/test/scala/stamina/PersistersSpec.scala b/stamina-core/src/test/scala/stamina/PersistersSpec.scala index 7e51577..7dad3d4 100644 --- a/stamina-core/src/test/scala/stamina/PersistersSpec.scala +++ b/stamina-core/src/test/scala/stamina/PersistersSpec.scala @@ -26,11 +26,11 @@ class PersistersSpec extends StaminaSpec { canUnpersist(cartPersister.currentManifest) should be(true) canUnpersist(cartCreatedPersister.currentManifest) should be(false) - canUnpersist(Manifest.encode("unknown", 1)) should be(false) - canUnpersist(Manifest.encode("item", 2)) should be(false) + canUnpersist(Manifest("unknown", 1)) should be(false) + canUnpersist(Manifest("item", 2)) should be(false) // works because canUnpersist only looks at the key and the version, not at the raw data - canUnpersist(Manifest.encode("item", 1)) should be(true) + canUnpersist(Manifest("item", 1)) should be(true) } "correctly implement persist() and unpersist()" in { diff --git a/stamina-core/src/test/scala/stamina/StaminaAkkaSerializerSpec.scala b/stamina-core/src/test/scala/stamina/StaminaAkkaSerializerSpec.scala index 1729681..65a96e1 100644 --- a/stamina-core/src/test/scala/stamina/StaminaAkkaSerializerSpec.scala +++ b/stamina-core/src/test/scala/stamina/StaminaAkkaSerializerSpec.scala @@ -3,7 +3,6 @@ package stamina class StaminaAkkaSerializerSpec extends StaminaSpec { import TestDomain._ import TestOnlyPersister._ - import DefaultPersistedCodec._ val itemPersister = persister[Item]("item") val cartPersister = persister[Cart]("cart") @@ -26,22 +25,22 @@ class StaminaAkkaSerializerSpec extends StaminaSpec { } "throw an UnregisteredTypeException when serializing an unregistered type" in { - a[UnregisteredTypeException] should be thrownBy toBinary("a raw String is not supported", Manifest.encode("foo", 32)) + a[UnregisteredTypeException] should be thrownBy toBinary("a raw String is not supported", Manifest("foo", 32)) } "throw an UnsupportedDataException when deserializing data with an unknown key" in { an[UnsupportedDataException] should - be thrownBy fromBinary(writePersisted(Persisted("unknown", 1, ByteString("..."))), Manifest.encode("unknown", 1)) + be thrownBy fromBinary(ByteString("...").toArray, Manifest("unknown", 1).manifest) } "throw an UnsupportedDataException when deserializing data with an unsupported version" in { an[UnsupportedDataException] should - be thrownBy fromBinary(writePersisted(Persisted("item", 2, ByteString("..."))), Manifest.encode("item", 2)) + be thrownBy fromBinary(ByteString("...").toArray, Manifest("item", 2).manifest) } "throw an UnrecoverableDataException when an exception occurs while deserializing" in { an[UnrecoverableDataException] should - be thrownBy fromBinary(writePersisted(Persisted("item", 1, ByteString("not an item"))), Manifest.encode("item", 1)) + be thrownBy fromBinary(ByteString("not an item").toArray, Manifest("item", 1).manifest) } } } diff --git a/stamina-core/src/test/scala/stamina/TestOnlyPersister.scala b/stamina-core/src/test/scala/stamina/TestOnlyPersister.scala index 0523eb6..0986eb3 100644 --- a/stamina-core/src/test/scala/stamina/TestOnlyPersister.scala +++ b/stamina-core/src/test/scala/stamina/TestOnlyPersister.scala @@ -13,7 +13,7 @@ object TestOnlyPersister { private class JavaPersister[T <: AnyRef: ClassTag](key: String) extends Persister[T, V1](key) { def persist(t: T): Array[Byte] = toBinary(t) - def unpersist(manifest: String, p: Array[Byte]): T = { + def unpersist(manifest: Manifest, p: Array[Byte]): T = { if (canUnpersist(manifest)) fromBinary(p).asInstanceOf[T] else throw new IllegalArgumentException("") } diff --git a/stamina-json/src/main/scala/stamina/json/json.scala b/stamina-json/src/main/scala/stamina/json/json.scala index e01bad4..b99c98a 100644 --- a/stamina-json/src/main/scala/stamina/json/json.scala +++ b/stamina-json/src/main/scala/stamina/json/json.scala @@ -67,24 +67,24 @@ package json { * Simple abstract marker superclass to unify (and hide) the two internal Persister implementations. */ sealed abstract class JsonPersister[T: RootJsonFormat: ClassTag, V <: Version: VersionInfo](key: String) extends Persister[T, V](key) { - private[json] def cannotUnpersist(manifest: String) = + private[json] def cannotUnpersist(manifest: Manifest) = s"""JsonPersister[${implicitly[ClassTag[T]].runtimeClass.getSimpleName}, V${currentVersion}](key = "${key}") cannot unpersist data with manifest "$manifest".""" } private[json] class V1JsonPersister[T: RootJsonFormat: ClassTag](key: String) extends JsonPersister[T, V1](key) { def persist(t: T): Array[Byte] = toJsonBytes(t) - def unpersist(manifest: String, p: Array[Byte]): T = { + def unpersist(manifest: Manifest, p: Array[Byte]): T = { if (canUnpersist(manifest)) fromJsonBytes[T](p) else throw new IllegalArgumentException(cannotUnpersist(manifest)) } } private[json] class VnJsonPersister[T: RootJsonFormat: ClassTag, V <: Version: VersionInfo: MigratableVersion](key: String, migrator: JsonMigrator[V]) extends JsonPersister[T, V](key) { - override def canUnpersist(m: String): Boolean = Manifest.key(m) == key && migrator.canMigrate(Manifest.version(m)) + override def canUnpersist(m: Manifest): Boolean = m.key == key && migrator.canMigrate(m.version) def persist(t: T): Array[Byte] = toJsonBytes(t) - def unpersist(manifest: String, p: Array[Byte]): T = { - if (canUnpersist(manifest)) migrator.migrate(parseJson(p), Manifest.version(manifest)).convertTo[T] + def unpersist(manifest: Manifest, p: Array[Byte]): T = { + if (canUnpersist(manifest)) migrator.migrate(parseJson(p), manifest.version).convertTo[T] else throw new IllegalArgumentException(cannotUnpersist(manifest)) } } diff --git a/stamina-testkit/src/test/scala/stamina/testkit/ScalatestTestGenerationSpec.scala b/stamina-testkit/src/test/scala/stamina/testkit/ScalatestTestGenerationSpec.scala index a699a44..bfc7866 100644 --- a/stamina-testkit/src/test/scala/stamina/testkit/ScalatestTestGenerationSpec.scala +++ b/stamina-testkit/src/test/scala/stamina/testkit/ScalatestTestGenerationSpec.scala @@ -13,7 +13,7 @@ class ScalatestTestGenerationSpec extends StaminaTestKitSpec { case class ItemPersister(override val key: String) extends Persister[Item, V1](key) { def persist(t: Item): Array[Byte] = Array[Byte]() - def unpersist(manifest: String, p: Array[Byte]): Item = item1 + def unpersist(manifest: Manifest, p: Array[Byte]): Item = item1 } "A spec generated by StaminaTestKit" should { From c340b1a2be5d3435bfd62a9f052136c952ced28b Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Thu, 15 Oct 2015 23:40:52 +0200 Subject: [PATCH 05/19] Made method naming a bit more consistent (hopefully) --- .../stamina/CodecBasedStaminaAkkaSerializer.scala | 2 +- .../src/main/scala/stamina/Persisters.scala | 14 +++++++------- .../main/scala/stamina/StaminaAkkaSerializer.scala | 2 +- .../src/test/scala/stamina/PersistersSpec.scala | 4 ++-- .../scala/stamina/testkit/StaminaTestKit.scala | 10 ++++------ 5 files changed, 15 insertions(+), 17 deletions(-) diff --git a/stamina-core/src/main/scala/stamina/CodecBasedStaminaAkkaSerializer.scala b/stamina-core/src/main/scala/stamina/CodecBasedStaminaAkkaSerializer.scala index b2fb5ec..af31221 100644 --- a/stamina-core/src/main/scala/stamina/CodecBasedStaminaAkkaSerializer.scala +++ b/stamina-core/src/main/scala/stamina/CodecBasedStaminaAkkaSerializer.scala @@ -21,7 +21,7 @@ abstract class CodecBasedStaminaAkkaSerializer private[stamina] (persisters: Per def toBinary(obj: AnyRef): Array[Byte] = { if (!persisters.canPersist(obj)) throw UnregisteredTypeException(obj) - codec.writePersisted(persisters.persistAndWrap(obj)) + codec.writePersisted(persisters.persist(obj)) } /** diff --git a/stamina-core/src/main/scala/stamina/Persisters.scala b/stamina-core/src/main/scala/stamina/Persisters.scala index 9a68fca..d836cec 100644 --- a/stamina-core/src/main/scala/stamina/Persisters.scala +++ b/stamina-core/src/main/scala/stamina/Persisters.scala @@ -20,24 +20,24 @@ case class Persisters(persisters: List[Persister[_, _]]) { .getOrElse(throw UnregisteredTypeException(anyref)) } - def persist(anyref: AnyRef): Array[Byte] = { + def bytes(anyref: AnyRef): Array[Byte] = { persisters.find(_.canPersist(anyref)) .map(_.persistAny(anyref)) .getOrElse(throw UnregisteredTypeException(anyref)) } + def persist(anyref: AnyRef): Persisted = { + persisters.find(_.canPersist(anyref)) + .map(p => Persisted(p.key, p.currentVersion, p.persistAny(anyref))) + .getOrElse(throw UnregisteredTypeException(anyref)) + } + def unpersist(manifest: Manifest, persisted: Array[Byte]): AnyRef = { persisters.find(_.canUnpersist(manifest)) .map(_.unpersistAny(manifest, persisted)) .getOrElse(throw UnsupportedDataException(manifest.key, manifest.version)) } - def persistAndWrap(anyref: AnyRef): Persisted = { - persisters.find(_.canPersist(anyref)) - .map(p => Persisted(p.key, p.currentVersion, p.persistAny(anyref))) - .getOrElse(throw UnregisteredTypeException(anyref)) - } - def unpersist(persisted: Persisted): AnyRef = { val manifest = Manifest(persisted.key, persisted.version) persisters.find(_.canUnpersist(manifest)) diff --git a/stamina-core/src/main/scala/stamina/StaminaAkkaSerializer.scala b/stamina-core/src/main/scala/stamina/StaminaAkkaSerializer.scala index 0d75186..4d8da96 100644 --- a/stamina-core/src/main/scala/stamina/StaminaAkkaSerializer.scala +++ b/stamina-core/src/main/scala/stamina/StaminaAkkaSerializer.scala @@ -21,7 +21,7 @@ abstract class StaminaAkkaSerializer private[stamina] (persisters: Persisters) e def toBinary(obj: AnyRef): Array[Byte] = { if (!persisters.canPersist(obj)) throw UnregisteredTypeException(obj) - persisters.persist(obj) + persisters.bytes(obj) } /** diff --git a/stamina-core/src/test/scala/stamina/PersistersSpec.scala b/stamina-core/src/test/scala/stamina/PersistersSpec.scala index 7dad3d4..df15c5f 100644 --- a/stamina-core/src/test/scala/stamina/PersistersSpec.scala +++ b/stamina-core/src/test/scala/stamina/PersistersSpec.scala @@ -34,8 +34,8 @@ class PersistersSpec extends StaminaSpec { } "correctly implement persist() and unpersist()" in { - unpersist(persistAndWrap(item1)) should equal(item1) - unpersist(persistAndWrap(cart)) should equal(cart) + unpersist(persist(item1)) should equal(item1) + unpersist(persist(cart)) should equal(cart) } "throw an UnregisteredTypeException when persisting an unregistered type" in { diff --git a/stamina-testkit/src/main/scala/stamina/testkit/StaminaTestKit.scala b/stamina-testkit/src/main/scala/stamina/testkit/StaminaTestKit.scala index d5a1f38..33c46be 100644 --- a/stamina-testkit/src/main/scala/stamina/testkit/StaminaTestKit.scala +++ b/stamina-testkit/src/main/scala/stamina/testkit/StaminaTestKit.scala @@ -26,7 +26,7 @@ trait StaminaTestKit { self: org.scalatest.WordSpecLike ⇒ private def generateRoundtripTestFor(sample: PersistableSample) = { s"persist and unpersist $sample" in { - persisters.unpersist(persisters.persistAndWrap(sample.persistable)) should equal(sample.persistable) + persisters.unpersist(persisters.persist(sample.persistable)) should equal(sample.persistable) } } @@ -42,10 +42,10 @@ trait StaminaTestKit { self: org.scalatest.WordSpecLike ⇒ def latestVersion(persistable: AnyRef) = Try(persisters.persisters.filter(_.canPersist(persistable)).map(_.currentVersion).max).toOption private def verifyByteStringDeserialization(sample: PersistableSample, version: Int, latestVersion: Int): Unit = { - val serialized = persisters.persistAndWrap(sample.persistable) + val serialized = persisters.persist(sample.persistable) byteStringFromResource(serialized.key, version, sample.sampleId) match { case Success(binary) ⇒ - persisters.unpersist(binary) should equal(sample.persistable) + persisters.unpersist(Manifest(serialized.key, version), binary) should equal(sample.persistable) case Failure(_: java.io.FileNotFoundException) if version == latestVersion ⇒ val writtenToPath = saveByteArrayToTargetSerializationDirectory(serialized.bytes.toArray, serialized.key, version, sample.sampleId) fail(s"You appear to have added a new serialization sample to the stamina persisters' test.\n" + @@ -67,7 +67,7 @@ trait StaminaTestKit { self: org.scalatest.WordSpecLike ⇒ } } - private def byteStringFromResource(key: String, version: Int, sampleId: String): Try[Persisted] = { + private def byteStringFromResource(key: String, version: Int, sampleId: String): Try[Array[Byte]] = { import scala.io.Source val resourceName = s"/$serializedObjectsPackage/${filename(key, version, sampleId)}" @@ -75,8 +75,6 @@ trait StaminaTestKit { self: org.scalatest.WordSpecLike ⇒ .map(Success(_)).getOrElse(Failure(new java.io.FileNotFoundException(resourceName))) .map(Source.fromInputStream(_).mkString) .flatMap(base64.Decode(_)) - .map(akka.util.ByteString(_)) - .map(Persisted(key, version, _)) } private def saveByteArrayToTargetSerializationDirectory(bytes: Array[Byte], key: String, version: Int, sampleId: String) = { From 111ba6c63649c16500834e4009d81af0da4c202e Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Mon, 19 Oct 2015 20:52:06 +0200 Subject: [PATCH 06/19] Clarify scaladoc --- .../scala/stamina/CodecBasedStaminaAkkaSerializer.scala | 7 ++++++- .../src/main/scala/stamina/StaminaAkkaSerializer.scala | 2 ++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/stamina-core/src/main/scala/stamina/CodecBasedStaminaAkkaSerializer.scala b/stamina-core/src/main/scala/stamina/CodecBasedStaminaAkkaSerializer.scala index af31221..ac7dfc7 100644 --- a/stamina-core/src/main/scala/stamina/CodecBasedStaminaAkkaSerializer.scala +++ b/stamina-core/src/main/scala/stamina/CodecBasedStaminaAkkaSerializer.scala @@ -3,7 +3,12 @@ package stamina import akka.serialization._ /** - * A custom Akka Serializer specifically designed for use with Akka Persistence. + * A custom Akka Serializer encoding key and version along with the serialized object. + * + * This is particularly useful when there is no separate field for metadata, such as when + * dealing with pre-akka-2.3 persistence. + * + * Wrapping/unwrapping the metadata around the serialized object is done by the Codec. */ abstract class CodecBasedStaminaAkkaSerializer private[stamina] (persisters: Persisters, codec: PersistedCodec) extends Serializer { def this(persisters: List[Persister[_, _]], codec: PersistedCodec = DefaultPersistedCodec) = this(Persisters(persisters), codec) diff --git a/stamina-core/src/main/scala/stamina/StaminaAkkaSerializer.scala b/stamina-core/src/main/scala/stamina/StaminaAkkaSerializer.scala index 4d8da96..1d43d34 100644 --- a/stamina-core/src/main/scala/stamina/StaminaAkkaSerializer.scala +++ b/stamina-core/src/main/scala/stamina/StaminaAkkaSerializer.scala @@ -4,6 +4,8 @@ import akka.serialization._ /** * A custom Akka Serializer specifically designed for use with Akka Persistence. + * + * Key and version information is encoded in the manifest. */ abstract class StaminaAkkaSerializer private[stamina] (persisters: Persisters) extends SerializerWithStringManifest { def this(persisters: List[Persister[_, _]]) = this(Persisters(persisters)) From 1855834a18f795aa377b7f5b03c411f555163e4f Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Mon, 19 Oct 2015 20:57:18 +0200 Subject: [PATCH 07/19] Remove intermediate Persisted when throwing UnrecoverableDataException --- stamina-core/src/main/scala/stamina/Persister.scala | 5 +---- stamina-core/src/main/scala/stamina/stamina.scala | 4 ++-- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/stamina-core/src/main/scala/stamina/Persister.scala b/stamina-core/src/main/scala/stamina/Persister.scala index 5e6bc4a..170be15 100644 --- a/stamina-core/src/main/scala/stamina/Persister.scala +++ b/stamina-core/src/main/scala/stamina/Persister.scala @@ -34,10 +34,7 @@ abstract class Persister[T: ClassTag, V <: Version: VersionInfo](val key: String private[stamina] def unpersistAny(manifest: Manifest, persistedBytes: Array[Byte]): AnyRef = { Try(unpersist(manifest, persistedBytes).asInstanceOf[AnyRef]) match { case Success(anyref) ⇒ anyref - case Failure(error) ⇒ - // TODO simplify - val persisted = Persisted(key, manifest.version, ByteString(persistedBytes)) - throw UnrecoverableDataException(persisted, error) + case Failure(error) ⇒ throw UnrecoverableDataException(manifest, error) } } } diff --git a/stamina-core/src/main/scala/stamina/stamina.scala b/stamina-core/src/main/scala/stamina/stamina.scala index ae97bbd..ba02abc 100644 --- a/stamina-core/src/main/scala/stamina/stamina.scala +++ b/stamina-core/src/main/scala/stamina/stamina.scala @@ -34,8 +34,8 @@ package stamina { extends RuntimeException(s"No unpersister registered for key: '$key' and version: $version") with NoStackTrace - case class UnrecoverableDataException(persisted: Persisted, error: Throwable) - extends RuntimeException(s"Error while trying to unpersist data with key '${persisted.key}' and version ${persisted.version}. Cause: ${error}") + case class UnrecoverableDataException(manifest: Manifest, error: Throwable) + extends RuntimeException(s"Error while trying to unpersist data with key '${manifest.key}' and version ${manifest.version}. Cause: ${error}") with NoStackTrace case class Manifest(manifest: String) { From de541448bfb5777a3de597eb02d820f383620ff3 Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Sat, 24 Oct 2015 22:43:44 +0200 Subject: [PATCH 08/19] Make Persisters API a bit more balanced --- .../CodecBasedStaminaAkkaSerializer.scala | 5 ++-- .../src/main/scala/stamina/Persisted.scala | 7 +++-- .../main/scala/stamina/PersistedCodec.scala | 2 +- .../src/main/scala/stamina/Persisters.scala | 28 ++++++------------- .../scala/stamina/StaminaAkkaSerializer.scala | 4 +-- 5 files changed, 19 insertions(+), 27 deletions(-) diff --git a/stamina-core/src/main/scala/stamina/CodecBasedStaminaAkkaSerializer.scala b/stamina-core/src/main/scala/stamina/CodecBasedStaminaAkkaSerializer.scala index ac7dfc7..a70ec37 100644 --- a/stamina-core/src/main/scala/stamina/CodecBasedStaminaAkkaSerializer.scala +++ b/stamina-core/src/main/scala/stamina/CodecBasedStaminaAkkaSerializer.scala @@ -35,10 +35,9 @@ abstract class CodecBasedStaminaAkkaSerializer private[stamina] (persisters: Per */ def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = { val persisted = codec.readPersisted(bytes) - val manifest = Manifest(persisted.key, persisted.version) - if (!persisters.canUnpersist(manifest)) throw UnsupportedDataException(persisted.key, persisted.version) + if (!persisters.canUnpersist(persisted.manifest)) throw UnsupportedDataException(persisted.key, persisted.version) - persisters.unpersist(manifest, persisted.bytes.toArray) + persisters.unpersist(persisted) } } diff --git a/stamina-core/src/main/scala/stamina/Persisted.scala b/stamina-core/src/main/scala/stamina/Persisted.scala index 5eac7a7..05a6593 100644 --- a/stamina-core/src/main/scala/stamina/Persisted.scala +++ b/stamina-core/src/main/scala/stamina/Persisted.scala @@ -4,8 +4,11 @@ package stamina * A simple container holding a persistence key, a version number, * and the associated serialized bytes. */ -case class Persisted(key: String, version: Int, bytes: ByteString) +case class Persisted(key: String, version: Int, bytes: Array[Byte]) { + lazy val manifest = Manifest(key, version) +} object Persisted { - def apply(key: String, version: Int, bytes: Array[Byte]): Persisted = apply(key, version, ByteString(bytes)) + def apply(manifest: Manifest, bytes: Array[Byte]): Persisted = apply(manifest.key, manifest.version, bytes) + def apply(key: String, version: Int, bytes: ByteString): Persisted = apply(key, version, bytes.toArray) } diff --git a/stamina-core/src/main/scala/stamina/PersistedCodec.scala b/stamina-core/src/main/scala/stamina/PersistedCodec.scala index 7ec3ffe..958bde2 100644 --- a/stamina-core/src/main/scala/stamina/PersistedCodec.scala +++ b/stamina-core/src/main/scala/stamina/PersistedCodec.scala @@ -33,7 +33,7 @@ object DefaultPersistedCodec extends PersistedCodec { putInt(keyBytes.length). putBytes(keyBytes). putInt(persisted.version). - append(persisted.bytes). + append(ByteString(persisted.bytes)). result. toArray } diff --git a/stamina-core/src/main/scala/stamina/Persisters.scala b/stamina-core/src/main/scala/stamina/Persisters.scala index d836cec..3ace540 100644 --- a/stamina-core/src/main/scala/stamina/Persisters.scala +++ b/stamina-core/src/main/scala/stamina/Persisters.scala @@ -14,28 +14,18 @@ case class Persisters(persisters: List[Persister[_, _]]) { def canUnpersist(manifest: Manifest): Boolean = persisters.exists(_.canUnpersist(manifest)) // format: OFF - def manifest(anyref: AnyRef): Manifest = { - persisters.find(_.canPersist(anyref)) - .map(_.currentManifest) - .getOrElse(throw UnregisteredTypeException(anyref)) - } + private def persister[T <: AnyRef](anyref: T): Persister[T, _] = + persisters + .find(_.canPersist(anyref)) + .map(_.asInstanceOf[Persister[T, _]]) + .getOrElse(throw UnregisteredTypeException(anyref)) - def bytes(anyref: AnyRef): Array[Byte] = { - persisters.find(_.canPersist(anyref)) - .map(_.persistAny(anyref)) - .getOrElse(throw UnregisteredTypeException(anyref)) - } + def manifest(anyref: AnyRef): Manifest = + persister(anyref).currentManifest def persist(anyref: AnyRef): Persisted = { - persisters.find(_.canPersist(anyref)) - .map(p => Persisted(p.key, p.currentVersion, p.persistAny(anyref))) - .getOrElse(throw UnregisteredTypeException(anyref)) - } - - def unpersist(manifest: Manifest, persisted: Array[Byte]): AnyRef = { - persisters.find(_.canUnpersist(manifest)) - .map(_.unpersistAny(manifest, persisted)) - .getOrElse(throw UnsupportedDataException(manifest.key, manifest.version)) + val p = persister(anyref) + Persisted(p.currentManifest, p.persistAny(anyref)) } def unpersist(persisted: Persisted): AnyRef = { diff --git a/stamina-core/src/main/scala/stamina/StaminaAkkaSerializer.scala b/stamina-core/src/main/scala/stamina/StaminaAkkaSerializer.scala index 1d43d34..6d9abf3 100644 --- a/stamina-core/src/main/scala/stamina/StaminaAkkaSerializer.scala +++ b/stamina-core/src/main/scala/stamina/StaminaAkkaSerializer.scala @@ -23,7 +23,7 @@ abstract class StaminaAkkaSerializer private[stamina] (persisters: Persisters) e def toBinary(obj: AnyRef): Array[Byte] = { if (!persisters.canPersist(obj)) throw UnregisteredTypeException(obj) - persisters.bytes(obj) + persisters.persist(obj).bytes } /** @@ -35,6 +35,6 @@ abstract class StaminaAkkaSerializer private[stamina] (persisters: Persisters) e val m = Manifest(manifest) if (!persisters.canUnpersist(m)) throw UnsupportedDataException(m.key, m.version) - persisters.unpersist(m, bytes) + persisters.unpersist(Persisted(m, bytes)) } } From 6bf76382495c31ee9327060e1769d55a8efc2817 Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Sun, 25 Oct 2015 10:19:31 +0100 Subject: [PATCH 09/19] Fix compilation error --- .../src/main/scala/stamina/testkit/StaminaTestKit.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stamina-testkit/src/main/scala/stamina/testkit/StaminaTestKit.scala b/stamina-testkit/src/main/scala/stamina/testkit/StaminaTestKit.scala index 33c46be..03ad14f 100644 --- a/stamina-testkit/src/main/scala/stamina/testkit/StaminaTestKit.scala +++ b/stamina-testkit/src/main/scala/stamina/testkit/StaminaTestKit.scala @@ -45,7 +45,7 @@ trait StaminaTestKit { self: org.scalatest.WordSpecLike ⇒ val serialized = persisters.persist(sample.persistable) byteStringFromResource(serialized.key, version, sample.sampleId) match { case Success(binary) ⇒ - persisters.unpersist(Manifest(serialized.key, version), binary) should equal(sample.persistable) + persisters.unpersist(Persisted(Manifest(serialized.key, version), binary)) should equal(sample.persistable) case Failure(_: java.io.FileNotFoundException) if version == latestVersion ⇒ val writtenToPath = saveByteArrayToTargetSerializationDirectory(serialized.bytes.toArray, serialized.key, version, sample.sampleId) fail(s"You appear to have added a new serialization sample to the stamina persisters' test.\n" + From 914992ff2085f4ae9eadc97a76848807b5704907 Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Sun, 25 Oct 2015 12:18:47 +0100 Subject: [PATCH 10/19] Move codec-related code to its own package As it's not needed for the common use case going forward --- stamina-core/src/main/scala/stamina/Persisters.scala | 6 +++--- .../{ => codec}/CodecBasedStaminaAkkaSerializer.scala | 1 + .../main/scala/stamina/{ => codec}/PersistedCodec.scala | 1 + stamina-core/src/test/scala/stamina/PersistersSpec.scala | 6 +++--- .../test/scala/stamina/StaminaAkkaSerializerSpec.scala | 8 ++++---- 5 files changed, 12 insertions(+), 10 deletions(-) rename stamina-core/src/main/scala/stamina/{ => codec}/CodecBasedStaminaAkkaSerializer.scala (99%) rename stamina-core/src/main/scala/stamina/{ => codec}/PersistedCodec.scala (98%) diff --git a/stamina-core/src/main/scala/stamina/Persisters.scala b/stamina-core/src/main/scala/stamina/Persisters.scala index 3ace540..af75ab4 100644 --- a/stamina-core/src/main/scala/stamina/Persisters.scala +++ b/stamina-core/src/main/scala/stamina/Persisters.scala @@ -28,10 +28,10 @@ case class Persisters(persisters: List[Persister[_, _]]) { Persisted(p.currentManifest, p.persistAny(anyref)) } - def unpersist(persisted: Persisted): AnyRef = { - val manifest = Manifest(persisted.key, persisted.version) + def unpersist(persisted: Persisted): AnyRef = unpersist(persisted.bytes, persisted.manifest) + def unpersist(payload: Array[Byte], manifest: Manifest): AnyRef = { persisters.find(_.canUnpersist(manifest)) - .map(_.unpersistAny(manifest, persisted.bytes.toArray)) + .map(_.unpersistAny(manifest, payload)) .getOrElse(throw UnsupportedDataException(manifest.key, manifest.version)) } // format: ON diff --git a/stamina-core/src/main/scala/stamina/CodecBasedStaminaAkkaSerializer.scala b/stamina-core/src/main/scala/stamina/codec/CodecBasedStaminaAkkaSerializer.scala similarity index 99% rename from stamina-core/src/main/scala/stamina/CodecBasedStaminaAkkaSerializer.scala rename to stamina-core/src/main/scala/stamina/codec/CodecBasedStaminaAkkaSerializer.scala index a70ec37..870875d 100644 --- a/stamina-core/src/main/scala/stamina/CodecBasedStaminaAkkaSerializer.scala +++ b/stamina-core/src/main/scala/stamina/codec/CodecBasedStaminaAkkaSerializer.scala @@ -1,4 +1,5 @@ package stamina +package codec import akka.serialization._ diff --git a/stamina-core/src/main/scala/stamina/PersistedCodec.scala b/stamina-core/src/main/scala/stamina/codec/PersistedCodec.scala similarity index 98% rename from stamina-core/src/main/scala/stamina/PersistedCodec.scala rename to stamina-core/src/main/scala/stamina/codec/PersistedCodec.scala index 958bde2..b48cac5 100644 --- a/stamina-core/src/main/scala/stamina/PersistedCodec.scala +++ b/stamina-core/src/main/scala/stamina/codec/PersistedCodec.scala @@ -1,4 +1,5 @@ package stamina +package codec /** * The encoding used to translate an instance of Persisted diff --git a/stamina-core/src/test/scala/stamina/PersistersSpec.scala b/stamina-core/src/test/scala/stamina/PersistersSpec.scala index df15c5f..72ccb26 100644 --- a/stamina-core/src/test/scala/stamina/PersistersSpec.scala +++ b/stamina-core/src/test/scala/stamina/PersistersSpec.scala @@ -44,17 +44,17 @@ class PersistersSpec extends StaminaSpec { "throw an UnsupportedDataException when unpersisting data with an unknown key" in { an[UnsupportedDataException] should - be thrownBy unpersist(Persisted("unknown", 1, ByteString("..."))) + be thrownBy unpersist(Array[Byte](), Manifest("unknown", 1)) } "throw an UnsupportedDataException when deserializing data with an unsupported version" in { an[UnsupportedDataException] should - be thrownBy unpersist(Persisted("item", 2, ByteString("..."))) + be thrownBy unpersist(Array[Byte](), Manifest("item", 2)) } "throw an UnrecoverableDataException when an exception occurs while deserializing" in { an[UnrecoverableDataException] should - be thrownBy unpersist(Persisted("item", 1, ByteString("not an item"))) + be thrownBy unpersist(ByteString("not an item").toArray, itemPersister.currentManifest) } } } diff --git a/stamina-core/src/test/scala/stamina/StaminaAkkaSerializerSpec.scala b/stamina-core/src/test/scala/stamina/StaminaAkkaSerializerSpec.scala index 65a96e1..5c1ea66 100644 --- a/stamina-core/src/test/scala/stamina/StaminaAkkaSerializerSpec.scala +++ b/stamina-core/src/test/scala/stamina/StaminaAkkaSerializerSpec.scala @@ -25,22 +25,22 @@ class StaminaAkkaSerializerSpec extends StaminaSpec { } "throw an UnregisteredTypeException when serializing an unregistered type" in { - a[UnregisteredTypeException] should be thrownBy toBinary("a raw String is not supported", Manifest("foo", 32)) + a[UnregisteredTypeException] should be thrownBy toBinary(ByteString("a raw String is not supported").toArray) } "throw an UnsupportedDataException when deserializing data with an unknown key" in { an[UnsupportedDataException] should - be thrownBy fromBinary(ByteString("...").toArray, Manifest("unknown", 1).manifest) + be thrownBy fromBinary(Array[Byte](), Manifest("unknown", 1).manifest) } "throw an UnsupportedDataException when deserializing data with an unsupported version" in { an[UnsupportedDataException] should - be thrownBy fromBinary(ByteString("...").toArray, Manifest("item", 2).manifest) + be thrownBy fromBinary(Array[Byte](), Manifest("item", 2).manifest) } "throw an UnrecoverableDataException when an exception occurs while deserializing" in { an[UnrecoverableDataException] should - be thrownBy fromBinary(ByteString("not an item").toArray, Manifest("item", 1).manifest) + be thrownBy fromBinary(ByteString("not an item").toArray, itemPersister.currentManifest.manifest) } } } From 0290363d0a50d509f3908ebd133992d4b6d8707e Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Fri, 30 Oct 2015 10:44:22 +0100 Subject: [PATCH 11/19] A naive eventadapter to use stamina through EA instead of Serialization --- project/Build.scala | 3 ++- project/Dependencies.scala | 11 +++++---- .../eventadapters/StaminaEventAdapter.scala | 24 +++++++++++++++++++ 3 files changed, 32 insertions(+), 6 deletions(-) create mode 100644 stamina-core/src/main/scala/stamina/eventadapters/StaminaEventAdapter.scala diff --git a/project/Build.scala b/project/Build.scala index b8a447b..f26fd08 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -40,7 +40,8 @@ object Build extends Build { .settings(libSettings: _*) .settings(libraryDependencies ++= compile( - akkaActor + akkaActor, + akkaPersistence ) ++ test( scalatest diff --git a/project/Dependencies.scala b/project/Dependencies.scala index ca35740..8bbf959 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -2,11 +2,12 @@ import sbt._ import sbt.Keys._ object Dependencies { - val akkaActor = "com.typesafe.akka" %% "akka-actor" % "2.4.0" - val sprayJson = "io.spray" %% "spray-json" % "1.3.2" - val jsonLenses = "net.virtual-void" %% "json-lenses" % "0.6.1" - val scalatest = "org.scalatest" %% "scalatest" % "2.2.5" - val base64 = "me.lessis" %% "base64" % "0.2.0" + val akkaActor = "com.typesafe.akka" %% "akka-actor" % "2.4.0" + val akkaPersistence = "com.typesafe.akka" %% "akka-persistence" % "2.4.0" + val sprayJson = "io.spray" %% "spray-json" % "1.3.2" + val jsonLenses = "net.virtual-void" %% "json-lenses" % "0.6.1" + val scalatest = "org.scalatest" %% "scalatest" % "2.2.5" + val base64 = "me.lessis" %% "base64" % "0.2.0" // Only used by the tests val sprayJsonShapeless = "com.github.fommil" %% "spray-json-shapeless" % "1.1.0" diff --git a/stamina-core/src/main/scala/stamina/eventadapters/StaminaEventAdapter.scala b/stamina-core/src/main/scala/stamina/eventadapters/StaminaEventAdapter.scala new file mode 100644 index 0000000..615ea38 --- /dev/null +++ b/stamina-core/src/main/scala/stamina/eventadapters/StaminaEventAdapter.scala @@ -0,0 +1,24 @@ +package stamina +package eventadapters + +import akka.persistence.journal._ + +/** + * EventAdapter that uses Stamina to convert events to and from Array[Byte]. + * + * When used like this, there is little value in using an EventAdapter instead of Serializer. + * We plan to generalize this to allow persisters for other things like Array[Byte], though. + */ +class StaminaEventAdapter(persisters: Persisters) extends EventAdapter { + def this(persisters: List[Persister[_, _]]) = this(Persisters(persisters)) + def this(persister: Persister[_, _], persisters: Persister[_, _]*) = this(Persisters(persister :: persisters.toList)) + + def manifest(event: Any) = + persisters.manifest(event.asInstanceOf[AnyRef]).manifest + + def fromJournal(event: Any, manifest: String) = + EventSeq(persisters.unpersist(event.asInstanceOf[Array[Byte]], Manifest(manifest))) + + def toJournal(event: Any) = + persisters.persist(event.asInstanceOf[AnyRef]) +} From 508481afc2ee9094030d693ed60a2f9232d786db Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Fri, 30 Oct 2015 14:21:33 +0100 Subject: [PATCH 12/19] Make Persister and Persisters generic in the 'target' type --- .../src/main/scala/stamina/Persisted.scala | 6 +++--- .../src/main/scala/stamina/Persister.scala | 12 +++++------ .../src/main/scala/stamina/Persisters.scala | 20 +++++++++---------- .../scala/stamina/StaminaAkkaSerializer.scala | 8 ++++---- .../CodecBasedStaminaAkkaSerializer.scala | 6 +++--- .../scala/stamina/codec/PersistedCodec.scala | 14 ++++++------- .../eventadapters/StaminaEventAdapter.scala | 13 +++++------- .../scala/stamina/TestOnlyPersister.scala | 4 ++-- .../src/main/scala/stamina/json/json.scala | 2 +- .../stamina/testkit/StaminaTestKit.scala | 4 ++-- .../testkit/ScalatestTestGenerationSpec.scala | 2 +- 11 files changed, 44 insertions(+), 47 deletions(-) diff --git a/stamina-core/src/main/scala/stamina/Persisted.scala b/stamina-core/src/main/scala/stamina/Persisted.scala index 05a6593..f73b47a 100644 --- a/stamina-core/src/main/scala/stamina/Persisted.scala +++ b/stamina-core/src/main/scala/stamina/Persisted.scala @@ -4,11 +4,11 @@ package stamina * A simple container holding a persistence key, a version number, * and the associated serialized bytes. */ -case class Persisted(key: String, version: Int, bytes: Array[Byte]) { +case class Persisted[P <: AnyRef](key: String, version: Int, persisted: P) { lazy val manifest = Manifest(key, version) } object Persisted { - def apply(manifest: Manifest, bytes: Array[Byte]): Persisted = apply(manifest.key, manifest.version, bytes) - def apply(key: String, version: Int, bytes: ByteString): Persisted = apply(key, version, bytes.toArray) + def apply[P <: AnyRef](manifest: Manifest, persisted: P): Persisted[P] = apply(manifest.key, manifest.version, persisted) + def apply(key: String, version: Int, bytes: ByteString): Persisted[Array[Byte]] = apply[Array[Byte]](key, version, bytes.toArray) } diff --git a/stamina-core/src/main/scala/stamina/Persister.scala b/stamina-core/src/main/scala/stamina/Persister.scala index 170be15..4d3d9e9 100644 --- a/stamina-core/src/main/scala/stamina/Persister.scala +++ b/stamina-core/src/main/scala/stamina/Persister.scala @@ -8,12 +8,12 @@ import scala.util._ * at version V and unpersisting persisted instances of T for all versions up * to and including version V. */ -abstract class Persister[T: ClassTag, V <: Version: VersionInfo](val key: String) { +abstract class Persister[T: ClassTag, P <: AnyRef, V <: Version: VersionInfo](val key: String) { lazy val currentVersion = Version.numberFor[V] lazy val currentManifest = Manifest(key, currentVersion) - def persist(t: T): Array[Byte] - def unpersist(manifest: Manifest, persisted: Array[Byte]): T + def persist(t: T): P + def unpersist(manifest: Manifest, persisted: P): T def canPersist(a: AnyRef): Boolean = convertToT(a).isDefined def canUnpersist(m: Manifest): Boolean = m.key == key && m.version <= currentVersion @@ -23,7 +23,7 @@ abstract class Persister[T: ClassTag, V <: Version: VersionInfo](val key: String case _ ⇒ None } - private[stamina] def persistAny(any: AnyRef): Array[Byte] = { + private[stamina] def persistAny(any: AnyRef): P = { convertToT(any).map(persist(_)).getOrElse( throw new IllegalArgumentException( s"persistAny() was called on Persister[${implicitly[ClassTag[T]].runtimeClass}] with an instance of ${any.getClass}." @@ -31,8 +31,8 @@ abstract class Persister[T: ClassTag, V <: Version: VersionInfo](val key: String ) } - private[stamina] def unpersistAny(manifest: Manifest, persistedBytes: Array[Byte]): AnyRef = { - Try(unpersist(manifest, persistedBytes).asInstanceOf[AnyRef]) match { + private[stamina] def unpersistAny(manifest: Manifest, persisted: AnyRef): AnyRef = { + Try(unpersist(manifest, persisted.asInstanceOf[P]).asInstanceOf[AnyRef]) match { case Success(anyref) ⇒ anyref case Failure(error) ⇒ throw UnrecoverableDataException(manifest, error) } diff --git a/stamina-core/src/main/scala/stamina/Persisters.scala b/stamina-core/src/main/scala/stamina/Persisters.scala index af75ab4..66fd00c 100644 --- a/stamina-core/src/main/scala/stamina/Persisters.scala +++ b/stamina-core/src/main/scala/stamina/Persisters.scala @@ -9,37 +9,37 @@ import scala.reflect.ClassTag * one single entry-point for subclasses of StaminaAkkaSerializer * */ -case class Persisters(persisters: List[Persister[_, _]]) { +case class Persisters[P <: AnyRef](persisters: List[Persister[_, P, _]]) { def canPersist(a: AnyRef): Boolean = persisters.exists(_.canPersist(a)) def canUnpersist(manifest: Manifest): Boolean = persisters.exists(_.canUnpersist(manifest)) // format: OFF - private def persister[T <: AnyRef](anyref: T): Persister[T, _] = + private def persister[T <: AnyRef](anyref: T): Persister[T, P, _] = persisters .find(_.canPersist(anyref)) - .map(_.asInstanceOf[Persister[T, _]]) + .map(_.asInstanceOf[Persister[T, P, _]]) .getOrElse(throw UnregisteredTypeException(anyref)) def manifest(anyref: AnyRef): Manifest = persister(anyref).currentManifest - def persist(anyref: AnyRef): Persisted = { + def persist(anyref: AnyRef): Persisted[P] = { val p = persister(anyref) Persisted(p.currentManifest, p.persistAny(anyref)) } - def unpersist(persisted: Persisted): AnyRef = unpersist(persisted.bytes, persisted.manifest) - def unpersist(payload: Array[Byte], manifest: Manifest): AnyRef = { + def unpersist(persisted: Persisted[P]): AnyRef = unpersist(persisted.persisted, persisted.manifest) + def unpersist(persisted: AnyRef, manifest: Manifest): AnyRef = { persisters.find(_.canUnpersist(manifest)) - .map(_.unpersistAny(manifest, payload)) + .map(_.unpersistAny(manifest, persisted)) .getOrElse(throw UnsupportedDataException(manifest.key, manifest.version)) } // format: ON - def ++(other: Persisters): Persisters = Persisters(persisters ++ other.persisters) + def ++(other: Persisters[P]): Persisters[P] = Persisters(persisters ++ other.persisters) } object Persisters { - def apply[T: ClassTag, V <: Version: VersionInfo](persister: Persister[T, V]): Persisters = apply(List(persister)) - def apply(first: Persister[_, _], rest: Persister[_, _]*): Persisters = apply(first :: rest.toList) + def apply[T: ClassTag, P <: AnyRef, V <: Version: VersionInfo](persister: Persister[T, P, V]): Persisters[P] = apply(List(persister)) + def apply[P <: AnyRef](first: Persister[_, P, _], rest: Persister[_, P, _]*): Persisters[P] = apply(first :: rest.toList) } diff --git a/stamina-core/src/main/scala/stamina/StaminaAkkaSerializer.scala b/stamina-core/src/main/scala/stamina/StaminaAkkaSerializer.scala index 6d9abf3..de762cf 100644 --- a/stamina-core/src/main/scala/stamina/StaminaAkkaSerializer.scala +++ b/stamina-core/src/main/scala/stamina/StaminaAkkaSerializer.scala @@ -7,9 +7,9 @@ import akka.serialization._ * * Key and version information is encoded in the manifest. */ -abstract class StaminaAkkaSerializer private[stamina] (persisters: Persisters) extends SerializerWithStringManifest { - def this(persisters: List[Persister[_, _]]) = this(Persisters(persisters)) - def this(persister: Persister[_, _], persisters: Persister[_, _]*) = this(Persisters(persister :: persisters.toList)) +abstract class StaminaAkkaSerializer private[stamina] (persisters: Persisters[Array[Byte]]) extends SerializerWithStringManifest { + def this(persisters: List[Persister[_, Array[Byte], _]]) = this(Persisters(persisters)) + def this(persister: Persister[_, Array[Byte], _], persisters: Persister[_, Array[Byte], _]*) = this(Persisters(persister :: persisters.toList)) /** Uniquely identifies this Serializer. */ val identifier = 490304 @@ -23,7 +23,7 @@ abstract class StaminaAkkaSerializer private[stamina] (persisters: Persisters) e def toBinary(obj: AnyRef): Array[Byte] = { if (!persisters.canPersist(obj)) throw UnregisteredTypeException(obj) - persisters.persist(obj).bytes + persisters.persist(obj).persisted } /** diff --git a/stamina-core/src/main/scala/stamina/codec/CodecBasedStaminaAkkaSerializer.scala b/stamina-core/src/main/scala/stamina/codec/CodecBasedStaminaAkkaSerializer.scala index 870875d..4603085 100644 --- a/stamina-core/src/main/scala/stamina/codec/CodecBasedStaminaAkkaSerializer.scala +++ b/stamina-core/src/main/scala/stamina/codec/CodecBasedStaminaAkkaSerializer.scala @@ -11,9 +11,9 @@ import akka.serialization._ * * Wrapping/unwrapping the metadata around the serialized object is done by the Codec. */ -abstract class CodecBasedStaminaAkkaSerializer private[stamina] (persisters: Persisters, codec: PersistedCodec) extends Serializer { - def this(persisters: List[Persister[_, _]], codec: PersistedCodec = DefaultPersistedCodec) = this(Persisters(persisters), codec) - def this(persister: Persister[_, _], persisters: Persister[_, _]*) = this(Persisters(persister :: persisters.toList), DefaultPersistedCodec) +abstract class CodecBasedStaminaAkkaSerializer private[stamina] (persisters: Persisters[Array[Byte]], codec: PersistedCodec[Array[Byte]]) extends Serializer { + def this(persisters: List[Persister[_, Array[Byte], _]], codec: PersistedCodec[Array[Byte]] = DefaultPersistedCodec) = this(Persisters(persisters), codec) + def this(persister: Persister[_, Array[Byte], _], persisters: Persister[_, Array[Byte], _]*) = this(Persisters(persister :: persisters.toList), DefaultPersistedCodec) /** We don't need class manifests since we're using keys to identify types. */ val includeManifest: Boolean = false diff --git a/stamina-core/src/main/scala/stamina/codec/PersistedCodec.scala b/stamina-core/src/main/scala/stamina/codec/PersistedCodec.scala index b48cac5..a7ef46e 100644 --- a/stamina-core/src/main/scala/stamina/codec/PersistedCodec.scala +++ b/stamina-core/src/main/scala/stamina/codec/PersistedCodec.scala @@ -5,10 +5,10 @@ package codec * The encoding used to translate an instance of Persisted * to a byte array and back. */ -trait PersistedCodec { +trait PersistedCodec[P <: AnyRef] { def identifier: Int - def writePersisted(persisted: Persisted): Array[Byte] - def readPersisted(bytes: Array[Byte]): Persisted + def writePersisted(persisted: Persisted[P]): Array[Byte] + def readPersisted(bytes: Array[Byte]): Persisted[P] } /** @@ -20,13 +20,13 @@ trait PersistedCodec { * - persisted data (n bytes) * */ -object DefaultPersistedCodec extends PersistedCodec { +object DefaultPersistedCodec extends PersistedCodec[Array[Byte]] { implicit val byteOrder = java.nio.ByteOrder.LITTLE_ENDIAN import java.nio.charset.StandardCharsets._ def identifier = 490303 - def writePersisted(persisted: Persisted): Array[Byte] = { + def writePersisted(persisted: Persisted[Array[Byte]]): Array[Byte] = { val keyBytes = persisted.key.getBytes(UTF_8) ByteString. @@ -34,12 +34,12 @@ object DefaultPersistedCodec extends PersistedCodec { putInt(keyBytes.length). putBytes(keyBytes). putInt(persisted.version). - append(ByteString(persisted.bytes)). + append(ByteString(persisted.persisted)). result. toArray } - def readPersisted(byteArray: Array[Byte]): Persisted = { + def readPersisted(byteArray: Array[Byte]): Persisted[Array[Byte]] = { val bytes = ByteString(byteArray) val keyLength = bytes.take(4).iterator.getInt val (keyBytes, rest) = bytes.drop(4).splitAt(keyLength) diff --git a/stamina-core/src/main/scala/stamina/eventadapters/StaminaEventAdapter.scala b/stamina-core/src/main/scala/stamina/eventadapters/StaminaEventAdapter.scala index 615ea38..0326e52 100644 --- a/stamina-core/src/main/scala/stamina/eventadapters/StaminaEventAdapter.scala +++ b/stamina-core/src/main/scala/stamina/eventadapters/StaminaEventAdapter.scala @@ -4,20 +4,17 @@ package eventadapters import akka.persistence.journal._ /** - * EventAdapter that uses Stamina to convert events to and from Array[Byte]. - * - * When used like this, there is little value in using an EventAdapter instead of Serializer. - * We plan to generalize this to allow persisters for other things like Array[Byte], though. + * EventAdapter that uses Stamina to convert events. */ -class StaminaEventAdapter(persisters: Persisters) extends EventAdapter { - def this(persisters: List[Persister[_, _]]) = this(Persisters(persisters)) - def this(persister: Persister[_, _], persisters: Persister[_, _]*) = this(Persisters(persister :: persisters.toList)) +class StaminaEventAdapter[P <: AnyRef](persisters: Persisters[P]) extends EventAdapter { + def this(persisters: List[Persister[_, P, _]]) = this(Persisters(persisters)) + def this(persister: Persister[_, P, _], persisters: Persister[_, P, _]*) = this(Persisters(persister :: persisters.toList)) def manifest(event: Any) = persisters.manifest(event.asInstanceOf[AnyRef]).manifest def fromJournal(event: Any, manifest: String) = - EventSeq(persisters.unpersist(event.asInstanceOf[Array[Byte]], Manifest(manifest))) + EventSeq(persisters.unpersist(event.asInstanceOf[AnyRef], Manifest(manifest))) def toJournal(event: Any) = persisters.persist(event.asInstanceOf[AnyRef]) diff --git a/stamina-core/src/test/scala/stamina/TestOnlyPersister.scala b/stamina-core/src/test/scala/stamina/TestOnlyPersister.scala index 0986eb3..ff6d204 100644 --- a/stamina-core/src/test/scala/stamina/TestOnlyPersister.scala +++ b/stamina-core/src/test/scala/stamina/TestOnlyPersister.scala @@ -9,9 +9,9 @@ object TestOnlyPersister { private val javaSerializer = new JavaSerializer(system.asInstanceOf[ExtendedActorSystem]) import javaSerializer._ - def persister[T <: AnyRef: ClassTag](key: String): Persister[T, V1] = new JavaPersister[T](key) + def persister[T <: AnyRef: ClassTag](key: String): Persister[T, Array[Byte], V1] = new JavaPersister[T](key) - private class JavaPersister[T <: AnyRef: ClassTag](key: String) extends Persister[T, V1](key) { + private class JavaPersister[T <: AnyRef: ClassTag](key: String) extends Persister[T, Array[Byte], V1](key) { def persist(t: T): Array[Byte] = toBinary(t) def unpersist(manifest: Manifest, p: Array[Byte]): T = { if (canUnpersist(manifest)) fromBinary(p).asInstanceOf[T] diff --git a/stamina-json/src/main/scala/stamina/json/json.scala b/stamina-json/src/main/scala/stamina/json/json.scala index b99c98a..ed5ce0b 100644 --- a/stamina-json/src/main/scala/stamina/json/json.scala +++ b/stamina-json/src/main/scala/stamina/json/json.scala @@ -66,7 +66,7 @@ package json { /** * Simple abstract marker superclass to unify (and hide) the two internal Persister implementations. */ - sealed abstract class JsonPersister[T: RootJsonFormat: ClassTag, V <: Version: VersionInfo](key: String) extends Persister[T, V](key) { + sealed abstract class JsonPersister[T: RootJsonFormat: ClassTag, V <: Version: VersionInfo](key: String) extends Persister[T, Array[Byte], V](key) { private[json] def cannotUnpersist(manifest: Manifest) = s"""JsonPersister[${implicitly[ClassTag[T]].runtimeClass.getSimpleName}, V${currentVersion}](key = "${key}") cannot unpersist data with manifest "$manifest".""" } diff --git a/stamina-testkit/src/main/scala/stamina/testkit/StaminaTestKit.scala b/stamina-testkit/src/main/scala/stamina/testkit/StaminaTestKit.scala index 03ad14f..5e48e57 100644 --- a/stamina-testkit/src/main/scala/stamina/testkit/StaminaTestKit.scala +++ b/stamina-testkit/src/main/scala/stamina/testkit/StaminaTestKit.scala @@ -15,7 +15,7 @@ trait StaminaTestKit { self: org.scalatest.WordSpecLike ⇒ def sample(sampleId: String, persistable: AnyRef) = new PersistableSample(sampleId, persistable, Some(sampleId)) def sample(sampleId: String, persistable: AnyRef, description: String) = new PersistableSample(sampleId, persistable, Some(description)) - implicit class TestablePersisters(persisters: Persisters) extends org.scalatest.Matchers { + implicit class TestablePersisters(persisters: Persisters[Array[Byte]]) extends org.scalatest.Matchers { def generateTestsFor(samples: PersistableSample*): Unit = { samples.foreach { sample ⇒ generateRoundtripTestFor(sample) @@ -47,7 +47,7 @@ trait StaminaTestKit { self: org.scalatest.WordSpecLike ⇒ case Success(binary) ⇒ persisters.unpersist(Persisted(Manifest(serialized.key, version), binary)) should equal(sample.persistable) case Failure(_: java.io.FileNotFoundException) if version == latestVersion ⇒ - val writtenToPath = saveByteArrayToTargetSerializationDirectory(serialized.bytes.toArray, serialized.key, version, sample.sampleId) + val writtenToPath = saveByteArrayToTargetSerializationDirectory(serialized.persisted.toArray, serialized.key, version, sample.sampleId) fail(s"You appear to have added a new serialization sample to the stamina persisters' test.\n" + "A serialized version of this sample must be stored as a project resource for future reference, to ensure future versions of the software can still correctly deserialize serialized objects in this format.\n" + "Please copy the generated serialized data into the project test resources:\n" + diff --git a/stamina-testkit/src/test/scala/stamina/testkit/ScalatestTestGenerationSpec.scala b/stamina-testkit/src/test/scala/stamina/testkit/ScalatestTestGenerationSpec.scala index 346330e..8904472 100644 --- a/stamina-testkit/src/test/scala/stamina/testkit/ScalatestTestGenerationSpec.scala +++ b/stamina-testkit/src/test/scala/stamina/testkit/ScalatestTestGenerationSpec.scala @@ -11,7 +11,7 @@ class ScalatestTestGenerationSpec extends StaminaTestKitSpec { import TestDomain._ - case class ItemPersister(override val key: String) extends Persister[Item, V1](key) { + case class ItemPersister(override val key: String) extends Persister[Item, Array[Byte], V1](key) { def persist(t: Item): Array[Byte] = Array[Byte]() def unpersist(manifest: Manifest, p: Array[Byte]): Item = item1 } From 2f760647ead1430fbffdbed1ff4f8a1720d62fe6 Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Fri, 30 Oct 2015 22:21:16 +0100 Subject: [PATCH 13/19] Make JsonPersister produce JsValue instead of Array[Byte] And add conversion to convert to a Persisters[Array[Byte]] anyway. --- .../eventadapters/StaminaEventAdapter.scala | 6 ++++- .../src/main/scala/stamina/json/json.scala | 26 ++++++++++++------- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/stamina-core/src/main/scala/stamina/eventadapters/StaminaEventAdapter.scala b/stamina-core/src/main/scala/stamina/eventadapters/StaminaEventAdapter.scala index 0326e52..7d46223 100644 --- a/stamina-core/src/main/scala/stamina/eventadapters/StaminaEventAdapter.scala +++ b/stamina-core/src/main/scala/stamina/eventadapters/StaminaEventAdapter.scala @@ -4,7 +4,11 @@ package eventadapters import akka.persistence.journal._ /** - * EventAdapter that uses Stamina to convert events. + * EventAdapter that uses Stamina to convert events and perform schema + * evolution when reading from the journal.. + * + * Remember to configure akka-serialization to correctly serialize objects + * of type P (unless it's treated specially by your journal plugin). */ class StaminaEventAdapter[P <: AnyRef](persisters: Persisters[P]) extends EventAdapter { def this(persisters: List[Persister[_, P, _]]) = this(Persisters(persisters)) diff --git a/stamina-json/src/main/scala/stamina/json/json.scala b/stamina-json/src/main/scala/stamina/json/json.scala index ed5ce0b..891cd30 100644 --- a/stamina-json/src/main/scala/stamina/json/json.scala +++ b/stamina-json/src/main/scala/stamina/json/json.scala @@ -55,10 +55,18 @@ package object json { */ def persister[T: RootJsonFormat: ClassTag, V <: Version: VersionInfo: MigratableVersion](key: String, migrator: JsonMigrator[V]): JsonPersister[T, V] = new VnJsonPersister[T, V](key, migrator) + def toByteArrayPersister[T: RootJsonFormat: ClassTag, V <: Version: VersionInfo](persister: Persister[T, JsValue, V]): Persister[T, Array[Byte], V] = new Persister[T, Array[Byte], V](persister.key) { + def persist(t: T): Array[Byte] = persister.persist(t).compactPrint.getBytes(UTF_8) + def unpersist(manifest: Manifest, persisted: Array[Byte]) = persister.unpersist(manifest, JsonParser(ParserInput(persisted))) + } + + def toByteArrayPersisters[T: RootJsonFormat: ClassTag, V <: Version: VersionInfo](persisters: List[Persister[T, JsValue, V]]): Persisters[Array[Byte]] = + new Persisters[Array[Byte]](persisters.map(toByteArrayPersister(_))) + import java.nio.charset.StandardCharsets val UTF_8: String = StandardCharsets.UTF_8.name() - private[json] def toJsonBytes[T](t: T)(implicit writer: RootJsonWriter[T]): Array[Byte] = writer.write(t).compactPrint.getBytes(UTF_8) - private[json] def fromJsonBytes[T](bytes: Array[Byte])(implicit reader: RootJsonReader[T]): T = reader.read(parseJson(bytes)) + private[json] def toJson[T](t: T)(implicit writer: RootJsonWriter[T]): JsValue = writer.write(t) + private[json] def fromJson[T](persisted: JsValue)(implicit reader: RootJsonReader[T]): T = reader.read(persisted) private[json] def parseJson(bytes: Array[Byte]): JsValue = JsonParser(ParserInput(bytes.toArray)) } @@ -66,15 +74,15 @@ package json { /** * Simple abstract marker superclass to unify (and hide) the two internal Persister implementations. */ - sealed abstract class JsonPersister[T: RootJsonFormat: ClassTag, V <: Version: VersionInfo](key: String) extends Persister[T, Array[Byte], V](key) { + sealed abstract class JsonPersister[T: RootJsonFormat: ClassTag, V <: Version: VersionInfo](key: String) extends Persister[T, JsValue, V](key) { private[json] def cannotUnpersist(manifest: Manifest) = s"""JsonPersister[${implicitly[ClassTag[T]].runtimeClass.getSimpleName}, V${currentVersion}](key = "${key}") cannot unpersist data with manifest "$manifest".""" } private[json] class V1JsonPersister[T: RootJsonFormat: ClassTag](key: String) extends JsonPersister[T, V1](key) { - def persist(t: T): Array[Byte] = toJsonBytes(t) - def unpersist(manifest: Manifest, p: Array[Byte]): T = { - if (canUnpersist(manifest)) fromJsonBytes[T](p) + def persist(t: T): JsValue = toJson(t) + def unpersist(manifest: Manifest, p: JsValue): T = { + if (canUnpersist(manifest)) fromJson[T](p) else throw new IllegalArgumentException(cannotUnpersist(manifest)) } } @@ -82,9 +90,9 @@ package json { private[json] class VnJsonPersister[T: RootJsonFormat: ClassTag, V <: Version: VersionInfo: MigratableVersion](key: String, migrator: JsonMigrator[V]) extends JsonPersister[T, V](key) { override def canUnpersist(m: Manifest): Boolean = m.key == key && migrator.canMigrate(m.version) - def persist(t: T): Array[Byte] = toJsonBytes(t) - def unpersist(manifest: Manifest, p: Array[Byte]): T = { - if (canUnpersist(manifest)) migrator.migrate(parseJson(p), manifest.version).convertTo[T] + def persist(t: T): JsValue = toJson(t) + def unpersist(manifest: Manifest, p: JsValue): T = { + if (canUnpersist(manifest)) migrator.migrate(p, manifest.version).convertTo[T] else throw new IllegalArgumentException(cannotUnpersist(manifest)) } } From e4a61ef41450de1fc472bb0bb0af027721013120 Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Sat, 31 Oct 2015 12:36:42 +0100 Subject: [PATCH 14/19] Add test for a spray-json-based Persisters[Array[Byte]] --- .../json/ByteArrayPersistersSpec.scala | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 stamina-json/src/test/scala/stamina/json/ByteArrayPersistersSpec.scala diff --git a/stamina-json/src/test/scala/stamina/json/ByteArrayPersistersSpec.scala b/stamina-json/src/test/scala/stamina/json/ByteArrayPersistersSpec.scala new file mode 100644 index 0000000..edd62a9 --- /dev/null +++ b/stamina-json/src/test/scala/stamina/json/ByteArrayPersistersSpec.scala @@ -0,0 +1,23 @@ +package stamina +package json + +class ByteArrayPersisterSpec extends StaminaJsonSpec { + import JsonTestDomain._ + import spray.json.lenses.JsonLenses._ + import fommil.sjs.FamilyFormats._ + + val jsonPersister = persister[CartCreatedV2, V2]("cart-created", + from[V1].to[V2](_.update('cart / 'items / * / 'price ! set[Int](1000))) + ) + + val persisters = toByteArrayPersisters(List(jsonPersister)) + + "The conversion to Persisters[Array[Byte]]" should { + "produce a persister that implements a roundtrip" in { + import persisters._ + val serialized: Persisted[Array[Byte]] = persist(v2CartCreated) + serialized.manifest should equal(jsonPersister.currentManifest) + unpersist(serialized) should equal(v2CartCreated) + } + } +} From fc4aa58a0a5677681608188f38e3daccd2025a85 Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Sat, 31 Oct 2015 22:36:58 +0100 Subject: [PATCH 15/19] EventAdapter.toJournal should not include manifest --- .../main/scala/stamina/eventadapters/StaminaEventAdapter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stamina-core/src/main/scala/stamina/eventadapters/StaminaEventAdapter.scala b/stamina-core/src/main/scala/stamina/eventadapters/StaminaEventAdapter.scala index 7d46223..45ad154 100644 --- a/stamina-core/src/main/scala/stamina/eventadapters/StaminaEventAdapter.scala +++ b/stamina-core/src/main/scala/stamina/eventadapters/StaminaEventAdapter.scala @@ -21,5 +21,5 @@ class StaminaEventAdapter[P <: AnyRef](persisters: Persisters[P]) extends EventA EventSeq(persisters.unpersist(event.asInstanceOf[AnyRef], Manifest(manifest))) def toJournal(event: Any) = - persisters.persist(event.asInstanceOf[AnyRef]) + persisters.persist(event.asInstanceOf[AnyRef]).persisted } From ed469aa3cf4604f16ed6a973af1f9318f2e5619a Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Sat, 31 Oct 2015 22:46:23 +0100 Subject: [PATCH 16/19] Don't require a JsonFormat when we already have the Persister --- stamina-json/src/main/scala/stamina/json/json.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/stamina-json/src/main/scala/stamina/json/json.scala b/stamina-json/src/main/scala/stamina/json/json.scala index 891cd30..bfce141 100644 --- a/stamina-json/src/main/scala/stamina/json/json.scala +++ b/stamina-json/src/main/scala/stamina/json/json.scala @@ -55,12 +55,12 @@ package object json { */ def persister[T: RootJsonFormat: ClassTag, V <: Version: VersionInfo: MigratableVersion](key: String, migrator: JsonMigrator[V]): JsonPersister[T, V] = new VnJsonPersister[T, V](key, migrator) - def toByteArrayPersister[T: RootJsonFormat: ClassTag, V <: Version: VersionInfo](persister: Persister[T, JsValue, V]): Persister[T, Array[Byte], V] = new Persister[T, Array[Byte], V](persister.key) { + def toByteArrayPersister[T: ClassTag, V <: Version: VersionInfo](persister: Persister[T, JsValue, V]): Persister[T, Array[Byte], V] = new Persister[T, Array[Byte], V](persister.key) { def persist(t: T): Array[Byte] = persister.persist(t).compactPrint.getBytes(UTF_8) def unpersist(manifest: Manifest, persisted: Array[Byte]) = persister.unpersist(manifest, JsonParser(ParserInput(persisted))) } - def toByteArrayPersisters[T: RootJsonFormat: ClassTag, V <: Version: VersionInfo](persisters: List[Persister[T, JsValue, V]]): Persisters[Array[Byte]] = + def toByteArrayPersisters[T: ClassTag, V <: Version: VersionInfo](persisters: List[Persister[T, JsValue, V]]): Persisters[Array[Byte]] = new Persisters[Array[Byte]](persisters.map(toByteArrayPersister(_))) import java.nio.charset.StandardCharsets From b91372041b5d270edaa5057c5bd4cd2aad94652f Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Sun, 1 Nov 2015 12:30:42 +0100 Subject: [PATCH 17/19] Add a stamina-testkit-based test for spray-json --- project/Build.scala | 1 + .../serialization/cart-created-v1-default | 1 + .../serialization/checkout-started-v1-default | 1 + .../scala/stamina/json/JsonPersisterSpec.scala | 18 +++++++++++++++++- .../scala/stamina/json/JsonTestDomain.scala | 7 ++++++- .../testkit/ScalatestTestGenerationSpec.scala | 13 +++++++------ 6 files changed, 33 insertions(+), 8 deletions(-) create mode 100644 stamina-json/src/test/resources/serialization/cart-created-v1-default create mode 100644 stamina-json/src/test/resources/serialization/checkout-started-v1-default diff --git a/project/Build.scala b/project/Build.scala index b8a447b..885365f 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -49,6 +49,7 @@ object Build extends Build { lazy val json = Project("stamina-json", file("stamina-json")) .dependsOn(core) + .dependsOn(testkit % "test") .settings(libSettings: _*) .settings(libraryDependencies ++= compile( diff --git a/stamina-json/src/test/resources/serialization/cart-created-v1-default b/stamina-json/src/test/resources/serialization/cart-created-v1-default new file mode 100644 index 0000000..19ad6be --- /dev/null +++ b/stamina-json/src/test/resources/serialization/cart-created-v1-default @@ -0,0 +1 @@ +eyJjYXJ0Ijp7ImlkIjoxLCJpdGVtcyI6W3siaWQiOjEsIm5hbWUiOiJXb25rYSBCYXIiLCJwcmljZSI6NTAwfSx7ImlkIjoyLCJuYW1lIjoiRXZlcmxhc3RpbmcgR29ic3RvcHBlciIsInByaWNlIjo0ODl9XX0sInRpbWVzdGFtcCI6MTQ0NjM3NzE5OTk0M30= \ No newline at end of file diff --git a/stamina-json/src/test/resources/serialization/checkout-started-v1-default b/stamina-json/src/test/resources/serialization/checkout-started-v1-default new file mode 100644 index 0000000..4b8db02 --- /dev/null +++ b/stamina-json/src/test/resources/serialization/checkout-started-v1-default @@ -0,0 +1 @@ +eyJwYXltZW50TWV0aG9kIjoibWFlc3RybyJ9 \ No newline at end of file diff --git a/stamina-json/src/test/scala/stamina/json/JsonPersisterSpec.scala b/stamina-json/src/test/scala/stamina/json/JsonPersisterSpec.scala index a3c45e7..24c945f 100644 --- a/stamina-json/src/test/scala/stamina/json/JsonPersisterSpec.scala +++ b/stamina-json/src/test/scala/stamina/json/JsonPersisterSpec.scala @@ -1,7 +1,10 @@ package stamina package json -class JsonPersisterSpec extends StaminaJsonSpec { +import stamina.testkit._ + +class JsonPersisterSpec extends StaminaJsonSpec + with StaminaTestKit { import JsonTestDomain._ import spray.json.lenses.JsonLenses._ import fommil.sjs.FamilyFormats._ @@ -56,4 +59,17 @@ class JsonPersisterSpec extends StaminaJsonSpec { v2Unpersisted.timestamp should (be > 0L and be < System.currentTimeMillis) } } + + "a persister based on stamina-json and spray-json-shapeless" should { + import fommil.sjs.FamilyFormats._ + + val persisters = Persisters(List( + persister[CartCreatedV3]("cart-created"), + persister[CheckoutStarted]("checkout-started") + )) + + persisters.generateTestsFor( + sample(v3CartCreated), + sample(checkoutStarted)) + } } diff --git a/stamina-json/src/test/scala/stamina/json/JsonTestDomain.scala b/stamina-json/src/test/scala/stamina/json/JsonTestDomain.scala index a38c0ed..3dde6cd 100644 --- a/stamina-json/src/test/scala/stamina/json/JsonTestDomain.scala +++ b/stamina-json/src/test/scala/stamina/json/JsonTestDomain.scala @@ -41,5 +41,10 @@ object JsonTestDomain { val v3Item1 = ItemV3(1, "Wonka Bar", 500) val v3Item2 = ItemV3(2, "Everlasting Gobstopper", 489) val v3Cart = CartV3(1, List(v3Item1, v3Item2)) - val v3CartCreated = CartCreatedV3(v3Cart, System.currentTimeMillis) + val v3CartCreated = CartCreatedV3(v3Cart, 1446377199943l) + + // Others + + case class CheckoutStarted(paymentMethod: String) + val checkoutStarted = CheckoutStarted("maestro") } diff --git a/stamina-testkit/src/test/scala/stamina/testkit/ScalatestTestGenerationSpec.scala b/stamina-testkit/src/test/scala/stamina/testkit/ScalatestTestGenerationSpec.scala index e720141..15f4b4e 100644 --- a/stamina-testkit/src/test/scala/stamina/testkit/ScalatestTestGenerationSpec.scala +++ b/stamina-testkit/src/test/scala/stamina/testkit/ScalatestTestGenerationSpec.scala @@ -11,14 +11,15 @@ class ScalatestTestGenerationSpec extends StaminaTestKitSpec { import TestDomain._ - case class ItemPersister(override val key: String) extends Persister[Item, V1](key) { - def persist(t: Item): Persisted = Persisted(key, currentVersion, ByteString()) - def unpersist(p: Persisted): Item = item1 - } - "A spec generated by StaminaTestKit" should { + case class ItemPersister(override val key: String) extends Persister[Item, V1](key) { + def persist(t: Item): Persisted = Persisted(key, currentVersion, ByteString()) + def unpersist(p: Persisted): Item = item1 + } + + val persisters = Persisters(ItemPersister("item1")) + val spec = new StaminaTestKit with WordSpecLike { - val persisters = Persisters(ItemPersister("item1")) "TestDomainSerialization" should { persisters.generateTestsFor( sample(item1), From 22569cc478e14367bb55b0d551126e08cfeb0d50 Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Sun, 1 Nov 2015 14:57:52 +0100 Subject: [PATCH 18/19] Correctly convert Persister[_, JsValue, _] to Persister[_, Array[Byte], _] For example for testing. --- stamina-core/src/main/scala/stamina/Persister.scala | 8 ++++++++ stamina-json/src/main/scala/stamina/json/json.scala | 6 ++---- .../src/test/scala/stamina/json/JsonPersisterSpec.scala | 2 +- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/stamina-core/src/main/scala/stamina/Persister.scala b/stamina-core/src/main/scala/stamina/Persister.scala index 4d3d9e9..ae00bc0 100644 --- a/stamina-core/src/main/scala/stamina/Persister.scala +++ b/stamina-core/src/main/scala/stamina/Persister.scala @@ -18,6 +18,14 @@ abstract class Persister[T: ClassTag, P <: AnyRef, V <: Version: VersionInfo](va def canPersist(a: AnyRef): Boolean = convertToT(a).isDefined def canUnpersist(m: Manifest): Boolean = m.key == key && m.version <= currentVersion + def translate[U <: AnyRef](p: P => U, u: U => P): Persister[T, U, V] = { + val original = this + new Persister[T, U, V](key) { + override def persist(t: T): U = p(original.persist(t)) + override def unpersist(manifest: Manifest, persisted: U): T = original.unpersist(manifest, u(persisted)) + } + } + private[stamina] def convertToT(any: AnyRef): Option[T] = any match { case t: T ⇒ Some(t) case _ ⇒ None diff --git a/stamina-json/src/main/scala/stamina/json/json.scala b/stamina-json/src/main/scala/stamina/json/json.scala index bfce141..a384b89 100644 --- a/stamina-json/src/main/scala/stamina/json/json.scala +++ b/stamina-json/src/main/scala/stamina/json/json.scala @@ -55,10 +55,8 @@ package object json { */ def persister[T: RootJsonFormat: ClassTag, V <: Version: VersionInfo: MigratableVersion](key: String, migrator: JsonMigrator[V]): JsonPersister[T, V] = new VnJsonPersister[T, V](key, migrator) - def toByteArrayPersister[T: ClassTag, V <: Version: VersionInfo](persister: Persister[T, JsValue, V]): Persister[T, Array[Byte], V] = new Persister[T, Array[Byte], V](persister.key) { - def persist(t: T): Array[Byte] = persister.persist(t).compactPrint.getBytes(UTF_8) - def unpersist(manifest: Manifest, persisted: Array[Byte]) = persister.unpersist(manifest, JsonParser(ParserInput(persisted))) - } + def toByteArrayPersister[T: ClassTag, V <: Version: VersionInfo](persister: Persister[T, JsValue, V]): Persister[T, Array[Byte], V] = + persister.translate(_.compactPrint.getBytes(UTF_8), b => JsonParser(ParserInput(b))) def toByteArrayPersisters[T: ClassTag, V <: Version: VersionInfo](persisters: List[Persister[T, JsValue, V]]): Persisters[Array[Byte]] = new Persisters[Array[Byte]](persisters.map(toByteArrayPersister(_))) diff --git a/stamina-json/src/test/scala/stamina/json/JsonPersisterSpec.scala b/stamina-json/src/test/scala/stamina/json/JsonPersisterSpec.scala index 43c6413..f1f90ae 100644 --- a/stamina-json/src/test/scala/stamina/json/JsonPersisterSpec.scala +++ b/stamina-json/src/test/scala/stamina/json/JsonPersisterSpec.scala @@ -66,7 +66,7 @@ class JsonPersisterSpec extends StaminaJsonSpec val persisters = Persisters(List( persister[CartCreatedV3]("cart-created"), persister[CheckoutStarted]("checkout-started") - )) + ).map(toByteArrayPersister(_))) persisters.generateTestsFor( sample(v3CartCreated), From 1dd0265e3a36f8dc64be3290e7e55c95916c20e6 Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Mon, 2 Nov 2015 12:53:49 +0100 Subject: [PATCH 19/19] Add test for Persister.translate --- .../src/test/scala/stamina/PersisterSpec.scala | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 stamina-core/src/test/scala/stamina/PersisterSpec.scala diff --git a/stamina-core/src/test/scala/stamina/PersisterSpec.scala b/stamina-core/src/test/scala/stamina/PersisterSpec.scala new file mode 100644 index 0000000..4205ef0 --- /dev/null +++ b/stamina-core/src/test/scala/stamina/PersisterSpec.scala @@ -0,0 +1,18 @@ +package stamina + +class PersisterSpec extends StaminaSpec { + import TestOnlyPersister._ + import TestDomain._ + + "A Persister" should { + "correctly report whether it can persist a given class, even after translation" in { + val cartPersister = persister[CartCreated]("cartCreated") + cartPersister.canPersist(cartCreated) should be(true) + cartPersister.canPersist(item1) should be(false) + + val translatedCartPersister = cartPersister.translate(_.toString, (s: String) ⇒ s.getBytes) + translatedCartPersister.canPersist(cartCreated) should be(true) + translatedCartPersister.canPersist(item1) should be(false) + } + } +}