diff --git a/src/main/scala/com/twosigma/flint/rdd/Conversion.scala b/src/main/scala/com/twosigma/flint/rdd/Conversion.scala index a9835aad..40d83e3e 100644 --- a/src/main/scala/com/twosigma/flint/rdd/Conversion.scala +++ b/src/main/scala/com/twosigma/flint/rdd/Conversion.scala @@ -108,7 +108,7 @@ object Conversion { ) val iter = PeekableIterator( OrderedIterator( - PartitionsIterator(rdd, thisDep.parents, context) + PartitionsIterator(rdd, thisDep.parents, context, preservesPartitionsOrdering = true) ).filterByRange(thisDep.range) ) new InterruptibleIterator[(K, V)](context, iter) diff --git a/src/main/scala/com/twosigma/flint/rdd/RangeDependency.scala b/src/main/scala/com/twosigma/flint/rdd/RangeDependency.scala index ccae79c1..17ef9ad2 100644 --- a/src/main/scala/com/twosigma/flint/rdd/RangeDependency.scala +++ b/src/main/scala/com/twosigma/flint/rdd/RangeDependency.scala @@ -23,7 +23,6 @@ import scala.reflect.ClassTag * :: DeveloperApi :: */ private[rdd] object RangeDependency { - /** * Normalize the ranges of partitions from a sorted [[org.apache.spark.rdd.RDD]]. * @@ -39,19 +38,9 @@ private[rdd] object RangeDependency { normalizationStrategy: PartitionNormalizationStrategy = HeavyKeysNormalizationStrategy )(implicit ord: Ordering[K]): Seq[RangeDependency[K, P]] = { require(headers.nonEmpty, "Need at least one partition") + import OrderedPartitionHeaderUtils.HeaderOrdering - val sortedHeaders = headers.sortBy(_.partition.index).toArray - // Assume partitions are sorted, i.e. the keys of ith partition are less or equal than those of (i + 1)th partition. - sortedHeaders.reduceOption { - (h1, h2) => - if (ord.lteq(h1.firstKey, h2.firstKey)) { - h2 - } else { - sys.error(s"Partitions are not sorted. " + - s"The partition ${h1.partition.index} has the first key ${h1.firstKey} and " + - s"the partition ${h2.partition.index} has the first key ${h2.firstKey}.") - } - } + val sortedHeaders = headers.sorted.toArray val (nonNormalizedPartitions, nonNormalizedRanges) = sortedHeaders.zipWithIndex.map { case (hdr, idx) => @@ -102,6 +91,32 @@ private[rdd] case class OrderedPartitionHeader[K, P <: Partition]( secondKey: Option[K] ) +private[rdd] object OrderedPartitionHeaderUtils { + implicit class HeaderOrdering[K, P <: Partition](header: OrderedPartitionHeader[K, P])(implicit ord: Ordering[K]) + extends Ordered[OrderedPartitionHeader[K, P]] { + import scala.math.Ordered.orderingToOrdered + def compare(that: OrderedPartitionHeader[K, P]): Int = (header, that) match { + // First keys not equal we return the larger first key + case (OrderedPartitionHeader(_, thisFirst, _), OrderedPartitionHeader(_, thatFirst, _)) + if thisFirst != thatFirst + => thisFirst.compare(thatFirst) + // First keys are equal, the header with "Some" second key one is larger than the one with "None" + case (OrderedPartitionHeader(_, _, Some(_)), OrderedPartitionHeader(_, _, None)) => 1 + case (OrderedPartitionHeader(_, _, None), OrderedPartitionHeader(_, _, Some(_))) => -1 + // If the partitions both only contain the same single key, compare by partition index to preserve + // previous behaviour + case (OrderedPartitionHeader(thisPartition, _, None), OrderedPartitionHeader(thatPartition, _, None)) + => thisPartition.index.compare(thatPartition.index) + case (OrderedPartitionHeader(thisPartition, thisFirst, Some(thisSecond)), + OrderedPartitionHeader(thatPartition, _, Some(thatSecond))) => + sys.error(s"Error has occurred, partitions weren't sorted properly, two partitions both had first key " + + s"$thisFirst and they both had non empty seconds keys: $thisSecond and $thatSecond" + + s" partitions were: $thisPartition and $thatPartition") + } + } +} + + /** * :: DeveloperApi :: * Base class for range dependency. @@ -214,7 +229,9 @@ private[rdd] object HeavyKeysNormalizationStrategy extends PartitionNormalizatio implicit ord: Ordering[K] ): Seq[CloseOpen[K]] = { - val sortedHeaders = headers.sortBy(_.partition.index) + import OrderedPartitionHeaderUtils.HeaderOrdering + + val sortedHeaders = headers.sorted val partitionBoundaries = sortedHeaders.head.firstKey +: sortedHeaders.tail.map { header => header.secondKey.getOrElse(header.firstKey) diff --git a/src/test/resources/timeseries/parquet/small.parquet/_SUCCESS b/src/test/resources/timeseries/parquet/small.parquet/_SUCCESS new file mode 100644 index 00000000..e69de29b diff --git a/src/test/resources/timeseries/parquet/small.parquet/part-00000-52210b44-868e-47d1-8525-65ba2c653e67.snappy.parquet b/src/test/resources/timeseries/parquet/small.parquet/part-00000-52210b44-868e-47d1-8525-65ba2c653e67.snappy.parquet new file mode 100644 index 00000000..53d2419a Binary files /dev/null and b/src/test/resources/timeseries/parquet/small.parquet/part-00000-52210b44-868e-47d1-8525-65ba2c653e67.snappy.parquet differ diff --git a/src/test/resources/timeseries/parquet/small.parquet/part-00001-52210b44-868e-47d1-8525-65ba2c653e67.snappy.parquet b/src/test/resources/timeseries/parquet/small.parquet/part-00001-52210b44-868e-47d1-8525-65ba2c653e67.snappy.parquet new file mode 100644 index 00000000..f550bf8a Binary files /dev/null and b/src/test/resources/timeseries/parquet/small.parquet/part-00001-52210b44-868e-47d1-8525-65ba2c653e67.snappy.parquet differ diff --git a/src/test/resources/timeseries/parquet/small.parquet/part-00002-52210b44-868e-47d1-8525-65ba2c653e67.snappy.parquet b/src/test/resources/timeseries/parquet/small.parquet/part-00002-52210b44-868e-47d1-8525-65ba2c653e67.snappy.parquet new file mode 100644 index 00000000..9d893970 Binary files /dev/null and b/src/test/resources/timeseries/parquet/small.parquet/part-00002-52210b44-868e-47d1-8525-65ba2c653e67.snappy.parquet differ diff --git a/src/test/resources/timeseries/parquet/small.parquet/part-00003-52210b44-868e-47d1-8525-65ba2c653e67.snappy.parquet b/src/test/resources/timeseries/parquet/small.parquet/part-00003-52210b44-868e-47d1-8525-65ba2c653e67.snappy.parquet new file mode 100644 index 00000000..5ea0c56c Binary files /dev/null and b/src/test/resources/timeseries/parquet/small.parquet/part-00003-52210b44-868e-47d1-8525-65ba2c653e67.snappy.parquet differ diff --git a/src/test/resources/timeseries/parquet/small.parquet/part-00004-52210b44-868e-47d1-8525-65ba2c653e67.snappy.parquet b/src/test/resources/timeseries/parquet/small.parquet/part-00004-52210b44-868e-47d1-8525-65ba2c653e67.snappy.parquet new file mode 100644 index 00000000..892ba21c Binary files /dev/null and b/src/test/resources/timeseries/parquet/small.parquet/part-00004-52210b44-868e-47d1-8525-65ba2c653e67.snappy.parquet differ diff --git a/src/test/scala/com/twosigma/flint/rdd/ConversionSpec.scala b/src/test/scala/com/twosigma/flint/rdd/ConversionSpec.scala index 2fdc53dc..c9282246 100644 --- a/src/test/scala/com/twosigma/flint/rdd/ConversionSpec.scala +++ b/src/test/scala/com/twosigma/flint/rdd/ConversionSpec.scala @@ -164,6 +164,22 @@ class ConversionSpec extends FlatSpec with SharedSparkContext with Timeouts { assert(false, "Should not completed as the job has been killed.") case Failure(_) => } + } + + "fromSortedRDD" should "sort partitions, and have partition indexes increasing" in { + + // Create an RDD with data data sorted within partitions, but partitions not sorted + // Data is: + // Partition 0: 100, 130, 160, 199 + // Partition 1: 0, 30, 60, 99 + val data = Seq(100, 130, 160, 199, 0, 30, 60, 99) + val kvData = data.zip(data) + val rdd = sc.makeRDD(kvData, 2) + + val orderedRdd = Conversion.fromSortedRDD(rdd) + assert(orderedRdd.partitions(0).index == 0) + assert(orderedRdd.partitions(1).index == 1) + assert(orderedRdd.collect().toSeq == kvData.sorted) } } diff --git a/src/test/scala/com/twosigma/flint/rdd/RangeDependencySpec.scala b/src/test/scala/com/twosigma/flint/rdd/RangeDependencySpec.scala index 6e2ff0db..908e1953 100644 --- a/src/test/scala/com/twosigma/flint/rdd/RangeDependencySpec.scala +++ b/src/test/scala/com/twosigma/flint/rdd/RangeDependencySpec.scala @@ -17,8 +17,9 @@ package com.twosigma.flint.rdd import org.scalatest.FlatSpec +import org.scalatest.prop.TableDrivenPropertyChecks -class RangeDependencySpec extends FlatSpec { +class RangeDependencySpec extends FlatSpec with TableDrivenPropertyChecks { // partition 0: [1, 1, 2, ..., 4] // partition 1: [4, ..., 4] // partition 2: [4, 4, 5, ..., 7] @@ -141,4 +142,65 @@ class RangeDependencySpec extends FlatSpec { assert(RangeDependency(2, CloseOpen(8, Some(14)), List(Split(3), Split(4))) == dep(2)) assert(RangeDependency(3, CloseOpen(14, None), List(Split(4))) == dep(3)) } + + private def makeHeader( + firstKey: Int, + secondKey: Option[Int], + partitionNumber: Int = 0): + OrderedPartitionHeader[Int, OrderedRDDPartition] = + OrderedPartitionHeader(OrderedRDDPartition(partitionNumber), firstKey, secondKey) + + import OrderedPartitionHeaderUtils.HeaderOrdering + import org.scalatest.Matchers.{an, thrownBy} + + "HeaderOrdering" should "throw error if first keys are both equal, and second keys are both Some" in { + val a = makeHeader(1, Some(2)) + val b = makeHeader(1, Some(2)) + an[Exception] shouldBe thrownBy { + a.compare(b) + } + } + + val secondKeyExamples = Table( + ("left", "right"), + (Some(1), Some(2)), + (Some(2), Some(1)), + (None, Some(1)), + (Some(1), None), + (None, None)) + + + forAll(secondKeyExamples) { (leftSecondKey, rightSecondKey) => { + it should s"say a < b if a.firstKey < b.firstKey with second keys ($leftSecondKey, $rightSecondKey)" in { + val left = makeHeader(1, leftSecondKey) + val right = makeHeader(2, rightSecondKey) + assert(left.compare(right) < 0) + } + } + } + + forAll(secondKeyExamples) { (leftSecondKey, rightSecondKey) => { + it should s"say a > b if a.firstKey > b.firstKey with second keys ($leftSecondKey, $rightSecondKey)" in { + import OrderedPartitionHeaderUtils.HeaderOrdering + val left = makeHeader(2, leftSecondKey) + val right = makeHeader(1, rightSecondKey) + assert(left.compare(right) > 0) + } + } + } + + it should "say the header with Some second key is larger if first keys are equal" in { + val smaller = makeHeader(1, None) + val larger = makeHeader(1, Some(2)) + + assert(smaller.compare(larger) < 0) + assert(larger.compare(smaller) > 0) + } + + it should "say headers are ordered by partition number if first keys are equal and second keys are both None" in { + val a = makeHeader(1, None, 1) + val b = makeHeader(1, None, 2) + assert(a.compare(b) < 0) + assert(b.compare(a) > 0) + } } diff --git a/src/test/scala/com/twosigma/flint/timeseries/TimeSeriesRDDSpec.scala b/src/test/scala/com/twosigma/flint/timeseries/TimeSeriesRDDSpec.scala index 0eeeb990..7566c652 100644 --- a/src/test/scala/com/twosigma/flint/timeseries/TimeSeriesRDDSpec.scala +++ b/src/test/scala/com/twosigma/flint/timeseries/TimeSeriesRDDSpec.scala @@ -636,6 +636,25 @@ class TimeSeriesRDDSpec extends TimeSeriesSuite { } } + // Parquet files no longer load with their partitions in the order they were written, see: + // https://issues.apache.org/jira/browse/SPARK-20144 + // This tests that data loads correctly despite this, the parquet file in question was generated in spark shell + // with the following code: + // import org.apache.spark.sql.types._ + // import org.apache.spark.sql._ + // + // val rdd = sc.makeRDD(Seq.range(0L, 10L).map(Row(_)), 5) + // val schema = StructType(Array(StructField("time", LongType))) + // val df = spark.sqlContext.createDataFrame(rdd, schema) + // df.write.parquet("/small.parquet") + it should "load from parquet" taggedAs(Slow) in { + withResource("/timeseries/parquet/small.parquet") { source => + val tsRdd = TimeSeriesRDD.fromParquet(sc, source)(true, TimeUnit.NANOSECONDS); + val loadedData = tsRdd.collect().map(_.getLong(0)) + assert(loadedData.toSeq == Seq.range(0, 10)) + } + } + // This test is temporarily tagged as "Slow" so that scalatest runner could exclude this test optionally. it should "not modify original rows during conversions/modifications" taggedAs (Slow) ignore { withResource("/timeseries/parquet/PriceWithHeader.parquet") { source => @@ -652,5 +671,4 @@ class TimeSeriesRDDSpec extends TimeSeriesSuite { assert(rows.deep == finalRows.deep) } } - }