diff --git a/project/Build.scala b/project/Build.scala
index d371ec7..2982676 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -41,7 +41,8 @@ object Build extends Build {
.settings(libSettings: _*)
.settings(libraryDependencies ++=
compile(
- akkaActor
+ akkaActor,
+ akkaPersistence
) ++
test(
scalatest
@@ -50,6 +51,7 @@ object Build extends Build {
lazy val json = Project("stamina-json", file("stamina-json"))
.dependsOn(core)
+ .dependsOn(testkit % "test")
.settings(libSettings: _*)
.settings(libraryDependencies ++=
compile(
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 1e5a245..086fadf 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -2,11 +2,12 @@ import sbt._
import sbt.Keys._
object Dependencies {
- val akkaActor = "com.typesafe.akka" %% "akka-actor" % "2.4.2"
- val sprayJson = "io.spray" %% "spray-json" % "1.3.2"
- val jsonLenses = "net.virtual-void" %% "json-lenses" % "0.6.1"
- val scalatest = "org.scalatest" %% "scalatest" % "2.2.6"
- val base64 = "me.lessis" %% "base64" % "0.2.0"
+ val akkaActor = "com.typesafe.akka" %% "akka-actor" % "2.4.4"
+ val akkaPersistence = "com.typesafe.akka" %% "akka-persistence" % "2.4.4"
+ val sprayJson = "io.spray" %% "spray-json" % "1.3.2"
+ val jsonLenses = "net.virtual-void" %% "json-lenses" % "0.6.1"
+ val scalatest = "org.scalatest" %% "scalatest" % "2.2.5"
+ val base64 = "me.lessis" %% "base64" % "0.2.0"
// Only used by the tests
val sprayJsonShapeless = "com.github.fommil" %% "spray-json-shapeless" % "1.1.0"
diff --git a/stamina-core/src/main/scala/stamina/Persisted.scala b/stamina-core/src/main/scala/stamina/Persisted.scala
index 5eac7a7..f73b47a 100644
--- a/stamina-core/src/main/scala/stamina/Persisted.scala
+++ b/stamina-core/src/main/scala/stamina/Persisted.scala
@@ -4,8 +4,11 @@ package stamina
* A simple container holding a persistence key, a version number,
* and the associated serialized bytes.
*/
-case class Persisted(key: String, version: Int, bytes: ByteString)
+case class Persisted[P <: AnyRef](key: String, version: Int, persisted: P) {
+ lazy val manifest = Manifest(key, version)
+}
object Persisted {
- def apply(key: String, version: Int, bytes: Array[Byte]): Persisted = apply(key, version, ByteString(bytes))
+ def apply[P <: AnyRef](manifest: Manifest, persisted: P): Persisted[P] = apply(manifest.key, manifest.version, persisted)
+ def apply(key: String, version: Int, bytes: ByteString): Persisted[Array[Byte]] = apply[Array[Byte]](key, version, bytes.toArray)
}
diff --git a/stamina-core/src/main/scala/stamina/Persister.scala b/stamina-core/src/main/scala/stamina/Persister.scala
index 64fee54..e5d027a 100644
--- a/stamina-core/src/main/scala/stamina/Persister.scala
+++ b/stamina-core/src/main/scala/stamina/Persister.scala
@@ -8,21 +8,30 @@ import scala.util._
* at version V and unpersisting persisted instances of T for all versions up
* to and including version V.
*/
-abstract class Persister[T: ClassTag, V <: Version: VersionInfo](val key: String) {
+abstract class Persister[T: ClassTag, P <: AnyRef, V <: Version: VersionInfo](val key: String) {
lazy val currentVersion = Version.numberFor[V]
- def persist(t: T): Persisted
- def unpersist(persisted: Persisted): T
+ lazy val currentManifest = Manifest(key, currentVersion)
+ def persist(t: T): P
+ def unpersist(manifest: Manifest, persisted: P): T
def canPersist(a: AnyRef): Boolean = convertToT(a).isDefined
- def canUnpersist(p: Persisted): Boolean = p.key == key && p.version <= currentVersion
+ def canUnpersist(m: Manifest): Boolean = m.key == key && m.version <= currentVersion
+
+ def translate[U <: AnyRef](p: P ⇒ U, u: U ⇒ P): Persister[T, U, V] = {
+ val original = this
+ new Persister[T, U, V](key) {
+ override def persist(t: T): U = p(original.persist(t))
+ override def unpersist(manifest: Manifest, persisted: U): T = original.unpersist(manifest, u(persisted))
+ }
+ }
private[stamina] def convertToT(any: AnyRef): Option[T] = any match {
case t: T ⇒ Some(t)
case _ ⇒ None
}
- private[stamina] def persistAny(any: AnyRef): Persisted = {
+ private[stamina] def persistAny(any: AnyRef): P = {
convertToT(any).map(persist(_)).getOrElse(
throw new IllegalArgumentException(
s"persistAny() was called on Persister[${implicitly[ClassTag[T]].runtimeClass}] with an instance of ${any.getClass}."
@@ -30,10 +39,10 @@ abstract class Persister[T: ClassTag, V <: Version: VersionInfo](val key: String
)
}
- private[stamina] def unpersistAny(persisted: Persisted): AnyRef = {
- Try(unpersist(persisted).asInstanceOf[AnyRef]) match {
+ private[stamina] def unpersistAny(manifest: Manifest, persisted: AnyRef): AnyRef = {
+ Try(unpersist(manifest, persisted.asInstanceOf[P]).asInstanceOf[AnyRef]) match {
case Success(anyref) ⇒ anyref
- case Failure(error) ⇒ throw UnrecoverableDataException(persisted, error)
+ case Failure(error) ⇒ throw UnrecoverableDataException(manifest, error)
}
}
diff --git a/stamina-core/src/main/scala/stamina/Persisters.scala b/stamina-core/src/main/scala/stamina/Persisters.scala
index f724eb9..422e9e6 100644
--- a/stamina-core/src/main/scala/stamina/Persisters.scala
+++ b/stamina-core/src/main/scala/stamina/Persisters.scala
@@ -9,27 +9,36 @@ import scala.reflect.ClassTag
* one single entry-point for subclasses of StaminaAkkaSerializer
*
*/
-case class Persisters(persisters: List[Persister[_, _]]) {
+case class Persisters[P <: AnyRef](persisters: List[Persister[_, P, _]]) {
requireNoOverlappingTags()
def canPersist(a: AnyRef): Boolean = persisters.exists(_.canPersist(a))
- def canUnpersist(p: Persisted): Boolean = persisters.exists(_.canUnpersist(p))
+ def canUnpersist(manifest: Manifest): Boolean = persisters.exists(_.canUnpersist(manifest))
// format: OFF
- def persist(anyref: AnyRef): Persisted = {
- persisters.find(_.canPersist(anyref))
- .map(_.persistAny(anyref))
- .getOrElse(throw UnregisteredTypeException(anyref))
+ private def persister[T <: AnyRef](anyref: T): Persister[T, P, _] =
+ persisters
+ .find(_.canPersist(anyref))
+ .map(_.asInstanceOf[Persister[T, P, _]])
+ .getOrElse(throw UnregisteredTypeException(anyref))
+
+ def manifest(anyref: AnyRef): Manifest =
+ persister(anyref).currentManifest
+
+ def persist(anyref: AnyRef): Persisted[P] = {
+ val p = persister(anyref)
+ Persisted(p.currentManifest, p.persistAny(anyref))
}
- def unpersist(persisted: Persisted): AnyRef = {
- persisters.find(_.canUnpersist(persisted))
- .map(_.unpersistAny(persisted))
- .getOrElse(throw UnsupportedDataException(persisted))
+ def unpersist(persisted: Persisted[P]): AnyRef = unpersist(persisted.persisted, persisted.manifest)
+ def unpersist(persisted: AnyRef, manifest: Manifest): AnyRef = {
+ persisters.find(_.canUnpersist(manifest))
+ .map(_.unpersistAny(manifest, persisted))
+ .getOrElse(throw UnsupportedDataException(manifest.key, manifest.version))
}
// format: ON
- def ++(other: Persisters): Persisters = Persisters(persisters ++ other.persisters)
+ def ++(other: Persisters[P]): Persisters[P] = Persisters(persisters ++ other.persisters)
private def requireNoOverlappingTags() = {
val overlappingTags = persisters.groupBy(_.tag).filter(_._2.length > 1).mapValues(_.map(_.key))
@@ -40,6 +49,6 @@ case class Persisters(persisters: List[Persister[_, _]]) {
}
object Persisters {
- def apply[T: ClassTag, V <: Version: VersionInfo](persister: Persister[T, V]): Persisters = apply(List(persister))
- def apply(first: Persister[_, _], rest: Persister[_, _]*): Persisters = apply(first :: rest.toList)
+ def apply[T: ClassTag, P <: AnyRef, V <: Version: VersionInfo](persister: Persister[T, P, V]): Persisters[P] = apply(List(persister))
+ def apply[P <: AnyRef](first: Persister[_, P, _], rest: Persister[_, P, _]*): Persisters[P] = apply(first :: rest.toList)
}
diff --git a/stamina-core/src/main/scala/stamina/StaminaAkkaSerializer.scala b/stamina-core/src/main/scala/stamina/StaminaAkkaSerializer.scala
index 50ad105..de762cf 100644
--- a/stamina-core/src/main/scala/stamina/StaminaAkkaSerializer.scala
+++ b/stamina-core/src/main/scala/stamina/StaminaAkkaSerializer.scala
@@ -4,16 +4,18 @@ import akka.serialization._
/**
* A custom Akka Serializer specifically designed for use with Akka Persistence.
+ *
+ * Key and version information is encoded in the manifest.
*/
-abstract class StaminaAkkaSerializer private[stamina] (persisters: Persisters, codec: PersistedCodec) extends Serializer {
- def this(persisters: List[Persister[_, _]], codec: PersistedCodec = DefaultPersistedCodec) = this(Persisters(persisters), codec)
- def this(persister: Persister[_, _], persisters: Persister[_, _]*) = this(Persisters(persister :: persisters.toList), DefaultPersistedCodec)
+abstract class StaminaAkkaSerializer private[stamina] (persisters: Persisters[Array[Byte]]) extends SerializerWithStringManifest {
+ def this(persisters: List[Persister[_, Array[Byte], _]]) = this(Persisters(persisters))
+ def this(persister: Persister[_, Array[Byte], _], persisters: Persister[_, Array[Byte], _]*) = this(Persisters(persister :: persisters.toList))
- /** We don't need class manifests since we're using keys to identify types. */
- val includeManifest: Boolean = false
+ /** Uniquely identifies this Serializer. */
+ val identifier = 490304
- /** Uniquely identifies this Serializer by combining the codec with a unique number. */
- val identifier = 42 * codec.identifier
+ def manifest(obj: AnyRef): String =
+ persisters.manifest(obj).manifest
/**
* @throws UnregisteredTypeException when the specified object is not supported by the persisters.
@@ -21,18 +23,18 @@ abstract class StaminaAkkaSerializer private[stamina] (persisters: Persisters, c
def toBinary(obj: AnyRef): Array[Byte] = {
if (!persisters.canPersist(obj)) throw UnregisteredTypeException(obj)
- codec.writePersisted(persisters.persist(obj))
+ persisters.persist(obj).persisted
}
/**
* @throws UnsupportedDataException when the persisted key and/or version is not supported.
* @throws UnrecoverableDataException when the key and version are supported but recovery throws an exception.
*/
- def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
- val persisted = codec.readPersisted(bytes)
+ def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
+ if (manifest.isEmpty) throw new IllegalArgumentException("No manifest found")
+ val m = Manifest(manifest)
+ if (!persisters.canUnpersist(m)) throw UnsupportedDataException(m.key, m.version)
- if (!persisters.canUnpersist(persisted)) throw UnsupportedDataException(persisted)
-
- persisters.unpersist(persisted)
+ persisters.unpersist(Persisted(m, bytes))
}
}
diff --git a/stamina-core/src/main/scala/stamina/codec/CodecBasedStaminaAkkaSerializer.scala b/stamina-core/src/main/scala/stamina/codec/CodecBasedStaminaAkkaSerializer.scala
new file mode 100644
index 0000000..4603085
--- /dev/null
+++ b/stamina-core/src/main/scala/stamina/codec/CodecBasedStaminaAkkaSerializer.scala
@@ -0,0 +1,44 @@
+package stamina
+package codec
+
+import akka.serialization._
+
+/**
+ * A custom Akka Serializer encoding key and version along with the serialized object.
+ *
+ * This is particularly useful when there is no separate field for metadata, such as when
+ * dealing with pre-akka-2.3 persistence.
+ *
+ * Wrapping/unwrapping the metadata around the serialized object is done by the Codec.
+ */
+abstract class CodecBasedStaminaAkkaSerializer private[stamina] (persisters: Persisters[Array[Byte]], codec: PersistedCodec[Array[Byte]]) extends Serializer {
+ def this(persisters: List[Persister[_, Array[Byte], _]], codec: PersistedCodec[Array[Byte]] = DefaultPersistedCodec) = this(Persisters(persisters), codec)
+ def this(persister: Persister[_, Array[Byte], _], persisters: Persister[_, Array[Byte], _]*) = this(Persisters(persister :: persisters.toList), DefaultPersistedCodec)
+
+ /** We don't need class manifests since we're using keys to identify types. */
+ val includeManifest: Boolean = false
+
+ /** Uniquely identifies this Serializer by combining the codec with a unique number. */
+ val identifier = 42 * codec.identifier
+
+ /**
+ * @throws UnregisteredTypeException when the specified object is not supported by the persisters.
+ */
+ def toBinary(obj: AnyRef): Array[Byte] = {
+ if (!persisters.canPersist(obj)) throw UnregisteredTypeException(obj)
+
+ codec.writePersisted(persisters.persist(obj))
+ }
+
+ /**
+ * @throws UnsupportedDataException when the persisted key and/or version is not supported.
+ * @throws UnrecoverableDataException when the key and version are supported but recovery throws an exception.
+ */
+ def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
+ val persisted = codec.readPersisted(bytes)
+
+ if (!persisters.canUnpersist(persisted.manifest)) throw UnsupportedDataException(persisted.key, persisted.version)
+
+ persisters.unpersist(persisted)
+ }
+}
diff --git a/stamina-core/src/main/scala/stamina/PersistedCodec.scala b/stamina-core/src/main/scala/stamina/codec/PersistedCodec.scala
similarity index 69%
rename from stamina-core/src/main/scala/stamina/PersistedCodec.scala
rename to stamina-core/src/main/scala/stamina/codec/PersistedCodec.scala
index 7ec3ffe..a7ef46e 100644
--- a/stamina-core/src/main/scala/stamina/PersistedCodec.scala
+++ b/stamina-core/src/main/scala/stamina/codec/PersistedCodec.scala
@@ -1,13 +1,14 @@
package stamina
+package codec
/**
* The encoding used to translate an instance of Persisted
* to a byte array and back.
*/
-trait PersistedCodec {
+trait PersistedCodec[P <: AnyRef] {
def identifier: Int
- def writePersisted(persisted: Persisted): Array[Byte]
- def readPersisted(bytes: Array[Byte]): Persisted
+ def writePersisted(persisted: Persisted[P]): Array[Byte]
+ def readPersisted(bytes: Array[Byte]): Persisted[P]
}
/**
@@ -19,13 +20,13 @@ trait PersistedCodec {
* - persisted data (n bytes)
*
*/
-object DefaultPersistedCodec extends PersistedCodec {
+object DefaultPersistedCodec extends PersistedCodec[Array[Byte]] {
implicit val byteOrder = java.nio.ByteOrder.LITTLE_ENDIAN
import java.nio.charset.StandardCharsets._
def identifier = 490303
- def writePersisted(persisted: Persisted): Array[Byte] = {
+ def writePersisted(persisted: Persisted[Array[Byte]]): Array[Byte] = {
val keyBytes = persisted.key.getBytes(UTF_8)
ByteString.
@@ -33,12 +34,12 @@ object DefaultPersistedCodec extends PersistedCodec {
putInt(keyBytes.length).
putBytes(keyBytes).
putInt(persisted.version).
- append(persisted.bytes).
+ append(ByteString(persisted.persisted)).
result.
toArray
}
- def readPersisted(byteArray: Array[Byte]): Persisted = {
+ def readPersisted(byteArray: Array[Byte]): Persisted[Array[Byte]] = {
val bytes = ByteString(byteArray)
val keyLength = bytes.take(4).iterator.getInt
val (keyBytes, rest) = bytes.drop(4).splitAt(keyLength)
diff --git a/stamina-core/src/main/scala/stamina/eventadapters/StaminaEventAdapter.scala b/stamina-core/src/main/scala/stamina/eventadapters/StaminaEventAdapter.scala
new file mode 100644
index 0000000..45ad154
--- /dev/null
+++ b/stamina-core/src/main/scala/stamina/eventadapters/StaminaEventAdapter.scala
@@ -0,0 +1,25 @@
+package stamina
+package eventadapters
+
+import akka.persistence.journal._
+
+/**
+ * EventAdapter that uses Stamina to convert events and perform schema
+ * evolution when reading from the journal..
+ *
+ * Remember to configure akka-serialization to correctly serialize objects
+ * of type P (unless it's treated specially by your journal plugin).
+ */
+class StaminaEventAdapter[P <: AnyRef](persisters: Persisters[P]) extends EventAdapter {
+ def this(persisters: List[Persister[_, P, _]]) = this(Persisters(persisters))
+ def this(persister: Persister[_, P, _], persisters: Persister[_, P, _]*) = this(Persisters(persister :: persisters.toList))
+
+ def manifest(event: Any) =
+ persisters.manifest(event.asInstanceOf[AnyRef]).manifest
+
+ def fromJournal(event: Any, manifest: String) =
+ EventSeq(persisters.unpersist(event.asInstanceOf[AnyRef], Manifest(manifest)))
+
+ def toJournal(event: Any) =
+ persisters.persist(event.asInstanceOf[AnyRef]).persisted
+}
diff --git a/stamina-core/src/main/scala/stamina/stamina.scala b/stamina-core/src/main/scala/stamina/stamina.scala
index bfd0d2d..ba02abc 100644
--- a/stamina-core/src/main/scala/stamina/stamina.scala
+++ b/stamina-core/src/main/scala/stamina/stamina.scala
@@ -30,11 +30,19 @@ package stamina {
extends RuntimeException(s"No persister registered for class: ${obj.getClass}")
with NoStackTrace
- case class UnsupportedDataException(persisted: Persisted)
- extends RuntimeException(s"No unpersister registered for key: '${persisted.key}' and version: ${persisted.version}")
+ case class UnsupportedDataException(key: String, version: Int)
+ extends RuntimeException(s"No unpersister registered for key: '$key' and version: $version")
with NoStackTrace
- case class UnrecoverableDataException(persisted: Persisted, error: Throwable)
- extends RuntimeException(s"Error while trying to unpersist data with key '${persisted.key}' and version ${persisted.version}. Cause: ${error}")
+ case class UnrecoverableDataException(manifest: Manifest, error: Throwable)
+ extends RuntimeException(s"Error while trying to unpersist data with key '${manifest.key}' and version ${manifest.version}. Cause: ${error}")
with NoStackTrace
+
+ case class Manifest(manifest: String) {
+ lazy val key: String = manifest.substring(manifest.indexOf('-') + 1)
+ lazy val version: Int = Integer.valueOf(manifest.substring(0, manifest.indexOf('-')))
+ }
+ object Manifest {
+ def apply(key: String, version: Int): Manifest = Manifest(version + "-" + key)
+ }
}
diff --git a/stamina-core/src/test/scala/stamina/PersisterSpec.scala b/stamina-core/src/test/scala/stamina/PersisterSpec.scala
new file mode 100644
index 0000000..4205ef0
--- /dev/null
+++ b/stamina-core/src/test/scala/stamina/PersisterSpec.scala
@@ -0,0 +1,18 @@
+package stamina
+
+class PersisterSpec extends StaminaSpec {
+ import TestOnlyPersister._
+ import TestDomain._
+
+ "A Persister" should {
+ "correctly report whether it can persist a given class, even after translation" in {
+ val cartPersister = persister[CartCreated]("cartCreated")
+ cartPersister.canPersist(cartCreated) should be(true)
+ cartPersister.canPersist(item1) should be(false)
+
+ val translatedCartPersister = cartPersister.translate(_.toString, (s: String) ⇒ s.getBytes)
+ translatedCartPersister.canPersist(cartCreated) should be(true)
+ translatedCartPersister.canPersist(item1) should be(false)
+ }
+ }
+}
diff --git a/stamina-core/src/test/scala/stamina/PersistersSpec.scala b/stamina-core/src/test/scala/stamina/PersistersSpec.scala
index 97f6935..a24bcb6 100644
--- a/stamina-core/src/test/scala/stamina/PersistersSpec.scala
+++ b/stamina-core/src/test/scala/stamina/PersistersSpec.scala
@@ -22,15 +22,12 @@ class PersistersSpec extends StaminaSpec {
}
"correctly implement canUnpersist()" in {
- canUnpersist(itemPersister.persist(item1)) should be(true)
- canUnpersist(cartPersister.persist(cart)) should be(true)
+ canUnpersist(itemPersister.currentManifest) should be(true)
+ canUnpersist(cartPersister.currentManifest) should be(true)
- canUnpersist(cartCreatedPersister.persist(cartCreated)) should be(false)
- canUnpersist(Persisted("unknown", 1, ByteString("..."))) should be(false)
- canUnpersist(Persisted("item", 2, ByteString("..."))) should be(false)
-
- // works because canUnpersist only looks at the key and the version, not at the raw data
- canUnpersist(Persisted("item", 1, ByteString("Not an item at all!"))) should be(true)
+ canUnpersist(cartCreatedPersister.currentManifest) should be(false)
+ canUnpersist(Manifest("unknown", 1)) should be(false)
+ canUnpersist(Manifest("item", 2)) should be(false)
}
"correctly implement persist() and unpersist()" in {
@@ -44,17 +41,17 @@ class PersistersSpec extends StaminaSpec {
"throw an UnsupportedDataException when unpersisting data with an unknown key" in {
an[UnsupportedDataException] should
- be thrownBy unpersist(Persisted("unknown", 1, ByteString("...")))
+ be thrownBy unpersist(Array[Byte](), Manifest("unknown", 1))
}
"throw an UnsupportedDataException when deserializing data with an unsupported version" in {
an[UnsupportedDataException] should
- be thrownBy unpersist(Persisted("item", 2, ByteString("...")))
+ be thrownBy unpersist(Array[Byte](), Manifest("item", 2))
}
"throw an UnrecoverableDataException when an exception occurs while deserializing" in {
an[UnrecoverableDataException] should
- be thrownBy unpersist(Persisted("item", 1, ByteString("not an item")))
+ be thrownBy unpersist(ByteString("not an item").toArray, itemPersister.currentManifest)
}
}
}
diff --git a/stamina-core/src/test/scala/stamina/StaminaAkkaSerializerSpec.scala b/stamina-core/src/test/scala/stamina/StaminaAkkaSerializerSpec.scala
index cffa695..5c1ea66 100644
--- a/stamina-core/src/test/scala/stamina/StaminaAkkaSerializerSpec.scala
+++ b/stamina-core/src/test/scala/stamina/StaminaAkkaSerializerSpec.scala
@@ -3,14 +3,13 @@ package stamina
class StaminaAkkaSerializerSpec extends StaminaSpec {
import TestDomain._
import TestOnlyPersister._
- import DefaultPersistedCodec._
val itemPersister = persister[Item]("item")
val cartPersister = persister[Cart]("cart")
val cartCreatedPersister = persister[CartCreated]("cart-created")
class MyAkkaSerializer1a extends StaminaAkkaSerializer(List(itemPersister, cartPersister, cartCreatedPersister))
- class MyAkkaSerializer1b extends StaminaAkkaSerializer(List(itemPersister, cartPersister, cartCreatedPersister), DefaultPersistedCodec)
+ class MyAkkaSerializer1b extends StaminaAkkaSerializer(List(itemPersister, cartPersister, cartCreatedPersister))
class MyAkkaSerializer2 extends StaminaAkkaSerializer(itemPersister, cartPersister, cartCreatedPersister)
val serializer = new MyAkkaSerializer1a
@@ -19,29 +18,29 @@ class StaminaAkkaSerializerSpec extends StaminaSpec {
"The StaminaAkkaSerializer" should {
"correctly serialize and deserialize the current version of the domain" in {
- fromBinary(toBinary(item1)) should equal(item1)
- fromBinary(toBinary(item2)) should equal(item2)
- fromBinary(toBinary(cart)) should equal(cart)
- fromBinary(toBinary(cartCreated)) should equal(cartCreated)
+ fromBinary(toBinary(item1), manifest(item1)) should equal(item1)
+ fromBinary(toBinary(item2), manifest(item2)) should equal(item2)
+ fromBinary(toBinary(cart), manifest(cart)) should equal(cart)
+ fromBinary(toBinary(cartCreated), manifest(cartCreated)) should equal(cartCreated)
}
"throw an UnregisteredTypeException when serializing an unregistered type" in {
- a[UnregisteredTypeException] should be thrownBy toBinary("a raw String is not supported")
+ a[UnregisteredTypeException] should be thrownBy toBinary(ByteString("a raw String is not supported").toArray)
}
"throw an UnsupportedDataException when deserializing data with an unknown key" in {
an[UnsupportedDataException] should
- be thrownBy fromBinary(writePersisted(Persisted("unknown", 1, ByteString("..."))))
+ be thrownBy fromBinary(Array[Byte](), Manifest("unknown", 1).manifest)
}
"throw an UnsupportedDataException when deserializing data with an unsupported version" in {
an[UnsupportedDataException] should
- be thrownBy fromBinary(writePersisted(Persisted("item", 2, ByteString("..."))))
+ be thrownBy fromBinary(Array[Byte](), Manifest("item", 2).manifest)
}
"throw an UnrecoverableDataException when an exception occurs while deserializing" in {
an[UnrecoverableDataException] should
- be thrownBy fromBinary(writePersisted(Persisted("item", 1, ByteString("not an item"))))
+ be thrownBy fromBinary(ByteString("not an item").toArray, itemPersister.currentManifest.manifest)
}
}
}
diff --git a/stamina-core/src/test/scala/stamina/TestOnlyPersister.scala b/stamina-core/src/test/scala/stamina/TestOnlyPersister.scala
index bc6c737..ff6d204 100644
--- a/stamina-core/src/test/scala/stamina/TestOnlyPersister.scala
+++ b/stamina-core/src/test/scala/stamina/TestOnlyPersister.scala
@@ -9,12 +9,12 @@ object TestOnlyPersister {
private val javaSerializer = new JavaSerializer(system.asInstanceOf[ExtendedActorSystem])
import javaSerializer._
- def persister[T <: AnyRef: ClassTag](key: String): Persister[T, V1] = new JavaPersister[T](key)
+ def persister[T <: AnyRef: ClassTag](key: String): Persister[T, Array[Byte], V1] = new JavaPersister[T](key)
- private class JavaPersister[T <: AnyRef: ClassTag](key: String) extends Persister[T, V1](key) {
- def persist(t: T): Persisted = Persisted(key, currentVersion, toBinary(t))
- def unpersist(p: Persisted): T = {
- if (canUnpersist(p)) fromBinary(p.bytes.toArray).asInstanceOf[T]
+ private class JavaPersister[T <: AnyRef: ClassTag](key: String) extends Persister[T, Array[Byte], V1](key) {
+ def persist(t: T): Array[Byte] = toBinary(t)
+ def unpersist(manifest: Manifest, p: Array[Byte]): T = {
+ if (canUnpersist(manifest)) fromBinary(p).asInstanceOf[T]
else throw new IllegalArgumentException("")
}
}
diff --git a/stamina-json/src/main/scala/stamina/json/json.scala b/stamina-json/src/main/scala/stamina/json/json.scala
index 6a72d44..c1bb171 100644
--- a/stamina-json/src/main/scala/stamina/json/json.scala
+++ b/stamina-json/src/main/scala/stamina/json/json.scala
@@ -55,35 +55,43 @@ package object json {
*/
def persister[T: RootJsonFormat: ClassTag, V <: Version: VersionInfo: MigratableVersion](key: String, migrator: JsonMigrator[V]): JsonPersister[T, V] = new VnJsonPersister[T, V](key, migrator)
- private[json] def toJsonBytes[T](t: T)(implicit writer: RootJsonWriter[T]): ByteString = ByteString(writer.write(t).compactPrint)
- private[json] def fromJsonBytes[T](bytes: ByteString)(implicit reader: RootJsonReader[T]): T = reader.read(parseJson(bytes))
- private[json] def parseJson(bytes: ByteString): JsValue = JsonParser(ParserInput(bytes.toArray))
+ def toByteArrayPersister[T: ClassTag, V <: Version: VersionInfo](persister: Persister[T, JsValue, V]): Persister[T, Array[Byte], V] =
+ persister.translate(_.compactPrint.getBytes(UTF_8), b ⇒ JsonParser(ParserInput(b)))
+
+ def toByteArrayPersisters[T: ClassTag, V <: Version: VersionInfo](persisters: List[Persister[T, JsValue, V]]): Persisters[Array[Byte]] =
+ new Persisters[Array[Byte]](persisters.map(toByteArrayPersister(_)))
+
+ import java.nio.charset.StandardCharsets
+ val UTF_8: String = StandardCharsets.UTF_8.name()
+ private[json] def toJson[T](t: T)(implicit writer: RootJsonWriter[T]): JsValue = writer.write(t)
+ private[json] def fromJson[T](persisted: JsValue)(implicit reader: RootJsonReader[T]): T = reader.read(persisted)
+ private[json] def parseJson(bytes: Array[Byte]): JsValue = JsonParser(ParserInput(bytes.toArray))
}
package json {
/**
* Simple abstract marker superclass to unify (and hide) the two internal Persister implementations.
*/
- sealed abstract class JsonPersister[T: RootJsonFormat: ClassTag, V <: Version: VersionInfo](key: String) extends Persister[T, V](key) {
- private[json] def cannotUnpersist(p: Persisted) =
- s"""JsonPersister[${implicitly[ClassTag[T]].runtimeClass.getSimpleName}, V${currentVersion}](key = "${key}") cannot unpersist data with key "${p.key}" and version ${p.version}."""
+ sealed abstract class JsonPersister[T: RootJsonFormat: ClassTag, V <: Version: VersionInfo](key: String) extends Persister[T, JsValue, V](key) {
+ private[json] def cannotUnpersist(manifest: Manifest) =
+ s"""JsonPersister[${implicitly[ClassTag[T]].runtimeClass.getSimpleName}, V${currentVersion}](key = "${key}") cannot unpersist data with manifest "$manifest"."""
}
private[json] class V1JsonPersister[T: RootJsonFormat: ClassTag](key: String) extends JsonPersister[T, V1](key) {
- def persist(t: T): Persisted = Persisted(key, currentVersion, toJsonBytes(t))
- def unpersist(p: Persisted): T = {
- if (canUnpersist(p)) fromJsonBytes[T](p.bytes)
- else throw new IllegalArgumentException(cannotUnpersist(p))
+ def persist(t: T): JsValue = toJson(t)
+ def unpersist(manifest: Manifest, p: JsValue): T = {
+ if (canUnpersist(manifest)) fromJson[T](p)
+ else throw new IllegalArgumentException(cannotUnpersist(manifest))
}
}
private[json] class VnJsonPersister[T: RootJsonFormat: ClassTag, V <: Version: VersionInfo: MigratableVersion](key: String, migrator: JsonMigrator[V]) extends JsonPersister[T, V](key) {
- override def canUnpersist(p: Persisted): Boolean = p.key == key && migrator.canMigrate(p.version)
+ override def canUnpersist(m: Manifest): Boolean = m.key == key && migrator.canMigrate(m.version)
- def persist(t: T): Persisted = Persisted(key, currentVersion, toJsonBytes(t))
- def unpersist(p: Persisted): T = {
- if (canUnpersist(p)) migrator.migrate(parseJson(p.bytes), p.version).convertTo[T]
- else throw new IllegalArgumentException(cannotUnpersist(p))
+ def persist(t: T): JsValue = toJson(t)
+ def unpersist(manifest: Manifest, p: JsValue): T = {
+ if (canUnpersist(manifest)) migrator.migrate(p, manifest.version).convertTo[T]
+ else throw new IllegalArgumentException(cannotUnpersist(manifest))
}
}
}
diff --git a/stamina-json/src/test/resources/serialization/cart-created-v1-default b/stamina-json/src/test/resources/serialization/cart-created-v1-default
new file mode 100644
index 0000000..19ad6be
--- /dev/null
+++ b/stamina-json/src/test/resources/serialization/cart-created-v1-default
@@ -0,0 +1 @@
+eyJjYXJ0Ijp7ImlkIjoxLCJpdGVtcyI6W3siaWQiOjEsIm5hbWUiOiJXb25rYSBCYXIiLCJwcmljZSI6NTAwfSx7ImlkIjoyLCJuYW1lIjoiRXZlcmxhc3RpbmcgR29ic3RvcHBlciIsInByaWNlIjo0ODl9XX0sInRpbWVzdGFtcCI6MTQ0NjM3NzE5OTk0M30=
\ No newline at end of file
diff --git a/stamina-json/src/test/resources/serialization/checkout-started-v1-default b/stamina-json/src/test/resources/serialization/checkout-started-v1-default
new file mode 100644
index 0000000..4b8db02
--- /dev/null
+++ b/stamina-json/src/test/resources/serialization/checkout-started-v1-default
@@ -0,0 +1 @@
+eyJwYXltZW50TWV0aG9kIjoibWFlc3RybyJ9
\ No newline at end of file
diff --git a/stamina-json/src/test/scala/stamina/json/ByteArrayPersistersSpec.scala b/stamina-json/src/test/scala/stamina/json/ByteArrayPersistersSpec.scala
new file mode 100644
index 0000000..edd62a9
--- /dev/null
+++ b/stamina-json/src/test/scala/stamina/json/ByteArrayPersistersSpec.scala
@@ -0,0 +1,23 @@
+package stamina
+package json
+
+class ByteArrayPersisterSpec extends StaminaJsonSpec {
+ import JsonTestDomain._
+ import spray.json.lenses.JsonLenses._
+ import fommil.sjs.FamilyFormats._
+
+ val jsonPersister = persister[CartCreatedV2, V2]("cart-created",
+ from[V1].to[V2](_.update('cart / 'items / * / 'price ! set[Int](1000)))
+ )
+
+ val persisters = toByteArrayPersisters(List(jsonPersister))
+
+ "The conversion to Persisters[Array[Byte]]" should {
+ "produce a persister that implements a roundtrip" in {
+ import persisters._
+ val serialized: Persisted[Array[Byte]] = persist(v2CartCreated)
+ serialized.manifest should equal(jsonPersister.currentManifest)
+ unpersist(serialized) should equal(v2CartCreated)
+ }
+ }
+}
diff --git a/stamina-json/src/test/scala/stamina/json/JsonPersisterSpec.scala b/stamina-json/src/test/scala/stamina/json/JsonPersisterSpec.scala
index a643018..aa6f57b 100644
--- a/stamina-json/src/test/scala/stamina/json/JsonPersisterSpec.scala
+++ b/stamina-json/src/test/scala/stamina/json/JsonPersisterSpec.scala
@@ -1,7 +1,10 @@
package stamina
package json
-class JsonPersisterSpec extends StaminaJsonSpec {
+import stamina.testkit._
+
+class JsonPersisterSpec extends StaminaJsonSpec
+ with StaminaTestKit {
import JsonTestDomain._
import spray.json.lenses.JsonLenses._
import fommil.sjs.FamilyFormats._
@@ -23,19 +26,19 @@ class JsonPersisterSpec extends StaminaJsonSpec {
"V1 persisters produced by SprayJsonPersister" should {
"correctly persist and unpersist domain events " in {
import v1CartCreatedPersister._
- unpersist(persist(v1CartCreated)) should equal(v1CartCreated)
+ unpersist(currentManifest, persist(v1CartCreated)) should equal(v1CartCreated)
}
}
"V2 persisters with migrators produced by SprayJsonPersister" should {
"correctly persist and unpersist domain events " in {
import v2CartCreatedPersister._
- unpersist(persist(v2CartCreated)) should equal(v2CartCreated)
+ unpersist(currentManifest, persist(v2CartCreated)) should equal(v2CartCreated)
}
"correctly migrate and unpersist V1 domain events" in {
val v1Persisted = v1CartCreatedPersister.persist(v1CartCreated)
- val v2Unpersisted = v2CartCreatedPersister.unpersist(v1Persisted)
+ val v2Unpersisted = v2CartCreatedPersister.unpersist(v1CartCreatedPersister.currentManifest, v1Persisted)
v2Unpersisted.cart.items.map(_.price).toSet should equal(Set(1000))
}
@@ -44,18 +47,31 @@ class JsonPersisterSpec extends StaminaJsonSpec {
"V3 persisters with migrators produced by SprayJsonPersister" should {
"correctly persist and unpersist domain events " in {
import v3CartCreatedPersister._
- unpersist(persist(v3CartCreated)) should equal(v3CartCreated)
+ unpersist(currentManifest, persist(v3CartCreated)) should equal(v3CartCreated)
}
"correctly migrate and unpersist V1 domain events" in {
val v1Persisted = v1CartCreatedPersister.persist(v1CartCreated)
val v2Persisted = v2CartCreatedPersister.persist(v2CartCreated)
- val v1Unpersisted = v3CartCreatedPersister.unpersist(v1Persisted)
- val v2Unpersisted = v3CartCreatedPersister.unpersist(v2Persisted)
+ val v1Unpersisted = v3CartCreatedPersister.unpersist(v1CartCreatedPersister.currentManifest, v1Persisted)
+ val v2Unpersisted = v3CartCreatedPersister.unpersist(v2CartCreatedPersister.currentManifest, v2Persisted)
v1Unpersisted.cart.items.map(_.price).toSet should equal(Set(1000))
v2Unpersisted.timestamp should (be > 0L and be < System.currentTimeMillis)
}
}
+
+ "a persister based on stamina-json and spray-json-shapeless" should {
+ import fommil.sjs.FamilyFormats._
+
+ val persisters = Persisters(List(
+ persister[CartCreatedV3]("cart-created"),
+ persister[CheckoutStarted]("checkout-started")
+ ).map(toByteArrayPersister(_)))
+
+ persisters.generateTestsFor(
+ sample(v3CartCreated),
+ sample(checkoutStarted))
+ }
}
diff --git a/stamina-json/src/test/scala/stamina/json/JsonTestDomain.scala b/stamina-json/src/test/scala/stamina/json/JsonTestDomain.scala
index a38c0ed..3dde6cd 100644
--- a/stamina-json/src/test/scala/stamina/json/JsonTestDomain.scala
+++ b/stamina-json/src/test/scala/stamina/json/JsonTestDomain.scala
@@ -41,5 +41,10 @@ object JsonTestDomain {
val v3Item1 = ItemV3(1, "Wonka Bar", 500)
val v3Item2 = ItemV3(2, "Everlasting Gobstopper", 489)
val v3Cart = CartV3(1, List(v3Item1, v3Item2))
- val v3CartCreated = CartCreatedV3(v3Cart, System.currentTimeMillis)
+ val v3CartCreated = CartCreatedV3(v3Cart, 1446377199943l)
+
+ // Others
+
+ case class CheckoutStarted(paymentMethod: String)
+ val checkoutStarted = CheckoutStarted("maestro")
}
diff --git a/stamina-testkit/src/main/scala/stamina/testkit/StaminaTestKit.scala b/stamina-testkit/src/main/scala/stamina/testkit/StaminaTestKit.scala
index 7036aec..5e48e57 100644
--- a/stamina-testkit/src/main/scala/stamina/testkit/StaminaTestKit.scala
+++ b/stamina-testkit/src/main/scala/stamina/testkit/StaminaTestKit.scala
@@ -15,7 +15,7 @@ trait StaminaTestKit { self: org.scalatest.WordSpecLike ⇒
def sample(sampleId: String, persistable: AnyRef) = new PersistableSample(sampleId, persistable, Some(sampleId))
def sample(sampleId: String, persistable: AnyRef, description: String) = new PersistableSample(sampleId, persistable, Some(description))
- implicit class TestablePersisters(persisters: Persisters) extends org.scalatest.Matchers {
+ implicit class TestablePersisters(persisters: Persisters[Array[Byte]]) extends org.scalatest.Matchers {
def generateTestsFor(samples: PersistableSample*): Unit = {
samples.foreach { sample ⇒
generateRoundtripTestFor(sample)
@@ -45,9 +45,9 @@ trait StaminaTestKit { self: org.scalatest.WordSpecLike ⇒
val serialized = persisters.persist(sample.persistable)
byteStringFromResource(serialized.key, version, sample.sampleId) match {
case Success(binary) ⇒
- persisters.unpersist(binary) should equal(sample.persistable)
+ persisters.unpersist(Persisted(Manifest(serialized.key, version), binary)) should equal(sample.persistable)
case Failure(_: java.io.FileNotFoundException) if version == latestVersion ⇒
- val writtenToPath = saveByteArrayToTargetSerializationDirectory(serialized.bytes.toArray, serialized.key, version, sample.sampleId)
+ val writtenToPath = saveByteArrayToTargetSerializationDirectory(serialized.persisted.toArray, serialized.key, version, sample.sampleId)
fail(s"You appear to have added a new serialization sample to the stamina persisters' test.\n" +
"A serialized version of this sample must be stored as a project resource for future reference, to ensure future versions of the software can still correctly deserialize serialized objects in this format.\n" +
"Please copy the generated serialized data into the project test resources:\n" +
@@ -67,7 +67,7 @@ trait StaminaTestKit { self: org.scalatest.WordSpecLike ⇒
}
}
- private def byteStringFromResource(key: String, version: Int, sampleId: String): Try[Persisted] = {
+ private def byteStringFromResource(key: String, version: Int, sampleId: String): Try[Array[Byte]] = {
import scala.io.Source
val resourceName = s"/$serializedObjectsPackage/${filename(key, version, sampleId)}"
@@ -75,8 +75,6 @@ trait StaminaTestKit { self: org.scalatest.WordSpecLike ⇒
.map(Success(_)).getOrElse(Failure(new java.io.FileNotFoundException(resourceName)))
.map(Source.fromInputStream(_).mkString)
.flatMap(base64.Decode(_))
- .map(akka.util.ByteString(_))
- .map(Persisted(key, version, _))
}
private def saveByteArrayToTargetSerializationDirectory(bytes: Array[Byte], key: String, version: Int, sampleId: String) = {
diff --git a/stamina-testkit/src/test/scala/stamina/testkit/ScalatestTestGenerationSpec.scala b/stamina-testkit/src/test/scala/stamina/testkit/ScalatestTestGenerationSpec.scala
index 7996c07..8740392 100644
--- a/stamina-testkit/src/test/scala/stamina/testkit/ScalatestTestGenerationSpec.scala
+++ b/stamina-testkit/src/test/scala/stamina/testkit/ScalatestTestGenerationSpec.scala
@@ -11,14 +11,15 @@ class ScalatestTestGenerationSpec extends StaminaTestKitSpec {
import TestDomain._
- case class ItemPersister(override val key: String) extends Persister[Item, V1](key) {
- def persist(t: Item): Persisted = Persisted(key, currentVersion, ByteString())
- def unpersist(p: Persisted): Item = item1
- }
-
"A spec generated by StaminaTestKit" should {
+ case class ItemPersister(override val key: String) extends Persister[Item, Array[Byte], V1](key) {
+ def persist(t: Item): Array[Byte] = Array[Byte]()
+ def unpersist(manifest: Manifest, p: Array[Byte]): Item = item1
+ }
+
+ val persisters = Persisters(ItemPersister("item1"))
+
val spec = new StaminaTestKit with WordSpecLike {
- val persisters = Persisters(ItemPersister("item1"))
"TestDomainSerialization" should {
persisters.generateTestsFor(
sample(item1),