Skip to content
This repository was archived by the owner on Jun 14, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/test/resources/tpcds/queries/q0.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
SELECT
catalog_sales.cs_bill_customer_sk,
store_sales.ss_customer_sk
FROM catalog_sales, store_sales
WHERE catalog_sales.cs_sold_date_sk = store_sales.ss_sold_date_sk
LIMIT 100
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
== Physical Plan ==
CollectLimit 100
+- *(5) Project [cs_bill_customer_sk#1, ss_customer_sk#2]
+- *(5) SortMergeJoin [cs_sold_date_sk#3], [ss_sold_date_sk#4], Inner
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: SortMergeJoin is used here because stats are enabled.

:- *(2) Sort [cs_sold_date_sk#3 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(cs_sold_date_sk#3, 200)
: +- *(1) Project [cs_sold_date_sk#3, cs_bill_customer_sk#1]
: +- *(1) Filter isnotnull(cs_sold_date_sk#3)
: +- *(1) FileScan parquet default.catalog_sales[cs_sold_date_sk#3,cs_bill_customer_sk#1] Batched: true, Format: Parquet, Location [not included in comparison]/{warehouse_dir}/catalog_sales], PartitionFilters: [], PushedFilters: [IsNotNull(cs_sold_date_sk)], ReadSchema: struct<cs_sold_date_sk:int,cs_bill_customer_sk:int>
+- *(4) Sort [ss_sold_date_sk#4 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(ss_sold_date_sk#4, 200)
+- *(3) Project [ss_sold_date_sk#4, ss_customer_sk#2]
+- *(3) Filter isnotnull(ss_sold_date_sk#4)
+- *(3) FileScan parquet default.store_sales[ss_sold_date_sk#4,ss_customer_sk#2] Batched: true, Format: Parquet, Location [not included in comparison]/{warehouse_dir}/store_sales], PartitionFilters: [], PushedFilters: [IsNotNull(ss_sold_date_sk)], ReadSchema: struct<ss_sold_date_sk:int,ss_customer_sk:int>
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
CollectLimit
WholeStageCodegen
Project [cs_bill_customer_sk,ss_customer_sk]
SortMergeJoin [cs_sold_date_sk,ss_sold_date_sk]
InputAdapter
WholeStageCodegen
Sort [cs_sold_date_sk]
InputAdapter
Exchange [cs_sold_date_sk] #1
WholeStageCodegen
Project [cs_bill_customer_sk,cs_sold_date_sk]
Filter [cs_sold_date_sk]
Scan parquet default.catalog_sales [cs_bill_customer_sk,cs_sold_date_sk] [cs_bill_customer_sk,cs_sold_date_sk]
InputAdapter
WholeStageCodegen
Sort [ss_sold_date_sk]
InputAdapter
Exchange [ss_sold_date_sk] #2
WholeStageCodegen
Project [ss_customer_sk,ss_sold_date_sk]
Filter [ss_sold_date_sk]
Scan parquet default.store_sales [ss_customer_sk,ss_sold_date_sk] [ss_customer_sk,ss_sold_date_sk]
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*
* The below license was copied from: https://github.com/FasterXML/jackson-module-scala/blob/2.10/src/main/resources/META-INF/LICENSE
*/

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
Expand Down Expand Up @@ -38,7 +37,7 @@ trait TPCDSBase extends SparkFunSuite with SparkInvolvedSuite {

// The TPCDS queries below are based on v1.4.
// TODO: Fix bulid pipeline for q49 and reenable q49.
val tpcdsQueries = Seq("q1")
val tpcdsQueries = Seq("q0")

private val tableColumns = Map(
"store_sales" ->
Expand Down Expand Up @@ -554,17 +553,35 @@ trait TPCDSBase extends SparkFunSuite with SparkInvolvedSuite {
""".stripMargin)
}

private val originalCBCEnabled = conf.cboEnabled
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all changes below this line onward are directly picked from spark codebase. Refer https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/TPCDSBase.scala#L609

private val originalJoinReorderEnabled = conf.joinReorderEnabled

override def beforeAll(): Unit = {
super.beforeAll()
if (injectStats) {
// Sets configurations for enabling the optimization rules that
// exploit data statistics.
conf.setConf(SQLConf.CBO_ENABLED, true)
conf.setConf(SQLConf.JOIN_REORDER_ENABLED, true)
}
tableNames.foreach { tableName =>
createTable(spark, tableName)
if (injectStats) {
// To simulate plan generation on actual TPC-DS data, injects data stats here
spark.sessionState.catalog.alterTableStats(
TableIdentifier(tableName), Some(TPCDSTableStats.sf100TableStats(tableName)))
}
}
}

override def afterAll(): Unit = {
conf.setConf(SQLConf.CBO_ENABLED, originalCBCEnabled)
conf.setConf(SQLConf.JOIN_REORDER_ENABLED, originalJoinReorderEnabled)
tableNames.foreach { tableName =>
spark.sessionState.catalog.dropTable(TableIdentifier(tableName), true, true)
}
super.afterAll()
}

protected def injectStats: Boolean = true
}
Loading