Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
dd2ee86
Experiment with #26
raboof Oct 14, 2015
a8925d1
Add Codec-based Serializer (mostly for legacy users)
raboof Oct 15, 2015
362b352
Split Persisted into manifest and bytes in more places
raboof Oct 15, 2015
2ca2cca
Manifest as a case class
raboof Oct 15, 2015
c340b1a
Made method naming a bit more consistent (hopefully)
raboof Oct 15, 2015
111ba6c
Clarify scaladoc
raboof Oct 19, 2015
1855834
Remove intermediate Persisted when throwing UnrecoverableDataException
raboof Oct 19, 2015
de54144
Make Persisters API a bit more balanced
raboof Oct 24, 2015
6bf7638
Fix compilation error
raboof Oct 25, 2015
914992f
Move codec-related code to its own package
raboof Oct 25, 2015
f09579f
Merge remote-tracking branch 'origin/master' into stringManifest
raboof Oct 30, 2015
0290363
A naive eventadapter to use stamina through EA instead of Serialization
raboof Oct 30, 2015
508481a
Make Persister and Persisters generic in the 'target' type
raboof Oct 30, 2015
2f76064
Make JsonPersister produce JsValue instead of Array[Byte]
raboof Oct 30, 2015
e4a61ef
Add test for a spray-json-based Persisters[Array[Byte]]
raboof Oct 31, 2015
fc4aa58
EventAdapter.toJournal should not include manifest
raboof Oct 31, 2015
ed469aa
Don't require a JsonFormat when we already have the Persister
raboof Oct 31, 2015
b913720
Add a stamina-testkit-based test for spray-json
raboof Nov 1, 2015
ad57d7b
Merge branch 'testkitBasedStaminaJsonTest' into eventAdapters
raboof Nov 1, 2015
22569cc
Correctly convert Persister[_, JsValue, _] to Persister[_, Array[Byte…
raboof Nov 1, 2015
1dd0265
Add test for Persister.translate
raboof Nov 2, 2015
8e32e62
Merge remote-tracking branch 'origin/master' into eventAdapters
raboof May 16, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ object Build extends Build {
.settings(libSettings: _*)
.settings(libraryDependencies ++=
compile(
akkaActor
akkaActor,
akkaPersistence
) ++
test(
scalatest
Expand All @@ -50,6 +51,7 @@ object Build extends Build {

lazy val json = Project("stamina-json", file("stamina-json"))
.dependsOn(core)
.dependsOn(testkit % "test")
.settings(libSettings: _*)
.settings(libraryDependencies ++=
compile(
Expand Down
11 changes: 6 additions & 5 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ import sbt._
import sbt.Keys._

object Dependencies {
val akkaActor = "com.typesafe.akka" %% "akka-actor" % "2.4.2"
val sprayJson = "io.spray" %% "spray-json" % "1.3.2"
val jsonLenses = "net.virtual-void" %% "json-lenses" % "0.6.1"
val scalatest = "org.scalatest" %% "scalatest" % "2.2.6"
val base64 = "me.lessis" %% "base64" % "0.2.0"
val akkaActor = "com.typesafe.akka" %% "akka-actor" % "2.4.4"
val akkaPersistence = "com.typesafe.akka" %% "akka-persistence" % "2.4.4"
val sprayJson = "io.spray" %% "spray-json" % "1.3.2"
val jsonLenses = "net.virtual-void" %% "json-lenses" % "0.6.1"
val scalatest = "org.scalatest" %% "scalatest" % "2.2.5"
val base64 = "me.lessis" %% "base64" % "0.2.0"

// Only used by the tests
val sprayJsonShapeless = "com.github.fommil" %% "spray-json-shapeless" % "1.1.0"
Expand Down
7 changes: 5 additions & 2 deletions stamina-core/src/main/scala/stamina/Persisted.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ package stamina
* A simple container holding a persistence key, a version number,
* and the associated serialized bytes.
*/
case class Persisted(key: String, version: Int, bytes: ByteString)
case class Persisted[P <: AnyRef](key: String, version: Int, persisted: P) {
lazy val manifest = Manifest(key, version)
}

object Persisted {
def apply(key: String, version: Int, bytes: Array[Byte]): Persisted = apply(key, version, ByteString(bytes))
def apply[P <: AnyRef](manifest: Manifest, persisted: P): Persisted[P] = apply(manifest.key, manifest.version, persisted)
def apply(key: String, version: Int, bytes: ByteString): Persisted[Array[Byte]] = apply[Array[Byte]](key, version, bytes.toArray)
}
25 changes: 17 additions & 8 deletions stamina-core/src/main/scala/stamina/Persister.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,41 @@ import scala.util._
* at version V and unpersisting persisted instances of T for all versions up
* to and including version V.
*/
abstract class Persister[T: ClassTag, V <: Version: VersionInfo](val key: String) {
abstract class Persister[T: ClassTag, P <: AnyRef, V <: Version: VersionInfo](val key: String) {
lazy val currentVersion = Version.numberFor[V]

def persist(t: T): Persisted
def unpersist(persisted: Persisted): T
lazy val currentManifest = Manifest(key, currentVersion)
def persist(t: T): P
def unpersist(manifest: Manifest, persisted: P): T

def canPersist(a: AnyRef): Boolean = convertToT(a).isDefined
def canUnpersist(p: Persisted): Boolean = p.key == key && p.version <= currentVersion
def canUnpersist(m: Manifest): Boolean = m.key == key && m.version <= currentVersion

def translate[U <: AnyRef](p: P ⇒ U, u: U ⇒ P): Persister[T, U, V] = {
val original = this
new Persister[T, U, V](key) {
override def persist(t: T): U = p(original.persist(t))
override def unpersist(manifest: Manifest, persisted: U): T = original.unpersist(manifest, u(persisted))
}
}

private[stamina] def convertToT(any: AnyRef): Option[T] = any match {
case t: T ⇒ Some(t)
case _ ⇒ None
}

private[stamina] def persistAny(any: AnyRef): Persisted = {
private[stamina] def persistAny(any: AnyRef): P = {
convertToT(any).map(persist(_)).getOrElse(
throw new IllegalArgumentException(
s"persistAny() was called on Persister[${implicitly[ClassTag[T]].runtimeClass}] with an instance of ${any.getClass}."
)
)
}

private[stamina] def unpersistAny(persisted: Persisted): AnyRef = {
Try(unpersist(persisted).asInstanceOf[AnyRef]) match {
private[stamina] def unpersistAny(manifest: Manifest, persisted: AnyRef): AnyRef = {
Try(unpersist(manifest, persisted.asInstanceOf[P]).asInstanceOf[AnyRef]) match {
case Success(anyref) ⇒ anyref
case Failure(error) ⇒ throw UnrecoverableDataException(persisted, error)
case Failure(error) ⇒ throw UnrecoverableDataException(manifest, error)
}
}

Expand Down
35 changes: 22 additions & 13 deletions stamina-core/src/main/scala/stamina/Persisters.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,36 @@ import scala.reflect.ClassTag
* one single entry-point for subclasses of <code>StaminaAkkaSerializer</code>
*
*/
case class Persisters(persisters: List[Persister[_, _]]) {
case class Persisters[P <: AnyRef](persisters: List[Persister[_, P, _]]) {
requireNoOverlappingTags()

def canPersist(a: AnyRef): Boolean = persisters.exists(_.canPersist(a))
def canUnpersist(p: Persisted): Boolean = persisters.exists(_.canUnpersist(p))
def canUnpersist(manifest: Manifest): Boolean = persisters.exists(_.canUnpersist(manifest))

// format: OFF
def persist(anyref: AnyRef): Persisted = {
persisters.find(_.canPersist(anyref))
.map(_.persistAny(anyref))
.getOrElse(throw UnregisteredTypeException(anyref))
private def persister[T <: AnyRef](anyref: T): Persister[T, P, _] =
persisters
.find(_.canPersist(anyref))
.map(_.asInstanceOf[Persister[T, P, _]])
.getOrElse(throw UnregisteredTypeException(anyref))

def manifest(anyref: AnyRef): Manifest =
persister(anyref).currentManifest

def persist(anyref: AnyRef): Persisted[P] = {
val p = persister(anyref)
Persisted(p.currentManifest, p.persistAny(anyref))
}

def unpersist(persisted: Persisted): AnyRef = {
persisters.find(_.canUnpersist(persisted))
.map(_.unpersistAny(persisted))
.getOrElse(throw UnsupportedDataException(persisted))
def unpersist(persisted: Persisted[P]): AnyRef = unpersist(persisted.persisted, persisted.manifest)
def unpersist(persisted: AnyRef, manifest: Manifest): AnyRef = {
persisters.find(_.canUnpersist(manifest))
.map(_.unpersistAny(manifest, persisted))
.getOrElse(throw UnsupportedDataException(manifest.key, manifest.version))
}
// format: ON

def ++(other: Persisters): Persisters = Persisters(persisters ++ other.persisters)
def ++(other: Persisters[P]): Persisters[P] = Persisters(persisters ++ other.persisters)

private def requireNoOverlappingTags() = {
val overlappingTags = persisters.groupBy(_.tag).filter(_._2.length > 1).mapValues(_.map(_.key))
Expand All @@ -40,6 +49,6 @@ case class Persisters(persisters: List[Persister[_, _]]) {
}

object Persisters {
def apply[T: ClassTag, V <: Version: VersionInfo](persister: Persister[T, V]): Persisters = apply(List(persister))
def apply(first: Persister[_, _], rest: Persister[_, _]*): Persisters = apply(first :: rest.toList)
def apply[T: ClassTag, P <: AnyRef, V <: Version: VersionInfo](persister: Persister[T, P, V]): Persisters[P] = apply(List(persister))
def apply[P <: AnyRef](first: Persister[_, P, _], rest: Persister[_, P, _]*): Persisters[P] = apply(first :: rest.toList)
}
28 changes: 15 additions & 13 deletions stamina-core/src/main/scala/stamina/StaminaAkkaSerializer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,37 @@ import akka.serialization._

/**
* A custom Akka Serializer specifically designed for use with Akka Persistence.
*
* Key and version information is encoded in the manifest.
*/
abstract class StaminaAkkaSerializer private[stamina] (persisters: Persisters, codec: PersistedCodec) extends Serializer {
def this(persisters: List[Persister[_, _]], codec: PersistedCodec = DefaultPersistedCodec) = this(Persisters(persisters), codec)
def this(persister: Persister[_, _], persisters: Persister[_, _]*) = this(Persisters(persister :: persisters.toList), DefaultPersistedCodec)
abstract class StaminaAkkaSerializer private[stamina] (persisters: Persisters[Array[Byte]]) extends SerializerWithStringManifest {
def this(persisters: List[Persister[_, Array[Byte], _]]) = this(Persisters(persisters))
def this(persister: Persister[_, Array[Byte], _], persisters: Persister[_, Array[Byte], _]*) = this(Persisters(persister :: persisters.toList))

/** We don't need class manifests since we're using keys to identify types. */
val includeManifest: Boolean = false
/** Uniquely identifies this Serializer. */
val identifier = 490304

/** Uniquely identifies this Serializer by combining the codec with a unique number. */
val identifier = 42 * codec.identifier
def manifest(obj: AnyRef): String =
persisters.manifest(obj).manifest

/**
* @throws UnregisteredTypeException when the specified object is not supported by the persisters.
*/
def toBinary(obj: AnyRef): Array[Byte] = {
if (!persisters.canPersist(obj)) throw UnregisteredTypeException(obj)

codec.writePersisted(persisters.persist(obj))
persisters.persist(obj).persisted
}

/**
* @throws UnsupportedDataException when the persisted key and/or version is not supported.
* @throws UnrecoverableDataException when the key and version are supported but recovery throws an exception.
*/
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
val persisted = codec.readPersisted(bytes)
def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
if (manifest.isEmpty) throw new IllegalArgumentException("No manifest found")
val m = Manifest(manifest)
if (!persisters.canUnpersist(m)) throw UnsupportedDataException(m.key, m.version)

if (!persisters.canUnpersist(persisted)) throw UnsupportedDataException(persisted)

persisters.unpersist(persisted)
persisters.unpersist(Persisted(m, bytes))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package stamina
package codec

import akka.serialization._

/**
* A custom Akka Serializer encoding key and version along with the serialized object.
*
* This is particularly useful when there is no separate field for metadata, such as when
* dealing with pre-akka-2.3 persistence.
*
* Wrapping/unwrapping the metadata around the serialized object is done by the Codec.
*/
abstract class CodecBasedStaminaAkkaSerializer private[stamina] (persisters: Persisters[Array[Byte]], codec: PersistedCodec[Array[Byte]]) extends Serializer {
def this(persisters: List[Persister[_, Array[Byte], _]], codec: PersistedCodec[Array[Byte]] = DefaultPersistedCodec) = this(Persisters(persisters), codec)
def this(persister: Persister[_, Array[Byte], _], persisters: Persister[_, Array[Byte], _]*) = this(Persisters(persister :: persisters.toList), DefaultPersistedCodec)

/** We don't need class manifests since we're using keys to identify types. */
val includeManifest: Boolean = false

/** Uniquely identifies this Serializer by combining the codec with a unique number. */
val identifier = 42 * codec.identifier

/**
* @throws UnregisteredTypeException when the specified object is not supported by the persisters.
*/
def toBinary(obj: AnyRef): Array[Byte] = {
if (!persisters.canPersist(obj)) throw UnregisteredTypeException(obj)

codec.writePersisted(persisters.persist(obj))
}

/**
* @throws UnsupportedDataException when the persisted key and/or version is not supported.
* @throws UnrecoverableDataException when the key and version are supported but recovery throws an exception.
*/
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
val persisted = codec.readPersisted(bytes)

if (!persisters.canUnpersist(persisted.manifest)) throw UnsupportedDataException(persisted.key, persisted.version)

persisters.unpersist(persisted)
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package stamina
package codec

/**
* The encoding used to translate an instance of <code>Persisted</code>
* to a byte array and back.
*/
trait PersistedCodec {
trait PersistedCodec[P <: AnyRef] {
def identifier: Int
def writePersisted(persisted: Persisted): Array[Byte]
def readPersisted(bytes: Array[Byte]): Persisted
def writePersisted(persisted: Persisted[P]): Array[Byte]
def readPersisted(bytes: Array[Byte]): Persisted[P]
}

/**
Expand All @@ -19,26 +20,26 @@ trait PersistedCodec {
* - persisted data (n bytes)
*
*/
object DefaultPersistedCodec extends PersistedCodec {
object DefaultPersistedCodec extends PersistedCodec[Array[Byte]] {
implicit val byteOrder = java.nio.ByteOrder.LITTLE_ENDIAN
import java.nio.charset.StandardCharsets._

def identifier = 490303

def writePersisted(persisted: Persisted): Array[Byte] = {
def writePersisted(persisted: Persisted[Array[Byte]]): Array[Byte] = {
val keyBytes = persisted.key.getBytes(UTF_8)

ByteString.
newBuilder.
putInt(keyBytes.length).
putBytes(keyBytes).
putInt(persisted.version).
append(persisted.bytes).
append(ByteString(persisted.persisted)).
result.
toArray
}

def readPersisted(byteArray: Array[Byte]): Persisted = {
def readPersisted(byteArray: Array[Byte]): Persisted[Array[Byte]] = {
val bytes = ByteString(byteArray)
val keyLength = bytes.take(4).iterator.getInt
val (keyBytes, rest) = bytes.drop(4).splitAt(keyLength)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package stamina
package eventadapters

import akka.persistence.journal._

/**
* EventAdapter that uses Stamina to convert events and perform schema
* evolution when reading from the journal..
*
* Remember to configure akka-serialization to correctly serialize objects
* of type P (unless it's treated specially by your journal plugin).
*/
class StaminaEventAdapter[P <: AnyRef](persisters: Persisters[P]) extends EventAdapter {
def this(persisters: List[Persister[_, P, _]]) = this(Persisters(persisters))
def this(persister: Persister[_, P, _], persisters: Persister[_, P, _]*) = this(Persisters(persister :: persisters.toList))

def manifest(event: Any) =
persisters.manifest(event.asInstanceOf[AnyRef]).manifest

def fromJournal(event: Any, manifest: String) =
EventSeq(persisters.unpersist(event.asInstanceOf[AnyRef], Manifest(manifest)))

def toJournal(event: Any) =
persisters.persist(event.asInstanceOf[AnyRef]).persisted
}
16 changes: 12 additions & 4 deletions stamina-core/src/main/scala/stamina/stamina.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,19 @@ package stamina {
extends RuntimeException(s"No persister registered for class: ${obj.getClass}")
with NoStackTrace

case class UnsupportedDataException(persisted: Persisted)
extends RuntimeException(s"No unpersister registered for key: '${persisted.key}' and version: ${persisted.version}")
case class UnsupportedDataException(key: String, version: Int)
extends RuntimeException(s"No unpersister registered for key: '$key' and version: $version")
with NoStackTrace

case class UnrecoverableDataException(persisted: Persisted, error: Throwable)
extends RuntimeException(s"Error while trying to unpersist data with key '${persisted.key}' and version ${persisted.version}. Cause: ${error}")
case class UnrecoverableDataException(manifest: Manifest, error: Throwable)
extends RuntimeException(s"Error while trying to unpersist data with key '${manifest.key}' and version ${manifest.version}. Cause: ${error}")
with NoStackTrace

case class Manifest(manifest: String) {
lazy val key: String = manifest.substring(manifest.indexOf('-') + 1)
lazy val version: Int = Integer.valueOf(manifest.substring(0, manifest.indexOf('-')))
}
object Manifest {
def apply(key: String, version: Int): Manifest = Manifest(version + "-" + key)
}
}
18 changes: 18 additions & 0 deletions stamina-core/src/test/scala/stamina/PersisterSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package stamina

class PersisterSpec extends StaminaSpec {
import TestOnlyPersister._
import TestDomain._

"A Persister" should {
"correctly report whether it can persist a given class, even after translation" in {
val cartPersister = persister[CartCreated]("cartCreated")
cartPersister.canPersist(cartCreated) should be(true)
cartPersister.canPersist(item1) should be(false)

val translatedCartPersister = cartPersister.translate(_.toString, (s: String) ⇒ s.getBytes)
translatedCartPersister.canPersist(cartCreated) should be(true)
translatedCartPersister.canPersist(item1) should be(false)
}
}
}
Loading