Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,20 @@ cudf.exchange=false
cudf.exchange.server.port=8083
cudf.memory_resource=async

# Turn on to use intra-node exchange optimization.
# NOTE: In cudf exchange 20260212 branch, this is needed for UCX to use nvlink.
cudf.intra_node_exchange=true

# Use 100M rows per chunk for cudf partitioned output.
# NOTE: This is not yet propagated to the worker properly because only a fixed set of query configs are supported.
# https://github.com/prestodb/presto/blob/a62672886152c8c6b61cf301d246f217d850e357/presto-native-execution/presto_cpp/main/PrestoToVeloxQueryConfig.cpp#L106-L224
# As a result, this needs to be hardcoded right now.
cudf.partitioned_output_batch_rows=100000000

# Enable cudf rebatching before aggregations.
cudf.concat_optimization_enabled=true
cudf.batch_size_min_threshold=100000000


async-data-cache-enabled=false
cudf.jit_expression_enabled=false
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,12 @@ services:
{% if workers|length > 1 and not single_container %}
# Separate GPU workers - runs each worker in a separate container, pinned to a specific GPU
{% for gpu_id in workers %}
{% set numa_node = gpu_id // 4 %}
presto-native-worker-gpu-{{ gpu_id }}:
<<: *gpu_worker_base
container_name: presto-native-worker-gpu-{{ gpu_id }}
environment:
LAUNCHER: "numactl --cpunodebind={{ numa_node }} --membind={{ numa_node }}"
NVIDIA_VISIBLE_DEVICES: all
PROFILE: ${PROFILE}
PROFILE_ARGS: ${PROFILE_ARGS}
Expand All @@ -87,7 +89,9 @@ services:
container_name: presto-native-worker-gpu
{%- if workers %}
command: ["bash", "/opt/presto_profiling_wrapper.sh"{% for gpu_id in workers %}, "{{ gpu_id }}"{% endfor %}]
privileged: true
environment:
LAUNCHER: 'numactl --cpunodebind=$$(( $$gpu_id / 4 )) --membind=$$(( $$gpu_id / 4 ))'
NVIDIA_VISIBLE_DEVICES: all
PROFILE: ${PROFILE}
PROFILE_ARGS: ${PROFILE_ARGS}
Expand Down Expand Up @@ -118,6 +122,7 @@ services:
CUDA_VISIBLE_DEVICES: 0
{%- endif %}
volumes:
- /sys/devices/system/node:/sys/devices/system/node
{%- if workers %}
# Mount all etc directories for workers {{ workers|join(', ') }}
{%- for gpu_id in workers %}
Expand Down
2 changes: 1 addition & 1 deletion presto/docker/launch_presto_servers.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ else
# Launch workers in parallel, each pinned to a different GPU
# The GPU IDs are passed as command-line arguments
for gpu_id in "$@"; do
CUDA_VISIBLE_DEVICES=$gpu_id presto_server --etc-dir="/opt/presto-server/etc${gpu_id}" &
eval "CUDA_VISIBLE_DEVICES=$gpu_id ${LAUNCHER:-} presto_server --etc-dir='/opt/presto-server/etc${gpu_id}'" &
done
fi

Expand Down
2 changes: 1 addition & 1 deletion presto/docker/native_build.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ FROM presto/prestissimo-dependency:centos9

RUN rpm --import https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64/7fa2af80.pub && \
dnf config-manager --add-repo "https://developer.download.nvidia.com/devtools/repos/rhel$(source /etc/os-release; echo ${VERSION_ID%%.*})/$(rpm --eval '%{_arch}' | sed s/aarch/arm/)/" && \
dnf install -y nsight-systems-cli-2025.5.1
dnf install -y nsight-systems-cli-2025.5.1 numactl

ARG GPU=ON
ARG BUILD_TYPE=release
Expand Down
4 changes: 2 additions & 2 deletions presto/scripts/generate_presto_config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ function duplicate_worker_configs() {
# make cudf.exchange=true if we are running multiple workers
sed -i "s+cudf.exchange=false+cudf.exchange=true+g" ${worker_native_config}
# make join-distribution-type=PARTITIONED if we are running multiple workers
# (ucx exchange does not currently support BROADCAST partition type)
sed -i "s+join-distribution-type=.*+join-distribution-type=PARTITIONED+g" ${coord_native_config}
# (ucx exchange supports PARTITIONED and BROADCAST partition types)
# sed -i "s+join-distribution-type=.*+join-distribution-type=PARTITIONED+g" ${coord_native_config}
fi

# Each worker node needs to have it's own http-server port. This isn't used, but
Expand Down
15 changes: 15 additions & 0 deletions presto/scripts/run_benchmark.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ OPTIONS:
-b, --benchmark-type Type of benchmark to run. Only "tpch" and "tpcds" are currently supported.
-q, --queries Set of benchmark queries to run. This should be a comma separate list of query numbers.
By default, all benchmark queries are run.
--queries-file Path to a custom JSON file containing query definitions. When specified, queries are loaded
from this file instead of the default queries_best.json.
-h, --hostname Hostname of the Presto coordinator.
--port Port number of the Presto coordinator.
-u, --user User who queries will be executed as.
Expand Down Expand Up @@ -71,6 +73,15 @@ parse_args() {
exit 1
fi
;;
--queries-file)
if [[ -n $2 ]]; then
QUERIES_FILE=$2
shift 2
else
echo "Error: --queries-file requires a value"
exit 1
fi
;;
-h|--hostname)
if [[ -n $2 ]]; then
HOST_NAME=$2
Expand Down Expand Up @@ -188,6 +199,10 @@ if [[ -n ${QUERIES} ]]; then
PYTEST_ARGS+=("--queries ${QUERIES}")
fi

if [[ -n ${QUERIES_FILE} ]]; then
PYTEST_ARGS+=("--queries-file ${QUERIES_FILE}")
fi

if [[ -n ${HOST_NAME} ]]; then
PYTEST_ARGS+=("--hostname ${HOST_NAME}")
fi
Expand Down
15 changes: 15 additions & 0 deletions presto/scripts/run_integ_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ OPTIONS:
-b, --benchmark-type Type of benchmark to run tests for. Only "tpch" and "tpcds" are currently supported.
-q, --queries Set of benchmark queries to run. This should be a comma separate list of query numbers.
By default, all benchmark queries are run.
--queries-file Path to a custom JSON file containing query definitions. When specified, queries
are loaded from this file instead of the default queries_best.json.
-k, --keep-tables If this argument is specified, created benchmark tables will not be dropped.
-H, --hostname Hostname of the Presto coordinator.
-p, --port Port number of the Presto coordinator.
Expand Down Expand Up @@ -86,6 +88,15 @@ parse_args() {
exit 1
fi
;;
--queries-file)
if [[ -n $2 ]]; then
QUERIES_FILE=$2
shift 2
else
echo "Error: --queries-file requires a value"
exit 1
fi
;;
-k|--keep-tables)
KEEP_TABLES=true
shift
Expand Down Expand Up @@ -204,6 +215,10 @@ if [[ -n ${QUERIES} ]]; then
PYTEST_ARGS+=("--queries ${QUERIES}")
fi

if [[ -n ${QUERIES_FILE} ]]; then
PYTEST_ARGS+=("--queries-file ${QUERIES_FILE}")
fi

if [[ -n ${HOST_NAME} ]]; then
PYTEST_ARGS+=("--hostname ${HOST_NAME}")
fi
Expand Down
6 changes: 4 additions & 2 deletions presto/testing/common/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@

@pytest.fixture(scope="module")
def tpch_queries(request, presto_cursor):
queries = test_utils.get_queries(request.node.obj.BENCHMARK_TYPE)
queries_file = request.config.getoption("--queries-file")
queries = test_utils.get_queries(request.node.obj.BENCHMARK_TYPE, queries_file)

# The "fraction" portion of Q11 is a value that depends on scale factor
# (it should be 0.0001 / scale_factor), whereas our query is currently hard-coded as 0.0001.
Expand All @@ -23,4 +24,5 @@ def tpch_queries(request, presto_cursor):

@pytest.fixture(scope="module")
def tpcds_queries(request):
return test_utils.get_queries(request.node.obj.BENCHMARK_TYPE)
queries_file = request.config.getoption("--queries-file")
return test_utils.get_queries(request.node.obj.BENCHMARK_TYPE, queries_file)
24 changes: 24 additions & 0 deletions presto/testing/common/queries/tpch/queries_best.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"Q1": "SELECT l_returnflag, l_linestatus, sum(l_quantity) AS sum_qty, sum(l_extendedprice) AS sum_base_price, sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price, sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge, avg(l_quantity) AS avg_qty, avg(l_extendedprice) AS avg_price, avg(l_discount) AS avg_disc, count(*) AS count_order FROM lineitem WHERE l_shipdate <= CAST('1998-09-02' AS date) GROUP BY l_returnflag, l_linestatus ORDER BY l_returnflag, l_linestatus",
"Q2": "SELECT s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment FROM part, supplier, partsupp, nation, region WHERE p_partkey = ps_partkey AND s_suppkey = ps_suppkey AND p_size = 15 AND p_type LIKE '%BRASS' AND s_nationkey = n_nationkey AND n_regionkey = r_regionkey AND r_name = 'EUROPE' AND ps_supplycost = ( SELECT min(ps_supplycost) FROM partsupp, supplier, nation, region WHERE p_partkey = ps_partkey AND s_suppkey = ps_suppkey AND s_nationkey = n_nationkey AND n_regionkey = r_regionkey AND r_name = 'EUROPE') ORDER BY s_acctbal DESC, n_name, s_name, p_partkey LIMIT 100",
"Q3": "SELECT l_orderkey, sum(l_extendedprice * (1 - l_discount)) AS revenue, o_orderdate, o_shippriority FROM customer, orders, lineitem WHERE c_mktsegment = 'BUILDING' AND c_custkey = o_custkey AND l_orderkey = o_orderkey AND o_orderdate < CAST('1995-03-15' AS date) AND l_shipdate > CAST('1995-03-15' AS date) GROUP BY l_orderkey, o_orderdate, o_shippriority ORDER BY revenue DESC, o_orderdate LIMIT 10",
"Q4": "SELECT o_orderpriority, count(*) AS order_count FROM orders WHERE o_orderdate >= CAST('1993-07-01' AS date) AND o_orderdate < CAST('1993-10-01' AS date) AND EXISTS ( SELECT * FROM lineitem WHERE l_orderkey = o_orderkey AND l_commitdate < l_receiptdate) GROUP BY o_orderpriority ORDER BY o_orderpriority",
"Q5": "SELECT n_name, sum(l_extendedprice * (1 - l_discount)) AS revenue FROM customer, orders, lineitem, supplier, nation, region WHERE c_custkey = o_custkey AND l_orderkey = o_orderkey AND l_suppkey = s_suppkey AND c_nationkey = s_nationkey AND s_nationkey = n_nationkey AND n_regionkey = r_regionkey AND r_name = 'ASIA' AND o_orderdate >= CAST('1994-01-01' AS date) AND o_orderdate < CAST('1995-01-01' AS date) GROUP BY n_name ORDER BY revenue DESC",
"Q6": "SELECT sum(l_extendedprice * l_discount) AS revenue FROM lineitem WHERE l_shipdate >= CAST('1994-01-01' AS date) AND l_shipdate < CAST('1995-01-01' AS date) AND l_discount BETWEEN 0.05 AND 0.07 AND l_quantity < 24",
"Q7": "SELECT supp_nation, cust_nation, l_year, sum(volume) AS revenue FROM ( SELECT n1.n_name AS supp_nation, n2.n_name AS cust_nation, extract(year FROM l_shipdate) AS l_year, l_extendedprice * (1 - l_discount) AS volume FROM supplier, lineitem, orders, customer, nation n1, nation n2 WHERE s_suppkey = l_suppkey AND o_orderkey = l_orderkey AND c_custkey = o_custkey AND s_nationkey = n1.n_nationkey AND c_nationkey = n2.n_nationkey AND ((n1.n_name = 'FRANCE' AND n2.n_name = 'GERMANY') OR (n1.n_name = 'GERMANY' AND n2.n_name = 'FRANCE')) AND l_shipdate BETWEEN CAST('1995-01-01' AS date) AND CAST('1996-12-31' AS date)) AS shipping GROUP BY supp_nation, cust_nation, l_year ORDER BY supp_nation, cust_nation, l_year",
"Q8": "SELECT o_year, sum( CASE WHEN nation = 'BRAZIL' THEN volume ELSE 0 END) / sum(volume) AS mkt_share FROM ( SELECT extract(year FROM o_orderdate) AS o_year, l_extendedprice * (1 - l_discount) AS volume, n2.n_name AS nation FROM part, supplier, lineitem, orders, customer, nation n1, nation n2, region WHERE p_partkey = l_partkey AND s_suppkey = l_suppkey AND l_orderkey = o_orderkey AND o_custkey = c_custkey AND c_nationkey = n1.n_nationkey AND n1.n_regionkey = r_regionkey AND r_name = 'AMERICA' AND s_nationkey = n2.n_nationkey AND o_orderdate BETWEEN CAST('1995-01-01' AS date) AND CAST('1996-12-31' AS date) AND p_type = 'ECONOMY ANODIZED STEEL') AS all_nations GROUP BY o_year ORDER BY o_year",
"Q9": "SELECT nation, o_year, sum(amount) AS sum_profit FROM ( SELECT n_name AS nation, extract(year FROM o_orderdate) AS o_year, l_extendedprice * (1 - l_discount) - ps_supplycost * l_quantity AS amount FROM part, supplier, lineitem, partsupp, orders, nation WHERE s_suppkey = l_suppkey AND ps_suppkey = l_suppkey AND ps_partkey = l_partkey AND p_partkey = l_partkey AND o_orderkey = l_orderkey AND s_nationkey = n_nationkey AND p_name LIKE '%green%') AS profit GROUP BY nation, o_year ORDER BY nation, o_year DESC",
"Q10": "SELECT c_custkey, c_name, sum(l_extendedprice * (1 - l_discount)) AS revenue, c_acctbal, n_name, c_address, c_phone, c_comment FROM customer, orders, lineitem, nation WHERE c_custkey = o_custkey AND l_orderkey = o_orderkey AND o_orderdate >= CAST('1993-10-01' AS date) AND o_orderdate < CAST('1994-01-01' AS date) AND l_returnflag = 'R' AND c_nationkey = n_nationkey GROUP BY c_custkey, c_name, c_acctbal, c_phone, n_name, c_address, c_comment ORDER BY revenue DESC LIMIT 20",
"Q11": "SELECT ps_partkey, sum(ps_supplycost * ps_availqty) AS value FROM partsupp, supplier, nation WHERE ps_suppkey = s_suppkey AND s_nationkey = n_nationkey AND n_name = 'GERMANY' GROUP BY ps_partkey HAVING sum(ps_supplycost * ps_availqty) > ( SELECT sum(ps_supplycost * ps_availqty) * {SF_FRACTION} FROM partsupp, supplier, nation WHERE ps_suppkey = s_suppkey AND s_nationkey = n_nationkey AND n_name = 'GERMANY') ORDER BY value DESC",
"Q12": "SELECT l_shipmode, sum( CASE WHEN o_orderpriority = '1-URGENT' OR o_orderpriority = '2-HIGH' THEN 1 ELSE 0 END) AS high_line_count, sum( CASE WHEN o_orderpriority <> '1-URGENT' AND o_orderpriority <> '2-HIGH' THEN 1 ELSE 0 END) AS low_line_count FROM orders, lineitem WHERE o_orderkey = l_orderkey AND l_shipmode IN ('MAIL', 'SHIP') AND l_commitdate < l_receiptdate AND l_shipdate < l_commitdate AND l_receiptdate >= CAST('1994-01-01' AS date) AND l_receiptdate < CAST('1995-01-01' AS date) GROUP BY l_shipmode ORDER BY l_shipmode",
"Q13": "SELECT c_count, count(*) AS custdist FROM ( SELECT c_custkey, count(o_orderkey) FROM customer LEFT OUTER JOIN orders ON c_custkey = o_custkey AND o_comment NOT LIKE '%special%requests%' GROUP BY c_custkey) AS c_orders (c_custkey, c_count) GROUP BY c_count ORDER BY custdist DESC, c_count DESC",
"Q14": "SELECT 100.00 * sum( CASE WHEN p_type LIKE 'PROMO%' THEN l_extendedprice * (1 - l_discount) ELSE 0 END) / sum(l_extendedprice * (1 - l_discount)) AS promo_revenue FROM lineitem, part WHERE l_partkey = p_partkey AND l_shipdate >= date '1995-09-01' AND l_shipdate < CAST('1995-10-01' AS date)",
"Q15": "WITH revenue AS ( SELECT l_suppkey AS supplier_no, sum(l_extendedprice * (1 - l_discount)) AS total_revenue FROM lineitem WHERE l_shipdate >= CAST('1996-01-01' AS date) AND l_shipdate < CAST('1996-04-01' AS date) GROUP BY supplier_no ) SELECT s_suppkey, s_name, s_address, s_phone, total_revenue FROM supplier, revenue WHERE s_suppkey = supplier_no AND total_revenue = ( SELECT max(total_revenue) FROM revenue) ORDER BY s_suppkey",
"Q16": "SELECT p_brand, p_type, p_size, count(DISTINCT ps_suppkey) AS supplier_cnt FROM partsupp, part WHERE p_partkey = ps_partkey AND p_brand <> 'Brand#45' AND p_type NOT LIKE 'MEDIUM POLISHED%' AND p_size IN (49, 14, 23, 45, 19, 3, 36, 9) AND ps_suppkey NOT IN ( SELECT s_suppkey FROM supplier WHERE s_comment LIKE '%Customer%Complaints%') GROUP BY p_brand, p_type, p_size ORDER BY supplier_cnt DESC, p_brand, p_type, p_size",
"Q17": "WITH filtered AS (SELECT p.p_partkey, l.l_quantity, l.l_extendedprice FROM part p JOIN lineitem l ON p.p_partkey = l.l_partkey WHERE p.p_brand = 'Brand#23' AND p.p_container = 'MED BOX'), avg_qty AS (SELECT p_partkey, 0.2 * avg(l_quantity) AS avg_quantity FROM filtered GROUP BY p_partkey) SELECT sum(l_extendedprice) / 7.0 AS avg_yearly FROM filtered JOIN avg_qty USING (p_partkey) WHERE l_quantity < avg_quantity",
"Q18": "SELECT c_name, c_custkey, o_orderkey, o_orderdate, o_totalprice, sum(l_quantity) FROM customer, orders, lineitem WHERE o_orderkey IN ( SELECT l_orderkey FROM lineitem GROUP BY l_orderkey HAVING sum(l_quantity) > 300) AND c_custkey = o_custkey AND o_orderkey = l_orderkey GROUP BY c_name, c_custkey, o_orderkey, o_orderdate, o_totalprice ORDER BY o_totalprice DESC, o_orderdate LIMIT 100",
"Q19": "SELECT sum(l_extendedprice * (1 - l_discount)) AS revenue FROM lineitem, part WHERE (p_partkey = l_partkey AND p_brand = 'Brand#12' AND p_container IN ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG') AND l_quantity >= 1 AND l_quantity <= 1 + 10 AND p_size BETWEEN 1 AND 5 AND l_shipmode IN ('AIR', 'AIR REG') AND l_shipinstruct = 'DELIVER IN PERSON') OR (p_partkey = l_partkey AND p_brand = 'Brand#23' AND p_container IN ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK') AND l_quantity >= 10 AND l_quantity <= 10 + 10 AND p_size BETWEEN 1 AND 10 AND l_shipmode IN ('AIR', 'AIR REG') AND l_shipinstruct = 'DELIVER IN PERSON') OR (p_partkey = l_partkey AND p_brand = 'Brand#34' AND p_container IN ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG') AND l_quantity >= 20 AND l_quantity <= 20 + 10 AND p_size BETWEEN 1 AND 15 AND l_shipmode IN ('AIR', 'AIR REG') AND l_shipinstruct = 'DELIVER IN PERSON')",
"Q20": "SELECT s_name, s_address FROM supplier, nation WHERE s_suppkey IN ( SELECT ps_suppkey FROM partsupp WHERE ps_partkey IN ( SELECT p_partkey FROM part WHERE p_name LIKE 'forest%') AND ps_availqty > ( SELECT 0.5 * sum(l_quantity) FROM lineitem WHERE l_partkey = ps_partkey AND l_suppkey = ps_suppkey AND l_shipdate >= CAST('1994-01-01' AS date) AND l_shipdate < CAST('1995-01-01' AS date))) AND s_nationkey = n_nationkey AND n_name = 'CANADA' ORDER BY s_name",
"Q21": "WITH multi_line_orders AS (SELECT l_orderkey FROM lineitem GROUP BY l_orderkey HAVING count(*) > 1), late_lines AS (SELECT l.l_orderkey, l.l_suppkey FROM multi_line_orders m JOIN lineitem l ON m.l_orderkey = l.l_orderkey WHERE l.l_receiptdate > l.l_commitdate), single_late_orders AS (SELECT l_orderkey FROM late_lines GROUP BY l_orderkey HAVING count(*) = 1) SELECT s_name, count(*) AS numwait FROM single_late_orders slo JOIN late_lines ll ON slo.l_orderkey = ll.l_orderkey JOIN supplier ON ll.l_suppkey = s_suppkey JOIN nation ON s_nationkey = n_nationkey AND n_name = 'SAUDI ARABIA' JOIN orders ON slo.l_orderkey = o_orderkey AND o_orderstatus = 'F' GROUP BY s_name ORDER BY numwait DESC, s_name LIMIT 100",
"Q22": "SELECT cntrycode, count(*) AS numcust, sum(c_acctbal) AS totacctbal FROM ( SELECT substring(c_phone FROM 1 FOR 2) AS cntrycode, c_acctbal FROM customer WHERE substring(c_phone FROM 1 FOR 2) IN ('13', '31', '23', '29', '30', '18', '17') AND c_acctbal > ( SELECT avg(c_acctbal) FROM customer WHERE c_acctbal > 0.00 AND substring(c_phone FROM 1 FOR 2) IN ('13', '31', '23', '29', '30', '18', '17')) AND NOT EXISTS ( SELECT * FROM orders WHERE o_custkey = c_custkey)) AS custsale GROUP BY cntrycode ORDER BY cntrycode"
}
Loading