-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathrun_clickbench.py
More file actions
executable file
·192 lines (163 loc) · 7.58 KB
/
run_clickbench.py
File metadata and controls
executable file
·192 lines (163 loc) · 7.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
#!/usr/bin/env python3
import argparse
import subprocess
import time
import platform
import os
# This script runs ClickBench queries using DataFusion datafusion-cli
# and writes the results to a CSV file.
#
# Arguments
# --output-dir: Directory to write results
# --datafusion-binary: Path to the datafusion-cli binary
#
# Example usage:
# python run_clickbench.py --output-dir results --datafusion-binary datafusion-cli-45.0.0
#
# Note:
# if there are already results in the output directory for the specified datafusion-cli
# binary, this script will exit without running the queries again.
def main():
parser = argparse.ArgumentParser(description="Run ClickBench queries with DataFusion.")
parser.add_argument('--output-dir', help='Directory to write output files', default="results")
parser.add_argument('--datafusion-binary', help='Path to datafusion-cli binary', default='datafusion-cli')
parser.add_argument('--git-revision', help='Git revision of the DataFusion repository')
parser.add_argument('--git-revision-timestamp', help='Date of the git revision')
args = parser.parse_args()
output_dir = args.output_dir
if not os.path.exists(output_dir):
os.makedirs(output_dir)
print(f"Output will be written to: {output_dir}")
# Check if results already exist for this datafusion binary
existing_file = check_existing_results(output_dir, args.datafusion_binary, args.git_revision)
if existing_file:
print(f"Results already found for {args.datafusion_binary} in {existing_file}")
return
script_start_timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
results = []
# note these queries are from the DataFusion ClickBench benchmark
# `cp -R ~/Software/datafusion/benchmarks/queries/clickbench queries/`
for query in range(0, 43):
query = f'q{query}'
results.extend(run_clickbench_query(query, args, script_start_timestamp))
# Now write the output to a csv file in the output directory using the csv module
output_file = os.path.join(output_dir, 'results.csv')
print(f"Writing results to {output_file}")
file_exists = os.path.isfile(output_file)
with open(output_file, 'a') as f:
# write a header row only if the file does not exist
columns = results[0].keys()
if not file_exists:
f.write(','.join(columns))
f.write('\n')
# write the results in the same order
for result in results:
f.write(','.join(str(result[col]) for col in columns))
f.write('\n')
# runs the specified ClickBench query using DataFusion
# and returns a list of results.
# example query names: q2, q3
# results is a list of dictionaries
def run_clickbench_query(query_name, args, script_start_timestamp):
print(f"Running Query: {query_name}")
query_directory = os.path.join(os.path.dirname(__file__), 'queries')
query_file = os.path.join(query_directory, 'clickbench', 'queries', f'{query_name}.sql')
num_runs = 5
# Execute the command, timing how long it takes and then writing the result to the output
# prepare a temporary script file in a temporary directory
try:
# read query_file into a string
with open(query_file, 'r') as f:
query_content = f.read()
# Create a temporary script file to set the configuration
# from https://github.com/ClickHouse/ClickBench/blob/main/datafusion/create_partitioned.sql
temp_dir = os.path.join(os.path.dirname(__file__), 'tmp')
if not os.path.exists(temp_dir):
os.makedirs(temp_dir)
temp_script = os.path.join(temp_dir, 'script.sql')
with open(temp_script, 'w') as f:
f.write("""
CREATE EXTERNAL TABLE hits
STORED AS PARQUET
LOCATION 'data/hits_partitioned/'
OPTIONS ('binary_as_string' 'true');
""")
# write the query multiple times to gather multiple results
for i in range(0, num_runs):
f.write(f"{query_content}")
# Now execute the command with the temporary script
# and time how long it takes to run the whole thing
command = f"{args.datafusion_binary} -f {temp_script}"
#print(f"Executing command: {command}")
start_time = time.time()
result = subprocess.run(command, shell=True, capture_output=True, text=True, check=True)
end_time = time.time()
elapsed_time = end_time - start_time
# TODO: figure out a way to check for errors running the benchmark
# if "Error" in result.stdout or "Error" in result.stderr:
# print("An error occurred during query execution.")
# print(result.stdout)
# print(result.stderr)
# return []
print(f"Total execution took {elapsed_time} seconds.")
# find all lines like this and extract the numeric value:
# Elapsed 0.023 seconds.
timings = []
for line in result.stdout.splitlines():
if "Elapsed" in line:
parts = line.split()
if len(parts) >= 3:
try:
timing = float(parts[1])
timings.append(timing)
except ValueError:
print(f"Could not convert timing to float: {parts[1]}")
#print("Timings for each run:")
for i, timing in enumerate(timings):
print(f"Run {i + 1}: {timing}")
results = []
for i, timing in enumerate(timings):
results.append({
"benchmark_name": "clickbench_partitioned",
"query_name": query_name,
"query_type": "query" if i != 0 else "table_creation",
"execution_time": timing,
"run_timestamp": script_start_timestamp,
"git_revision": args.git_revision if args.git_revision is not None else "",
"git_revision_timestamp": args.git_revision_timestamp if args.git_revision_timestamp is not None else "",
"num_cores": os.cpu_count(),
#"cpu_model": platform.processor(),
#"os": platform.system(),
#"os_version": platform.version(),
})
return results
except subprocess.CalledProcessError as e:
print(f"Error executing query: {e.stderr}")
# Check if results already exist for this datafusion binary
# If the results exist, return the path to the existing results file
# Otherwise, return None
def check_existing_results(output_dir, datafusion_binary, git_revision):
import csv
import glob
# Get all CSV files in the output directory
csv_files = glob.glob(os.path.join(output_dir, "results.csv"))
for csv_file in csv_files:
try:
with open(csv_file, 'r') as f:
reader = csv.DictReader(f)
# skip header
next(reader, None)
# check all existing rows for the git revision
for row in reader:
# Check if this file contains results for our git revision or binary
if git_revision and row.get('git_revision') == git_revision:
return os.path.basename(csv_file)
# If no git revision provided, check if the binary path matches
# We can't directly check binary path from CSV, so we'll be more conservative
# and only match on git_revision if provided
except (IOError, csv.Error):
# Skip files that can't be read
continue
return None
if __name__ == "__main__":
main()