Skip to content

Commit fdc8f62

Browse files
committed
Added aggregate expr and kernel
1 parent ce949f1 commit fdc8f62

14 files changed

Lines changed: 1851 additions & 45 deletions

File tree

.github/actions/setup/action.yml

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,39 @@
11
name: Setup Rust Env
22
description: "Checkout repo, install protobuf, and make build script executable"
33
runs:
4-
using: "composite"
5-
steps:
6-
- uses: actions/checkout@v4
4+
using: "composite"
5+
steps:
6+
- uses: actions/checkout@v4
77

8-
- name: Update apt
9-
run: apt-get update
10-
shell: bash
8+
- name: Update apt
9+
run: apt-get update
10+
shell: bash
1111

12-
- name: Install protobuf
13-
run: apt-get install -y protobuf-compiler libprotobuf-dev
14-
shell: bash
12+
- name: Install protobuf
13+
run: apt-get install -y protobuf-compiler libprotobuf-dev
14+
shell: bash
1515

16-
- name: Install Rust components
17-
run: rustup component add rustfmt clippy
18-
shell: bash
16+
- name: Install Rust components
17+
run: rustup component add rustfmt clippy
18+
shell: bash
1919

20-
- name: Install CPP components
21-
run: apt install -y clang-format
22-
shell: bash
20+
- name: Install CPP components
21+
run: apt install -y clang-format
22+
shell: bash
2323

24-
- name: Cache rust packages
25-
if: ${{ !env.ACT }}
26-
uses: swatinem/rust-cache@v2.7.7
24+
- name: Cache rust packages
25+
if: ${{ !env.ACT }}
26+
uses: swatinem/rust-cache@v2.7.7
2727

28-
- name: Make build script executable
29-
run: chmod +x ./bolt.sh
30-
shell: bash
28+
- name: Make build script executable
29+
run: chmod +x ./bolt.sh
30+
shell: bash
3131

32-
- name: Setup CUDA
33-
run: |
34-
wget https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2404/x86_64/cuda-keyring_1.1-1_all.deb
35-
dpkg -i cuda-keyring_1.1-1_all.deb
36-
apt-get update
37-
apt-get install -y cuda-toolkit-13-0
38-
apt-get install -y cuda-drivers
39-
shell: bash
32+
- name: Setup CUDA
33+
run: |
34+
wget https://developer.download.nvidia.com/compute/cuda/repos/debian13/x86_64/cuda-keyring_1.1-1_all.deb
35+
dpkg -i cuda-keyring_1.1-1_all.deb
36+
apt-get update
37+
apt-get install -y cuda-toolkit-13-1
38+
apt-get install -y cuda-drivers
39+
shell: bash

tachyon/compute/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,7 @@ criterion.workspace = true
3131
[[bench]]
3232
name = "evaluate_bench"
3333
harness = false
34+
35+
[[bench]]
36+
name = "aggregate_bench"
37+
harness = false
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
use std::hint::black_box;
2+
use std::mem::size_of;
3+
4+
use compute::bit_vector::BitVector;
5+
use compute::column::{Column, VecArray};
6+
use compute::data_type::DataType;
7+
use compute::error::ErrorMode;
8+
use compute::evaluate::{Device, evaluate};
9+
use compute::expr::Expr;
10+
use compute::operator::Operator;
11+
use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main};
12+
use rand::Rng;
13+
use tokio::runtime::Builder;
14+
15+
fn single_thread_runtime() -> tokio::runtime::Runtime {
16+
Builder::new_current_thread().enable_all().build().unwrap()
17+
}
18+
19+
fn random_i32(len: usize) -> Vec<i32> {
20+
let mut rng = rand::rng();
21+
(0..len).map(|_| rng.random_range(i32::MIN..i32::MAX)).collect::<Vec<i32>>()
22+
}
23+
24+
fn random_f64(len: usize) -> Vec<f64> {
25+
let mut rng = rand::rng();
26+
(0..len).map(|_| rng.random_range(-1_000_000.0..1_000_000.0)).collect::<Vec<f64>>()
27+
}
28+
29+
fn make_i32_column(name: &str, len: usize) -> Column<u64> {
30+
use std::sync::Arc;
31+
let values = Arc::new(VecArray { data: random_i32(len), datatype: DataType::I32 });
32+
Column::new(name, values, Some(BitVector::<u64>::new_all_valid(len)))
33+
}
34+
35+
fn make_f64_column(name: &str, len: usize) -> Column<u64> {
36+
use std::sync::Arc;
37+
let values = Arc::new(VecArray { data: random_f64(len), datatype: DataType::F64 });
38+
Column::new(name, values, Some(BitVector::<u64>::new_all_valid(len)))
39+
}
40+
41+
fn bench_aggregate_i32(c: &mut Criterion, rt: &tokio::runtime::Runtime, len: usize) {
42+
let col_a = make_i32_column("a", len);
43+
let ops = [Operator::Min, Operator::Max, Operator::Sum, Operator::Count];
44+
let bytes = (len * size_of::<i32>()) as u64;
45+
46+
let mut group_rows = c.benchmark_group("aggregate_i32_rows");
47+
group_rows.throughput(Throughput::Elements(len as u64));
48+
for op in ops {
49+
let expr = Expr::aggregate(op, Expr::col("a"), false);
50+
let bench_id = BenchmarkId::new(format!("{:?}", op), len);
51+
group_rows.bench_with_input(bench_id, &op, |bch, &_op| {
52+
bch.iter(|| {
53+
let out = rt.block_on(async {
54+
evaluate(Device::GPU, ErrorMode::Tachyon, &expr, black_box(&[col_a.clone()]))
55+
.await
56+
});
57+
black_box(out).unwrap();
58+
});
59+
});
60+
}
61+
group_rows.finish();
62+
63+
let mut group_bytes = c.benchmark_group("aggregate_i32_bytes");
64+
group_bytes.throughput(Throughput::Bytes(bytes));
65+
for op in ops {
66+
let expr = Expr::aggregate(op, Expr::col("a"), false);
67+
let bench_id = BenchmarkId::new(format!("{:?}", op), len);
68+
group_bytes.bench_with_input(bench_id, &op, |bch, &_op| {
69+
bch.iter(|| {
70+
let out = rt.block_on(async {
71+
evaluate(Device::GPU, ErrorMode::Tachyon, &expr, black_box(&[col_a.clone()]))
72+
.await
73+
});
74+
black_box(out).unwrap();
75+
});
76+
});
77+
}
78+
group_bytes.finish();
79+
}
80+
81+
fn bench_aggregate_f64(c: &mut Criterion, rt: &tokio::runtime::Runtime, len: usize) {
82+
let col_a = make_f64_column("a", len);
83+
let expr = Expr::aggregate(Operator::Avg, Expr::col("a"), false);
84+
let bytes = (len * size_of::<f64>()) as u64;
85+
86+
let mut group_rows = c.benchmark_group("aggregate_f64_rows");
87+
group_rows.throughput(Throughput::Elements(len as u64));
88+
let bench_id_rows = BenchmarkId::new("Avg", len);
89+
group_rows.bench_with_input(bench_id_rows, &len, |bch, &_len| {
90+
bch.iter(|| {
91+
let out = rt.block_on(async {
92+
evaluate(Device::GPU, ErrorMode::Tachyon, &expr, black_box(&[col_a.clone()])).await
93+
});
94+
black_box(out).unwrap();
95+
});
96+
});
97+
group_rows.finish();
98+
99+
let mut group_bytes = c.benchmark_group("aggregate_f64_bytes");
100+
group_bytes.throughput(Throughput::Bytes(bytes));
101+
let bench_id_bytes = BenchmarkId::new("Avg", len);
102+
group_bytes.bench_with_input(bench_id_bytes, &len, |bch, &_len| {
103+
bch.iter(|| {
104+
let out = rt.block_on(async {
105+
evaluate(Device::GPU, ErrorMode::Tachyon, &expr, black_box(&[col_a.clone()])).await
106+
});
107+
black_box(out).unwrap();
108+
});
109+
});
110+
group_bytes.finish();
111+
}
112+
113+
fn bench_all_aggregate(c: &mut Criterion) {
114+
let rt = single_thread_runtime();
115+
let len = 1_000_000;
116+
bench_aggregate_i32(c, &rt, len);
117+
bench_aggregate_f64(c, &rt, len);
118+
}
119+
120+
criterion_group!(benches, bench_all_aggregate);
121+
criterion_main!(benches);

tachyon/compute/src/codegen.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,12 @@ impl CodeGen for Expr {
232232
});
233233
var
234234
}
235+
Expr::Aggregate { op, .. } => {
236+
return Err(TypeError::Unsupported(format!(
237+
"Aggregate {:?} is not supported in codegen yet",
238+
op
239+
)));
240+
}
235241
};
236242

237243
Ok(var)

tachyon/compute/src/evaluate.rs

Lines changed: 92 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use crate::column::Column;
1616
use crate::data_type::DataType;
1717
use crate::error::ErrorMode;
1818
use crate::expr::{Expr, SchemaContext};
19+
use crate::operator::Operator;
1920

2021
#[derive(Debug, Clone, PartialEq, Eq)]
2122
pub enum Device {
@@ -40,15 +41,27 @@ async fn evaluate_gpu<B: BitBlock>(
4041
.collect();
4142

4243
let schema_context = SchemaContext::new().with_columns(&column_map).with_error_mode(error_mode);
44+
match expr {
45+
Expr::Aggregate { op, arg, distinct } => {
46+
evaluate_gpu_aggregate::<B>(*op, arg.as_ref(), *distinct, &schema_context, columns)
47+
.await
48+
}
49+
_ => evaluate_gpu_row::<B>(expr, &schema_context, columns).await,
50+
}
51+
}
52+
53+
async fn evaluate_gpu_row<B: BitBlock>(
54+
expr: &Expr, schema_context: &SchemaContext, columns: &[Column<B>],
55+
) -> Result<Vec<Column<B>>, Box<dyn Error>> {
4356
let mut code_block = CodeBlock::default();
44-
expr.to_nvrtc::<B>(&schema_context, &mut code_block)?;
57+
expr.to_nvrtc::<B>(schema_context, &mut code_block)?;
4558

4659
let size = columns[0].len();
4760
let input_cols =
4861
columns.iter().map(|col| col.to_gpu_column()).collect::<Result<Vec<_>, _>>()?;
4962

5063
let mut output_cols = Vec::<gpu_column::Column>::new();
51-
let result_type = expr.infer_type(&schema_context)?;
64+
let result_type = expr.infer_type(schema_context)?;
5265

5366
let gpu_col = gpu_column::Column::new_uninitialized::<B>(
5467
size * result_type.native_size(),
@@ -68,3 +81,80 @@ async fn evaluate_gpu<B: BitBlock>(
6881

6982
Ok(result_cols)
7083
}
84+
85+
async fn evaluate_gpu_aggregate<B: BitBlock>(
86+
op: Operator, arg: &Expr, distinct: bool, schema_context: &SchemaContext, columns: &[Column<B>],
87+
) -> Result<Vec<Column<B>>, Box<dyn Error>> {
88+
if distinct {
89+
return Err("DISTINCT aggregates are not supported yet".into());
90+
}
91+
92+
let (col_idx, col_type) = match arg {
93+
Expr::Column(col_name) => schema_context
94+
.lookup(col_name)
95+
.copied()
96+
.ok_or_else(|| format!("unknown column: {}", col_name))?,
97+
_ => return Err("Aggregate argument must be a column reference".into()),
98+
};
99+
100+
let result_type =
101+
Expr::Aggregate { op, arg: Box::new(arg.clone()), distinct }.infer_type(schema_context)?;
102+
let code = build_aggregate_nvrtc_code::<B>(
103+
op,
104+
col_idx,
105+
col_type,
106+
result_type,
107+
schema_context.error_mode() == ErrorMode::Ansi,
108+
)?;
109+
let input_cols =
110+
columns.iter().map(|col| col.to_gpu_column()).collect::<Result<Vec<_>, _>>()?;
111+
112+
let mut output_cols = Vec::<gpu_column::Column>::new();
113+
let size = 1usize;
114+
let gpu_col = gpu_column::Column::new_uninitialized::<B>(
115+
size * result_type.native_size(),
116+
size.div_ceil(B::BITS),
117+
size,
118+
)?;
119+
output_cols.push(gpu_col);
120+
121+
cuda_launcher::launch_aggregate::<B>(&code, &input_cols, &output_cols).await?;
122+
123+
let result_cols = output_cols
124+
.into_iter()
125+
.map(|col| -> Result<_, Box<dyn Error>> {
126+
Column::from_gpu_column(&col, "r0", result_type)
127+
})
128+
.collect::<Result<Vec<_>, _>>()?;
129+
130+
Ok(result_cols)
131+
}
132+
133+
fn build_aggregate_nvrtc_code<B: BitBlock>(
134+
op: Operator, col_idx: u16, col_type: DataType, result_type: DataType, ansi_error_mode: bool,
135+
) -> Result<String, Box<dyn Error>> {
136+
let input_kernel_type = col_type.kernel_type();
137+
let output_kernel_type = result_type.kernel_type();
138+
let bits_type = B::C_TYPE;
139+
140+
let code = match op {
141+
Operator::Min => format!(
142+
"\taggregate_codegen::min<TypeKind::{input_kernel_type}, TypeKind::{output_kernel_type}, {bits_type}>(ctx, input, output, num_rows, {col_idx});\n"
143+
),
144+
Operator::Max => format!(
145+
"\taggregate_codegen::max<TypeKind::{input_kernel_type}, TypeKind::{output_kernel_type}, {bits_type}>(ctx, input, output, num_rows, {col_idx});\n"
146+
),
147+
Operator::Sum => format!(
148+
"\taggregate_codegen::sum<{ansi_error_mode}, TypeKind::{input_kernel_type}, TypeKind::{output_kernel_type}, {bits_type}>(ctx, input, output, num_rows, {col_idx});\n"
149+
),
150+
Operator::Avg => format!(
151+
"\taggregate_codegen::avg<{ansi_error_mode}, TypeKind::{input_kernel_type}, TypeKind::{output_kernel_type}, {bits_type}>(ctx, input, output, num_rows, {col_idx});\n"
152+
),
153+
Operator::Count => format!(
154+
"\taggregate_codegen::count<TypeKind::{input_kernel_type}, {bits_type}>(ctx, input, output, num_rows, {col_idx});\n"
155+
),
156+
_ => return Err(format!("Unsupported aggregate operator: {:?}", op).into()),
157+
};
158+
159+
Ok(code)
160+
}

0 commit comments

Comments
 (0)