diff --git a/.gitignore b/.gitignore index adfc0294..a3fe89e8 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ metastore_db derby.log python/spark **/.cache +logs diff --git a/build.sbt b/build.sbt index 4e1b2c1b..2e5a2299 100644 --- a/build.sbt +++ b/build.sbt @@ -20,6 +20,8 @@ import scalariform.formatter.preferences._ import com.typesafe.sbt.SbtScalariform import BuildUtil._ +licenses += ("Apache-2.0", url("http://apache.org/licenses/LICENSE-2.0")) + lazy val formattingPreferences = { import scalariform.formatter.preferences._ FormattingPreferences(). @@ -28,10 +30,16 @@ lazy val formattingPreferences = { setPreference(SpacesAroundMultiImports, true) } +val _scalaVersion: String = sys.props.getOrElse("scala.version", default = "2.12.8") +val _sparkVersion: String = sys.props.getOrElse("spark.version", default = "2.4.3") + lazy val compilationSettings = scalariformSettings ++ Seq( - version := "0.6.0-SNAPSHOT", - organization := "com.twosigma", - scalaVersion := "2.12.8", + name := "sparklyr-flint_" + + _sparkVersion.substring(0, _sparkVersion.lastIndexOf(".")) + "_" + + _scalaVersion.substring(0, _scalaVersion.lastIndexOf(".")), + version := sys.props.getOrElse("version", default = "0.6.0-SNAPSHOT"), + organization := "org.sparklyr", + scalaVersion := _scalaVersion, assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false), javacOptions ++= Seq("-source", "1.7", "-target", "1.7"), compileOrder in Compile := CompileOrder.JavaThenScala, @@ -58,7 +66,7 @@ lazy val versions = new { val commons_math = "3.5" val joda_time = "2.9.4" val httpclient = "4.3.2" // Note that newer versions need to be configured - val spark = sys.props.getOrElse("spark.version", default = "2.4.3") + val spark = _sparkVersion val scalatest = "3.0.8" val scalacheck = "1.12.6" val grizzled_slf4j = "1.3.0" @@ -121,5 +129,4 @@ addCommandAlias( "; set test in Test := {}; assembly" ) -publishTo := sonatypePublishTo.value crossPaths := false diff --git a/project/plugins.sbt b/project/plugins.sbt index 95d6d9e5..1feaf8d8 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -10,6 +10,4 @@ addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.0") addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.8.2") -addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "2.3") - -addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.0") +addSbtPlugin("org.foundweekends" % "sbt-bintray" % "0.5.6") diff --git a/scripts/build_spark_3.0_artifacts.sh b/scripts/build_spark_3.0_artifacts.sh new file mode 100755 index 00000000..98eabc94 --- /dev/null +++ b/scripts/build_spark_3.0_artifacts.sh @@ -0,0 +1,5 @@ +#!/usr/bin/env bash + +set -e + +sbt -Dversion='0.6.1-SNAPSHOT' -Dscala.version='2.12.10' -Dspark.version='3.0.0-preview2' assemblyNoTest diff --git a/sonatype.sbt b/sonatype.sbt deleted file mode 100644 index 68558322..00000000 --- a/sonatype.sbt +++ /dev/null @@ -1,28 +0,0 @@ -sonatypeProfileName := "com.twosigma" - -publishMavenStyle := true - -licenses := Seq("APL2" -> url("http://www.apache.org/licenses/LICENSE-2.0.txt")) - -import xerial.sbt.Sonatype._ -sonatypeProjectHosting := Some(GitHubHosting("twosigma", "flint", "ljin@twosigma.com")) - -homepage := Some(url("https://github.com/twosigma/flint")) - -scmInfo := Some( - ScmInfo( - url("https://github.com/twosigma/flint"), - "scm:git@github.com:twosigma/flint.git" - ) -) - -developers := List( - Developer( - id = "icexelloss", - name = "Li Jin", - email = "ice.xelloss@gmail.com", - url = url("https://github.com/icexelloss") - ) -) - -publishArtifact in Test := false diff --git a/src/main/scala/com/twosigma/flint/annotation/SparklyrApi.java b/src/main/scala/com/twosigma/flint/annotation/SparklyrApi.java new file mode 100644 index 00000000..4e28413e --- /dev/null +++ b/src/main/scala/com/twosigma/flint/annotation/SparklyrApi.java @@ -0,0 +1,24 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twosigma.flint.annotation; + +/** + * API for sparklyr.flint + */ +public @interface SparklyrApi { + String message() default ""; + + String until() default ""; +} diff --git a/src/main/scala/com/twosigma/flint/math/stats/regression/LinearRegressionModel.scala b/src/main/scala/com/twosigma/flint/math/stats/regression/LinearRegressionModel.scala index 44947a9b..f641844d 100644 --- a/src/main/scala/com/twosigma/flint/math/stats/regression/LinearRegressionModel.scala +++ b/src/main/scala/com/twosigma/flint/math/stats/regression/LinearRegressionModel.scala @@ -173,7 +173,7 @@ trait LinearRegression { def calculateRegressionParametersTStat(): DenseVector[Double] = { val standardErrors = estimateRegressionParametersStandardErrors() val parameters = estimateRegressionParameters() - parameters :/ standardErrors + parameters /:/ standardErrors } /** @@ -234,7 +234,7 @@ case class LinearRegressionModel(input: RDD[WeightedLabeledPoint], intercept: Bo def getK: Int = beta.length def calculateMeanOfY(): Double = srwsl / n def calculateWeightedMeanOfY(): Double = srwsl / ssrw - def calculateMeanOfX(): DenseVector[Double] = swx :/ n.toDouble + def calculateMeanOfX(): DenseVector[Double] = swx /:/ n.toDouble def calculateGramianMatrix(): DenseMatrix[Double] = xx def calculateEigenvaluesOfGramianMatrix(): DenseVector[Double] = eigenvalues def calculateCenteredTSS(): Double = centeredTSS @@ -267,7 +267,7 @@ case class LinearRegressionModel(input: RDD[WeightedLabeledPoint], intercept: Bo * * @return the MacKinnon and White's (1985) heteroskedasticity robust estimator. */ - def calculateHC1(): DenseMatrix[Double] = hc0 :* (1.0 * getN / (getN - getK)) + def calculateHC1(): DenseMatrix[Double] = hc0 *:* (1.0 * getN / (getN - getK)) /** * Calculate the White's (1980) heteroskedasticity robust standard errors which is defined as @@ -423,7 +423,7 @@ case class LinearRegressionModel(input: RDD[WeightedLabeledPoint], intercept: Bo } else { v.features * (v.features.t * beta - v.label) } - xe = xe :* v.weight + xe = xe *:* v.weight U += xe.asDenseMatrix.t * xe.asDenseMatrix }, combOp = (U1, U2) => { U1 += U2 } ) diff --git a/src/main/scala/com/twosigma/flint/math/stats/regression/OLSMultipleLinearRegression.scala b/src/main/scala/com/twosigma/flint/math/stats/regression/OLSMultipleLinearRegression.scala index 137dba9e..d1f57e6f 100644 --- a/src/main/scala/com/twosigma/flint/math/stats/regression/OLSMultipleLinearRegression.scala +++ b/src/main/scala/com/twosigma/flint/math/stats/regression/OLSMultipleLinearRegression.scala @@ -80,6 +80,6 @@ object OLSMultipleLinearRegression { U1._9 + U2._9 ) ) - LinearRegressionModel(input, intercept, n, (xx + xx.t) :/ 2.0, xy, swx, srwsl, ssrw, wsl, sw, lw) + LinearRegressionModel(input, intercept, n, (xx + xx.t) /:/ 2.0, xy, swx, srwsl, ssrw, wsl, sw, lw) } } diff --git a/src/main/scala/com/twosigma/flint/rdd/function/summarize/summarizer/subtractable/OLSRegressionSummarizer.scala b/src/main/scala/com/twosigma/flint/rdd/function/summarize/summarizer/subtractable/OLSRegressionSummarizer.scala index 6103c029..ce994bc1 100644 --- a/src/main/scala/com/twosigma/flint/rdd/function/summarize/summarizer/subtractable/OLSRegressionSummarizer.scala +++ b/src/main/scala/com/twosigma/flint/rdd/function/summarize/summarizer/subtractable/OLSRegressionSummarizer.scala @@ -195,7 +195,7 @@ class OLSRegressionSummarizer( Math.sqrt(errorVariance * betaVar) } val stdErrs = vectorOfStdErrs.toArray - val vectorOfTStat = vectorOfBeta :/ vectorOfStdErrs + val vectorOfTStat = vectorOfBeta /:/ vectorOfStdErrs val tStat = vectorOfTStat.toArray val (intercept, diff --git a/src/main/scala/com/twosigma/flint/timeseries/TimeSeriesRDD.scala b/src/main/scala/com/twosigma/flint/timeseries/TimeSeriesRDD.scala index a2041ca4..9b2f327c 100644 --- a/src/main/scala/com/twosigma/flint/timeseries/TimeSeriesRDD.scala +++ b/src/main/scala/com/twosigma/flint/timeseries/TimeSeriesRDD.scala @@ -153,7 +153,7 @@ object TimeSeriesRDD { ): TimeSeriesRDD = { requireSchema(schema) - val timeType = TimeType.get(SQLContext.getOrCreate(sc).sparkSession) + val timeType = TimeType.get(SparkSession.builder.config(sc.getConf).getOrCreate) val timeIndex = schema.fieldIndex(timeColumnName) val rdd = sc.parallelize( rows.map { row => (timeType.internalToNanos(row.getLong(timeIndex)), row) }, numSlices @@ -446,7 +446,7 @@ object TimeSeriesRDD { timeUnit: TimeUnit, timeColumn: String ): TimeSeriesRDD = { - val sqlContext = SQLContext.getOrCreate(sc) + val sqlContext = SparkSession.builder.config(sc.getConf).getOrCreate val df = sqlContext.read.parquet(paths: _*) val prunedDf = columns.map { columnNames => diff --git a/src/main/scala/com/twosigma/flint/timeseries/TimeSeriesRDDBuilder.scala b/src/main/scala/com/twosigma/flint/timeseries/TimeSeriesRDDBuilder.scala new file mode 100644 index 00000000..1d8b5cbe --- /dev/null +++ b/src/main/scala/com/twosigma/flint/timeseries/TimeSeriesRDDBuilder.scala @@ -0,0 +1,42 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twosigma.flint.timeseries + +import java.util.concurrent.TimeUnit + +import com.twosigma.flint.annotation.SparklyrApi +import org.apache.spark.rdd.RDD +import org.apache.spark.sql._ +import org.apache.spark.sql.types._ + +@SparklyrApi +class TimeSeriesRDDBuilder( + isSorted: Boolean, + timeUnit: TimeUnit, + timeColumn: String +) { + def fromRDD( + rdd: RDD[Row], + schema: StructType + ): TimeSeriesRDD = { + TimeSeriesRDD.fromRDD(rdd, schema)(isSorted, timeUnit, timeColumn) + } + + def fromDF( + dataFrame: DataFrame + ): TimeSeriesRDD = { + TimeSeriesRDD.fromDF(dataFrame)(isSorted, timeUnit, timeColumn) + } +} diff --git a/src/main/scala/org/apache/spark/sql/PartitionPreservingOperation.scala b/src/main/scala/org/apache/spark/sql/PartitionPreservingOperation.scala index a7efc97f..8d347442 100644 --- a/src/main/scala/org/apache/spark/sql/PartitionPreservingOperation.scala +++ b/src/main/scala/org/apache/spark/sql/PartitionPreservingOperation.scala @@ -65,6 +65,8 @@ object PartitionPreservingOperation { case _: InputAdapter => true case _: GenerateExec => true case _: SerializeFromObjectExec => true + case _: ColumnarToRowExec => true + case _: RowToColumnarExec => true case _ => false } } diff --git a/src/test/scala/org/apache/spark/sql/PartitionPreservingOperationSpec.scala b/src/test/scala/org/apache/spark/sql/PartitionPreservingOperationSpec.scala index f40903ee..baae7b71 100644 --- a/src/test/scala/org/apache/spark/sql/PartitionPreservingOperationSpec.scala +++ b/src/test/scala/org/apache/spark/sql/PartitionPreservingOperationSpec.scala @@ -106,17 +106,31 @@ class PartitionPreservingOperationSpec extends FlintSuite with FlintTestData { assert(leafExecutedPlan(data).isInstanceOf[ExternalRDDScanExec[_]]) data.cache() data.count() - // first node is WholeStageCodegen - assert(executedPlan(data).children.head.isInstanceOf[InMemoryTableScanExec]) + if (data.sparkSession.version < "3.0.0") { + // first node is WholeStageCodegen + assert(executedPlan(data).children.head.isInstanceOf[InMemoryTableScanExec]) + } else { + val plan = executedPlan(data) + assert(plan.children.head.isInstanceOf[ColumnarToRowExec]) + assert(plan.children.head.children.head.isInstanceOf[InputAdapter]) + assert(plan.children.head.children.head.children.head.isInstanceOf[InMemoryTableScanExec]) + } assert(leafExecutedPlan(data).isInstanceOf[InMemoryTableScanExec]) data.unpersist() val orderedData = data.orderBy("time") orderedData.cache() orderedData.count() - // first node is WholeStageCodegen - assert(executedPlan(orderedData).children.head.isInstanceOf[InMemoryTableScanExec]) - assert(leafExecutedPlan(orderedData).isInstanceOf[InMemoryTableScanExec]) + if (data.sparkSession.version < "3.0.0") { + // first node is WholeStageCodegen + assert(executedPlan(orderedData).children.head.isInstanceOf[InMemoryTableScanExec]) + assert(leafExecutedPlan(orderedData).isInstanceOf[InMemoryTableScanExec]) + } else { + val plan = executedPlan(orderedData) + assert(plan.children.head.isInstanceOf[ColumnarToRowExec]) + assert(plan.children.head.children.head.isInstanceOf[InputAdapter]) + assert(plan.children.head.children.head.children.head.isInstanceOf[InMemoryTableScanExec]) + } orderedData.unpersist() }