Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -664,4 +664,32 @@ abstract class IcebergSuite extends WholeStageTransformerSuite {
assert(result.head.getString(1) == "test_data")
}
}

test("assert_not_null with iceberg table") {
withTable("iceberg_not_null") {
spark.sql("""
|CREATE TABLE iceberg_not_null (id BIGINT NOT NULL, name STRING NOT NULL)
|USING iceberg
|""".stripMargin)
// Insert non-null values should succeed with AssertNotNull offloaded.
spark.sql("INSERT INTO iceberg_not_null VALUES (1, 'a'), (2, 'b')")
runQueryAndCompare("SELECT * FROM iceberg_not_null") {
checkGlutenPlan[IcebergScanTransformer]
}

// Insert from a query with nullable source columns.
spark.sql(
"INSERT INTO iceberg_not_null SELECT id + 10, CAST(id AS STRING) FROM iceberg_not_null")
val df = runQueryAndCompare("SELECT * FROM iceberg_not_null ORDER BY id") { _ => }
assert(df.count() == 4)

// Insert null into NOT NULL column should throw.
val e = intercept[Exception] {
spark.sql("INSERT INTO iceberg_not_null VALUES (null, 'c')").collect()
}
assert(
e.getMessage.contains("null") || e.getMessage.contains("NOT_NULL") ||
e.getCause != null && e.getCause.getMessage.contains("null"))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.gluten.sql.shims.SparkShimLoader

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull
import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero
import org.apache.spark.sql.execution.ScalarSubquery

Expand Down Expand Up @@ -294,6 +295,7 @@ object ExpressionMappings {
Sig[WidthBucket](WIDTH_BUCKET),
Sig[ReplicateRows](REPLICATE_ROWS),
Sig[RaiseError](RAISE_ERROR),
Sig[AssertNotNull](ASSERT_NOT_NULL),
Sig[SparkVersion](VERSION),
// Decimal
Sig[UnscaledValue](UNSCALED_VALUE),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,13 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenBitmapExpressionsQuerySuite]
enableSuite[GlutenEmptyInSuite]
enableSuite[GlutenRuntimeNullChecksV2Writes]
// Velox assert_not_null throws VeloxUserError instead of SparkRuntimeException
.exclude("NOT NULL checks for atomic top-level fields (byName)")
.exclude("NOT NULL checks for atomic top-level fields (byPosition)")
.exclude("NOT NULL checks for nested struct fields (byName)")
.exclude("NOT NULL checks for nested struct fields (byPosition)")
.exclude("NOT NULL checks for nullable array with required element (byPosition)")
.exclude("not null checks for fields inside nullable array (byPosition)")
enableSuite[GlutenTableOptionsConstantFoldingSuite]
enableSuite[GlutenDeltaBasedMergeIntoTableSuite]
enableSuite[GlutenDeltaBasedMergeIntoTableUpdateAsDeleteAndInsertSuite]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenDataSourceV2FunctionSuite]
enableSuite[GlutenDataSourceV2SQLSessionCatalogSuite]
enableSuite[GlutenDataSourceV2SQLSuiteV1Filter]
// Velox assert_not_null throws VeloxUserError instead of SparkRuntimeException
.exclude("CreateTableAsSelect: nullable schema")
enableSuite[GlutenDataSourceV2SQLSuiteV2Filter]
enableSuite[GlutenDataSourceV2Suite]
// Rewrite the following tests in GlutenDataSourceV2Suite.
Expand Down Expand Up @@ -785,6 +787,8 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenFilteredScanSuite]
enableSuite[GlutenFiltersSuite]
enableSuite[GlutenInsertSuite]
// Velox assert_not_null throws VeloxUserError instead of SparkRuntimeException
.exclude("SPARK-24583 Wrong schema type in InsertIntoDataSourceCommand")
// the native write staing dir is differnt with vanilla Spark for coustom partition paths
.exclude("SPARK-35106: Throw exception when rename custom partition paths returns false")
.exclude("Stop task set if FileAlreadyExistsException was thrown")
Expand Down Expand Up @@ -1103,21 +1107,38 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenBitmapExpressionsQuerySuite]
enableSuite[GlutenEmptyInSuite]
enableSuite[GlutenRuntimeNullChecksV2Writes]
// Velox assert_not_null throws VeloxUserError instead of SparkRuntimeException
.exclude("NOT NULL checks for atomic top-level fields (byName)")
.exclude("NOT NULL checks for atomic top-level fields (byPosition)")
.exclude("NOT NULL checks for nested struct fields (byName)")
.exclude("NOT NULL checks for nested struct fields (byPosition)")
.exclude("NOT NULL checks for nullable array with required element (byPosition)")
.exclude("not null checks for fields inside nullable array (byPosition)")
enableSuite[GlutenTableOptionsConstantFoldingSuite]
enableSuite[GlutenDeltaBasedMergeIntoTableSuite]
// Replaced by Gluten versions that handle wrapped exceptions
.excludeByPrefix("merge cardinality check with")
// Velox assert_not_null throws VeloxUserError instead of SparkRuntimeException
.exclude("merge with NOT NULL checks")
enableSuite[GlutenDeltaBasedMergeIntoTableUpdateAsDeleteAndInsertSuite]
// Replaced by Gluten versions that handle wrapped exceptions
.excludeByPrefix("merge cardinality check with")
// Velox assert_not_null throws VeloxUserError instead of SparkRuntimeException
.exclude("merge with NOT NULL checks")
enableSuite[GlutenDeltaBasedUpdateAsDeleteAndInsertTableSuite]
// FIXME: complex type result mismatch
.exclude("update nested struct fields")
.exclude("update char/varchar columns")
// Velox assert_not_null throws VeloxUserError instead of SparkRuntimeException
.exclude("update with NOT NULL checks")
enableSuite[GlutenDeltaBasedUpdateTableSuite]
// Velox assert_not_null throws VeloxUserError instead of SparkRuntimeException
.exclude("update with NOT NULL checks")
enableSuite[GlutenGroupBasedMergeIntoTableSuite]
// Replaced by Gluten versions that handle wrapped exceptions
.excludeByPrefix("merge cardinality check with")
// Velox assert_not_null throws VeloxUserError instead of SparkRuntimeException
.exclude("merge with NOT NULL checks")
enableSuite[GlutenFileSourceCustomMetadataStructSuite]
enableSuite[GlutenParquetFileMetadataStructRowIndexSuite]
enableSuite[GlutenTableLocationSuite]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenDataSourceV2FunctionSuite]
enableSuite[GlutenDataSourceV2SQLSessionCatalogSuite]
enableSuite[GlutenDataSourceV2SQLSuiteV1Filter]
// Velox assert_not_null throws VeloxUserError instead of SparkRuntimeException
.exclude("CreateTableAsSelect: nullable schema")
enableSuite[GlutenDataSourceV2SQLSuiteV2Filter]
enableSuite[GlutenDataSourceV2Suite]
// Rewrite the following tests in GlutenDataSourceV2Suite.
Expand Down Expand Up @@ -751,6 +753,8 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenFilteredScanSuite]
enableSuite[GlutenFiltersSuite]
enableSuite[GlutenInsertSuite]
// Velox assert_not_null throws VeloxUserError instead of SparkRuntimeException
.exclude("SPARK-24583 Wrong schema type in InsertIntoDataSourceCommand")
// the native write staing dir is differnt with vanilla Spark for coustom partition paths
.exclude("SPARK-35106: Throw exception when rename custom partition paths returns false")
.exclude("Stop task set if FileAlreadyExistsException was thrown")
Expand Down Expand Up @@ -1089,21 +1093,39 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenBitmapExpressionsQuerySuite]
enableSuite[GlutenEmptyInSuite]
enableSuite[GlutenRuntimeNullChecksV2Writes]
// Velox assert_not_null throws VeloxUserError instead of SparkRuntimeException
.exclude("NOT NULL checks for atomic top-level fields (byName)")
.exclude("NOT NULL checks for atomic top-level fields (byPosition)")
.exclude("NOT NULL checks for nested struct fields (byName)")
.exclude("NOT NULL checks for nested struct fields (byPosition)")
.exclude("NOT NULL checks for nested structs, arrays, maps (byName)")
.exclude("NOT NULL checks for nullable array with required element (byPosition)")
.exclude("not null checks for fields inside nullable array (byPosition)")
enableSuite[GlutenTableOptionsConstantFoldingSuite]
enableSuite[GlutenDeltaBasedMergeIntoTableSuite]
// Replaced by Gluten versions that handle wrapped exceptions
.excludeByPrefix("merge cardinality check with")
// Velox assert_not_null throws VeloxUserError instead of SparkRuntimeException
.exclude("merge with NOT NULL checks")
enableSuite[GlutenDeltaBasedMergeIntoTableUpdateAsDeleteAndInsertSuite]
// Replaced by Gluten versions that handle wrapped exceptions
.excludeByPrefix("merge cardinality check with")
// Velox assert_not_null throws VeloxUserError instead of SparkRuntimeException
.exclude("merge with NOT NULL checks")
enableSuite[GlutenDeltaBasedUpdateAsDeleteAndInsertTableSuite]
// FIXME: complex type result mismatch
.exclude("update nested struct fields")
.exclude("update char/varchar columns")
// Velox assert_not_null throws VeloxUserError instead of SparkRuntimeException
.exclude("update with NOT NULL checks")
enableSuite[GlutenDeltaBasedUpdateTableSuite]
// Velox assert_not_null throws VeloxUserError instead of SparkRuntimeException
.exclude("update with NOT NULL checks")
enableSuite[GlutenGroupBasedMergeIntoTableSuite]
// Replaced by Gluten versions that handle wrapped exceptions
.excludeByPrefix("merge cardinality check with")
// Velox assert_not_null throws VeloxUserError instead of SparkRuntimeException
.exclude("merge with NOT NULL checks")
enableSuite[GlutenFileSourceCustomMetadataStructSuite]
enableSuite[GlutenParquetFileMetadataStructRowIndexSuite]
enableSuite[GlutenTableLocationSuite]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ object ExpressionNames {
final val VERSION = "version"
final val AT_LEAST_N_NON_NULLS = "at_least_n_non_nulls"
final val ASSERT_TRUE = "assert_true"
final val ASSERT_NOT_NULL = "assert_not_null"
final val NULLIF = "nullif"
final val NVL = "nvl"
final val NVL2 = "nvl2"
Expand Down
Loading