diff --git a/pom.xml b/pom.xml index b0a9ce2..3ddfba7 100644 --- a/pom.xml +++ b/pom.xml @@ -59,7 +59,7 @@ - + ${scala.version} -target:jvm-${java.compile.version} @@ -132,6 +132,12 @@ + + maven + Maven Main Repo + http://repo1.maven.org/maven2/ + + scala-tools.org-snapshots Scala-Tools Maven2 Repository @@ -160,9 +166,9 @@ - org.apache.thrift + org.apache.cassandra.deps libthrift - 917130 + 0.5.0 @@ -173,8 +179,8 @@ org.apache.cassandra - cassandra - 0.6.1 + cassandra-all + 0.7.0 diff --git a/src/main/scala/com/shorrockin/cascal/jmx/CascalStatistics.scala b/src/main/scala/com/shorrockin/cascal/jmx/CascalStatistics.scala index 28e0db5..99f404d 100755 --- a/src/main/scala/com/shorrockin/cascal/jmx/CascalStatistics.scala +++ b/src/main/scala/com/shorrockin/cascal/jmx/CascalStatistics.scala @@ -34,7 +34,7 @@ object CascalStatistics extends CascalStatistics$MBean { mbeanServer.registerMBean(this, objectName) } - + /** * retrieves the stats for the specified host, creating and registering them if they don't * exist. @@ -60,7 +60,7 @@ object CascalStatistics extends CascalStatistics$MBean { } def register(pool:SessionPool) = pools = pool :: pools - def unregister(pool:SessionPool) = pools = pools - pool + def unregister(pool:SessionPool) = pools = pools.filterNot(_ == pool) def creation(host:Host) = get(host).creation def creationError(host:Host) = get(host).creationError @@ -94,7 +94,7 @@ class HostStatistics(host:Host) extends HostStatisticsMBean { def getTotalUsageTime() = usageTime def getNumberOfCreationFailures() = createFails def getNumberOfUsageExceptions() = usageErrors - def getNumberOfSessionsCreated() = created + def getNumberOfSessionsCreated() = created } trait HostStatisticsMBean { @@ -109,4 +109,4 @@ trait HostStatisticsMBean { trait CascalStatistics$MBean extends HostStatisticsMBean { def getNumberOfActiveConnections():Int def getNumberOfIdleConnections():Int -} \ No newline at end of file +} diff --git a/src/main/scala/com/shorrockin/cascal/model/Column.scala b/src/main/scala/com/shorrockin/cascal/model/Column.scala index d6aac38..40e3a83 100644 --- a/src/main/scala/com/shorrockin/cascal/model/Column.scala +++ b/src/main/scala/com/shorrockin/cascal/model/Column.scala @@ -1,5 +1,6 @@ package com.shorrockin.cascal.model +import java.nio.ByteBuffer import java.util.Date import org.apache.cassandra.thrift.{ColumnPath, ColumnOrSuperColumn} import org.apache.cassandra.thrift.{Column => CassColumn} @@ -15,14 +16,14 @@ import com.shorrockin.cascal.utils.Utils.now * @author Chris Shorrock * @param Owner the type of object which owns this column */ -case class Column[Owner](val name:Array[Byte], - val value:Array[Byte], +case class Column[Owner](val name:ByteBuffer, + val value:ByteBuffer, val time:Long, val owner:Owner) extends Gettable[Column[Owner]] { - def this(name:Array[Byte], value:Array[Byte], owner:Owner) = this(name, value, now, owner) - def this(name:Array[Byte], owner:Owner) = this(name, null, now, owner) - def this(name:Array[Byte], value:Array[Byte], date:Date, owner:Owner) = this(name, value, date.getTime, owner) + def this(name:ByteBuffer, value:ByteBuffer, owner:Owner) = this(name, value, now, owner) + def this(name:ByteBuffer, owner:Owner) = this(name, null, now, owner) + def this(name:ByteBuffer, value:ByteBuffer, date:Date, owner:Owner) = this(name, value, date.getTime, owner) val partial = (value == null) @@ -44,7 +45,7 @@ case class Column[Owner](val name:Array[Byte], case key:StandardKey => cosc.setColumn(new CassColumn(name, value, time)) case sup:SuperColumn => val list = Conversions.toJavaList(new CassColumn(name, value, time) :: Nil) - cosc.setSuper_column(new CassSuperColumn(sup.value, list)) + cosc.setSuper_column(new CassSuperColumn(sup.value, list)) } } @@ -53,7 +54,7 @@ case class Column[Owner](val name:Array[Byte], * copy method to create a new instance of this column with a new value and * the same other values. */ - def \(newValue:Array[Byte]) = new Column[Owner](name, newValue, time, owner) + def \(newValue:ByteBuffer) = new Column[Owner](name, newValue, time, owner) /** @@ -68,15 +69,16 @@ case class Column[Owner](val name:Array[Byte], */ def convertGetResult(colOrSuperCol:ColumnOrSuperColumn):Column[Owner] = { val col = colOrSuperCol.getColumn - Column(col.getName, col.getValue, col.getTimestamp, owner) + Column(ByteBuffer.wrap(col.getName), ByteBuffer.wrap(col.getValue), col.getTimestamp, owner) } - private def stringIfPossible(a:Array[Byte]):String = { - if (a.length <= 4) return "Array (" + a.mkString(", ") + ")" - if (a.length > 1000) return a.toString - try { Conversions.string(a) } catch { case _ => a.toString } + private def stringIfPossible(a:ByteBuffer):String = { + if (a == null) return "NULL" + if (a.array.length <= 4) return "Array (" + a.array.mkString(", ") + ")" + if (a.array.length > 1000) return a.array.toString + try { Conversions.string(a) } catch { case _ => a.array.toString } } override def toString():String = "%s \\ Column(name = %s, value = %s, time = %s)".format( owner.toString, stringIfPossible(name), stringIfPossible(value), time) -} \ No newline at end of file +} diff --git a/src/main/scala/com/shorrockin/cascal/model/ColumnContainer.scala b/src/main/scala/com/shorrockin/cascal/model/ColumnContainer.scala index 1efc6d6..224087b 100644 --- a/src/main/scala/com/shorrockin/cascal/model/ColumnContainer.scala +++ b/src/main/scala/com/shorrockin/cascal/model/ColumnContainer.scala @@ -1,5 +1,6 @@ package com.shorrockin.cascal.model +import java.nio.ByteBuffer import org.apache.cassandra.thrift.{ColumnParent, ColumnPath, ColumnOrSuperColumn} /** @@ -13,7 +14,7 @@ import org.apache.cassandra.thrift.{ColumnParent, ColumnPath, ColumnOrSuperColum * @param ListType when listed, what type of object does it return. */ trait ColumnContainer[ColumnType, ListType] { - def \(value:Array[Byte]):ColumnType + def \(value:ByteBuffer):ColumnType val family:ColumnFamily[_] val key:Key[_, _] diff --git a/src/main/scala/com/shorrockin/cascal/model/Keyspace.scala b/src/main/scala/com/shorrockin/cascal/model/Keyspace.scala index 96f869b..79edd71 100644 --- a/src/main/scala/com/shorrockin/cascal/model/Keyspace.scala +++ b/src/main/scala/com/shorrockin/cascal/model/Keyspace.scala @@ -1,4 +1,5 @@ package com.shorrockin.cascal.model +import java.nio.ByteBuffer /** * provides the high level abstraction for the keyspace. can be thought @@ -15,4 +16,4 @@ case class Keyspace(val value:String) extends StringValue { def \(value:String):StandardColumnFamily = new StandardColumnFamily(value, this) def \\(value:String):SuperColumnFamily = new SuperColumnFamily(value, this) override def toString = "Keyspace(value = %s)".format(value) -} \ No newline at end of file +} diff --git a/src/main/scala/com/shorrockin/cascal/model/PathComponent.scala b/src/main/scala/com/shorrockin/cascal/model/PathComponent.scala index a69ad0c..cee171a 100755 --- a/src/main/scala/com/shorrockin/cascal/model/PathComponent.scala +++ b/src/main/scala/com/shorrockin/cascal/model/PathComponent.scala @@ -1,4 +1,5 @@ package com.shorrockin.cascal.model +import java.nio.ByteBuffer /** * categorization of a cassandra path component. @@ -13,10 +14,10 @@ trait PathComponent[ValueType] { val value:ValueType } * categorization of a path component who's value is a byte * @author Chris Shorrock */ -trait ByteValue extends PathComponent[Array[Byte]] +trait ByteValue extends PathComponent[ByteBuffer] /** * categorization of path component who's value is a string * @author Chris Shorrock */ -trait StringValue extends PathComponent[String] \ No newline at end of file +trait StringValue extends PathComponent[String] diff --git a/src/main/scala/com/shorrockin/cascal/model/StandardColumnContainer.scala b/src/main/scala/com/shorrockin/cascal/model/StandardColumnContainer.scala index 69e8942..5bfa5a4 100644 --- a/src/main/scala/com/shorrockin/cascal/model/StandardColumnContainer.scala +++ b/src/main/scala/com/shorrockin/cascal/model/StandardColumnContainer.scala @@ -1,12 +1,14 @@ package com.shorrockin.cascal.model +import java.nio.ByteBuffer + /** * a type of column container which holds standard columns. * * @author Chris Shorrock */ trait StandardColumnContainer[ColumnType, SliceType] extends ColumnContainer[ColumnType, SliceType] { - def \(name:Array[Byte]):ColumnType - def \(name:Array[Byte], value:Array[Byte]):ColumnType - def \(name:Array[Byte], value:Array[Byte], time:Long):ColumnType + def \(name:ByteBuffer):ColumnType + def \(name:ByteBuffer, value:ByteBuffer):ColumnType + def \(name:ByteBuffer, value:ByteBuffer, time:Long):ColumnType } diff --git a/src/main/scala/com/shorrockin/cascal/model/StandardKey.scala b/src/main/scala/com/shorrockin/cascal/model/StandardKey.scala index 9433d6c..50f4e1e 100644 --- a/src/main/scala/com/shorrockin/cascal/model/StandardKey.scala +++ b/src/main/scala/com/shorrockin/cascal/model/StandardKey.scala @@ -1,5 +1,6 @@ package com.shorrockin.cascal.model +import java.nio.ByteBuffer import org.apache.cassandra.thrift.{ColumnOrSuperColumn} /** @@ -12,16 +13,16 @@ import org.apache.cassandra.thrift.{ColumnOrSuperColumn} case class StandardKey(val value:String, val family:StandardColumnFamily) extends Key[Column[StandardKey], Seq[Column[StandardKey]]] with StandardColumnContainer[Column[StandardKey], Seq[Column[StandardKey]]] { - def \(name:Array[Byte]) = new Column(name, this) - def \(name:Array[Byte], value:Array[Byte]) = new Column(name, value, this) - def \(name:Array[Byte], value:Array[Byte], time:Long) = new Column(name, value, time, this) + def \(name:ByteBuffer) = new Column(name, this) + def \(name:ByteBuffer, value:ByteBuffer) = new Column(name, value, this) + def \(name:ByteBuffer, value:ByteBuffer, time:Long) = new Column(name, value, time, this) def convertListResult(results:Seq[ColumnOrSuperColumn]):Seq[Column[StandardKey]] = { results.map { (result) => val column = result.getColumn - \(column.getName, column.getValue, column.getTimestamp) + \(ByteBuffer.wrap(column.getName), ByteBuffer.wrap(column.getValue), column.getTimestamp) } } override def toString = "%s \\ StandardKey(value = %s)".format(family.toString, value) -} \ No newline at end of file +} diff --git a/src/main/scala/com/shorrockin/cascal/model/SuperColumn.scala b/src/main/scala/com/shorrockin/cascal/model/SuperColumn.scala index 0abe098..2ab2a49 100644 --- a/src/main/scala/com/shorrockin/cascal/model/SuperColumn.scala +++ b/src/main/scala/com/shorrockin/cascal/model/SuperColumn.scala @@ -1,5 +1,6 @@ package com.shorrockin.cascal.model +import java.nio.ByteBuffer import org.apache.cassandra.thrift.{ColumnPath, ColumnParent, ColumnOrSuperColumn} import com.shorrockin.cascal.utils.Conversions @@ -10,11 +11,11 @@ import com.shorrockin.cascal.utils.Conversions * * @author Chris Shorrock */ -case class SuperColumn(val value:Array[Byte], val key:SuperKey) extends Gettable[Seq[Column[SuperColumn]]]() +case class SuperColumn(val value:ByteBuffer, val key:SuperKey) extends Gettable[Seq[Column[SuperColumn]]]() with StandardColumnContainer[Column[SuperColumn], Seq[Column[SuperColumn]]] { - def \(name:Array[Byte]) = new Column(name, this) - def \(name:Array[Byte], value:Array[Byte]) = new Column(name, value, this) - def \(name:Array[Byte], value:Array[Byte], time:Long) = new Column(name, value, time, this) + def \(name:ByteBuffer) = new Column(name, this) + def \(name:ByteBuffer, value:ByteBuffer) = new Column(name, value, this) + def \(name:ByteBuffer, value:ByteBuffer, time:Long) = new Column(name, value, time, this) val family = key.family val keyspace = family.keyspace @@ -22,7 +23,7 @@ case class SuperColumn(val value:Array[Byte], val key:SuperKey) extends Gettable lazy val columnParent = new ColumnParent(family.value).setSuper_column(value) lazy val columnPath = new ColumnPath(family.value).setSuper_column(value) - def ::(other:SuperColumn):List[SuperColumn] = other :: this :: Nil + def ::(other:SuperColumn):List[SuperColumn] = other :: this :: Nil private def convertList[T](v:java.util.List[T]):List[T] = { scala.collection.JavaConversions.asBuffer(v).toList @@ -34,7 +35,7 @@ case class SuperColumn(val value:Array[Byte], val key:SuperKey) extends Gettable */ def convertGetResult(colOrSuperCol:ColumnOrSuperColumn):Seq[Column[SuperColumn]] = { val superCol = colOrSuperCol.getSuper_column - convertList(superCol.getColumns).map { (column) => \(column.getName, column.getValue, column.getTimestamp) } + convertList(superCol.getColumns).map { (column) => \(ByteBuffer.wrap(column.getName), ByteBuffer.wrap(column.getValue), column.getTimestamp) } } @@ -45,16 +46,16 @@ case class SuperColumn(val value:Array[Byte], val key:SuperKey) extends Gettable def convertListResult(results:Seq[ColumnOrSuperColumn]):Seq[Column[SuperColumn]] = { results.map { (result) => val column = result.getColumn - \(column.getName, column.getValue, column.getTimestamp) + \(ByteBuffer.wrap(column.getName), ByteBuffer.wrap(column.getValue), column.getTimestamp) } } - private def stringIfPossible(a:Array[Byte]):String = { - if (a.length <= 4) return "Array (" + a.mkString(", ") + ")" - if (a.length > 1000) return a.toString - try { Conversions.string(a) } catch { case _ => a.toString } + private def stringIfPossible(a:ByteBuffer):String = { + if (a.array.length <= 4) return "Array (" + a.array.mkString(", ") + ")" + if (a.array.length > 1000) return a.array.toString + try { Conversions.string(a) } catch { case _ => a.array.toString } } override def toString():String = "%s \\ SuperColumn(value = %s)".format( key.toString, stringIfPossible(value)) -} \ No newline at end of file +} diff --git a/src/main/scala/com/shorrockin/cascal/model/SuperKey.scala b/src/main/scala/com/shorrockin/cascal/model/SuperKey.scala index 56a0b77..711fdf6 100644 --- a/src/main/scala/com/shorrockin/cascal/model/SuperKey.scala +++ b/src/main/scala/com/shorrockin/cascal/model/SuperKey.scala @@ -1,10 +1,11 @@ package com.shorrockin.cascal.model import org.apache.cassandra.thrift.{ColumnOrSuperColumn} +import java.nio.ByteBuffer case class SuperKey(val value:String, val family:SuperColumnFamily) extends Key[SuperColumn, Seq[(SuperColumn, Seq[Column[SuperColumn]])]] { - def \(value:Array[Byte]) = new SuperColumn(value, this) + def \(value:ByteBuffer) = new SuperColumn(value, this) /** * converts a list of super columns to the specified return type @@ -12,9 +13,9 @@ case class SuperKey(val value:String, val family:SuperColumnFamily) extends Key[ def convertListResult(results:Seq[ColumnOrSuperColumn]):Seq[(SuperColumn, Seq[Column[SuperColumn]])] = { results.map { (result) => val nativeSuperCol = result.getSuper_column - val superColumn = this \ nativeSuperCol.getName + val superColumn = this \ ByteBuffer.wrap(nativeSuperCol.getName) val columns = convertList(nativeSuperCol.getColumns).map { (column) => - superColumn \ (column.getName, column.getValue, column.getTimestamp) + superColumn \ (ByteBuffer.wrap(column.getName), ByteBuffer.wrap(column.getValue), column.getTimestamp) } (superColumn -> columns) } @@ -25,4 +26,4 @@ case class SuperKey(val value:String, val family:SuperColumnFamily) extends Key[ } override def toString = "%s \\ SuperKey(value = %s)".format(family.toString, value) -} \ No newline at end of file +} diff --git a/src/main/scala/com/shorrockin/cascal/serialization/Converter.scala b/src/main/scala/com/shorrockin/cascal/serialization/Converter.scala index 60722bf..4c9f2b3 100755 --- a/src/main/scala/com/shorrockin/cascal/serialization/Converter.scala +++ b/src/main/scala/com/shorrockin/cascal/serialization/Converter.scala @@ -1,9 +1,10 @@ package com.shorrockin.cascal.serialization +import java.nio.ByteBuffer import reflect.Manifest import java.lang.annotation.Annotation import java.lang.reflect.{Field, Method} -import java.util.{Arrays, Date, UUID} +import java.util.{Date, UUID} import annotations.{Columns, Optional} import annotations.{Key => AKey, SuperColumn => ASuperColumn, Value => AValue} import annotations.{Keyspace => AKeySpace, Super => ASuper, Family => AFamily} @@ -25,7 +26,7 @@ object Converter extends Converter(Serializer.Default) with Logging { class Converter(serializers:Map[Class[_], Serializer[_]]) { private var reflectionCache = Map[Class[_], ReflectionInformation]() - + /** * converts all the column sequences in the provided map (which is returned from a list @@ -99,7 +100,7 @@ class Converter(serializers:Map[Class[_], Serializer[_]]) { * Given a class type, a Method that returns that type, and a source object (Cascal ORM object), * return the appropriate serialized byte array. Does not support Option. */ - private def getFieldSerialized[T](fieldType:Class[_], fieldGetter:Method, obj:T):Array[Byte] = { + private def getFieldSerialized[T](fieldType:Class[_], fieldGetter:Method, obj:T):ByteBuffer = { // Couldn't figure out how to case match classes on a class obj with type erasure if (fieldType == classOf[String]) Conversions.bytes(fieldGetter.invoke(obj).asInstanceOf[String]) else if (fieldType == classOf[UUID]) Conversions.bytes(fieldGetter.invoke(obj).asInstanceOf[UUID]) @@ -109,8 +110,7 @@ class Converter(serializers:Map[Class[_], Serializer[_]]) { else if (fieldType == classOf[Float]) Conversions.bytes(fieldGetter.invoke(obj).asInstanceOf[Float]) else if (fieldType == classOf[Double]) Conversions.bytes(fieldGetter.invoke(obj).asInstanceOf[Double]) else if (fieldType == classOf[Date]) Conversions.bytes(fieldGetter.invoke(obj).asInstanceOf[Date]) - else throw new IllegalStateException( - "Type %s of getter %s is unknown".format(fieldGetter.getName, fieldType.toString)) + else throw new IllegalStateException("Type %s of getter %s is unknown".format(fieldGetter.getName, fieldType.toString)) } /** @@ -118,7 +118,7 @@ class Converter(serializers:Map[Class[_], Serializer[_]]) { * return null if calling the method returns None, or otherwise the appropriate * serialized byte array. */ - private def getOptionFieldSerialized[T](fieldGetter:Method, obj:T):Array[Byte] = { + private def getOptionFieldSerialized[T](fieldGetter:Method, obj:T):ByteBuffer = { val opt = fieldGetter.invoke(obj).asInstanceOf[Option[_]] opt match { case None => null @@ -139,8 +139,8 @@ class Converter(serializers:Map[Class[_], Serializer[_]]) { * Given an object of type T using the Cascal Annotations returns a list of columns * complete with name/value. Uses the serializers to convert values in columns to their * appropriate byte array. - */ - def unapply[T](obj:T)(implicit manifest:Manifest[T]):Seq[Column[_]] = { + */ + def unapply[T](obj:T)(implicit manifest:Manifest[T]):List[Column[_]] = { val info = Converter.this.info(manifest.erasure) val key:String = info.fieldGettersAndColumnNames.filter(tup => tup._2._2 match { @@ -148,7 +148,7 @@ class Converter(serializers:Map[Class[_], Serializer[_]]) { case _ => false }).head._1.invoke(obj).asInstanceOf[String] - var superCol:Array[Byte] = null + var superCol:ByteBuffer = null if (info.isSuper) { val superTup = info.fieldGettersAndColumnNames.filter(tup => tup._2._2 match { case a:ASuperColumn => true @@ -159,11 +159,11 @@ class Converter(serializers:Map[Class[_], Serializer[_]]) { superCol = getFieldSerialized(superType, superGetter, obj) } - info.fieldGettersAndColumnNames.map((tup) => { + info.fieldGettersAndColumnNames.foldLeft(List[Column[_]]()) { (acc, tup) => val fieldGetter = tup._1 var optField = false val fieldType = tup._2._2 match { - case a:Optional => + case a:Optional => optField = true a.as case _ => tup._2._1 @@ -174,17 +174,17 @@ class Converter(serializers:Map[Class[_], Serializer[_]]) { case _ => null } - val value:Array[Byte] = optField match { + val value:ByteBuffer = optField match { case false => getFieldSerialized(fieldType, fieldGetter, obj) case true => getOptionFieldSerialized(fieldGetter, obj) } - if (columnName == null || value == null) null + if (columnName == null || value == null) acc else info.isSuper match { - case true => info.family.asInstanceOf[SuperColumnFamily] \ key \ superCol \ (Conversions.bytes(columnName), value) - case false => info.family.asInstanceOf[StandardColumnFamily] \ key \ (Conversions.bytes(columnName), value) + case true => (info.family.asInstanceOf[SuperColumnFamily] \ key \ superCol \ (Conversions.bytes(columnName), value)) :: acc + case false => (info.family.asInstanceOf[StandardColumnFamily] \ key \ (Conversions.bytes(columnName), value)) :: acc } - }).filter(_!=null) + } } /** @@ -205,11 +205,11 @@ class Converter(serializers:Map[Class[_], Serializer[_]]) { /** - * returns the column with the specified name, or + * returns the column with the specified name, or */ private def find(name:String, columns:Seq[Column[_]]):Option[Column[_]] = { val nameBytes = Conversions.bytes(name) - columns.find { (c) => Arrays.equals(nameBytes, c.name) } + columns.find { (c) => nameBytes.equals(c.name) } } @@ -217,7 +217,7 @@ class Converter(serializers:Map[Class[_], Serializer[_]]) { * converts the specified byte array to the specified type using the installed * serializers. */ - private def bytesToObject[A](ofType:Class[A], bytes:Array[Byte]):A = { + private def bytesToObject[A](ofType:Class[A], bytes:ByteBuffer):A = { serializers.get(ofType) match { case None => throw new IllegalArgumentException("unable to find serializer for type: " + ofType) case Some(s) => @@ -272,7 +272,7 @@ class Converter(serializers:Map[Class[_], Serializer[_]]) { case class ReflectionInformation(val cls:Class[_]) { val keyspace = { extract(cls, classOf[AKeySpace]) match { - case None => throw new IllegalArgumentException("all mapped classes must contain @Keyspace annotation") + case None => throw new IllegalArgumentException("all mapped classes must contain @Keyspace annotation; not found in " + cls) case Some(v) => Keyspace(v.value()) } } @@ -325,7 +325,7 @@ class Converter(serializers:Map[Class[_], Serializer[_]]) { cls.getDeclaredFields.foreach { field => val annotations = field.getDeclaredAnnotations if (annotations.length > 0) annotations(0) match { - case a:AKey => out = (field -> a) :: out + case a:AKey => out = (field -> a) :: out case a:Optional => out = (field -> a) :: out case a:ASuperColumn => out = (field -> a) :: out case a:AValue => out = (field -> a) :: out @@ -350,11 +350,11 @@ class Converter(serializers:Map[Class[_], Serializer[_]]) { * returns all the fields matching the specified annotation */ def fields[A <: Annotation](cls:Class[A]):Seq[(Field, Annotation)] = fields.filter { (tup) => cls.equals(tup._2.getClass) } - + private def extract[A <: Annotation](cls:Class[_], annot:Class[A]):Option[A] = { val value = cls.getAnnotation(annot).asInstanceOf[A] if (null == value) None else Some(value) } } -} \ No newline at end of file +} diff --git a/src/main/scala/com/shorrockin/cascal/serialization/Serializer.scala b/src/main/scala/com/shorrockin/cascal/serialization/Serializer.scala index 9df9925..01f22f2 100755 --- a/src/main/scala/com/shorrockin/cascal/serialization/Serializer.scala +++ b/src/main/scala/com/shorrockin/cascal/serialization/Serializer.scala @@ -4,7 +4,7 @@ import com.shorrockin.cascal.utils.{UUID => UUIDUtils} import java.util.UUID import java.util.Date import java.nio.charset.Charset -import java.nio.ByteBuffer +import java.nio.{ByteBuffer,CharBuffer} object Serializer { @@ -31,10 +31,10 @@ object Serializer { */ trait Serializer[A] { /** converts this object to a byte array for entry into cassandra */ - def toBytes(obj:A):Array[Byte] + def toBytes(obj:A):ByteBuffer /** converts the specified byte array into an object */ - def fromBytes(bytes:Array[Byte]):A + def fromBytes(bytes:ByteBuffer):A /** converts the specified value to a string */ def toString(obj:A):String @@ -45,15 +45,17 @@ trait Serializer[A] { object StringSerializer extends Serializer[String] { val utf8 = Charset.forName("UTF-8") + val decoder = utf8.newDecoder + val encoder = utf8.newEncoder - def toBytes(str:String) = str.getBytes(utf8) - def fromBytes(bytes:Array[Byte]) = new String(bytes, utf8) + def toBytes(str:String) = encoder.encode(CharBuffer.wrap(str.toCharArray)) + def fromBytes(bytes:ByteBuffer) = decoder.decode(bytes).toString def toString(str:String) = str def fromString(str:String) = str } object UUIDSerializer extends Serializer[UUID] { - def fromBytes(bytes:Array[Byte]) = UUIDUtils(bytes) + def fromBytes(bytes:ByteBuffer) = UUIDUtils(bytes.array) def toString(uuid:UUID) = uuid.toString def fromString(str:String) = UUID.fromString(str) @@ -65,7 +67,7 @@ object UUIDSerializer extends Serializer[UUID] { (0 until 8).foreach { (i) => buffer(i) = (msb >>> 8 * (7 - i)).asInstanceOf[Byte] } (8 until 16).foreach { (i) => buffer(i) = (lsb >>> 8 * (7 - i)).asInstanceOf[Byte] } - buffer + ByteBuffer.wrap(buffer) } } @@ -73,8 +75,8 @@ object UUIDSerializer extends Serializer[UUID] { object IntSerializer extends Serializer[Int] { val bytesPerInt = java.lang.Integer.SIZE / java.lang.Byte.SIZE - def toBytes(i:Int) = ByteBuffer.wrap(new Array[Byte](bytesPerInt)).putInt(i).array() - def fromBytes(bytes:Array[Byte]) = ByteBuffer.wrap(bytes).getInt() + def toBytes(i:Int) = ByteBuffer.allocate(bytesPerInt).putInt(i) + def fromBytes(bytes:ByteBuffer) = bytes.getInt def toString(obj:Int) = obj.toString def fromString(str:String) = str.toInt } @@ -82,15 +84,15 @@ object IntSerializer extends Serializer[Int] { object LongSerializer extends Serializer[Long] { val bytesPerLong = java.lang.Long.SIZE / java.lang.Byte.SIZE - def toBytes(l:Long) = ByteBuffer.wrap(new Array[Byte](bytesPerLong)).putLong(l).array() - def fromBytes(bytes:Array[Byte]) = ByteBuffer.wrap(bytes).getLong() + def toBytes(l:Long) = ByteBuffer.allocate(bytesPerLong).putLong(l) + def fromBytes(bytes:ByteBuffer) = bytes.getLong() def toString(obj:Long) = obj.toString def fromString(str:String) = str.toLong } object BooleanSerializer extends Serializer[Boolean] { def toBytes(b:Boolean) = StringSerializer.toBytes(b.toString) - def fromBytes(bytes:Array[Byte]) = StringSerializer.fromBytes(bytes).toBoolean + def fromBytes(bytes:ByteBuffer) = StringSerializer.fromBytes(bytes).toBoolean def toString(obj:Boolean) = obj.toString def fromString(str:String) = str.toBoolean } @@ -98,8 +100,8 @@ object BooleanSerializer extends Serializer[Boolean] { object FloatSerializer extends Serializer[Float] { val bytesPerFloat = java.lang.Float.SIZE / java.lang.Byte.SIZE - def toBytes(f:Float) = ByteBuffer.wrap(new Array[Byte](bytesPerFloat)).putFloat(f).array() - def fromBytes(bytes:Array[Byte]) = ByteBuffer.wrap(bytes).getFloat() + def toBytes(f:Float) = ByteBuffer.allocate(bytesPerFloat).putFloat(f) + def fromBytes(bytes:ByteBuffer) = bytes.getFloat() def toString(obj:Float) = obj.toString def fromString(str:String) = str.toFloat } @@ -107,15 +109,15 @@ object FloatSerializer extends Serializer[Float] { object DoubleSerializer extends Serializer[Double] { val bytesPerDouble = java.lang.Double.SIZE / java.lang.Byte.SIZE - def toBytes(d:Double) = ByteBuffer.wrap(new Array[Byte](bytesPerDouble)).putDouble(d).array() - def fromBytes(bytes:Array[Byte]) = ByteBuffer.wrap(bytes).getDouble + def toBytes(d:Double) = ByteBuffer.allocate(bytesPerDouble).putDouble(d) + def fromBytes(bytes:ByteBuffer) = bytes.getDouble def toString(obj:Double) = obj.toString def fromString(str:String) = str.toDouble } object DateSerializer extends Serializer[Date] { def toBytes(date:Date) = LongSerializer.toBytes(date.getTime) - def fromBytes(bytes:Array[Byte]) = new Date(LongSerializer.fromBytes(bytes).longValue) + def fromBytes(bytes:ByteBuffer) = new Date(LongSerializer.fromBytes(bytes).longValue) def toString(obj:Date) = obj.getTime.toString def fromString(str:String) = new Date(str.toLong.longValue) -} \ No newline at end of file +} diff --git a/src/main/scala/com/shorrockin/cascal/session/Consistency.scala b/src/main/scala/com/shorrockin/cascal/session/Consistency.scala index 37ec905..569f0ac 100644 --- a/src/main/scala/com/shorrockin/cascal/session/Consistency.scala +++ b/src/main/scala/com/shorrockin/cascal/session/Consistency.scala @@ -8,14 +8,6 @@ import org.apache.cassandra.thrift.ConsistencyLevel * @author Chris Shorrock */ object Consistency { - - /** - * WRITE: Ensure nothing. A write happens asynchronously in background - * - * READ: Not supported, because it doesn't make sense - */ - val Zero = new Consistency { def thriftValue = ConsistencyLevel.ZERO } - /** * WRITE: Ensure that the write has been written to at least 1 node, * including hinted recipients. diff --git a/src/main/scala/com/shorrockin/cascal/session/KeyRange.scala b/src/main/scala/com/shorrockin/cascal/session/KeyRange.scala index ee03a69..110eb0f 100644 --- a/src/main/scala/com/shorrockin/cascal/session/KeyRange.scala +++ b/src/main/scala/com/shorrockin/cascal/session/KeyRange.scala @@ -1,6 +1,7 @@ package com.shorrockin.cascal.session import org.apache.cassandra.thrift.{KeyRange => CassKeyRange} +import java.nio.charset.Charset /** * a key range is used when you list by keys to specified the start and end @@ -10,11 +11,19 @@ import org.apache.cassandra.thrift.{KeyRange => CassKeyRange} * * @author Chris Shorrock */ -case class KeyRange(start:String, end:String, limit:Int) { +object KeyRange { + val utf8 = Charset.forName("UTF-8") +} + +trait CassandraKeyRange { + lazy val cassandraRange:CassKeyRange = null +} + +case class KeyRange(start:String, end:String, limit:Int) extends CassKeyRange { lazy val cassandraRange = { val range = new CassKeyRange(limit) - range.setStart_key(start) - range.setEnd_key(end) + range.setStart_key(KeyRange.utf8.encode(start)) + range.setEnd_key(KeyRange.utf8.encode(end)) range } } @@ -28,11 +37,11 @@ case class KeyRange(start:String, end:String, limit:Int) { * * @author Chris Shorrock */ -case class TokenRange(tokenStart:String, tokenEnd:String, tokenLimit:Int) extends KeyRange(tokenStart, tokenEnd, tokenLimit) { +case class TokenRange(tokenStart:String, tokenEnd:String, tokenLimit:Int) extends CassandraKeyRange { override lazy val cassandraRange = { - val range = new CassKeyRange(limit) - range.setStart_token(start) - range.setEnd_token(end) + val range = new CassKeyRange(tokenLimit) + range.setStart_token(tokenStart) + range.setEnd_token(tokenEnd) range } -} \ No newline at end of file +} diff --git a/src/main/scala/com/shorrockin/cascal/session/Predicate.scala b/src/main/scala/com/shorrockin/cascal/session/Predicate.scala index e46a479..d4e8270 100755 --- a/src/main/scala/com/shorrockin/cascal/session/Predicate.scala +++ b/src/main/scala/com/shorrockin/cascal/session/Predicate.scala @@ -1,5 +1,6 @@ package com.shorrockin.cascal.session +import java.nio.ByteBuffer import org.apache.cassandra.thrift.{SliceRange, SlicePredicate} import com.shorrockin.cascal.utils.Conversions @@ -20,7 +21,7 @@ trait Predicate { * * @author Chris Shorrock */ -case class ColumnPredicate(values:Seq[Array[Byte]]) extends Predicate { +case class ColumnPredicate(values:Seq[ByteBuffer]) extends Predicate { val slicePredicate = new SlicePredicate() slicePredicate.setColumn_names(Conversions.toJavaList(values)) } @@ -28,9 +29,9 @@ case class ColumnPredicate(values:Seq[Array[Byte]]) extends Predicate { object RangePredicate { def apply(limit:Int) = new RangePredicate(None, None, Order.Ascending, Some(limit)) def apply(order:Order, limit:Int) = new RangePredicate(None, None, order, Some(limit)) - def apply(start:Array[Byte], end:Array[Byte]) = new RangePredicate(Some(start), Some(end), Order.Ascending, None) - def apply(start:Array[Byte], end:Array[Byte], limit:Int) = new RangePredicate(Some(start), Some(end), Order.Ascending, Some(limit)) - def apply(start:Option[Array[Byte]], end:Option[Array[Byte]], order:Order, limit:Option[Int]) = new RangePredicate(start, end, order, limit) + def apply(start:ByteBuffer, end:ByteBuffer) = new RangePredicate(Some(start), Some(end), Order.Ascending, None) + def apply(start:ByteBuffer, end:ByteBuffer, limit:Int) = new RangePredicate(Some(start), Some(end), Order.Ascending, Some(limit)) + def apply(start:Option[ByteBuffer], end:Option[ByteBuffer], order:Order, limit:Option[Int]) = new RangePredicate(start, end, order, limit) } /** @@ -38,10 +39,10 @@ object RangePredicate { * * @author Chris Shorrock */ -class RangePredicate(start:Option[Array[Byte]], end:Option[Array[Byte]], order:Order, limit:Option[Int]) extends Predicate { - val emptyBytes = new Array[Byte](0) +class RangePredicate(start:Option[ByteBuffer], end:Option[ByteBuffer], order:Order, limit:Option[Int]) extends Predicate { + val emptyBytes = ByteBuffer.wrap(new Array[Byte](0)) - def optBytesToBytes(opt:Option[Array[Byte]]) = opt match { + def optBytesToBytes(opt:Option[ByteBuffer]) = opt match { case None => emptyBytes case Some(array) => array } @@ -55,4 +56,4 @@ class RangePredicate(start:Option[Array[Byte]], end:Option[Array[Byte]], order:O slicePredicate.setSlice_range(new SliceRange(optBytesToBytes(start), optBytesToBytes(end), order.reversed, limitVal)) } -case object EmptyPredicate extends RangePredicate(None, None, Order.Ascending, None) \ No newline at end of file +case object EmptyPredicate extends RangePredicate(None, None, Order.Ascending, None) diff --git a/src/main/scala/com/shorrockin/cascal/session/Session.scala b/src/main/scala/com/shorrockin/cascal/session/Session.scala index 0490c7c..6cf1838 100644 --- a/src/main/scala/com/shorrockin/cascal/session/Session.scala +++ b/src/main/scala/com/shorrockin/cascal/session/Session.scala @@ -1,19 +1,21 @@ package com.shorrockin.cascal.session -import org.apache.thrift.protocol.TBinaryProtocol +import scala.collection.mutable +import collection.immutable.HashSet +import java.util.concurrent.atomic.AtomicLong +import java.util.{Map => JMap, List => JList, HashMap, ArrayList} +import java.nio.ByteBuffer +import org.apache.thrift.protocol.TBinaryProtocol +import org.apache.thrift.transport.{TFramedTransport, TSocket} import org.apache.cassandra.thrift.{Mutation, Cassandra, NotFoundException, ConsistencyLevel} -import java.util.{Map => JMap, List => JList, HashMap, ArrayList} +import org.apache.cassandra.thrift.{Column => CassColumn} +import org.apache.cassandra.thrift.{SuperColumn => CassSuperColumn} +import com.shorrockin.cascal.model._ import com.shorrockin.cascal.utils.Conversions._ import com.shorrockin.cascal.utils.Utils.now -import com.shorrockin.cascal.model._ -import org.apache.thrift.transport.{TFramedTransport, TSocket} -import collection.immutable.HashSet - -import java.util.concurrent.atomic.AtomicLong - /** * a cascal session is the entry point for interacting with the * cassandra system through various path elements. @@ -67,43 +69,52 @@ class Session(val host:Host, val defaultConsistency:Consistency, val framedTrans /** * return the current cluster name of the cassandra instance */ - lazy val clusterName = client.get_string_property("cluster name") - - - /** - * returns the configuration file of the connected cassandra instance - */ - lazy val configFile = client.get_string_property("config file") + lazy val clusterName = client.describe_cluster_name() /** * returns the version of the cassandra instance */ - lazy val version = client.get_string_property("version") + lazy val version = client.describe_version() /** * returns all the keyspaces from the cassandra instance */ - lazy val keyspaces: Seq[String] = Buffer(client.get_string_list_property("keyspaces")) + lazy val keyspaces: Seq[String] = Buffer(client.describe_keyspaces.map { _.name }) /** * returns the descriptors for all keyspaces */ lazy val keyspaceDescriptors: Set[Tuple3[String, String, String]] = { var keyspaceDesc: Set[Tuple3[String, String, String]] = new HashSet[Tuple3[String, String, String]] - convertSet(client.describe_keyspaces) foreach { + client.describe_keyspaces foreach { space => - val familyMap = client.describe_keyspace(space) - familyMap.keySet foreach { + val familyMap = space.cf_defs + familyMap foreach { family => - keyspaceDesc = keyspaceDesc + ((space, family, familyMap.get(family).get("Type"))) + keyspaceDesc = keyspaceDesc + ((space.name, family.name, family.column_type)) () } } keyspaceDesc } + /** + * Cassandra 0.7 requires you to set the keyspace before CRUD operations. We cache the last + * set keyspace and change it in the client if the new operation differs. + */ + private var currentKeyspace:String = null + + def verifyKeyspace(keyspace:String) = { + if (keyspace == null || keyspace.length == 0) + throw new IllegalArgumentException("Keyspace cannot be null") + if (currentKeyspace == null || !keyspace.equals(currentKeyspace)) { + client.set_keyspace(keyspace) + currentKeyspace = keyspace + } + } + def verifyInsert[E](col: Column[E]) { var famType = if (col.owner.isInstanceOf[SuperColumn]) "Super" else "Standard" if (!keyspaceDescriptors.contains(col.keyspace.value, col.family.value, famType)) { @@ -131,7 +142,7 @@ class Session(val host:Host, val defaultConsistency:Consistency, val framedTrans */ def get[ResultType](col: Gettable[ResultType], consistency: Consistency): Option[ResultType] = detect { try { - val result = client.get(col.keyspace.value, col.key.value, col.columnPath, consistency) + val result = client.get(ByteBuffer.wrap(col.key.value.getBytes("UTF-8")), col.columnPath, consistency) Some(col.convertGetResult(result)) } catch { case nfe: NotFoundException => None @@ -150,7 +161,9 @@ class Session(val host:Host, val defaultConsistency:Consistency, val framedTrans */ def insert[E](col: Column[E], consistency: Consistency) = detect { verifyInsert(col) - client.insert(col.keyspace.value, col.key.value, col.columnPath, col.value, col.time, consistency) + verifyKeyspace(col.keyspace.value) + val cassCol = new CassColumn(col.name, col.value, col.time) + client.insert(col.key.value, col.key.columnParent, cassCol, consistency) col } @@ -164,11 +177,15 @@ class Session(val host:Host, val defaultConsistency:Consistency, val framedTrans /** * counts the number of columns in the specified column container */ - def count(container: ColumnContainer[_, _], consistency: Consistency): Int = detect { - client.get_count(container.keyspace.value, container.key.value, container.columnParent, consistency) + def count(container: ColumnContainer[_, _], predicate:Predicate, consistency: Consistency): Int = detect { + verifyKeyspace(container.keyspace.value) + client.get_count(container.key.value, container.columnParent, predicate.slicePredicate, consistency) } - + /** + * counts the number of columns in the specified column container + */ + def count(container: ColumnContainer[_, _], consistency: Consistency): Int = count(container, EmptyPredicate, consistency) /** * performs count on the specified column container */ @@ -179,8 +196,9 @@ class Session(val host:Host, val defaultConsistency:Consistency, val framedTrans * removes the specified column container */ def remove(container: ColumnContainer[_, _], consistency: Consistency): Unit = detect { + verifyKeyspace(container.keyspace.value) verifyRemove(container) - client.remove(container.keyspace.value, container.key.value, container.columnPath, now, consistency) + client.remove(container.key.value, container.columnPath, now, consistency) } @@ -194,7 +212,8 @@ class Session(val host:Host, val defaultConsistency:Consistency, val framedTrans * removes the specified column container */ def remove(column: Column[_], consistency: Consistency): Unit = detect { - client.remove(column.keyspace.value, column.key.value, column.columnPath, now, consistency) + verifyKeyspace(column.keyspace.value) + client.remove(column.key.value, column.columnPath, now, consistency) } @@ -209,7 +228,8 @@ class Session(val host:Host, val defaultConsistency:Consistency, val framedTrans * to determine which columns to return. */ def list[ResultType](container: ColumnContainer[_, ResultType], predicate: Predicate, consistency: Consistency): ResultType = detect { - val results = client.get_slice(container.keyspace.value, container.key.value, container.columnParent, predicate.slicePredicate, consistency) + verifyKeyspace(container.keyspace.value) + val results = client.get_slice(container.key.value, container.columnParent, predicate.slicePredicate, consistency) container.convertListResult(convertList(results)) } @@ -233,16 +253,23 @@ class Session(val host:Host, val defaultConsistency:Consistency, val framedTrans * same super column name (existing to separate key values), and regardless of column * container type - belong to the same column family. If they are not, the first key * in the sequence what is used in this query.
- * NOTE (to be clear): If containers is a + * NOTE (to be clear): If containers is a */ def list[ColumnType, ResultType](containers: Seq[ColumnContainer[ColumnType, ResultType]], predicate: Predicate, consistency: Consistency): Seq[(ColumnContainer[ColumnType, ResultType], ResultType)] = { if (containers.size > 0) detect { val firstContainer = containers(0) val keyspace = firstContainer.keyspace - val keyStrings = containers.map {_.key.value} - val results = client.multiget_slice(keyspace.value, keyStrings, firstContainer.columnParent, predicate.slicePredicate, consistency) + val keyStrings = containers.map {container => ByteBuffer.wrap(container.key.value.getBytes("UTF-8"))} + verifyKeyspace(keyspace.value) + val results = client.multiget_slice(keyStrings, firstContainer.columnParent, predicate.slicePredicate, consistency) + + val containersByKey = containers.foldLeft( + mutable.Map[String, ColumnContainer[ColumnType, ResultType]]())((acc, container) => { + acc += (container.key.value -> container) + }) - def locate(key: String) = (containers.find {_.key.value.equals(key)}).get + // def locate(key: String) = (containers.find {_.key.value.equals(key)}).get + def locate(key: String) = containersByKey(key) convertMap(results).map { (tuple) => val key = locate(tuple._1) @@ -273,7 +300,8 @@ class Session(val host:Host, val defaultConsistency:Consistency, val framedTrans * of tokens. This list call is only available when using an order-preserving partition. */ def list[ColumnType, ListType](family: ColumnFamily[Key[ColumnType, ListType]], range: KeyRange, predicate: Predicate, consistency: Consistency): Map[Key[ColumnType, ListType], ListType] = detect { - val results = client.get_range_slices(family.keyspace.value, family.columnParent, predicate.slicePredicate, range.cassandraRange, consistency) + verifyKeyspace(family.keyspace.value) + val results = client.get_range_slices(family.columnParent, predicate.slicePredicate, range.cassandraRange, consistency) var map = Map[Key[ColumnType, ListType], ListType]() convertList(results).foreach { (keyslice) => @@ -307,8 +335,9 @@ class Session(val host:Host, val defaultConsistency:Consistency, val framedTrans */ def batch(ops: Seq[Operation], consistency: Consistency): Unit = { if (ops.size > 0) detect { - val keyToFamilyMutations = new HashMap[String, JMap[String, JList[Mutation]]]() + val keyToFamilyMutations = new HashMap[ByteBuffer, JMap[String, JList[Mutation]]]() val keyspace = ops(0).keyspace + verifyKeyspace(keyspace.value) def getOrElse[A, B](map: JMap[A, B], key: A, f: => B): B = { if (map.containsKey(key)) { @@ -323,25 +352,61 @@ class Session(val host:Host, val defaultConsistency:Consistency, val framedTrans ops.foreach { (op) => verifyOperation(op) - val familyToMutations = getOrElse(keyToFamilyMutations, op.key.value, new HashMap[String, JList[Mutation]]()) + val familyToMutations = getOrElse(keyToFamilyMutations, ByteBuffer.wrap(op.key.value.getBytes("UTF-8")), new HashMap[String, JList[Mutation]]()) val mutationList = getOrElse(familyToMutations, op.family.value, new ArrayList[Mutation]()) mutationList.add(op.mutation) } // TODO may need to flatten duplicate super columns? - client.batch_mutate(keyspace.value, keyToFamilyMutations, consistency) + client.batch_mutate(keyToFamilyMutations, consistency) } else { throw new IllegalArgumentException("cannot perform batch operation on 0 length operation sequence") } } - /** * performs the list of operations in batch using the default consistency */ def batch(ops: Seq[Operation]): Unit = batch(ops, defaultConsistency) + /** + * Performs the specified seq of operations in batch. Assumes all operations belong + * to the same keyspace. If they do not then the first keyspace in the first operation + * is used. Will retry in the face of a Cassandra-related timeout exception. + * TODO: Reforumulate where we keep splitting up the batch size into halves. + */ + def batchWithRetry(ops: Seq[Operation], consistency: Consistency): Unit = timeoutTry { + batch(ops, consistency) + } + + + /** + * Performs the list of operations in batch using the default consistency, retrying in + * the face of a Cassandra-related timeout exception. + * TODO: Reforumulate where we keep splitting up the batch size into halves. + */ + def batchWithRetry(ops: Seq[Operation]): Unit = timeoutTry { batch(ops) } + + + /** + * Given a code block, keep retrying it (up to maxTries) in the event of a + * Cassandra-related timeout exception. + */ + private def timeoutTry(f: =>Unit, maxTries:Int=5, timeoutWaitMsec:Int=200):Unit = { + var tries = maxTries + assert(maxTries > 0) + while(tries > 0) { + try { f; return } catch { + case e:java.net.SocketTimeoutException => tries -= 1; if (tries <= 0) throw e + case e:java.util.concurrent.TimeoutException => tries -= 1; if (tries <= 0) throw e + case e:org.apache.cassandra.thrift.UnavailableException => tries -= 1; if (tries <= 0) throw e + case e:Exception => throw e + } + Thread.sleep(timeoutWaitMsec) + } + } + /** * implicitly coverts a consistency value to an int */ @@ -359,15 +424,15 @@ class Session(val host:Host, val defaultConsistency:Consistency, val framedTrans } private def Buffer[T](v:java.util.List[T]) = { - scala.collection.JavaConversions.asBuffer(v) + scala.collection.JavaConversions.asBuffer(v) } implicit private def convertList[T](v:java.util.List[T]):List[T] = { - scala.collection.JavaConversions.asBuffer(v).toList + scala.collection.JavaConversions.asBuffer(v).toList } implicit private def convertMap[K,V](v:java.util.Map[K,V]): scala.collection.mutable.Map[K,V] = { - scala.collection.JavaConversions.asMap(v) + scala.collection.JavaConversions.asMap(v) } implicit private def convertSet[T](s:java.util.Set[T]):scala.collection.mutable.Set[T] = { diff --git a/src/main/scala/com/shorrockin/cascal/session/SessionPool.scala b/src/main/scala/com/shorrockin/cascal/session/SessionPool.scala index f44e7a9..0cfc578 100644 --- a/src/main/scala/com/shorrockin/cascal/session/SessionPool.scala +++ b/src/main/scala/com/shorrockin/cascal/session/SessionPool.scala @@ -60,7 +60,7 @@ class SessionPool(val hosts:Seq[Host], val params:PoolParams, consistency:Consis /** - * returns the number of active session connections. + * returns the number of active session connections. */ def active = pool.getNumActive @@ -99,7 +99,7 @@ class SessionPool(val hosts:Seq[Host], val params:PoolParams, consistency:Consis /** * retrieves a session. Once the caller has finished with the * session it must be returned to the pool. failure to do so - * will result your pool shedding a tear. + * will result your pool shedding a tear. */ def checkout:Session = pool.borrowObject.asInstanceOf[Session] @@ -159,8 +159,6 @@ class SessionPool(val hosts:Seq[Host], val params:PoolParams, consistency:Consis def clusterName:String = borrow { _.clusterName } - def configFile:String = borrow { _.configFile } - def version:String = borrow { _.version } def keyspaces:Seq[String] = borrow { _.keyspaces } @@ -206,6 +204,10 @@ class SessionPool(val hosts:Seq[Host], val params:PoolParams, consistency:Consis def batch(ops:Seq[Operation], consistency:Consistency):Unit = borrow { _.batch(ops, consistency) } def batch(ops:Seq[Operation]):Unit = borrow { _.batch(ops) } + + def batchWithRetry(ops: Seq[Operation], consistency: Consistency): Unit = borrow { _.batchWithRetry(ops, consistency) } + + def batchWithRetry(ops: Seq[Operation]): Unit = borrow { _.batchWithRetry(ops) } } @@ -272,4 +274,4 @@ case class PoolParams(maxActive:Int, GenericObjectPool.DEFAULT_TEST_WHILE_IDLE, GenericObjectPool.DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME_MILLIS, true) -} \ No newline at end of file +} diff --git a/src/main/scala/com/shorrockin/cascal/session/SessionTemplate.scala b/src/main/scala/com/shorrockin/cascal/session/SessionTemplate.scala index 55775d1..d4a5baa 100755 --- a/src/main/scala/com/shorrockin/cascal/session/SessionTemplate.scala +++ b/src/main/scala/com/shorrockin/cascal/session/SessionTemplate.scala @@ -10,19 +10,13 @@ import com.shorrockin.cascal.model._ * @author Chris Shorrock */ trait SessionTemplate { - + /** * return the current cluster name of the cassandra instance */ def clusterName:String - /** - * returns the configuration file of the connected cassandra instance - */ - def configFile:String - - /** * returns the version of the cassandra instance */ @@ -169,4 +163,4 @@ trait SessionTemplate { * performs the list of operations in batch using the default consistency */ def batch(ops:Seq[Operation]):Unit -} \ No newline at end of file +} diff --git a/src/main/scala/com/shorrockin/cascal/testing/CassandraTestPool.scala b/src/main/scala/com/shorrockin/cascal/testing/CassandraTestPool.scala index e182497..c59c50d 100644 --- a/src/main/scala/com/shorrockin/cascal/testing/CassandraTestPool.scala +++ b/src/main/scala/com/shorrockin/cascal/testing/CassandraTestPool.scala @@ -1,3 +1,5 @@ +// TODO Need to update this for Cassandra 0.7 + package com.shorrockin.cascal.testing import org.apache.cassandra.thrift.CassandraDaemon @@ -37,7 +39,7 @@ object EmbeddedTestCassandra extends Logging { log.debug("creating cassandra instance at: " + homeDirectory.getCanonicalPath) log.debug("copying cassandra configuration files to root directory") - + val fileSep = System.getProperty("file.separator") val storageFile = new File(homeDirectory, "storage-conf.xml") val logFile = new File(homeDirectory, "log4j.properties") @@ -49,7 +51,7 @@ object EmbeddedTestCassandra extends Logging { log.debug("creating data file and log location directories") DatabaseDescriptor.getAllDataFileLocations.foreach { (file) => new File(file).mkdirs } - new File(DatabaseDescriptor.getLogFileLocation).mkdirs + // new File(DatabaseDescriptor.getLogFileLocation).mkdirs val daemon = new CassandraDaemonThread daemon.start diff --git a/src/main/scala/com/shorrockin/cascal/utils/Conversions.scala b/src/main/scala/com/shorrockin/cascal/utils/Conversions.scala index f8d4110..0655d78 100644 --- a/src/main/scala/com/shorrockin/cascal/utils/Conversions.scala +++ b/src/main/scala/com/shorrockin/cascal/utils/Conversions.scala @@ -4,6 +4,7 @@ import java.nio.charset.Charset import com.shorrockin.cascal.model.{Column, Keyspace} import java.util.{Date, UUID => JavaUUID} import com.shorrockin.cascal.serialization._ +import java.nio.ByteBuffer /** * some implicits to assist with common conversions @@ -13,36 +14,36 @@ object Conversions { implicit def keyspace(str:String) = new Keyspace(str) - implicit def bytes(date:Date):Array[Byte] = DateSerializer.toBytes(date) - implicit def date(bytes:Array[Byte]):Date = DateSerializer.fromBytes(bytes) + implicit def bytes(date:Date):ByteBuffer = DateSerializer.toBytes(date) + implicit def date(bytes:ByteBuffer):Date = DateSerializer.fromBytes(bytes) implicit def string(date:Date):String = DateSerializer.toString(date) - implicit def bytes(b:Boolean):Array[Byte] = BooleanSerializer.toBytes(b) - implicit def boolean(bytes:Array[Byte]):Boolean = BooleanSerializer.fromBytes(bytes) + implicit def bytes(b:Boolean):ByteBuffer = BooleanSerializer.toBytes(b) + implicit def boolean(bytes:ByteBuffer):Boolean = BooleanSerializer.fromBytes(bytes) implicit def string(b:Boolean):String = BooleanSerializer.toString(b) - implicit def bytes(b:Float):Array[Byte] = FloatSerializer.toBytes(b) - implicit def float(bytes:Array[Byte]):Float = FloatSerializer.fromBytes(bytes) + implicit def bytes(b:Float):ByteBuffer = FloatSerializer.toBytes(b) + implicit def float(bytes:ByteBuffer):Float = FloatSerializer.fromBytes(bytes) implicit def string(b:Float):String = FloatSerializer.toString(b) - implicit def bytes(b:Double):Array[Byte] = DoubleSerializer.toBytes(b) - implicit def double(bytes:Array[Byte]):Double = DoubleSerializer.fromBytes(bytes) + implicit def bytes(b:Double):ByteBuffer = DoubleSerializer.toBytes(b) + implicit def double(bytes:ByteBuffer):Double = DoubleSerializer.fromBytes(bytes) implicit def string(b:Double):String = DoubleSerializer.toString(b) - implicit def bytes(l:Long):Array[Byte] = LongSerializer.toBytes(l) - implicit def long(bytes:Array[Byte]):Long = LongSerializer.fromBytes(bytes) + implicit def bytes(l:Long):ByteBuffer = LongSerializer.toBytes(l) + implicit def long(bytes:ByteBuffer):Long = LongSerializer.fromBytes(bytes) implicit def string(l:Long):String = LongSerializer.toString(l) - implicit def bytes(i:Int):Array[Byte] = IntSerializer.toBytes(i) - implicit def int(bytes:Array[Byte]):Int = IntSerializer.fromBytes(bytes) + implicit def bytes(i:Int):ByteBuffer = IntSerializer.toBytes(i) + implicit def int(bytes:ByteBuffer):Int = IntSerializer.fromBytes(bytes) implicit def string(i:Int) = IntSerializer.toString(i) - implicit def bytes(str:String):Array[Byte] = StringSerializer.toBytes(str) - implicit def string(bytes:Array[Byte]):String = StringSerializer.fromBytes(bytes) + implicit def bytes(str:String):ByteBuffer = StringSerializer.toBytes(str) + implicit def string(bytes:ByteBuffer):String = StringSerializer.fromBytes(bytes) implicit def string(source:JavaUUID) = UUIDSerializer.toString(source) implicit def uuid(source:String) = UUIDSerializer.fromString(source) - implicit def bytes(source:JavaUUID):Array[Byte] = UUIDSerializer.toBytes(source) + implicit def bytes(source:JavaUUID):ByteBuffer = UUIDSerializer.toBytes(source) implicit def string(col:Column[_]):String = { "%s -> %s (time: %s)".format(Conversions.string(col.name), @@ -52,4 +53,4 @@ object Conversions { implicit def toSeqBytes(values:Seq[String]) = values.map { (s) => Conversions.bytes(s) } implicit def toJavaList[T](l: Seq[T]):java.util.List[T] = l.foldLeft(new java.util.ArrayList[T](l.size)){(al, e) => al.add(e); al} -} \ No newline at end of file +}