From 5d376b3a9be084d26aebdb18413d1f3c56c72042 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B1=AA=E7=BB=86=E5=8B=96=28WangXixu=29-=E9=A1=BA?= =?UTF-8?q?=E4=B8=B0=E7=A7=91=E6=8A=80=E6=8A=80=E6=9C=AF=E9=9B=86=E5=9B=A2?= Date: Tue, 23 Sep 2025 16:39:46 +0800 Subject: [PATCH 1/2] [GLUTEN] Close sql files to avoid resouce leak --- .../workload/tpch/run_tpch/tpch_parquet.scala | 32 +++++++++++-------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/tools/workload/tpch/run_tpch/tpch_parquet.scala b/tools/workload/tpch/run_tpch/tpch_parquet.scala index 53987633c2af..6141ae41b83c 100644 --- a/tools/workload/tpch/run_tpch/tpch_parquet.scala +++ b/tools/workload/tpch/run_tpch/tpch_parquet.scala @@ -16,6 +16,7 @@ */ import org.apache.spark.sql.execution.debug._ import scala.io.Source +import scala.collection.mutable import java.io.File import java.util.Arrays import sys.process._ @@ -24,14 +25,6 @@ import sys.process._ var parquet_file_path = "/PATH/TO/TPCH_PARQUET_PATH" var gluten_root = "/PATH/TO/GLUTEN" -def time[R](block: => R): R = { - val t0 = System.nanoTime() - val result = block // call-by-name - val t1 = System.nanoTime() - println("Elapsed time: " + (t1 - t0)/1000000000.0 + " seconds") - result -} - //Read TPC-H Table from DWRF files val lineitem = spark.read.format("parquet").load("file://" + parquet_file_path + "/lineitem") val part = spark.read.format("parquet").load("file://" + parquet_file_path + "/part") @@ -77,11 +70,24 @@ val sorted = fileLists.sortBy { }} // Main program to run TPC-H testing +val resultMap = mutable.Map[String, Double]() for (t <- sorted) { - println(t) - val fileContents = Source.fromFile(t).getLines.filter(!_.startsWith("--")).mkString(" ") - println(fileContents) - time{spark.sql(fileContents).collectAsList()} - //spark.sql(fileContents).explain + val fileContents = { + val src = Source.fromFile(t) + try { + src.getLines().filter(!_.startsWith("--")).mkString(" ") + } finally { + src.close() + } + } + + val t0 = System.nanoTime() + spark.sql(fileContents).collectAsList() + val t1 = System.nanoTime() + resultMap += (t.getName -> (t1 - t0) / 1e9) Thread.sleep(2000) } + +resultMap.foreach { case (file, time) => + println(s"$file\t$time s") +} From c4b9967f3ab8f998c67f3c34d0ba4367cc72a7e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B1=AA=E7=BB=86=E5=8B=96=28WangXixu=29-=E9=A1=BA?= =?UTF-8?q?=E4=B8=B0=E7=A7=91=E6=8A=80=E6=8A=80=E6=9C=AF=E9=9B=86=E5=9B=A2?= Date: Thu, 9 Oct 2025 09:55:20 +0800 Subject: [PATCH 2/2] refactor code --- .../workload/tpch/run_tpch/tpch_parquet.scala | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/tools/workload/tpch/run_tpch/tpch_parquet.scala b/tools/workload/tpch/run_tpch/tpch_parquet.scala index 6141ae41b83c..2bb5004e454c 100644 --- a/tools/workload/tpch/run_tpch/tpch_parquet.scala +++ b/tools/workload/tpch/run_tpch/tpch_parquet.scala @@ -16,7 +16,6 @@ */ import org.apache.spark.sql.execution.debug._ import scala.io.Source -import scala.collection.mutable import java.io.File import java.util.Arrays import sys.process._ @@ -25,6 +24,14 @@ import sys.process._ var parquet_file_path = "/PATH/TO/TPCH_PARQUET_PATH" var gluten_root = "/PATH/TO/GLUTEN" +def time[R](block: => R): R = { + val t0 = System.nanoTime() + val result = block // call-by-name + val t1 = System.nanoTime() + println("Elapsed time: " + (t1 - t0)/1000000000.0 + " seconds") + result +} + //Read TPC-H Table from DWRF files val lineitem = spark.read.format("parquet").load("file://" + parquet_file_path + "/lineitem") val part = spark.read.format("parquet").load("file://" + parquet_file_path + "/part") @@ -70,7 +77,6 @@ val sorted = fileLists.sortBy { }} // Main program to run TPC-H testing -val resultMap = mutable.Map[String, Double]() for (t <- sorted) { val fileContents = { val src = Source.fromFile(t) @@ -80,14 +86,8 @@ for (t <- sorted) { src.close() } } - - val t0 = System.nanoTime() - spark.sql(fileContents).collectAsList() - val t1 = System.nanoTime() - resultMap += (t.getName -> (t1 - t0) / 1e9) + println(t) + println(fileContents) + time{spark.sql(fileContents).collectAsList()} Thread.sleep(2000) } - -resultMap.foreach { case (file, time) => - println(s"$file\t$time s") -}