diff --git a/modules/spark-ext/pom.xml b/modules/spark-ext/pom.xml index 9fc038f5f..383b4a83e 100644 --- a/modules/spark-ext/pom.xml +++ b/modules/spark-ext/pom.xml @@ -36,7 +36,7 @@ 2.12.16 3.2.12 - 3.2.2 + 3.5.3 2.12.7 diff --git a/modules/spark-ext/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/DateExpressions.scala b/modules/spark-ext/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/DateExpressions.scala index 156d4fa25..d0030a8c2 100644 --- a/modules/spark-ext/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/DateExpressions.scala +++ b/modules/spark-ext/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/DateExpressions.scala @@ -52,8 +52,8 @@ private[optimization] object DateExpressions extends SupportedExpressions { case Month(date) ⇒ checkChild(date) - case ParseToDate(left, format, child) ⇒ - checkChild(left) && (format.isEmpty || checkChild(format.get)) && checkChild(child) + case ParseToDate(left, format, _, _) ⇒ + checkChild(left) && (format.isEmpty || checkChild(format.get)) case Quarter(date) ⇒ checkChild(date) @@ -101,7 +101,7 @@ private[optimization] object DateExpressions extends SupportedExpressions { case Month(date) ⇒ Some(s"MINUTE(${childToString(date)})") - case ParseToDate(left, formatOption, _) ⇒ + case ParseToDate(left, formatOption, _, _) ⇒ formatOption match { case Some(format) ⇒ Some(s"PARSEDATETIME(${childToString(left)}, ${childToString(format)})") diff --git a/modules/spark-ext/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/MathExpressions.scala b/modules/spark-ext/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/MathExpressions.scala index 99386ac4b..757e680d9 100644 --- a/modules/spark-ext/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/MathExpressions.scala +++ b/modules/spark-ext/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/MathExpressions.scala @@ -106,7 +106,7 @@ private[optimization] object MathExpressions extends SupportedExpressions { case Rand(child, _) ⇒ checkChild(child) - case Round(child, scale) ⇒ + case Round(child, scale, _) ⇒ checkChild(child) && checkChild(scale) case Signum(child) ⇒ @@ -230,7 +230,7 @@ private[optimization] object MathExpressions extends SupportedExpressions { case Rand(child, _) ⇒ Some(s"RAND(${childToString(child)})") - case Round(child, scale) ⇒ + case Round(child, scale, _) ⇒ Some(s"ROUND(${childToString(child)}, ${childToString(scale)})") case Signum(child) ⇒ diff --git a/modules/spark-ext/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SystemExpressions.scala b/modules/spark-ext/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SystemExpressions.scala index 66cfc7149..6fb781e82 100644 --- a/modules/spark-ext/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SystemExpressions.scala +++ b/modules/spark-ext/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SystemExpressions.scala @@ -18,7 +18,7 @@ package org.apache.ignite.spark.impl.optimization import org.apache.ignite.IgniteException -import org.apache.spark.sql.catalyst.expressions.{Coalesce, EqualTo, Expression, Greatest, If, IfNull, IsNotNull, IsNull, Least, Literal, NullIf, Nvl2} +import org.apache.spark.sql.catalyst.expressions.{Coalesce, EqualTo, Expression, Greatest, If, IsNotNull, IsNull, Least, Literal, NullIf, Nvl2} /** * Object to support some built-in expressions like `nvl2` or `coalesce`. @@ -32,9 +32,6 @@ private[optimization] object SystemExpressions extends SupportedExpressions { case Greatest(children) ⇒ children.forall(checkChild) - case IfNull(left, right, _) ⇒ - checkChild(left) && checkChild(right) - case Least(children) ⇒ children.forall(checkChild) @@ -78,9 +75,6 @@ private[optimization] object SystemExpressions extends SupportedExpressions { case Greatest(children) ⇒ Some(s"GREATEST(${children.map(childToString(_)).mkString(", ")})") - case IfNull(left, right, _) ⇒ - Some(s"IFNULL(${childToString(left)}, ${childToString(right)})") - case Least(children) ⇒ Some(s"LEAST(${children.map(childToString(_)).mkString(", ")})") diff --git a/modules/spark-ext/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala b/modules/spark-ext/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala index c41937a3c..34cd13a74 100644 --- a/modules/spark-ext/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala +++ b/modules/spark-ext/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala @@ -17,7 +17,6 @@ package org.apache.ignite.spark -import org.apache.commons.lang.StringUtils.equalsIgnoreCase import org.apache.ignite.cache.CacheMode import org.apache.ignite.cluster.ClusterNode import org.apache.ignite.configuration.CacheConfiguration @@ -136,7 +135,7 @@ package object impl { * @return `True` if column is key. */ def isKeyColumn(table: GridQueryTypeDescriptor, column: String): Boolean = - contains(allKeyFields(table), column) || equalsIgnoreCase(table.keyFieldName, column) + contains(allKeyFields(table), column) || column.equalsIgnoreCase(table.keyFieldName) /** * @param table Table. diff --git a/modules/spark-ext/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteSparkSession.scala b/modules/spark-ext/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteSparkSession.scala index cddf56c30..8ca4d3654 100644 --- a/modules/spark-ext/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteSparkSession.scala +++ b/modules/spark-ext/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteSparkSession.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.SparkSession.Builder import org.apache.spark.sql._ import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.encoders._ +import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Range} import org.apache.spark.sql.execution._ @@ -101,12 +102,12 @@ class IgniteSparkSession private( /** @inheritdoc */ override def emptyDataset[T: Encoder]: Dataset[T] = { val encoder = implicitly[Encoder[T]] - new Dataset(self, LocalRelation(encoder.schema.toAttributes), encoder) + new Dataset(self, LocalRelation(DataTypeUtils.toAttributes(encoder.schema)), encoder) } /** @inheritdoc */ override def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame = { - Dataset.ofRows(self, LocalRelation.fromExternalRows(schema.toAttributes, rows.asScala)) + Dataset.ofRows(self, LocalRelation.fromExternalRows(DataTypeUtils.toAttributes(schema), rows.asScala)) } /** @inheritdoc */ @@ -141,7 +142,7 @@ class IgniteSparkSession private( /** @inheritdoc */ override def createDataset[T: Encoder](data: Seq[T]): Dataset[T] = { val enc = encoderFor[T] - val attributes = enc.schema.toAttributes + val attributes = DataTypeUtils.toAttributes(enc.schema) val encoded = data.map(d => enc.createSerializer().apply(d)) val plan = new LocalRelation(attributes, encoded) Dataset[T](self, plan) @@ -179,7 +180,7 @@ class IgniteSparkSession private( /** @inheritdoc */ override private[sql] def applySchemaToPythonRDD(rdd: RDD[Array[Any]], schema: StructType) = { val rowRdd = rdd.map(r => python.EvaluatePython.makeFromJava(schema).asInstanceOf[InternalRow]) - Dataset.ofRows(self, LogicalRDD(schema.toAttributes, rowRdd)(self)) + Dataset.ofRows(self, LogicalRDD(DataTypeUtils.toAttributes(schema), rowRdd)(self)) } /** @inheritdoc */ @@ -198,11 +199,13 @@ class IgniteSparkSession private( /** @inheritdoc */ override def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame = { + val catalystRows = { - val encoder = RowEncoder(schema).createSerializer() + val encoder = Encoders.row(schema).asInstanceOf[ExpressionEncoder[Row]].createSerializer() rowRDD.map(encoder.apply) } - val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self) + val logicalPlan = LogicalRDD(DataTypeUtils.toAttributes(schema), catalystRows)(self) + Dataset.ofRows(self, logicalPlan) }