Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ metastore_db
derby.log
python/spark
**/.cache
logs
17 changes: 12 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import scalariform.formatter.preferences._
import com.typesafe.sbt.SbtScalariform
import BuildUtil._

licenses += ("Apache-2.0", url("http://apache.org/licenses/LICENSE-2.0"))

lazy val formattingPreferences = {
import scalariform.formatter.preferences._
FormattingPreferences().
Expand All @@ -28,10 +30,16 @@ lazy val formattingPreferences = {
setPreference(SpacesAroundMultiImports, true)
}

val _scalaVersion: String = sys.props.getOrElse("scala.version", default = "2.12.8")
val _sparkVersion: String = sys.props.getOrElse("spark.version", default = "2.4.3")

lazy val compilationSettings = scalariformSettings ++ Seq(
version := "0.6.0-SNAPSHOT",
organization := "com.twosigma",
scalaVersion := "2.12.8",
name := "sparklyr-flint_" +
_sparkVersion.substring(0, _sparkVersion.lastIndexOf(".")) + "_" +
_scalaVersion.substring(0, _scalaVersion.lastIndexOf(".")),
version := sys.props.getOrElse("version", default = "0.6.0-SNAPSHOT"),
organization := "org.sparklyr",
scalaVersion := _scalaVersion,
assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false),
javacOptions ++= Seq("-source", "1.7", "-target", "1.7"),
compileOrder in Compile := CompileOrder.JavaThenScala,
Expand All @@ -58,7 +66,7 @@ lazy val versions = new {
val commons_math = "3.5"
val joda_time = "2.9.4"
val httpclient = "4.3.2" // Note that newer versions need to be configured
val spark = sys.props.getOrElse("spark.version", default = "2.4.3")
val spark = _sparkVersion
val scalatest = "3.0.8"
val scalacheck = "1.12.6"
val grizzled_slf4j = "1.3.0"
Expand Down Expand Up @@ -121,5 +129,4 @@ addCommandAlias(
"; set test in Test := {}; assembly"
)

publishTo := sonatypePublishTo.value
crossPaths := false
4 changes: 1 addition & 3 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,4 @@ addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.0")

addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.8.2")

addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "2.3")

addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.0")
addSbtPlugin("org.foundweekends" % "sbt-bintray" % "0.5.6")
5 changes: 5 additions & 0 deletions scripts/build_spark_3.0_artifacts.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/usr/bin/env bash

set -e

sbt -Dversion='0.6.1-SNAPSHOT' -Dscala.version='2.12.10' -Dspark.version='3.0.0-preview2' assemblyNoTest
28 changes: 0 additions & 28 deletions sonatype.sbt

This file was deleted.

24 changes: 24 additions & 0 deletions src/main/scala/com/twosigma/flint/annotation/SparklyrApi.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twosigma.flint.annotation;

/**
* API for sparklyr.flint
*/
public @interface SparklyrApi {
String message() default "";

String until() default "";
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ trait LinearRegression {
def calculateRegressionParametersTStat(): DenseVector[Double] = {
val standardErrors = estimateRegressionParametersStandardErrors()
val parameters = estimateRegressionParameters()
parameters :/ standardErrors
parameters /:/ standardErrors
}

/**
Expand Down Expand Up @@ -234,7 +234,7 @@ case class LinearRegressionModel(input: RDD[WeightedLabeledPoint], intercept: Bo
def getK: Int = beta.length
def calculateMeanOfY(): Double = srwsl / n
def calculateWeightedMeanOfY(): Double = srwsl / ssrw
def calculateMeanOfX(): DenseVector[Double] = swx :/ n.toDouble
def calculateMeanOfX(): DenseVector[Double] = swx /:/ n.toDouble
def calculateGramianMatrix(): DenseMatrix[Double] = xx
def calculateEigenvaluesOfGramianMatrix(): DenseVector[Double] = eigenvalues
def calculateCenteredTSS(): Double = centeredTSS
Expand Down Expand Up @@ -267,7 +267,7 @@ case class LinearRegressionModel(input: RDD[WeightedLabeledPoint], intercept: Bo
*
* @return the MacKinnon and White's (1985) heteroskedasticity robust estimator.
*/
def calculateHC1(): DenseMatrix[Double] = hc0 :* (1.0 * getN / (getN - getK))
def calculateHC1(): DenseMatrix[Double] = hc0 *:* (1.0 * getN / (getN - getK))

/**
* Calculate the White's (1980) heteroskedasticity robust standard errors which is defined as
Expand Down Expand Up @@ -423,7 +423,7 @@ case class LinearRegressionModel(input: RDD[WeightedLabeledPoint], intercept: Bo
} else {
v.features * (v.features.t * beta - v.label)
}
xe = xe :* v.weight
xe = xe *:* v.weight
U += xe.asDenseMatrix.t * xe.asDenseMatrix
}, combOp = (U1, U2) => { U1 += U2 }
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,6 @@ object OLSMultipleLinearRegression {
U1._9 + U2._9
)
)
LinearRegressionModel(input, intercept, n, (xx + xx.t) :/ 2.0, xy, swx, srwsl, ssrw, wsl, sw, lw)
LinearRegressionModel(input, intercept, n, (xx + xx.t) /:/ 2.0, xy, swx, srwsl, ssrw, wsl, sw, lw)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ class OLSRegressionSummarizer(
Math.sqrt(errorVariance * betaVar)
}
val stdErrs = vectorOfStdErrs.toArray
val vectorOfTStat = vectorOfBeta :/ vectorOfStdErrs
val vectorOfTStat = vectorOfBeta /:/ vectorOfStdErrs
val tStat = vectorOfTStat.toArray

val (intercept,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ object TimeSeriesRDD {
): TimeSeriesRDD = {
requireSchema(schema)

val timeType = TimeType.get(SQLContext.getOrCreate(sc).sparkSession)
val timeType = TimeType.get(SparkSession.builder.config(sc.getConf).getOrCreate)
val timeIndex = schema.fieldIndex(timeColumnName)
val rdd = sc.parallelize(
rows.map { row => (timeType.internalToNanos(row.getLong(timeIndex)), row) }, numSlices
Expand Down Expand Up @@ -446,7 +446,7 @@ object TimeSeriesRDD {
timeUnit: TimeUnit,
timeColumn: String
): TimeSeriesRDD = {
val sqlContext = SQLContext.getOrCreate(sc)
val sqlContext = SparkSession.builder.config(sc.getConf).getOrCreate
val df = sqlContext.read.parquet(paths: _*)

val prunedDf = columns.map { columnNames =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twosigma.flint.timeseries

import java.util.concurrent.TimeUnit

import com.twosigma.flint.annotation.SparklyrApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.types._

@SparklyrApi
class TimeSeriesRDDBuilder(
isSorted: Boolean,
timeUnit: TimeUnit,
timeColumn: String
) {
def fromRDD(
rdd: RDD[Row],
schema: StructType
): TimeSeriesRDD = {
TimeSeriesRDD.fromRDD(rdd, schema)(isSorted, timeUnit, timeColumn)
}

def fromDF(
dataFrame: DataFrame
): TimeSeriesRDD = {
TimeSeriesRDD.fromDF(dataFrame)(isSorted, timeUnit, timeColumn)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ object PartitionPreservingOperation {
case _: InputAdapter => true
case _: GenerateExec => true
case _: SerializeFromObjectExec => true
case _: ColumnarToRowExec => true
case _: RowToColumnarExec => true
case _ => false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,31 @@ class PartitionPreservingOperationSpec extends FlintSuite with FlintTestData {
assert(leafExecutedPlan(data).isInstanceOf[ExternalRDDScanExec[_]])
data.cache()
data.count()
// first node is WholeStageCodegen
assert(executedPlan(data).children.head.isInstanceOf[InMemoryTableScanExec])
if (data.sparkSession.version < "3.0.0") {
// first node is WholeStageCodegen
assert(executedPlan(data).children.head.isInstanceOf[InMemoryTableScanExec])
} else {
val plan = executedPlan(data)
assert(plan.children.head.isInstanceOf[ColumnarToRowExec])
assert(plan.children.head.children.head.isInstanceOf[InputAdapter])
assert(plan.children.head.children.head.children.head.isInstanceOf[InMemoryTableScanExec])
}
assert(leafExecutedPlan(data).isInstanceOf[InMemoryTableScanExec])
data.unpersist()

val orderedData = data.orderBy("time")
orderedData.cache()
orderedData.count()
// first node is WholeStageCodegen
assert(executedPlan(orderedData).children.head.isInstanceOf[InMemoryTableScanExec])
assert(leafExecutedPlan(orderedData).isInstanceOf[InMemoryTableScanExec])
if (data.sparkSession.version < "3.0.0") {
// first node is WholeStageCodegen
assert(executedPlan(orderedData).children.head.isInstanceOf[InMemoryTableScanExec])
assert(leafExecutedPlan(orderedData).isInstanceOf[InMemoryTableScanExec])
} else {
val plan = executedPlan(orderedData)
assert(plan.children.head.isInstanceOf[ColumnarToRowExec])
assert(plan.children.head.children.head.isInstanceOf[InputAdapter])
assert(plan.children.head.children.head.children.head.isInstanceOf[InMemoryTableScanExec])
}
orderedData.unpersist()
}

Expand Down