Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 29 additions & 29 deletions .github/actions/setup/action.yml
Original file line number Diff line number Diff line change
@@ -1,39 +1,39 @@
name: Setup Rust Env
description: "Checkout repo, install protobuf, and make build script executable"
runs:
using: "composite"
steps:
- uses: actions/checkout@v4
using: "composite"
steps:
- uses: actions/checkout@v4

- name: Update apt
run: apt-get update
shell: bash
- name: Update apt
run: apt-get update
shell: bash

- name: Install protobuf
run: apt-get install -y protobuf-compiler libprotobuf-dev
shell: bash
- name: Install protobuf
run: apt-get install -y protobuf-compiler libprotobuf-dev
shell: bash

- name: Install Rust components
run: rustup component add rustfmt clippy
shell: bash
- name: Install Rust components
run: rustup component add rustfmt clippy
shell: bash

- name: Install CPP components
run: apt install -y clang-format
shell: bash
- name: Install CPP components
run: apt install -y clang-format
shell: bash

- name: Cache rust packages
if: ${{ !env.ACT }}
uses: swatinem/rust-cache@v2.7.7
- name: Cache rust packages
if: ${{ !env.ACT }}
uses: swatinem/rust-cache@v2.7.7

- name: Make build script executable
run: chmod +x ./bolt.sh
shell: bash
- name: Make build script executable
run: chmod +x ./bolt.sh
shell: bash

- name: Setup CUDA
run: |
wget https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2404/x86_64/cuda-keyring_1.1-1_all.deb
dpkg -i cuda-keyring_1.1-1_all.deb
apt-get update
apt-get install -y cuda-toolkit-13-0
apt-get install -y cuda-drivers
shell: bash
- name: Setup CUDA
run: |
wget https://developer.download.nvidia.com/compute/cuda/repos/debian13/x86_64/cuda-keyring_1.1-1_all.deb
dpkg -i cuda-keyring_1.1-1_all.deb
apt-get update
apt-get install -y cuda-toolkit-13-1
apt-get install -y cuda-drivers
shell: bash
4 changes: 4 additions & 0 deletions tachyon/compute/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,7 @@ criterion.workspace = true
[[bench]]
name = "evaluate_bench"
harness = false

[[bench]]
name = "aggregate_bench"
harness = false
121 changes: 121 additions & 0 deletions tachyon/compute/benches/aggregate_bench.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
use std::hint::black_box;
use std::mem::size_of;

use compute::bit_vector::BitVector;
use compute::column::{Column, VecArray};
use compute::data_type::DataType;
use compute::error::ErrorMode;
use compute::evaluate::{Device, evaluate};
use compute::expr::Expr;
use compute::operator::Operator;
use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main};
use rand::Rng;
use tokio::runtime::Builder;

fn single_thread_runtime() -> tokio::runtime::Runtime {
Builder::new_current_thread().enable_all().build().unwrap()
}

fn random_i32(len: usize) -> Vec<i32> {
let mut rng = rand::rng();
(0..len).map(|_| rng.random_range(i32::MIN..i32::MAX)).collect::<Vec<i32>>()
}

fn random_f64(len: usize) -> Vec<f64> {
let mut rng = rand::rng();
(0..len).map(|_| rng.random_range(-1_000_000.0..1_000_000.0)).collect::<Vec<f64>>()
}

fn make_i32_column(name: &str, len: usize) -> Column<u64> {
use std::sync::Arc;
let values = Arc::new(VecArray { data: random_i32(len), datatype: DataType::I32 });
Column::new(name, values, Some(BitVector::<u64>::new_all_valid(len)))
}

fn make_f64_column(name: &str, len: usize) -> Column<u64> {
use std::sync::Arc;
let values = Arc::new(VecArray { data: random_f64(len), datatype: DataType::F64 });
Column::new(name, values, Some(BitVector::<u64>::new_all_valid(len)))
}

fn bench_aggregate_i32(c: &mut Criterion, rt: &tokio::runtime::Runtime, len: usize) {
let col_a = make_i32_column("a", len);
let ops = [Operator::Min, Operator::Max, Operator::Sum, Operator::Count];
let bytes = (len * size_of::<i32>()) as u64;

let mut group_rows = c.benchmark_group("aggregate_i32_rows");
group_rows.throughput(Throughput::Elements(len as u64));
for op in ops {
let expr = Expr::aggregate(op, Expr::col("a"), false);
let bench_id = BenchmarkId::new(format!("{:?}", op), len);
group_rows.bench_with_input(bench_id, &op, |bch, &_op| {
bch.iter(|| {
let out = rt.block_on(async {
evaluate(Device::GPU, ErrorMode::Tachyon, &expr, black_box(&[col_a.clone()]))
.await
});
black_box(out).unwrap();
});
});
}
group_rows.finish();

let mut group_bytes = c.benchmark_group("aggregate_i32_bytes");
group_bytes.throughput(Throughput::Bytes(bytes));
for op in ops {
let expr = Expr::aggregate(op, Expr::col("a"), false);
let bench_id = BenchmarkId::new(format!("{:?}", op), len);
group_bytes.bench_with_input(bench_id, &op, |bch, &_op| {
bch.iter(|| {
let out = rt.block_on(async {
evaluate(Device::GPU, ErrorMode::Tachyon, &expr, black_box(&[col_a.clone()]))
.await
});
black_box(out).unwrap();
});
});
}
group_bytes.finish();
}

fn bench_aggregate_f64(c: &mut Criterion, rt: &tokio::runtime::Runtime, len: usize) {
let col_a = make_f64_column("a", len);
let expr = Expr::aggregate(Operator::Avg, Expr::col("a"), false);
let bytes = (len * size_of::<f64>()) as u64;

let mut group_rows = c.benchmark_group("aggregate_f64_rows");
group_rows.throughput(Throughput::Elements(len as u64));
let bench_id_rows = BenchmarkId::new("Avg", len);
group_rows.bench_with_input(bench_id_rows, &len, |bch, &_len| {
bch.iter(|| {
let out = rt.block_on(async {
evaluate(Device::GPU, ErrorMode::Tachyon, &expr, black_box(&[col_a.clone()])).await
});
black_box(out).unwrap();
});
});
group_rows.finish();

let mut group_bytes = c.benchmark_group("aggregate_f64_bytes");
group_bytes.throughput(Throughput::Bytes(bytes));
let bench_id_bytes = BenchmarkId::new("Avg", len);
group_bytes.bench_with_input(bench_id_bytes, &len, |bch, &_len| {
bch.iter(|| {
let out = rt.block_on(async {
evaluate(Device::GPU, ErrorMode::Tachyon, &expr, black_box(&[col_a.clone()])).await
});
black_box(out).unwrap();
});
});
group_bytes.finish();
}

fn bench_all_aggregate(c: &mut Criterion) {
let rt = single_thread_runtime();
let len = 1_000_000;
bench_aggregate_i32(c, &rt, len);
bench_aggregate_f64(c, &rt, len);
}

criterion_group!(benches, bench_all_aggregate);
criterion_main!(benches);
6 changes: 6 additions & 0 deletions tachyon/compute/src/codegen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,12 @@ impl CodeGen for Expr {
});
var
}
Expr::Aggregate { op, .. } => {
return Err(TypeError::Unsupported(format!(
"Aggregate {:?} is not supported in codegen yet",
op
)));
}
};

Ok(var)
Expand Down
94 changes: 92 additions & 2 deletions tachyon/compute/src/evaluate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::column::Column;
use crate::data_type::DataType;
use crate::error::ErrorMode;
use crate::expr::{Expr, SchemaContext};
use crate::operator::Operator;

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Device {
Expand All @@ -40,15 +41,27 @@ async fn evaluate_gpu<B: BitBlock>(
.collect();

let schema_context = SchemaContext::new().with_columns(&column_map).with_error_mode(error_mode);
match expr {
Expr::Aggregate { op, arg, distinct } => {
evaluate_gpu_aggregate::<B>(*op, arg.as_ref(), *distinct, &schema_context, columns)
.await
}
_ => evaluate_gpu_row::<B>(expr, &schema_context, columns).await,
}
}

async fn evaluate_gpu_row<B: BitBlock>(
expr: &Expr, schema_context: &SchemaContext, columns: &[Column<B>],
) -> Result<Vec<Column<B>>, Box<dyn Error>> {
let mut code_block = CodeBlock::default();
expr.to_nvrtc::<B>(&schema_context, &mut code_block)?;
expr.to_nvrtc::<B>(schema_context, &mut code_block)?;

let size = columns[0].len();
let input_cols =
columns.iter().map(|col| col.to_gpu_column()).collect::<Result<Vec<_>, _>>()?;

let mut output_cols = Vec::<gpu_column::Column>::new();
let result_type = expr.infer_type(&schema_context)?;
let result_type = expr.infer_type(schema_context)?;

let gpu_col = gpu_column::Column::new_uninitialized::<B>(
size * result_type.native_size(),
Expand All @@ -68,3 +81,80 @@ async fn evaluate_gpu<B: BitBlock>(

Ok(result_cols)
}

async fn evaluate_gpu_aggregate<B: BitBlock>(
op: Operator, arg: &Expr, distinct: bool, schema_context: &SchemaContext, columns: &[Column<B>],
) -> Result<Vec<Column<B>>, Box<dyn Error>> {
if distinct {
return Err("DISTINCT aggregates are not supported yet".into());
}

let (col_idx, col_type) = match arg {
Expr::Column(col_name) => schema_context
.lookup(col_name)
.copied()
.ok_or_else(|| format!("unknown column: {}", col_name))?,
_ => return Err("Aggregate argument must be a column reference".into()),
};

let result_type =
Expr::Aggregate { op, arg: Box::new(arg.clone()), distinct }.infer_type(schema_context)?;
let code = build_aggregate_nvrtc_code::<B>(
op,
col_idx,
col_type,
result_type,
schema_context.error_mode() == ErrorMode::Ansi,
)?;
let input_cols =
columns.iter().map(|col| col.to_gpu_column()).collect::<Result<Vec<_>, _>>()?;

let mut output_cols = Vec::<gpu_column::Column>::new();
let size = 1usize;
let gpu_col = gpu_column::Column::new_uninitialized::<B>(
size * result_type.native_size(),
size.div_ceil(B::BITS),
size,
)?;
output_cols.push(gpu_col);

cuda_launcher::launch_aggregate::<B>(&code, &input_cols, &output_cols).await?;

let result_cols = output_cols
.into_iter()
.map(|col| -> Result<_, Box<dyn Error>> {
Column::from_gpu_column(&col, "r0", result_type)
})
.collect::<Result<Vec<_>, _>>()?;

Ok(result_cols)
}

fn build_aggregate_nvrtc_code<B: BitBlock>(
op: Operator, col_idx: u16, col_type: DataType, result_type: DataType, ansi_error_mode: bool,
) -> Result<String, Box<dyn Error>> {
let input_kernel_type = col_type.kernel_type();
let output_kernel_type = result_type.kernel_type();
let bits_type = B::C_TYPE;

let code = match op {
Operator::Min => format!(
"\taggregate_codegen::min<TypeKind::{input_kernel_type}, TypeKind::{output_kernel_type}, {bits_type}>(ctx, input, output, num_rows, {col_idx});\n"
),
Operator::Max => format!(
"\taggregate_codegen::max<TypeKind::{input_kernel_type}, TypeKind::{output_kernel_type}, {bits_type}>(ctx, input, output, num_rows, {col_idx});\n"
),
Operator::Sum => format!(
"\taggregate_codegen::sum<{ansi_error_mode}, TypeKind::{input_kernel_type}, TypeKind::{output_kernel_type}, {bits_type}>(ctx, input, output, num_rows, {col_idx});\n"
),
Operator::Avg => format!(
"\taggregate_codegen::avg<{ansi_error_mode}, TypeKind::{input_kernel_type}, TypeKind::{output_kernel_type}, {bits_type}>(ctx, input, output, num_rows, {col_idx});\n"
),
Operator::Count => format!(
"\taggregate_codegen::count<TypeKind::{input_kernel_type}, {bits_type}>(ctx, input, output, num_rows, {col_idx});\n"
),
_ => return Err(format!("Unsupported aggregate operator: {:?}", op).into()),
};

Ok(code)
}
Loading
Loading