Skip to content

Commit 282dfce

Browse files
committed
Added aggregate expr and kernel
1 parent ce949f1 commit 282dfce

File tree

14 files changed

+1828
-21
lines changed

14 files changed

+1828
-21
lines changed

.github/actions/setup/action.yml

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,10 @@ runs:
3131

3232
- name: Setup CUDA
3333
run: |
34-
wget https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2404/x86_64/cuda-keyring_1.1-1_all.deb
35-
dpkg -i cuda-keyring_1.1-1_all.deb
36-
apt-get update
37-
apt-get install -y cuda-toolkit-13-0
38-
apt-get install -y cuda-drivers
34+
# Fix when cuda keyring certifcate is fixed
35+
#wget https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2404/x86_64/cuda-keyring_1.1-1_all.deb
36+
#dpkg -i cuda-keyring_1.1-1_all.deb
37+
#apt-get update
38+
#apt-get install -y cuda-toolkit-13-0
39+
#apt-get install -y cuda-drivers
3940
shell: bash

tachyon/compute/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,7 @@ criterion.workspace = true
3131
[[bench]]
3232
name = "evaluate_bench"
3333
harness = false
34+
35+
[[bench]]
36+
name = "aggregate_bench"
37+
harness = false
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
use std::hint::black_box;
2+
use std::mem::size_of;
3+
4+
use compute::bit_vector::BitVector;
5+
use compute::column::{Column, VecArray};
6+
use compute::data_type::DataType;
7+
use compute::error::ErrorMode;
8+
use compute::evaluate::{Device, evaluate};
9+
use compute::expr::Expr;
10+
use compute::operator::Operator;
11+
use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main};
12+
use rand::Rng;
13+
use tokio::runtime::Builder;
14+
15+
fn single_thread_runtime() -> tokio::runtime::Runtime {
16+
Builder::new_current_thread().enable_all().build().unwrap()
17+
}
18+
19+
fn random_i32(len: usize) -> Vec<i32> {
20+
let mut rng = rand::rng();
21+
(0..len).map(|_| rng.random_range(i32::MIN..i32::MAX)).collect::<Vec<i32>>()
22+
}
23+
24+
fn random_f64(len: usize) -> Vec<f64> {
25+
let mut rng = rand::rng();
26+
(0..len).map(|_| rng.random_range(-1_000_000.0..1_000_000.0)).collect::<Vec<f64>>()
27+
}
28+
29+
fn make_i32_column(name: &str, len: usize) -> Column<u64> {
30+
use std::sync::Arc;
31+
let values = Arc::new(VecArray { data: random_i32(len), datatype: DataType::I32 });
32+
Column::new(name, values, Some(BitVector::<u64>::new_all_valid(len)))
33+
}
34+
35+
fn make_f64_column(name: &str, len: usize) -> Column<u64> {
36+
use std::sync::Arc;
37+
let values = Arc::new(VecArray { data: random_f64(len), datatype: DataType::F64 });
38+
Column::new(name, values, Some(BitVector::<u64>::new_all_valid(len)))
39+
}
40+
41+
fn bench_aggregate_i32(c: &mut Criterion, rt: &tokio::runtime::Runtime, len: usize) {
42+
let col_a = make_i32_column("a", len);
43+
let ops = [Operator::Min, Operator::Max, Operator::Sum, Operator::Count];
44+
let bytes = (len * size_of::<i32>()) as u64;
45+
46+
let mut group_rows = c.benchmark_group("aggregate_i32_rows");
47+
group_rows.throughput(Throughput::Elements(len as u64));
48+
for op in ops {
49+
let expr = Expr::aggregate(op, Expr::col("a"), false);
50+
let bench_id = BenchmarkId::new(format!("{:?}", op), len);
51+
group_rows.bench_with_input(bench_id, &op, |bch, &_op| {
52+
bch.iter(|| {
53+
let out = rt.block_on(async {
54+
evaluate(Device::GPU, ErrorMode::Tachyon, &expr, black_box(&[col_a.clone()]))
55+
.await
56+
});
57+
black_box(out).unwrap();
58+
});
59+
});
60+
}
61+
group_rows.finish();
62+
63+
let mut group_bytes = c.benchmark_group("aggregate_i32_bytes");
64+
group_bytes.throughput(Throughput::Bytes(bytes));
65+
for op in ops {
66+
let expr = Expr::aggregate(op, Expr::col("a"), false);
67+
let bench_id = BenchmarkId::new(format!("{:?}", op), len);
68+
group_bytes.bench_with_input(bench_id, &op, |bch, &_op| {
69+
bch.iter(|| {
70+
let out = rt.block_on(async {
71+
evaluate(Device::GPU, ErrorMode::Tachyon, &expr, black_box(&[col_a.clone()]))
72+
.await
73+
});
74+
black_box(out).unwrap();
75+
});
76+
});
77+
}
78+
group_bytes.finish();
79+
}
80+
81+
fn bench_aggregate_f64(c: &mut Criterion, rt: &tokio::runtime::Runtime, len: usize) {
82+
let col_a = make_f64_column("a", len);
83+
let expr = Expr::aggregate(Operator::Avg, Expr::col("a"), false);
84+
let bytes = (len * size_of::<f64>()) as u64;
85+
86+
let mut group_rows = c.benchmark_group("aggregate_f64_rows");
87+
group_rows.throughput(Throughput::Elements(len as u64));
88+
let bench_id_rows = BenchmarkId::new("Avg", len);
89+
group_rows.bench_with_input(bench_id_rows, &len, |bch, &_len| {
90+
bch.iter(|| {
91+
let out = rt.block_on(async {
92+
evaluate(Device::GPU, ErrorMode::Tachyon, &expr, black_box(&[col_a.clone()])).await
93+
});
94+
black_box(out).unwrap();
95+
});
96+
});
97+
group_rows.finish();
98+
99+
let mut group_bytes = c.benchmark_group("aggregate_f64_bytes");
100+
group_bytes.throughput(Throughput::Bytes(bytes));
101+
let bench_id_bytes = BenchmarkId::new("Avg", len);
102+
group_bytes.bench_with_input(bench_id_bytes, &len, |bch, &_len| {
103+
bch.iter(|| {
104+
let out = rt.block_on(async {
105+
evaluate(Device::GPU, ErrorMode::Tachyon, &expr, black_box(&[col_a.clone()])).await
106+
});
107+
black_box(out).unwrap();
108+
});
109+
});
110+
group_bytes.finish();
111+
}
112+
113+
fn bench_all_aggregate(c: &mut Criterion) {
114+
let rt = single_thread_runtime();
115+
let len = 1_000_000;
116+
bench_aggregate_i32(c, &rt, len);
117+
bench_aggregate_f64(c, &rt, len);
118+
}
119+
120+
criterion_group!(benches, bench_all_aggregate);
121+
criterion_main!(benches);

tachyon/compute/src/codegen.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,12 @@ impl CodeGen for Expr {
232232
});
233233
var
234234
}
235+
Expr::Aggregate { op, .. } => {
236+
return Err(TypeError::Unsupported(format!(
237+
"Aggregate {:?} is not supported in codegen yet",
238+
op
239+
)));
240+
}
235241
};
236242

237243
Ok(var)

tachyon/compute/src/evaluate.rs

Lines changed: 92 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use crate::column::Column;
1616
use crate::data_type::DataType;
1717
use crate::error::ErrorMode;
1818
use crate::expr::{Expr, SchemaContext};
19+
use crate::operator::Operator;
1920

2021
#[derive(Debug, Clone, PartialEq, Eq)]
2122
pub enum Device {
@@ -40,15 +41,27 @@ async fn evaluate_gpu<B: BitBlock>(
4041
.collect();
4142

4243
let schema_context = SchemaContext::new().with_columns(&column_map).with_error_mode(error_mode);
44+
match expr {
45+
Expr::Aggregate { op, arg, distinct } => {
46+
evaluate_gpu_aggregate::<B>(*op, arg.as_ref(), *distinct, &schema_context, columns)
47+
.await
48+
}
49+
_ => evaluate_gpu_row::<B>(expr, &schema_context, columns).await,
50+
}
51+
}
52+
53+
async fn evaluate_gpu_row<B: BitBlock>(
54+
expr: &Expr, schema_context: &SchemaContext, columns: &[Column<B>],
55+
) -> Result<Vec<Column<B>>, Box<dyn Error>> {
4356
let mut code_block = CodeBlock::default();
44-
expr.to_nvrtc::<B>(&schema_context, &mut code_block)?;
57+
expr.to_nvrtc::<B>(schema_context, &mut code_block)?;
4558

4659
let size = columns[0].len();
4760
let input_cols =
4861
columns.iter().map(|col| col.to_gpu_column()).collect::<Result<Vec<_>, _>>()?;
4962

5063
let mut output_cols = Vec::<gpu_column::Column>::new();
51-
let result_type = expr.infer_type(&schema_context)?;
64+
let result_type = expr.infer_type(schema_context)?;
5265

5366
let gpu_col = gpu_column::Column::new_uninitialized::<B>(
5467
size * result_type.native_size(),
@@ -68,3 +81,80 @@ async fn evaluate_gpu<B: BitBlock>(
6881

6982
Ok(result_cols)
7083
}
84+
85+
async fn evaluate_gpu_aggregate<B: BitBlock>(
86+
op: Operator, arg: &Expr, distinct: bool, schema_context: &SchemaContext, columns: &[Column<B>],
87+
) -> Result<Vec<Column<B>>, Box<dyn Error>> {
88+
if distinct {
89+
return Err("DISTINCT aggregates are not supported yet".into());
90+
}
91+
92+
let (col_idx, col_type) = match arg {
93+
Expr::Column(col_name) => schema_context
94+
.lookup(col_name)
95+
.copied()
96+
.ok_or_else(|| format!("unknown column: {}", col_name))?,
97+
_ => return Err("Aggregate argument must be a column reference".into()),
98+
};
99+
100+
let result_type =
101+
Expr::Aggregate { op, arg: Box::new(arg.clone()), distinct }.infer_type(schema_context)?;
102+
let code = build_aggregate_nvrtc_code::<B>(
103+
op,
104+
col_idx,
105+
col_type,
106+
result_type,
107+
schema_context.error_mode() == ErrorMode::Ansi,
108+
)?;
109+
let input_cols =
110+
columns.iter().map(|col| col.to_gpu_column()).collect::<Result<Vec<_>, _>>()?;
111+
112+
let mut output_cols = Vec::<gpu_column::Column>::new();
113+
let size = 1usize;
114+
let gpu_col = gpu_column::Column::new_uninitialized::<B>(
115+
size * result_type.native_size(),
116+
size.div_ceil(B::BITS),
117+
size,
118+
)?;
119+
output_cols.push(gpu_col);
120+
121+
cuda_launcher::launch_aggregate::<B>(&code, &input_cols, &output_cols).await?;
122+
123+
let result_cols = output_cols
124+
.into_iter()
125+
.map(|col| -> Result<_, Box<dyn Error>> {
126+
Column::from_gpu_column(&col, "r0", result_type)
127+
})
128+
.collect::<Result<Vec<_>, _>>()?;
129+
130+
Ok(result_cols)
131+
}
132+
133+
fn build_aggregate_nvrtc_code<B: BitBlock>(
134+
op: Operator, col_idx: u16, col_type: DataType, result_type: DataType, ansi_error_mode: bool,
135+
) -> Result<String, Box<dyn Error>> {
136+
let input_kernel_type = col_type.kernel_type();
137+
let output_kernel_type = result_type.kernel_type();
138+
let bits_type = B::C_TYPE;
139+
140+
let code = match op {
141+
Operator::Min => format!(
142+
"\taggregate_codegen::min<TypeKind::{input_kernel_type}, TypeKind::{output_kernel_type}, {bits_type}>(ctx, input, output, num_rows, {col_idx});\n"
143+
),
144+
Operator::Max => format!(
145+
"\taggregate_codegen::max<TypeKind::{input_kernel_type}, TypeKind::{output_kernel_type}, {bits_type}>(ctx, input, output, num_rows, {col_idx});\n"
146+
),
147+
Operator::Sum => format!(
148+
"\taggregate_codegen::sum<{ansi_error_mode}, TypeKind::{input_kernel_type}, TypeKind::{output_kernel_type}, {bits_type}>(ctx, input, output, num_rows, {col_idx});\n"
149+
),
150+
Operator::Avg => format!(
151+
"\taggregate_codegen::avg<{ansi_error_mode}, TypeKind::{input_kernel_type}, TypeKind::{output_kernel_type}, {bits_type}>(ctx, input, output, num_rows, {col_idx});\n"
152+
),
153+
Operator::Count => format!(
154+
"\taggregate_codegen::count<TypeKind::{input_kernel_type}, {bits_type}>(ctx, input, output, num_rows, {col_idx});\n"
155+
),
156+
_ => return Err(format!("Unsupported aggregate operator: {:?}", op).into()),
157+
};
158+
159+
Ok(code)
160+
}

tachyon/compute/src/expr.rs

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use half::{bf16, f16};
1212

1313
use crate::data_type::DataType;
1414
use crate::error::ErrorMode;
15-
use crate::operator::Operator;
15+
pub use crate::operator::Operator;
1616

1717
#[derive(Debug, Clone, PartialEq)]
1818
pub enum Literal {
@@ -47,6 +47,8 @@ pub enum Expr {
4747
Call { name: String, args: Vec<Expr> },
4848

4949
Cast { expr: Box<Expr>, to: DataType },
50+
51+
Aggregate { op: Operator, arg: Box<Expr>, distinct: bool },
5052
}
5153

5254
impl Expr {
@@ -126,6 +128,10 @@ impl Expr {
126128
Expr::Cast { expr: Box::new(self), to }
127129
}
128130

131+
pub fn aggregate(op: Operator, arg: Expr, distinct: bool) -> Self {
132+
Expr::Aggregate { op, arg: Box::new(arg), distinct }
133+
}
134+
129135
pub fn children(&self) -> Vec<&Expr> {
130136
match self {
131137
Expr::Column(_) | Expr::Literal(_) => vec![],
@@ -134,6 +140,7 @@ impl Expr {
134140
Expr::Nary { args, .. } => args.iter().map(|x| x.as_ref()).collect(),
135141
Expr::Call { args, .. } => args.iter().collect(),
136142
Expr::Cast { expr, .. } => vec![expr.as_ref()],
143+
Expr::Aggregate { arg, .. } => vec![arg.as_ref()],
137144
}
138145
}
139146
}
@@ -313,6 +320,31 @@ impl Expr {
313320
let _ = expr.infer_type(schema)?;
314321
Ok(*to)
315322
}
323+
324+
Expr::Aggregate { op, arg, .. } => {
325+
let t = arg.infer_type(schema)?;
326+
match op {
327+
Operator::Count => Ok(DataType::U64),
328+
Operator::Sum | Operator::Avg | Operator::Min | Operator::Max => match t {
329+
DataType::I8
330+
| DataType::I16
331+
| DataType::I32
332+
| DataType::I64
333+
| DataType::U8
334+
| DataType::U16
335+
| DataType::U32
336+
| DataType::U64
337+
| DataType::BF16
338+
| DataType::F16
339+
| DataType::F32
340+
| DataType::F64 => Ok(t),
341+
_ => Err(TypeError::Unsupported(format!("aggregate {:?} on {:?}", op, t))),
342+
},
343+
_ => {
344+
Err(TypeError::Unsupported(format!("not an aggregate operator: {:?}", op)))
345+
}
346+
}
347+
}
316348
}
317349
}
318350

@@ -413,7 +445,17 @@ impl Expr {
413445
Ok(Expr::Call { name: name.clone(), args: simplified_args })
414446
}
415447

416-
_ => Ok(self.clone()),
448+
Expr::Cast { expr, to } => {
449+
let simplified_expr = expr.simplify(schema)?;
450+
Ok(Expr::Cast { expr: Box::new(simplified_expr), to: *to })
451+
}
452+
453+
Expr::Aggregate { op, arg, distinct } => {
454+
let simplified_arg = arg.simplify(schema)?;
455+
Ok(Expr::Aggregate { op: *op, arg: Box::new(simplified_arg), distinct: *distinct })
456+
}
457+
458+
Expr::Column(_) | Expr::Literal(_) => Ok(self.clone()),
417459
}
418460
}
419461
}
@@ -495,6 +537,13 @@ impl fmt::Display for Expr {
495537
Expr::Nary { op, args } => write!(f, "{}({:?})", op, args),
496538
Expr::Call { name, args } => write!(f, "{}({:?})", name, args),
497539
Expr::Cast { expr, to } => write!(f, "cast({} as {:?})", expr, to),
540+
Expr::Aggregate { op, arg, distinct } => {
541+
if *distinct {
542+
write!(f, "{}(DISTINCT {})", op, arg)
543+
} else {
544+
write!(f, "{}({})", op, arg)
545+
}
546+
}
498547
}
499548
}
500549
}

0 commit comments

Comments
 (0)