From 8ed24dcccdd94d5c79ac097002e4c0aed04ca663 Mon Sep 17 00:00:00 2001 From: jm771 Date: Fri, 7 Jul 2017 15:55:08 +0100 Subject: [PATCH 1/3] Fix parquet file loading by sorting the partitions on the basis of their headers --- .../com/twosigma/flint/rdd/Conversion.scala | 2 +- .../twosigma/flint/rdd/RangeDependency.scala | 45 ++++++++---- .../timeseries/parquet/small.parquet/_SUCCESS | 0 ...868e-47d1-8525-65ba2c653e67.snappy.parquet | Bin 0 -> 386 bytes ...868e-47d1-8525-65ba2c653e67.snappy.parquet | Bin 0 -> 385 bytes ...868e-47d1-8525-65ba2c653e67.snappy.parquet | Bin 0 -> 386 bytes ...868e-47d1-8525-65ba2c653e67.snappy.parquet | Bin 0 -> 386 bytes ...868e-47d1-8525-65ba2c653e67.snappy.parquet | Bin 0 -> 386 bytes .../flint/rdd/RangeDependencySpec.scala | 64 +++++++++++++++++- .../flint/timeseries/TimeSeriesRDDSpec.scala | 20 +++++- 10 files changed, 114 insertions(+), 17 deletions(-) create mode 100644 src/test/resources/timeseries/parquet/small.parquet/_SUCCESS create mode 100644 src/test/resources/timeseries/parquet/small.parquet/part-00000-52210b44-868e-47d1-8525-65ba2c653e67.snappy.parquet create mode 100644 src/test/resources/timeseries/parquet/small.parquet/part-00001-52210b44-868e-47d1-8525-65ba2c653e67.snappy.parquet create mode 100644 src/test/resources/timeseries/parquet/small.parquet/part-00002-52210b44-868e-47d1-8525-65ba2c653e67.snappy.parquet create mode 100644 src/test/resources/timeseries/parquet/small.parquet/part-00003-52210b44-868e-47d1-8525-65ba2c653e67.snappy.parquet create mode 100644 src/test/resources/timeseries/parquet/small.parquet/part-00004-52210b44-868e-47d1-8525-65ba2c653e67.snappy.parquet diff --git a/src/main/scala/com/twosigma/flint/rdd/Conversion.scala b/src/main/scala/com/twosigma/flint/rdd/Conversion.scala index a9835aad..40d83e3e 100644 --- a/src/main/scala/com/twosigma/flint/rdd/Conversion.scala +++ b/src/main/scala/com/twosigma/flint/rdd/Conversion.scala @@ -108,7 +108,7 @@ object Conversion { ) val iter = PeekableIterator( OrderedIterator( - PartitionsIterator(rdd, thisDep.parents, context) + PartitionsIterator(rdd, thisDep.parents, context, preservesPartitionsOrdering = true) ).filterByRange(thisDep.range) ) new InterruptibleIterator[(K, V)](context, iter) diff --git a/src/main/scala/com/twosigma/flint/rdd/RangeDependency.scala b/src/main/scala/com/twosigma/flint/rdd/RangeDependency.scala index ccae79c1..bf7525dd 100644 --- a/src/main/scala/com/twosigma/flint/rdd/RangeDependency.scala +++ b/src/main/scala/com/twosigma/flint/rdd/RangeDependency.scala @@ -23,7 +23,6 @@ import scala.reflect.ClassTag * :: DeveloperApi :: */ private[rdd] object RangeDependency { - /** * Normalize the ranges of partitions from a sorted [[org.apache.spark.rdd.RDD]]. * @@ -39,19 +38,9 @@ private[rdd] object RangeDependency { normalizationStrategy: PartitionNormalizationStrategy = HeavyKeysNormalizationStrategy )(implicit ord: Ordering[K]): Seq[RangeDependency[K, P]] = { require(headers.nonEmpty, "Need at least one partition") + import OrderedPartitionHeaderUtils.HeaderOrdering - val sortedHeaders = headers.sortBy(_.partition.index).toArray - // Assume partitions are sorted, i.e. the keys of ith partition are less or equal than those of (i + 1)th partition. - sortedHeaders.reduceOption { - (h1, h2) => - if (ord.lteq(h1.firstKey, h2.firstKey)) { - h2 - } else { - sys.error(s"Partitions are not sorted. " + - s"The partition ${h1.partition.index} has the first key ${h1.firstKey} and " + - s"the partition ${h2.partition.index} has the first key ${h2.firstKey}.") - } - } + val sortedHeaders = headers.sortBy(x => x).toArray val (nonNormalizedPartitions, nonNormalizedRanges) = sortedHeaders.zipWithIndex.map { case (hdr, idx) => @@ -102,6 +91,32 @@ private[rdd] case class OrderedPartitionHeader[K, P <: Partition]( secondKey: Option[K] ) +private[rdd] object OrderedPartitionHeaderUtils { + implicit class HeaderOrdering[K, P <: Partition](header: OrderedPartitionHeader[K, P])(implicit ord: Ordering[K]) + extends Ordered[OrderedPartitionHeader[K, P]] { + import scala.math.Ordered.orderingToOrdered + def compare(that: OrderedPartitionHeader[K, P]): Int = (header, that) match { + // First keys not equal we return the larger first key + case (OrderedPartitionHeader(_, thisFirst, _), OrderedPartitionHeader(_, thatFirst, _)) + if thisFirst != thatFirst + => thisFirst.compare(thatFirst) + // First keys are equal, the header with "Some" second key one is larger than the one with "None" + case (OrderedPartitionHeader(_, _, Some(_)), OrderedPartitionHeader(_, _, None)) => 1 + case (OrderedPartitionHeader(_, _, None), OrderedPartitionHeader(_, _, Some(_))) => -1 + // If the partitions both only contain the same single key, compare by partition index to preserve + // previous behaviour + case (OrderedPartitionHeader(thisPartition, _, None), OrderedPartitionHeader(thatPartition, _, None)) + => thisPartition.index.compare(thatPartition.index) + case (OrderedPartitionHeader(thisPartition, thisFirst, Some(thisSecond)), + OrderedPartitionHeader(thatPartition, _, Some(thatSecond))) => + sys.error(s"Error has occurred, partitions weren't sorted properly, two partitions both had first key " + + s"$thisFirst and they both had non empty seconds keys: $thisSecond and $thatSecond" + + s" partitions were: $thisPartition and $thatPartition") + } + } +} + + /** * :: DeveloperApi :: * Base class for range dependency. @@ -214,7 +229,9 @@ private[rdd] object HeavyKeysNormalizationStrategy extends PartitionNormalizatio implicit ord: Ordering[K] ): Seq[CloseOpen[K]] = { - val sortedHeaders = headers.sortBy(_.partition.index) + import OrderedPartitionHeaderUtils.HeaderOrdering + + val sortedHeaders = headers.sortBy(x => x) val partitionBoundaries = sortedHeaders.head.firstKey +: sortedHeaders.tail.map { header => header.secondKey.getOrElse(header.firstKey) diff --git a/src/test/resources/timeseries/parquet/small.parquet/_SUCCESS b/src/test/resources/timeseries/parquet/small.parquet/_SUCCESS new file mode 100644 index 00000000..e69de29b diff --git a/src/test/resources/timeseries/parquet/small.parquet/part-00000-52210b44-868e-47d1-8525-65ba2c653e67.snappy.parquet b/src/test/resources/timeseries/parquet/small.parquet/part-00000-52210b44-868e-47d1-8525-65ba2c653e67.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..53d2419a2549bdf50414384fa267fcc59255bd41 GIT binary patch literal 386 zcmZ`#J!``-5LFxzJam?37VMMS6J>mwQ+d-%B+%rJ*2D5C;)_?RePag+vtlfgkkUfM9>(_K<(-6Nh4Ypg*0yd-uQRfnwwb-k_>i9M)jv z`JWSSHkw<@O)Cwrt1n(i0|E?qFFR(w*ODz*W9l|DEMh8`S~P6&x??32N9uT(CC-Do zDp$5DTdg70mbZ&CVi!ra*zNakd*nTy4R4D&+{(Jit1^uFjK|?k+U8n>V@P2vKu*(i hD8^ExnVQZ=3CK7}=4v!kB2iP7sR5eU?=9{3e*qbwSe5_) literal 0 HcmV?d00001 diff --git a/src/test/resources/timeseries/parquet/small.parquet/part-00001-52210b44-868e-47d1-8525-65ba2c653e67.snappy.parquet b/src/test/resources/timeseries/parquet/small.parquet/part-00001-52210b44-868e-47d1-8525-65ba2c653e67.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..f550bf8a0d1d0748c293fd88ef7a7e71c116123a GIT binary patch literal 385 zcmZ`#O-sW-5Z$Ir$-$$$B#=W`uvmzTXyirxJIc&hn z^FJrvY&Ex*+g2K0S6{r41_T)Jo_EZAZzNl?#?)|GW>97OPy<6)B*5>f02e$aOVg8hlxL;kT(9E$0l{&f28?f;s4ijg09gQ{L}Sc6^9 z|CD&M(cDUIT4{J)eepsX5MaQ2-ZJyOmTbWqQ@5F65mUL;qG5}dEi0ioK!?LDa3boe zT-mB@wT4t%+z!f!omaBOcDH-oA@AWNcvIBjr>vX2D#Mu1cpP4*ZLUQ)h7`sEw%e%*zAQF*bM6gYI#0@<60v*kBd#8*ZB7JhK?{?44^B8gb=>D=NYbhVk(zfG;Hy*WhE2`=x~??PJ_BC zSGFo!ts&JGw~I1jXG*r%?sl&` z4)53Zz4txd`tC8p0Ec)SVrY{AM!t(Kz{y;Mx{Mqnd2+1p_s`8U4Le?Ve_3yN_3>7x z?=t7u0T^C6E;Ly#0fC#|7NW-e~i{;XA!3}qxTWY?ylCEfNswOowq$-nI)O7XI(t_tl=y;d~I-)L% zjjf7CYo2I}+d&!9^GdpE_xsm9a34Cso4g7>WmRWo5kzdkqTo7dGA)8BPxw@DIZu+Y gm`ahPYQCJrTt;!cRFj1gv6`z?jlir2z}kNQ7gNw!#sB~S literal 0 HcmV?d00001 diff --git a/src/test/scala/com/twosigma/flint/rdd/RangeDependencySpec.scala b/src/test/scala/com/twosigma/flint/rdd/RangeDependencySpec.scala index 6e2ff0db..908e1953 100644 --- a/src/test/scala/com/twosigma/flint/rdd/RangeDependencySpec.scala +++ b/src/test/scala/com/twosigma/flint/rdd/RangeDependencySpec.scala @@ -17,8 +17,9 @@ package com.twosigma.flint.rdd import org.scalatest.FlatSpec +import org.scalatest.prop.TableDrivenPropertyChecks -class RangeDependencySpec extends FlatSpec { +class RangeDependencySpec extends FlatSpec with TableDrivenPropertyChecks { // partition 0: [1, 1, 2, ..., 4] // partition 1: [4, ..., 4] // partition 2: [4, 4, 5, ..., 7] @@ -141,4 +142,65 @@ class RangeDependencySpec extends FlatSpec { assert(RangeDependency(2, CloseOpen(8, Some(14)), List(Split(3), Split(4))) == dep(2)) assert(RangeDependency(3, CloseOpen(14, None), List(Split(4))) == dep(3)) } + + private def makeHeader( + firstKey: Int, + secondKey: Option[Int], + partitionNumber: Int = 0): + OrderedPartitionHeader[Int, OrderedRDDPartition] = + OrderedPartitionHeader(OrderedRDDPartition(partitionNumber), firstKey, secondKey) + + import OrderedPartitionHeaderUtils.HeaderOrdering + import org.scalatest.Matchers.{an, thrownBy} + + "HeaderOrdering" should "throw error if first keys are both equal, and second keys are both Some" in { + val a = makeHeader(1, Some(2)) + val b = makeHeader(1, Some(2)) + an[Exception] shouldBe thrownBy { + a.compare(b) + } + } + + val secondKeyExamples = Table( + ("left", "right"), + (Some(1), Some(2)), + (Some(2), Some(1)), + (None, Some(1)), + (Some(1), None), + (None, None)) + + + forAll(secondKeyExamples) { (leftSecondKey, rightSecondKey) => { + it should s"say a < b if a.firstKey < b.firstKey with second keys ($leftSecondKey, $rightSecondKey)" in { + val left = makeHeader(1, leftSecondKey) + val right = makeHeader(2, rightSecondKey) + assert(left.compare(right) < 0) + } + } + } + + forAll(secondKeyExamples) { (leftSecondKey, rightSecondKey) => { + it should s"say a > b if a.firstKey > b.firstKey with second keys ($leftSecondKey, $rightSecondKey)" in { + import OrderedPartitionHeaderUtils.HeaderOrdering + val left = makeHeader(2, leftSecondKey) + val right = makeHeader(1, rightSecondKey) + assert(left.compare(right) > 0) + } + } + } + + it should "say the header with Some second key is larger if first keys are equal" in { + val smaller = makeHeader(1, None) + val larger = makeHeader(1, Some(2)) + + assert(smaller.compare(larger) < 0) + assert(larger.compare(smaller) > 0) + } + + it should "say headers are ordered by partition number if first keys are equal and second keys are both None" in { + val a = makeHeader(1, None, 1) + val b = makeHeader(1, None, 2) + assert(a.compare(b) < 0) + assert(b.compare(a) > 0) + } } diff --git a/src/test/scala/com/twosigma/flint/timeseries/TimeSeriesRDDSpec.scala b/src/test/scala/com/twosigma/flint/timeseries/TimeSeriesRDDSpec.scala index 0eeeb990..9469bf75 100644 --- a/src/test/scala/com/twosigma/flint/timeseries/TimeSeriesRDDSpec.scala +++ b/src/test/scala/com/twosigma/flint/timeseries/TimeSeriesRDDSpec.scala @@ -636,6 +636,25 @@ class TimeSeriesRDDSpec extends TimeSeriesSuite { } } + // Parquet files no longer load with their partitions in the other they were written, see: + // https://issues.apache.org/jira/browse/SPARK-20144 + // This tests that data loads correctly despite this, the parquet file in question was generated in spark shell + // with the following code: + // import org.apache.spark.sql.types._ + // import org.apache.spark.sql._ + // + // val rdd = sc.makeRDD(Seq.range(0L, 10L).map(Row(_)), 5) + // val schema = StructType(Array(StructField("time", LongType))) + // val df = spark.sqlContext.createDataFrame(rdd, schema) + // df.write.parquet("/small.parquet") + it should "load from parquet" taggedAs(Slow) in { + withResource("/timeseries/parquet/small.parquet") { source => + val tsRdd = TimeSeriesRDD.fromParquet(sc, source)(true, TimeUnit.NANOSECONDS); + val loadedData = tsRdd.collect().map(_.getLong(0)) + assert(loadedData.toSeq == Seq.range(0, 10)) + } + } + // This test is temporarily tagged as "Slow" so that scalatest runner could exclude this test optionally. it should "not modify original rows during conversions/modifications" taggedAs (Slow) ignore { withResource("/timeseries/parquet/PriceWithHeader.parquet") { source => @@ -652,5 +671,4 @@ class TimeSeriesRDDSpec extends TimeSeriesSuite { assert(rows.deep == finalRows.deep) } } - } From 63fabacd8323676577a2c60370e61b529dc3cfd5 Mon Sep 17 00:00:00 2001 From: jm771 Date: Fri, 7 Jul 2017 18:27:22 +0100 Subject: [PATCH 2/3] Correct typo --- .../scala/com/twosigma/flint/timeseries/TimeSeriesRDDSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/scala/com/twosigma/flint/timeseries/TimeSeriesRDDSpec.scala b/src/test/scala/com/twosigma/flint/timeseries/TimeSeriesRDDSpec.scala index 9469bf75..7566c652 100644 --- a/src/test/scala/com/twosigma/flint/timeseries/TimeSeriesRDDSpec.scala +++ b/src/test/scala/com/twosigma/flint/timeseries/TimeSeriesRDDSpec.scala @@ -636,7 +636,7 @@ class TimeSeriesRDDSpec extends TimeSeriesSuite { } } - // Parquet files no longer load with their partitions in the other they were written, see: + // Parquet files no longer load with their partitions in the order they were written, see: // https://issues.apache.org/jira/browse/SPARK-20144 // This tests that data loads correctly despite this, the parquet file in question was generated in spark shell // with the following code: From 036f7e445aff761348446a1f562c3ff2bae7b88a Mon Sep 17 00:00:00 2001 From: jm771 Date: Thu, 13 Jul 2017 17:15:11 +0100 Subject: [PATCH 3/3] Add a test which proves we have our OrderedRDDPartitions in the right order --- .../com/twosigma/flint/rdd/RangeDependency.scala | 4 ++-- .../com/twosigma/flint/rdd/ConversionSpec.scala | 16 ++++++++++++++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/src/main/scala/com/twosigma/flint/rdd/RangeDependency.scala b/src/main/scala/com/twosigma/flint/rdd/RangeDependency.scala index bf7525dd..17ef9ad2 100644 --- a/src/main/scala/com/twosigma/flint/rdd/RangeDependency.scala +++ b/src/main/scala/com/twosigma/flint/rdd/RangeDependency.scala @@ -40,7 +40,7 @@ private[rdd] object RangeDependency { require(headers.nonEmpty, "Need at least one partition") import OrderedPartitionHeaderUtils.HeaderOrdering - val sortedHeaders = headers.sortBy(x => x).toArray + val sortedHeaders = headers.sorted.toArray val (nonNormalizedPartitions, nonNormalizedRanges) = sortedHeaders.zipWithIndex.map { case (hdr, idx) => @@ -231,7 +231,7 @@ private[rdd] object HeavyKeysNormalizationStrategy extends PartitionNormalizatio ): Seq[CloseOpen[K]] = { import OrderedPartitionHeaderUtils.HeaderOrdering - val sortedHeaders = headers.sortBy(x => x) + val sortedHeaders = headers.sorted val partitionBoundaries = sortedHeaders.head.firstKey +: sortedHeaders.tail.map { header => header.secondKey.getOrElse(header.firstKey) diff --git a/src/test/scala/com/twosigma/flint/rdd/ConversionSpec.scala b/src/test/scala/com/twosigma/flint/rdd/ConversionSpec.scala index 2fdc53dc..c9282246 100644 --- a/src/test/scala/com/twosigma/flint/rdd/ConversionSpec.scala +++ b/src/test/scala/com/twosigma/flint/rdd/ConversionSpec.scala @@ -164,6 +164,22 @@ class ConversionSpec extends FlatSpec with SharedSparkContext with Timeouts { assert(false, "Should not completed as the job has been killed.") case Failure(_) => } + } + + "fromSortedRDD" should "sort partitions, and have partition indexes increasing" in { + + // Create an RDD with data data sorted within partitions, but partitions not sorted + // Data is: + // Partition 0: 100, 130, 160, 199 + // Partition 1: 0, 30, 60, 99 + val data = Seq(100, 130, 160, 199, 0, 30, 60, 99) + val kvData = data.zip(data) + val rdd = sc.makeRDD(kvData, 2) + + val orderedRdd = Conversion.fromSortedRDD(rdd) + assert(orderedRdd.partitions(0).index == 0) + assert(orderedRdd.partitions(1).index == 1) + assert(orderedRdd.collect().toSeq == kvData.sorted) } }