From 6ef15596ea02dda783430b313a8a9ed3ea72a779 Mon Sep 17 00:00:00 2001 From: Dat Chu Date: Sun, 2 Apr 2017 22:15:14 -0500 Subject: [PATCH 1/2] CSV can read input with time column with a different name and a different format --- .../flint/timeseries/TimeSeriesRDD.scala | 9 +++++---- .../csv/PriceWithHeaderDateColumn.csv | 13 +++++++++++++ .../com/twosigma/flint/timeseries/CSVSpec.scala | 17 ++++++++++++++++- 3 files changed, 34 insertions(+), 5 deletions(-) create mode 100644 src/test/resources/timeseries/csv/PriceWithHeaderDateColumn.csv diff --git a/src/main/scala/com/twosigma/flint/timeseries/TimeSeriesRDD.scala b/src/main/scala/com/twosigma/flint/timeseries/TimeSeriesRDD.scala index 39d8dd90..8d066630 100644 --- a/src/main/scala/com/twosigma/flint/timeseries/TimeSeriesRDD.scala +++ b/src/main/scala/com/twosigma/flint/timeseries/TimeSeriesRDD.scala @@ -92,7 +92,8 @@ object TimeSeriesRDD { private[flint] def convertDfTimestamps( dataFrame: DataFrame, - timeUnit: TimeUnit + timeUnit: TimeUnit, + timeColumn: String = timeColumnName ): DataFrame = { if (timeUnit == NANOSECONDS) { dataFrame @@ -100,7 +101,7 @@ object TimeSeriesRDD { val converter: Long => Long = TimeUnit.NANOSECONDS.convert(_, timeUnit) val udfConverter = udf(converter) - dataFrame.withColumn(timeColumnName, udfConverter(col(timeColumnName))) + dataFrame.withColumn(timeColumnName, udfConverter(col(timeColumn))) } } @@ -246,7 +247,7 @@ object TimeSeriesRDD { val df = dataFrame.withColumnRenamed(timeColumn, timeColumnName) requireSchema(df.schema) - val convertedDf = convertDfTimestamps(dataFrame, timeUnit) + val convertedDf = convertDfTimestamps(dataFrame, timeUnit, timeColumn) // we want to keep time column first, but no code should rely on that val timeFirstDf = if (convertedDf.schema.fieldIndex(timeColumnName) == 0) { convertedDf @@ -270,7 +271,7 @@ object TimeSeriesRDD { ): TimeSeriesRDD = { val df = dataFrame.withColumnRenamed(timeColumn, timeColumnName) requireSchema(df.schema) - val convertedDf = convertDfTimestamps(df, timeUnit) + val convertedDf = convertDfTimestamps(df, timeUnit, timeColumn) val partitionInfo = PartitionInfo(rangeSplits, deps) TimeSeriesRDD.fromSortedDfWithPartInfo(convertedDf, Some(partitionInfo)) diff --git a/src/test/resources/timeseries/csv/PriceWithHeaderDateColumn.csv b/src/test/resources/timeseries/csv/PriceWithHeaderDateColumn.csv new file mode 100644 index 00000000..16c83d69 --- /dev/null +++ b/src/test/resources/timeseries/csv/PriceWithHeaderDateColumn.csv @@ -0,0 +1,13 @@ +date,id,price,info +2017/01/01,7,0.5,test +2017/01/01,3,1,test +2017/01/02,3,1.5,test +2017/01/02,7,2,test +2017/01/03,3,2.5,test +2017/01/03,7,3,test +2017/01/04,3,3.5,test +2017/01/04,7,4,test +2017/01/05,3,4.5,test +2017/01/05,7,5,test +2017/01/08,3,5.5,test +2017/01/08,7,6,test diff --git a/src/test/scala/com/twosigma/flint/timeseries/CSVSpec.scala b/src/test/scala/com/twosigma/flint/timeseries/CSVSpec.scala index 58b9abda..48d98972 100644 --- a/src/test/scala/com/twosigma/flint/timeseries/CSVSpec.scala +++ b/src/test/scala/com/twosigma/flint/timeseries/CSVSpec.scala @@ -16,11 +16,12 @@ package com.twosigma.flint.timeseries +import java.time.LocalDate import java.util.TimeZone +import java.util.concurrent.TimeUnit import com.twosigma.flint.timeseries.row.Schema import org.scalatest.FlatSpec - import org.apache.spark.sql.types._ import com.twosigma.flint.SharedSparkContext import com.twosigma.flint.SpecUtils @@ -117,4 +118,18 @@ class CSVSpec extends FlatSpec with SharedSparkContext { assert(first.getAs[Long]("time") == format.parse("2008-01-02 00:00:00.000").getTime * 1000000L) } } + + it should "correct read unsorted CSV with header, time column not called time in a specific format" in { + SpecUtils.withResource("/timeseries/csv/PriceWithHeaderDateColumn.csv") { source => + val timeseriesRdd = CSV.from(sqlContext, "file://" + source, sorted = false, timeColumnName = "date", + dateFormat = "yyyy/MM/dd", header = true) + val first = timeseriesRdd.first() + + val format = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S") + assert(first.getAs[Long]("time") == TimeUnit.MILLISECONDS.toNanos( + format.parse("2017-01-01 00:00:00.000").getTime + )) + } + + } } From b131f0e8e0cdf6fbb9ff6e88602b47cfae5bbc26 Mon Sep 17 00:00:00 2001 From: Dat Chu Date: Sun, 2 Apr 2017 22:19:18 -0500 Subject: [PATCH 2/2] Fix typo --- src/test/scala/com/twosigma/flint/timeseries/CSVSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/scala/com/twosigma/flint/timeseries/CSVSpec.scala b/src/test/scala/com/twosigma/flint/timeseries/CSVSpec.scala index 48d98972..e788cd25 100644 --- a/src/test/scala/com/twosigma/flint/timeseries/CSVSpec.scala +++ b/src/test/scala/com/twosigma/flint/timeseries/CSVSpec.scala @@ -119,7 +119,7 @@ class CSVSpec extends FlatSpec with SharedSparkContext { } } - it should "correct read unsorted CSV with header, time column not called time in a specific format" in { + it should "correctly read CSV with header, time column not called 'time' and in a specific format" in { SpecUtils.withResource("/timeseries/csv/PriceWithHeaderDateColumn.csv") { source => val timeseriesRdd = CSV.from(sqlContext, "file://" + source, sorted = false, timeColumnName = "date", dateFormat = "yyyy/MM/dd", header = true)