diff --git a/.github/actions/setup/action.yml b/.github/actions/setup/action.yml index 7682856..325edb0 100644 --- a/.github/actions/setup/action.yml +++ b/.github/actions/setup/action.yml @@ -1,39 +1,39 @@ name: Setup Rust Env description: "Checkout repo, install protobuf, and make build script executable" runs: - using: "composite" - steps: - - uses: actions/checkout@v4 + using: "composite" + steps: + - uses: actions/checkout@v4 - - name: Update apt - run: apt-get update - shell: bash + - name: Update apt + run: apt-get update + shell: bash - - name: Install protobuf - run: apt-get install -y protobuf-compiler libprotobuf-dev - shell: bash + - name: Install protobuf + run: apt-get install -y protobuf-compiler libprotobuf-dev + shell: bash - - name: Install Rust components - run: rustup component add rustfmt clippy - shell: bash + - name: Install Rust components + run: rustup component add rustfmt clippy + shell: bash - - name: Install CPP components - run: apt install -y clang-format - shell: bash + - name: Install CPP components + run: apt install -y clang-format + shell: bash - - name: Cache rust packages - if: ${{ !env.ACT }} - uses: swatinem/rust-cache@v2.7.7 + - name: Cache rust packages + if: ${{ !env.ACT }} + uses: swatinem/rust-cache@v2.7.7 - - name: Make build script executable - run: chmod +x ./bolt.sh - shell: bash + - name: Make build script executable + run: chmod +x ./bolt.sh + shell: bash - - name: Setup CUDA - run: | - wget https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2404/x86_64/cuda-keyring_1.1-1_all.deb - dpkg -i cuda-keyring_1.1-1_all.deb - apt-get update - apt-get install -y cuda-toolkit-13-0 - apt-get install -y cuda-drivers - shell: bash + - name: Setup CUDA + run: | + wget https://developer.download.nvidia.com/compute/cuda/repos/debian13/x86_64/cuda-keyring_1.1-1_all.deb + dpkg -i cuda-keyring_1.1-1_all.deb + apt-get update + apt-get install -y cuda-toolkit-13-1 + apt-get install -y cuda-drivers + shell: bash diff --git a/tachyon/compute/Cargo.toml b/tachyon/compute/Cargo.toml index a6c17eb..eff255d 100644 --- a/tachyon/compute/Cargo.toml +++ b/tachyon/compute/Cargo.toml @@ -31,3 +31,7 @@ criterion.workspace = true [[bench]] name = "evaluate_bench" harness = false + +[[bench]] +name = "aggregate_bench" +harness = false diff --git a/tachyon/compute/benches/aggregate_bench.rs b/tachyon/compute/benches/aggregate_bench.rs new file mode 100644 index 0000000..c4682db --- /dev/null +++ b/tachyon/compute/benches/aggregate_bench.rs @@ -0,0 +1,121 @@ +use std::hint::black_box; +use std::mem::size_of; + +use compute::bit_vector::BitVector; +use compute::column::{Column, VecArray}; +use compute::data_type::DataType; +use compute::error::ErrorMode; +use compute::evaluate::{Device, evaluate}; +use compute::expr::Expr; +use compute::operator::Operator; +use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main}; +use rand::Rng; +use tokio::runtime::Builder; + +fn single_thread_runtime() -> tokio::runtime::Runtime { + Builder::new_current_thread().enable_all().build().unwrap() +} + +fn random_i32(len: usize) -> Vec { + let mut rng = rand::rng(); + (0..len).map(|_| rng.random_range(i32::MIN..i32::MAX)).collect::>() +} + +fn random_f64(len: usize) -> Vec { + let mut rng = rand::rng(); + (0..len).map(|_| rng.random_range(-1_000_000.0..1_000_000.0)).collect::>() +} + +fn make_i32_column(name: &str, len: usize) -> Column { + use std::sync::Arc; + let values = Arc::new(VecArray { data: random_i32(len), datatype: DataType::I32 }); + Column::new(name, values, Some(BitVector::::new_all_valid(len))) +} + +fn make_f64_column(name: &str, len: usize) -> Column { + use std::sync::Arc; + let values = Arc::new(VecArray { data: random_f64(len), datatype: DataType::F64 }); + Column::new(name, values, Some(BitVector::::new_all_valid(len))) +} + +fn bench_aggregate_i32(c: &mut Criterion, rt: &tokio::runtime::Runtime, len: usize) { + let col_a = make_i32_column("a", len); + let ops = [Operator::Min, Operator::Max, Operator::Sum, Operator::Count]; + let bytes = (len * size_of::()) as u64; + + let mut group_rows = c.benchmark_group("aggregate_i32_rows"); + group_rows.throughput(Throughput::Elements(len as u64)); + for op in ops { + let expr = Expr::aggregate(op, Expr::col("a"), false); + let bench_id = BenchmarkId::new(format!("{:?}", op), len); + group_rows.bench_with_input(bench_id, &op, |bch, &_op| { + bch.iter(|| { + let out = rt.block_on(async { + evaluate(Device::GPU, ErrorMode::Tachyon, &expr, black_box(&[col_a.clone()])) + .await + }); + black_box(out).unwrap(); + }); + }); + } + group_rows.finish(); + + let mut group_bytes = c.benchmark_group("aggregate_i32_bytes"); + group_bytes.throughput(Throughput::Bytes(bytes)); + for op in ops { + let expr = Expr::aggregate(op, Expr::col("a"), false); + let bench_id = BenchmarkId::new(format!("{:?}", op), len); + group_bytes.bench_with_input(bench_id, &op, |bch, &_op| { + bch.iter(|| { + let out = rt.block_on(async { + evaluate(Device::GPU, ErrorMode::Tachyon, &expr, black_box(&[col_a.clone()])) + .await + }); + black_box(out).unwrap(); + }); + }); + } + group_bytes.finish(); +} + +fn bench_aggregate_f64(c: &mut Criterion, rt: &tokio::runtime::Runtime, len: usize) { + let col_a = make_f64_column("a", len); + let expr = Expr::aggregate(Operator::Avg, Expr::col("a"), false); + let bytes = (len * size_of::()) as u64; + + let mut group_rows = c.benchmark_group("aggregate_f64_rows"); + group_rows.throughput(Throughput::Elements(len as u64)); + let bench_id_rows = BenchmarkId::new("Avg", len); + group_rows.bench_with_input(bench_id_rows, &len, |bch, &_len| { + bch.iter(|| { + let out = rt.block_on(async { + evaluate(Device::GPU, ErrorMode::Tachyon, &expr, black_box(&[col_a.clone()])).await + }); + black_box(out).unwrap(); + }); + }); + group_rows.finish(); + + let mut group_bytes = c.benchmark_group("aggregate_f64_bytes"); + group_bytes.throughput(Throughput::Bytes(bytes)); + let bench_id_bytes = BenchmarkId::new("Avg", len); + group_bytes.bench_with_input(bench_id_bytes, &len, |bch, &_len| { + bch.iter(|| { + let out = rt.block_on(async { + evaluate(Device::GPU, ErrorMode::Tachyon, &expr, black_box(&[col_a.clone()])).await + }); + black_box(out).unwrap(); + }); + }); + group_bytes.finish(); +} + +fn bench_all_aggregate(c: &mut Criterion) { + let rt = single_thread_runtime(); + let len = 1_000_000; + bench_aggregate_i32(c, &rt, len); + bench_aggregate_f64(c, &rt, len); +} + +criterion_group!(benches, bench_all_aggregate); +criterion_main!(benches); diff --git a/tachyon/compute/src/codegen.rs b/tachyon/compute/src/codegen.rs index f479293..9bcfc38 100644 --- a/tachyon/compute/src/codegen.rs +++ b/tachyon/compute/src/codegen.rs @@ -232,6 +232,12 @@ impl CodeGen for Expr { }); var } + Expr::Aggregate { op, .. } => { + return Err(TypeError::Unsupported(format!( + "Aggregate {:?} is not supported in codegen yet", + op + ))); + } }; Ok(var) diff --git a/tachyon/compute/src/evaluate.rs b/tachyon/compute/src/evaluate.rs index 7ee2bea..e97e278 100644 --- a/tachyon/compute/src/evaluate.rs +++ b/tachyon/compute/src/evaluate.rs @@ -16,6 +16,7 @@ use crate::column::Column; use crate::data_type::DataType; use crate::error::ErrorMode; use crate::expr::{Expr, SchemaContext}; +use crate::operator::Operator; #[derive(Debug, Clone, PartialEq, Eq)] pub enum Device { @@ -40,15 +41,27 @@ async fn evaluate_gpu( .collect(); let schema_context = SchemaContext::new().with_columns(&column_map).with_error_mode(error_mode); + match expr { + Expr::Aggregate { op, arg, distinct } => { + evaluate_gpu_aggregate::(*op, arg.as_ref(), *distinct, &schema_context, columns) + .await + } + _ => evaluate_gpu_row::(expr, &schema_context, columns).await, + } +} + +async fn evaluate_gpu_row( + expr: &Expr, schema_context: &SchemaContext, columns: &[Column], +) -> Result>, Box> { let mut code_block = CodeBlock::default(); - expr.to_nvrtc::(&schema_context, &mut code_block)?; + expr.to_nvrtc::(schema_context, &mut code_block)?; let size = columns[0].len(); let input_cols = columns.iter().map(|col| col.to_gpu_column()).collect::, _>>()?; let mut output_cols = Vec::::new(); - let result_type = expr.infer_type(&schema_context)?; + let result_type = expr.infer_type(schema_context)?; let gpu_col = gpu_column::Column::new_uninitialized::( size * result_type.native_size(), @@ -68,3 +81,80 @@ async fn evaluate_gpu( Ok(result_cols) } + +async fn evaluate_gpu_aggregate( + op: Operator, arg: &Expr, distinct: bool, schema_context: &SchemaContext, columns: &[Column], +) -> Result>, Box> { + if distinct { + return Err("DISTINCT aggregates are not supported yet".into()); + } + + let (col_idx, col_type) = match arg { + Expr::Column(col_name) => schema_context + .lookup(col_name) + .copied() + .ok_or_else(|| format!("unknown column: {}", col_name))?, + _ => return Err("Aggregate argument must be a column reference".into()), + }; + + let result_type = + Expr::Aggregate { op, arg: Box::new(arg.clone()), distinct }.infer_type(schema_context)?; + let code = build_aggregate_nvrtc_code::( + op, + col_idx, + col_type, + result_type, + schema_context.error_mode() == ErrorMode::Ansi, + )?; + let input_cols = + columns.iter().map(|col| col.to_gpu_column()).collect::, _>>()?; + + let mut output_cols = Vec::::new(); + let size = 1usize; + let gpu_col = gpu_column::Column::new_uninitialized::( + size * result_type.native_size(), + size.div_ceil(B::BITS), + size, + )?; + output_cols.push(gpu_col); + + cuda_launcher::launch_aggregate::(&code, &input_cols, &output_cols).await?; + + let result_cols = output_cols + .into_iter() + .map(|col| -> Result<_, Box> { + Column::from_gpu_column(&col, "r0", result_type) + }) + .collect::, _>>()?; + + Ok(result_cols) +} + +fn build_aggregate_nvrtc_code( + op: Operator, col_idx: u16, col_type: DataType, result_type: DataType, ansi_error_mode: bool, +) -> Result> { + let input_kernel_type = col_type.kernel_type(); + let output_kernel_type = result_type.kernel_type(); + let bits_type = B::C_TYPE; + + let code = match op { + Operator::Min => format!( + "\taggregate_codegen::min(ctx, input, output, num_rows, {col_idx});\n" + ), + Operator::Max => format!( + "\taggregate_codegen::max(ctx, input, output, num_rows, {col_idx});\n" + ), + Operator::Sum => format!( + "\taggregate_codegen::sum<{ansi_error_mode}, TypeKind::{input_kernel_type}, TypeKind::{output_kernel_type}, {bits_type}>(ctx, input, output, num_rows, {col_idx});\n" + ), + Operator::Avg => format!( + "\taggregate_codegen::avg<{ansi_error_mode}, TypeKind::{input_kernel_type}, TypeKind::{output_kernel_type}, {bits_type}>(ctx, input, output, num_rows, {col_idx});\n" + ), + Operator::Count => format!( + "\taggregate_codegen::count(ctx, input, output, num_rows, {col_idx});\n" + ), + _ => return Err(format!("Unsupported aggregate operator: {:?}", op).into()), + }; + + Ok(code) +} diff --git a/tachyon/compute/src/expr.rs b/tachyon/compute/src/expr.rs index d373996..9d1a55e 100644 --- a/tachyon/compute/src/expr.rs +++ b/tachyon/compute/src/expr.rs @@ -12,7 +12,7 @@ use half::{bf16, f16}; use crate::data_type::DataType; use crate::error::ErrorMode; -use crate::operator::Operator; +pub use crate::operator::Operator; #[derive(Debug, Clone, PartialEq)] pub enum Literal { @@ -47,6 +47,8 @@ pub enum Expr { Call { name: String, args: Vec }, Cast { expr: Box, to: DataType }, + + Aggregate { op: Operator, arg: Box, distinct: bool }, } impl Expr { @@ -126,6 +128,10 @@ impl Expr { Expr::Cast { expr: Box::new(self), to } } + pub fn aggregate(op: Operator, arg: Expr, distinct: bool) -> Self { + Expr::Aggregate { op, arg: Box::new(arg), distinct } + } + pub fn children(&self) -> Vec<&Expr> { match self { Expr::Column(_) | Expr::Literal(_) => vec![], @@ -134,6 +140,7 @@ impl Expr { Expr::Nary { args, .. } => args.iter().map(|x| x.as_ref()).collect(), Expr::Call { args, .. } => args.iter().collect(), Expr::Cast { expr, .. } => vec![expr.as_ref()], + Expr::Aggregate { arg, .. } => vec![arg.as_ref()], } } } @@ -313,6 +320,31 @@ impl Expr { let _ = expr.infer_type(schema)?; Ok(*to) } + + Expr::Aggregate { op, arg, .. } => { + let t = arg.infer_type(schema)?; + match op { + Operator::Count => Ok(DataType::U64), + Operator::Sum | Operator::Avg | Operator::Min | Operator::Max => match t { + DataType::I8 + | DataType::I16 + | DataType::I32 + | DataType::I64 + | DataType::U8 + | DataType::U16 + | DataType::U32 + | DataType::U64 + | DataType::BF16 + | DataType::F16 + | DataType::F32 + | DataType::F64 => Ok(t), + _ => Err(TypeError::Unsupported(format!("aggregate {:?} on {:?}", op, t))), + }, + _ => { + Err(TypeError::Unsupported(format!("not an aggregate operator: {:?}", op))) + } + } + } } } @@ -413,7 +445,17 @@ impl Expr { Ok(Expr::Call { name: name.clone(), args: simplified_args }) } - _ => Ok(self.clone()), + Expr::Cast { expr, to } => { + let simplified_expr = expr.simplify(schema)?; + Ok(Expr::Cast { expr: Box::new(simplified_expr), to: *to }) + } + + Expr::Aggregate { op, arg, distinct } => { + let simplified_arg = arg.simplify(schema)?; + Ok(Expr::Aggregate { op: *op, arg: Box::new(simplified_arg), distinct: *distinct }) + } + + Expr::Column(_) | Expr::Literal(_) => Ok(self.clone()), } } } @@ -495,6 +537,13 @@ impl fmt::Display for Expr { Expr::Nary { op, args } => write!(f, "{}({:?})", op, args), Expr::Call { name, args } => write!(f, "{}({:?})", name, args), Expr::Cast { expr, to } => write!(f, "cast({} as {:?})", expr, to), + Expr::Aggregate { op, arg, distinct } => { + if *distinct { + write!(f, "{}(DISTINCT {})", op, arg) + } else { + write!(f, "{}({})", op, arg) + } + } } } } diff --git a/tachyon/compute/src/operator.rs b/tachyon/compute/src/operator.rs index 3f5ce91..cc43186 100644 --- a/tachyon/compute/src/operator.rs +++ b/tachyon/compute/src/operator.rs @@ -23,6 +23,12 @@ pub enum Operator { Not, Cast, Call, + // Aggregate Operators + Sum, + Count, + Avg, + Min, + Max, } impl Operator { @@ -43,6 +49,14 @@ impl Operator { | Operator::Or ) } + + #[allow(dead_code)] + pub(crate) fn is_aggregate(&self) -> bool { + matches!( + self, + Operator::Sum | Operator::Count | Operator::Avg | Operator::Min | Operator::Max + ) + } } impl From<&str> for Operator { @@ -63,6 +77,11 @@ impl From<&str> for Operator { "neg" => Operator::Neg, "!" | "not" => Operator::Not, "cast" => Operator::Cast, + "sum" => Operator::Sum, + "count" => Operator::Count, + "avg" => Operator::Avg, + "min" => Operator::Min, + "max" => Operator::Max, _ => Operator::Call, } } @@ -87,6 +106,11 @@ impl Operator { Operator::Not => "!", Operator::Cast => "cast", Operator::Call => "call", + Operator::Sum => "sum", + Operator::Count => "count", + Operator::Avg => "avg", + Operator::Min => "min", + Operator::Max => "max", } } } diff --git a/tachyon/compute/src/parser.rs b/tachyon/compute/src/parser.rs index b875523..26cb171 100644 --- a/tachyon/compute/src/parser.rs +++ b/tachyon/compute/src/parser.rs @@ -375,7 +375,29 @@ impl Parser { } } - _ => Ok(Expr::call(op, args)), + _ => { + let agg_op = match op.to_lowercase().as_str() { + "sum" => Some(Operator::Sum), + "count" => Some(Operator::Count), + "avg" => Some(Operator::Avg), + "min" => Some(Operator::Min), + "max" => Some(Operator::Max), + _ => None, + }; + + if let Some(op) = agg_op { + if args.len() != 1 { + return Err(ParseError::WrongArity { + op: format!("{:?}", op), + expected: 1, + got: args.len(), + }); + } + Ok(Expr::Aggregate { op, arg: Box::new(args.remove(0)), distinct: false }) + } else { + Ok(Expr::call(op, args)) + } + } } } diff --git a/tachyon/compute/tests/aggregate_tests.rs b/tachyon/compute/tests/aggregate_tests.rs new file mode 100644 index 0000000..d87d937 --- /dev/null +++ b/tachyon/compute/tests/aggregate_tests.rs @@ -0,0 +1,820 @@ +mod test_utils; +use compute::data_type::DataType; +use compute::error::ErrorMode; +use compute::operator::Operator; +use half::f16; + +use crate::test_utils::CastTo; + +macro_rules! test_eval_aggregate_matrix { + ( + $operator:expr, + $error_mode:expr, + $size_min:expr, + $size_max:expr, + [ + $( + ( $test_name:ident, $native_type:ident, $data_type:expr ) + ),* $(,)? + ] + ) => { + $( + test_eval_aggregate_fn!( + $test_name, + $operator, + $error_mode, + $native_type, + $data_type, + $size_min, + $size_max, + ); + )* + }; +} + +trait FromUsize { + fn from_usize(v: usize) -> Self; +} + +macro_rules! impl_from_usize { + ($($t:ty),* $(,)?) => { + $( + impl FromUsize for $t { + fn from_usize(v: usize) -> Self { + v as $t + } + } + )* + }; +} + +impl_from_usize!(i8, i16, i32, i64, u8, u16, u32, u64, f32, f64); + +impl FromUsize for f16 { + fn from_usize(v: usize) -> Self { + f16::from_f32(v as f32) + } +} + +trait AggAddOps: Sized { + fn add_tachyon(self, rhs: Self) -> Self; + fn add_ansi(self, rhs: Self) -> Option; +} + +macro_rules! impl_agg_add_int { + ($($t:ty),* $(,)?) => { + $( + impl AggAddOps for $t { + fn add_tachyon(self, rhs: Self) -> Self { + self.wrapping_add(rhs) + } + + fn add_ansi(self, rhs: Self) -> Option { + self.checked_add(rhs) + } + } + )* + }; +} + +impl_agg_add_int!(i8, i16, i32, i64, u8, u16, u32, u64); + +impl AggAddOps for f16 { + fn add_tachyon(self, rhs: Self) -> Self { + self + rhs + } + + fn add_ansi(self, rhs: Self) -> Option { + Some(self + rhs) + } +} + +impl AggAddOps for f32 { + fn add_tachyon(self, rhs: Self) -> Self { + self + rhs + } + + fn add_ansi(self, rhs: Self) -> Option { + Some(self + rhs) + } +} + +impl AggAddOps for f64 { + fn add_tachyon(self, rhs: Self) -> Self { + self + rhs + } + + fn add_ansi(self, rhs: Self) -> Option { + Some(self + rhs) + } +} + +macro_rules! random_aggregate_vec { + ($size:expr, i8) => { + random_vec!($size, i8, -64i8, 64i8) + }; + ($size:expr, i16) => { + random_vec!($size, i16, -1024i16, 1024i16) + }; + ($size:expr, i32) => { + random_vec!($size, i32, -10_000i32, 10_000i32) + }; + ($size:expr, i64) => { + random_vec!($size, i64, -100_000i64, 100_000i64) + }; + ($size:expr, u8) => { + random_vec!($size, u8, 0u8, 128u8) + }; + ($size:expr, u16) => { + random_vec!($size, u16, 0u16, 1024u16) + }; + ($size:expr, u32) => { + random_vec!($size, u32, 0u32, 10_000u32) + }; + ($size:expr, u64) => { + random_vec!($size, u64, 0u64, 100_000u64) + }; + ($size:expr, f16) => {{ + use rand::Rng; + let mut rng = rand::rng(); + (0..$size) + .map(|_| f16::from_f32(rng.random_range(-1000.0f32..1000.0f32))) + .collect::>() + }}; + ($size:expr, f32) => { + random_vec!($size, f32, -1000.0f32, 1000.0f32) + }; + ($size:expr, f64) => { + random_vec!($size, f64, -1000.0f64, 1000.0f64) + }; +} + +macro_rules! test_eval_aggregate_fn { + ( + $test_name:ident, + $operator:expr, + $error_mode:expr, + $native_type:ident, + $data_type:expr, + $size_min:expr, + $size_max:expr, + ) => { + #[cfg(feature = "gpu")] + #[tokio::test] + async fn $test_name() { + use compute::data_type::DataType; + use compute::error::ErrorMode; + use compute::evaluate::{Device, evaluate}; + use compute::expr::Expr; + use compute::operator::Operator; + + use crate::create_column; + use crate::test_utils::init_tracing; + + init_tracing(); + let size = random_num!($size_min, $size_max); + let a_vec: Vec<$native_type> = random_aggregate_vec!(size, $native_type); + let a_bit_vec = random_bit_vec!(size, u64); + let col_a = create_column!(a_vec, Some(a_bit_vec.clone()), "a", $data_type); + + let expr = Expr::aggregate($operator, Expr::col("a"), false); + let result = evaluate(Device::GPU, $error_mode, &expr, &[col_a]).await; + + let valid_values: Vec<$native_type> = a_vec + .iter() + .enumerate() + .filter_map(|(i, v)| if a_bit_vec.is_valid(i) { Some(*v) } else { None }) + .collect(); + let error_mode = $error_mode; + + match $operator { + Operator::Count => { + assert!( + result.is_ok(), + "aggregate {:?} for {:?} failed: {:?}", + $operator, + $data_type, + result.as_ref().err() + ); + let result = result.unwrap(); + let out_bits = result[0].null_bits_as_slice().unwrap(); + let output = result[0].data_as_slice::().unwrap(); + assert_eq!(output.len(), 1); + assert!(out_bits.is_valid(0)); + assert_eq!(output[0], valid_values.len() as u64); + } + Operator::Min => { + assert!( + result.is_ok(), + "aggregate {:?} for {:?} failed: {:?}", + $operator, + $data_type, + result.as_ref().err() + ); + let result = result.unwrap(); + let out_bits = result[0].null_bits_as_slice().unwrap(); + let output = result[0].data_as_slice::<$native_type>().unwrap(); + assert_eq!(output.len(), 1); + if valid_values.is_empty() { + assert!(out_bits.is_null(0)); + } else { + let expected = valid_values + .iter() + .copied() + .reduce(|a, b| if a < b { a } else { b }) + .unwrap(); + if $data_type.is_float() { + let expected_f64: f64 = expected.cast(); + let actual_f64: f64 = output[0].cast(); + let eps: f64 = if $data_type == DataType::F16 { + 1e-2_f64.max(expected_f64.abs() * 1e-2) + } else { + 1e-6_f64.max(expected_f64.abs() * 1e-6) + }; + assert!((expected_f64 - actual_f64).abs() <= eps); + } else { + assert_eq!(output[0], expected); + } + } + } + Operator::Max => { + assert!( + result.is_ok(), + "aggregate {:?} for {:?} failed: {:?}", + $operator, + $data_type, + result.as_ref().err() + ); + let result = result.unwrap(); + let out_bits = result[0].null_bits_as_slice().unwrap(); + let output = result[0].data_as_slice::<$native_type>().unwrap(); + assert_eq!(output.len(), 1); + if valid_values.is_empty() { + assert!(out_bits.is_null(0)); + } else { + let expected = valid_values + .iter() + .copied() + .reduce(|a, b| if a > b { a } else { b }) + .unwrap(); + if $data_type.is_float() { + let expected_f64: f64 = expected.cast(); + let actual_f64: f64 = output[0].cast(); + let eps: f64 = if $data_type == DataType::F16 { + 1e-2_f64.max(expected_f64.abs() * 1e-2) + } else { + 1e-6_f64.max(expected_f64.abs() * 1e-6) + }; + assert!((expected_f64 - actual_f64).abs() <= eps); + } else { + assert_eq!(output[0], expected); + } + } + } + Operator::Sum => { + let mut overflowed = false; + let expected = valid_values.iter().copied().fold( + <$native_type as Default>::default(), + |acc, v| match error_mode { + ErrorMode::Ansi => match acc.add_ansi(v) { + Some(next) => next, + None => { + overflowed = true; + acc.add_tachyon(v) + } + }, + ErrorMode::Tachyon => acc.add_tachyon(v), + }, + ); + + if overflowed { + assert!(result.is_err()); + } else { + assert!( + result.is_ok(), + "aggregate {:?} for {:?} failed: {:?}", + $operator, + $data_type, + result.as_ref().err() + ); + let result = result.unwrap(); + let out_bits = result[0].null_bits_as_slice().unwrap(); + let output = result[0].data_as_slice::<$native_type>().unwrap(); + assert_eq!(output.len(), 1); + if valid_values.is_empty() { + assert!(out_bits.is_null(0)); + } else { + if $data_type.is_float() { + let expected_f64: f64 = expected.cast(); + let actual_f64: f64 = output[0].cast(); + let sum_abs: f64 = valid_values + .iter() + .map(|v| { + let x: f64 = (*v).cast(); + x.abs() + }) + .sum(); + let eps: f64 = if $data_type == DataType::F16 { + 1e-1_f64.max(sum_abs * 5e-2) + } else { + 1e-2_f64.max(sum_abs * 1e-4) + }; + assert!((expected_f64 - actual_f64).abs() <= eps); + } else { + assert_eq!(output[0], expected); + } + } + } + } + Operator::Avg => { + let mut overflowed = false; + let sum = valid_values.iter().copied().fold( + <$native_type as Default>::default(), + |acc, v| match error_mode { + ErrorMode::Ansi => match acc.add_ansi(v) { + Some(next) => next, + None => { + overflowed = true; + acc.add_tachyon(v) + } + }, + ErrorMode::Tachyon => acc.add_tachyon(v), + }, + ); + + if overflowed { + assert!(result.is_err()); + } else { + assert!( + result.is_ok(), + "aggregate {:?} for {:?} failed: {:?}", + $operator, + $data_type, + result.as_ref().err() + ); + let result = result.unwrap(); + let out_bits = result[0].null_bits_as_slice().unwrap(); + let output = result[0].data_as_slice::<$native_type>().unwrap(); + assert_eq!(output.len(), 1); + if valid_values.is_empty() { + assert!(out_bits.is_null(0)); + } else { + let cnt = <$native_type as FromUsize>::from_usize(valid_values.len()); + let expected = sum / cnt; + if $data_type.is_float() { + let expected_f64: f64 = expected.cast(); + let actual_f64: f64 = output[0].cast(); + let sum_abs: f64 = valid_values + .iter() + .map(|v| { + let x: f64 = (*v).cast(); + x.abs() + }) + .sum(); + let mean_abs = sum_abs / valid_values.len() as f64; + let eps: f64 = if $data_type == DataType::F16 { + 1e-2_f64.max(mean_abs * 5e-2) + } else { + 1e-3_f64.max(mean_abs * 1e-4) + }; + assert!((expected_f64 - actual_f64).abs() <= eps); + } else { + assert_eq!(output[0], expected); + } + } + } + } + _ => unreachable!("unsupported aggregate operator"), + } + } + }; +} + +test_eval_aggregate_matrix!( + Operator::Min, + ErrorMode::Tachyon, + 256, + 4096, + [ + (test_agg_min_i8, i8, DataType::I8), + (test_agg_min_i16, i16, DataType::I16), + (test_agg_min_i32, i32, DataType::I32), + (test_agg_min_i64, i64, DataType::I64), + (test_agg_min_u8, u8, DataType::U8), + (test_agg_min_u16, u16, DataType::U16), + (test_agg_min_u32, u32, DataType::U32), + (test_agg_min_u64, u64, DataType::U64), + (test_agg_min_f16, f16, DataType::F16), + (test_agg_min_f32, f32, DataType::F32), + (test_agg_min_f64, f64, DataType::F64), + ] +); + +test_eval_aggregate_matrix!( + Operator::Max, + ErrorMode::Tachyon, + 256, + 4096, + [ + (test_agg_max_i8, i8, DataType::I8), + (test_agg_max_i16, i16, DataType::I16), + (test_agg_max_i32, i32, DataType::I32), + (test_agg_max_i64, i64, DataType::I64), + (test_agg_max_u8, u8, DataType::U8), + (test_agg_max_u16, u16, DataType::U16), + (test_agg_max_u32, u32, DataType::U32), + (test_agg_max_u64, u64, DataType::U64), + (test_agg_max_f16, f16, DataType::F16), + (test_agg_max_f32, f32, DataType::F32), + (test_agg_max_f64, f64, DataType::F64), + ] +); + +test_eval_aggregate_matrix!( + Operator::Sum, + ErrorMode::Tachyon, + 256, + 4096, + [ + (test_agg_sum_i8, i8, DataType::I8), + (test_agg_sum_i16, i16, DataType::I16), + (test_agg_sum_i32, i32, DataType::I32), + (test_agg_sum_i64, i64, DataType::I64), + (test_agg_sum_u8, u8, DataType::U8), + (test_agg_sum_u16, u16, DataType::U16), + (test_agg_sum_u32, u32, DataType::U32), + (test_agg_sum_u64, u64, DataType::U64), + (test_agg_sum_f16, f16, DataType::F16), + (test_agg_sum_f32, f32, DataType::F32), + (test_agg_sum_f64, f64, DataType::F64), + ] +); + +test_eval_aggregate_matrix!( + Operator::Avg, + ErrorMode::Tachyon, + 256, + 4096, + [ + (test_agg_avg_i8, i8, DataType::I8), + (test_agg_avg_i16, i16, DataType::I16), + (test_agg_avg_i32, i32, DataType::I32), + (test_agg_avg_i64, i64, DataType::I64), + (test_agg_avg_u8, u8, DataType::U8), + (test_agg_avg_u16, u16, DataType::U16), + (test_agg_avg_u32, u32, DataType::U32), + (test_agg_avg_u64, u64, DataType::U64), + (test_agg_avg_f16, f16, DataType::F16), + (test_agg_avg_f32, f32, DataType::F32), + (test_agg_avg_f64, f64, DataType::F64), + ] +); + +#[cfg(feature = "gpu")] +#[tokio::test] +async fn test_aggregate_sum_ansi_i16_overflow_returns_error() { + use compute::bit_vector::BitVector; + use compute::data_type::DataType; + use compute::error::ErrorMode; + use compute::evaluate::{Device, evaluate}; + use compute::expr::Expr; + use compute::operator::Operator; + + use crate::create_column; + + let a_vec: Vec = vec![30_000, 30_000]; + let a_bits = BitVector::::new_all_valid(a_vec.len()); + let col_a = create_column!(a_vec, Some(a_bits), "a", DataType::I16); + let expr = Expr::aggregate(Operator::Sum, Expr::col("a"), false); + let result = evaluate(Device::GPU, ErrorMode::Ansi, &expr, &[col_a]).await; + assert!(result.is_err()); +} + +#[cfg(feature = "gpu")] +#[tokio::test] +async fn test_aggregate_avg_ansi_i16_overflow_returns_error() { + use compute::bit_vector::BitVector; + use compute::data_type::DataType; + use compute::error::ErrorMode; + use compute::evaluate::{Device, evaluate}; + use compute::expr::Expr; + use compute::operator::Operator; + + use crate::create_column; + + let a_vec: Vec = vec![30_000, 30_000, 30_000]; + let a_bits = BitVector::::new_all_valid(a_vec.len()); + let col_a = create_column!(a_vec, Some(a_bits), "a", DataType::I16); + let expr = Expr::aggregate(Operator::Avg, Expr::col("a"), false); + let result = evaluate(Device::GPU, ErrorMode::Ansi, &expr, &[col_a]).await; + assert!(result.is_err()); +} + +#[cfg(feature = "gpu")] +#[tokio::test] +async fn test_aggregate_sum_ansi_i16_no_overflow_ok() { + use compute::bit_vector::BitVector; + use compute::data_type::DataType; + use compute::error::ErrorMode; + use compute::evaluate::{Device, evaluate}; + use compute::expr::Expr; + use compute::operator::Operator; + + use crate::create_column; + + let a_vec: Vec = vec![100, -10, 20]; + let a_bits = BitVector::::new_all_valid(a_vec.len()); + let col_a = create_column!(a_vec, Some(a_bits), "a", DataType::I16); + let expr = Expr::aggregate(Operator::Sum, Expr::col("a"), false); + let result = evaluate(Device::GPU, ErrorMode::Ansi, &expr, &[col_a]).await.unwrap(); + let out = result[0].data_as_slice::().unwrap(); + assert_eq!(out[0], 110); +} + +test_eval_aggregate_matrix!( + Operator::Count, + ErrorMode::Tachyon, + 256, + 4096, + [ + (test_agg_count_i8, i8, DataType::I8), + (test_agg_count_i16, i16, DataType::I16), + (test_agg_count_i32, i32, DataType::I32), + (test_agg_count_i64, i64, DataType::I64), + (test_agg_count_u8, u8, DataType::U8), + (test_agg_count_u16, u16, DataType::U16), + (test_agg_count_u32, u32, DataType::U32), + (test_agg_count_u64, u64, DataType::U64), + (test_agg_count_f16, f16, DataType::F16), + (test_agg_count_f32, f32, DataType::F32), + (test_agg_count_f64, f64, DataType::F64), + ] +); + +#[cfg(feature = "gpu")] +#[tokio::test] +async fn test_aggregate_min_i32() { + use compute::bit_vector::BitVector; + use compute::data_type::DataType; + use compute::error::ErrorMode; + use compute::evaluate::{Device, evaluate}; + use compute::expr::Expr; + use compute::operator::Operator; + + use crate::create_column; + use crate::test_utils::init_tracing; + init_tracing(); + + let a_vec: Vec = vec![11, -7, 3, 42, 0]; + let a_bit_vec = BitVector::::new_all_valid(a_vec.len()); + let col_a = create_column!(a_vec, Some(a_bit_vec), "a", DataType::I32); + + let expr = Expr::aggregate(Operator::Min, Expr::col("a"), false); + let result = evaluate(Device::GPU, ErrorMode::Tachyon, &expr, &[col_a]).await.unwrap(); + + let output = result[0].data_as_slice::().unwrap(); + assert_eq!(output.len(), 1); + assert_eq!(output[0], -7); +} + +#[cfg(feature = "gpu")] +#[tokio::test] +async fn test_aggregate_sum_i32() { + use compute::bit_vector::BitVector; + use compute::data_type::DataType; + use compute::error::ErrorMode; + use compute::evaluate::{Device, evaluate}; + use compute::expr::Expr; + use compute::operator::Operator; + + use crate::create_column; + use crate::test_utils::init_tracing; + init_tracing(); + + let a_vec: Vec = vec![10, 20, -5, 1]; + let a_bit_vec = BitVector::::new_all_valid(a_vec.len()); + let col_a = create_column!(a_vec, Some(a_bit_vec), "a", DataType::I32); + + let expr = Expr::aggregate(Operator::Sum, Expr::col("a"), false); + let result = evaluate(Device::GPU, ErrorMode::Tachyon, &expr, &[col_a]).await.unwrap(); + + let output = result[0].data_as_slice::().unwrap(); + assert_eq!(output.len(), 1); + assert_eq!(output[0], 26); +} + +#[cfg(feature = "gpu")] +#[tokio::test] +async fn test_aggregate_max_i32() { + use compute::bit_vector::BitVector; + use compute::data_type::DataType; + use compute::error::ErrorMode; + use compute::evaluate::{Device, evaluate}; + use compute::expr::Expr; + use compute::operator::Operator; + + use crate::create_column; + use crate::test_utils::init_tracing; + init_tracing(); + + let a_vec: Vec = vec![10, 20, -5, 1]; + let a_bit_vec = BitVector::::new_all_valid(a_vec.len()); + let col_a = create_column!(a_vec, Some(a_bit_vec), "a", DataType::I32); + + let expr = Expr::aggregate(Operator::Max, Expr::col("a"), false); + let result = evaluate(Device::GPU, ErrorMode::Tachyon, &expr, &[col_a]).await.unwrap(); + + let output = result[0].data_as_slice::().unwrap(); + assert_eq!(output.len(), 1); + assert_eq!(output[0], 20); +} + +#[cfg(feature = "gpu")] +#[tokio::test] +async fn test_aggregate_avg_f64() { + use compute::bit_vector::BitVector; + use compute::data_type::DataType; + use compute::error::ErrorMode; + use compute::evaluate::{Device, evaluate}; + use compute::expr::Expr; + use compute::operator::Operator; + + use crate::create_column; + use crate::test_utils::init_tracing; + init_tracing(); + + let a_vec: Vec = vec![10.0, 20.0, 30.0, 40.0]; + let a_bit_vec = BitVector::::new_all_valid(a_vec.len()); + let col_a = create_column!(a_vec, Some(a_bit_vec), "a", DataType::F64); + + let expr = Expr::aggregate(Operator::Avg, Expr::col("a"), false); + let result = evaluate(Device::GPU, ErrorMode::Tachyon, &expr, &[col_a]).await.unwrap(); + + let output = result[0].data_as_slice::().unwrap(); + assert_eq!(output.len(), 1); + assert!((output[0] - 25.0).abs() < 1e-9); +} + +#[cfg(feature = "gpu")] +#[tokio::test] +async fn test_aggregate_count_i32_with_nulls() { + use compute::bit_vector::BitVector; + use compute::data_type::DataType; + use compute::error::ErrorMode; + use compute::evaluate::{Device, evaluate}; + use compute::expr::Expr; + use compute::operator::Operator; + + use crate::create_column; + use crate::test_utils::init_tracing; + init_tracing(); + + let a_vec: Vec = vec![1, 2, 3, 4, 5]; + let mut a_bit_vec = BitVector::::new_all_valid(a_vec.len()); + a_bit_vec.set_null(1); + a_bit_vec.set_null(4); + + let col_a = create_column!(a_vec, Some(a_bit_vec), "a", DataType::I32); + + let expr = Expr::aggregate(Operator::Count, Expr::col("a"), false); + let result = evaluate(Device::GPU, ErrorMode::Tachyon, &expr, &[col_a]).await.unwrap(); + + let output = result[0].data_as_slice::().unwrap(); + assert_eq!(output.len(), 1); + assert_eq!(output[0], 3); +} + +#[cfg(feature = "gpu")] +#[tokio::test] +async fn test_aggregate_max_i32_with_nulls() { + use compute::bit_vector::BitVector; + use compute::data_type::DataType; + use compute::error::ErrorMode; + use compute::evaluate::{Device, evaluate}; + use compute::expr::Expr; + use compute::operator::Operator; + + use crate::create_column; + use crate::test_utils::init_tracing; + init_tracing(); + + let a_vec: Vec = vec![1, 100, 2, 99, 3]; + let mut a_bit_vec = BitVector::::new_all_valid(a_vec.len()); + a_bit_vec.set_null(1); + a_bit_vec.set_null(3); + let col_a = create_column!(a_vec, Some(a_bit_vec), "a", DataType::I32); + + let expr = Expr::aggregate(Operator::Max, Expr::col("a"), false); + let result = evaluate(Device::GPU, ErrorMode::Tachyon, &expr, &[col_a]).await.unwrap(); + + let output = result[0].data_as_slice::().unwrap(); + assert_eq!(output.len(), 1); + assert_eq!(output[0], 3); +} + +#[cfg(feature = "gpu")] +#[tokio::test] +async fn test_aggregate_avg_f64_with_nulls() { + use compute::bit_vector::BitVector; + use compute::data_type::DataType; + use compute::error::ErrorMode; + use compute::evaluate::{Device, evaluate}; + use compute::expr::Expr; + use compute::operator::Operator; + + use crate::create_column; + use crate::test_utils::init_tracing; + init_tracing(); + + let a_vec: Vec = vec![10.0, 20.0, 30.0, 40.0]; + let mut a_bit_vec = BitVector::::new_all_valid(a_vec.len()); + a_bit_vec.set_null(1); + a_bit_vec.set_null(3); + let col_a = create_column!(a_vec, Some(a_bit_vec), "a", DataType::F64); + + let expr = Expr::aggregate(Operator::Avg, Expr::col("a"), false); + let result = evaluate(Device::GPU, ErrorMode::Tachyon, &expr, &[col_a]).await.unwrap(); + + let output = result[0].data_as_slice::().unwrap(); + assert_eq!(output.len(), 1); + assert!((output[0] - 20.0).abs() < 1e-9); +} + +#[cfg(feature = "gpu")] +#[tokio::test] +async fn test_aggregate_min_all_null_returns_null() { + use compute::bit_vector::BitVector; + use compute::data_type::DataType; + use compute::error::ErrorMode; + use compute::evaluate::{Device, evaluate}; + use compute::expr::Expr; + use compute::operator::Operator; + + use crate::create_column; + use crate::test_utils::init_tracing; + init_tracing(); + + let a_vec: Vec = vec![7, 8, 9]; + let a_bit_vec = BitVector::::new_all_null(a_vec.len()); + let col_a = create_column!(a_vec, Some(a_bit_vec), "a", DataType::I32); + + let expr = Expr::aggregate(Operator::Min, Expr::col("a"), false); + let result = evaluate(Device::GPU, ErrorMode::Tachyon, &expr, &[col_a]).await.unwrap(); + + let bit_vec = result[0].null_bits_as_slice().unwrap(); + assert!(bit_vec.is_null(0)); +} + +#[cfg(feature = "gpu")] +#[tokio::test] +async fn test_aggregate_count_all_null_returns_zero() { + use compute::bit_vector::BitVector; + use compute::data_type::DataType; + use compute::error::ErrorMode; + use compute::evaluate::{Device, evaluate}; + use compute::expr::Expr; + use compute::operator::Operator; + + use crate::create_column; + use crate::test_utils::init_tracing; + init_tracing(); + + let a_vec: Vec = vec![7, 8, 9]; + let a_bit_vec = BitVector::::new_all_null(a_vec.len()); + let col_a = create_column!(a_vec, Some(a_bit_vec), "a", DataType::I32); + + let expr = Expr::aggregate(Operator::Count, Expr::col("a"), false); + let result = evaluate(Device::GPU, ErrorMode::Tachyon, &expr, &[col_a]).await.unwrap(); + + let output = result[0].data_as_slice::().unwrap(); + assert_eq!(output.len(), 1); + assert_eq!(output[0], 0); + let bit_vec = result[0].null_bits_as_slice().unwrap(); + assert!(bit_vec.is_valid(0)); +} + +#[cfg(feature = "gpu")] +#[tokio::test] +async fn test_aggregate_min_f32_ignores_nan_when_non_nan_exists() { + use compute::bit_vector::BitVector; + use compute::data_type::DataType; + use compute::error::ErrorMode; + use compute::evaluate::{Device, evaluate}; + use compute::expr::Expr; + use compute::operator::Operator; + + use crate::create_column; + use crate::test_utils::init_tracing; + init_tracing(); + + let a_vec: Vec = vec![f32::NAN, 4.0, -3.0, f32::NAN, 2.0]; + let a_bit_vec = BitVector::::new_all_valid(a_vec.len()); + let col_a = create_column!(a_vec, Some(a_bit_vec), "a", DataType::F32); + + let expr = Expr::aggregate(Operator::Min, Expr::col("a"), false); + let result = evaluate(Device::GPU, ErrorMode::Tachyon, &expr, &[col_a]).await.unwrap(); + + let output = result[0].data_as_slice::().unwrap(); + assert_eq!(output.len(), 1); + assert_eq!(output[0], -3.0); +} diff --git a/tachyon/compute/tests/parser_tests.rs b/tachyon/compute/tests/parser_tests.rs index 32fd4ff..6735c08 100644 --- a/tachyon/compute/tests/parser_tests.rs +++ b/tachyon/compute/tests/parser_tests.rs @@ -90,6 +90,35 @@ test_parser_matrix!( [(test_parse_sqrt, "(sqrt, i0)", "sqrt"), (test_parse_upper, "(upper, i0)", "upper"),] ); +macro_rules! test_parse_aggregate { + ( + [ + $( + ( $test_name:ident, $expr:expr, $op:expr ) + ),* $(,)? + ] + ) => { + $( + #[test] + fn $test_name() { + let result = parse_scheme_expr($expr).unwrap(); + match result { + Expr::Aggregate { op, .. } => assert_eq!(op, $op), + _ => panic!("Expected Aggregate expression, got {:?}", result), + } + } + )* + }; +} + +test_parse_aggregate!([ + (test_parse_sum, "(sum, i0)", Operator::Sum), + (test_parse_count, "(count, i0)", Operator::Count), + (test_parse_avg, "(avg, i0)", Operator::Avg), + (test_parse_min, "(min, i0)", Operator::Min), + (test_parse_max, "(max, i0)", Operator::Max), +]); + #[test] fn test_nested() { let expr = parse_scheme_expr("(*, (+ , i0, 1), 2.5)").unwrap(); diff --git a/tachyon/gpu/src/cuda_launcher.rs b/tachyon/gpu/src/cuda_launcher.rs index 0b7debc..1f84d0d 100644 --- a/tachyon/gpu/src/cuda_launcher.rs +++ b/tachyon/gpu/src/cuda_launcher.rs @@ -21,6 +21,30 @@ use crate::ffi::memory::device_memory::DeviceMemory; use crate::ffi::nvrtc::*; use crate::kernel_cache::get_or_compile_kernel; +const EVAL_KERNEL_HEADERS_FINGERPRINT: &str = concat!( + include_str!("ffi/kernels/types.cuh"), + include_str!("ffi/kernels/column.cuh"), + include_str!("ffi/kernels/context.cuh"), + include_str!("ffi/kernels/math.cuh"), + include_str!("ffi/kernels/limits.cuh"), + include_str!("ffi/kernels/utils.cuh"), + include_str!("ffi/kernels/bitVector.cuh"), + include_str!("ffi/kernels/error.h"), +); + +const AGGREGATE_KERNEL_HEADERS_FINGERPRINT: &str = concat!( + include_str!("ffi/kernels/types.cuh"), + include_str!("ffi/kernels/column.cuh"), + include_str!("ffi/kernels/context.cuh"), + include_str!("ffi/kernels/math.cuh"), + include_str!("ffi/kernels/aggregate.cuh"), + include_str!("ffi/kernels/aggregate_codegen.cuh"), + include_str!("ffi/kernels/limits.cuh"), + include_str!("ffi/kernels/utils.cuh"), + include_str!("ffi/kernels/bitVector.cuh"), + include_str!("ffi/kernels/error.h"), +); + #[inline(always)] fn compose_kernel_source(kernel_name: &str, code: &str) -> String { let kernel_source = formatdoc! {r#" @@ -38,12 +62,36 @@ fn compose_kernel_source(kernel_name: &str, code: &str) -> String { kernel_source } +#[inline(always)] +fn compose_aggregate_kernel_source(kernel_name: &str, code: &str) -> String { + let kernel_source = formatdoc! {r#" + #include "types.cuh" + #include "column.cuh" + #include "context.cuh" + #include "math.cuh" + #include "aggregate.cuh" + #include "aggregate_codegen.cuh" + extern "C" __global__ void {kernel_name}(Context* ctx, Column* input, Column* output, size_t num_rows) {{ + if (blockIdx.x != 0) return; + + {code} + }} + "#}; + kernel_source +} + pub fn kernel_name(code: &str) -> String { + scoped_kernel_name("kernel", code, EVAL_KERNEL_HEADERS_FINGERPRINT) +} + +fn scoped_kernel_name(scope: &str, code: &str, headers_fingerprint: &str) -> String { let mut hasher = Sha256::new(); + hasher.update(scope.as_bytes()); hasher.update(code.as_bytes()); + hasher.update(headers_fingerprint.as_bytes()); let result = hasher.finalize(); - format!("kernel_{:x}", result) + format!("{}_{}", scope, result.iter().map(|b| format!("{:02x}", b)).collect::()) } fn build_or_load_kernel( @@ -104,6 +152,23 @@ async fn launch_kernel( Ok(()) } +async fn launch_aggregate_kernel( + kernel: *const std::ffi::c_void, context_ptr: u64, input_ptr: u64, output_ptr: u64, size: usize, +) -> GpuResult<()> { + let mut args = [ + &context_ptr as *const u64 as *mut std::ffi::c_void, + &input_ptr as *const u64 as *mut std::ffi::c_void, + &output_ptr as *const u64 as *mut std::ffi::c_void, + &size as *const usize as *mut std::ffi::c_void, + ]; + let block_size = 256u32; + debug!("Launching aggregate kernel with grid size {} and block size {}", 1, block_size); + + cuda::launch_kernel(kernel, 1, block_size, args.as_mut_ptr()).await?; + cuda::synchronize()?; + Ok(()) +} + pub async fn launch(code: &str, input: &[Column], output: &[Column]) -> GpuResult<()> { cuda::init_cuda()?; let device = cuda::get_device(0)?; @@ -139,6 +204,44 @@ pub async fn launch(code: &str, input: &[Column], output: &[Column]) - Ok(()) } +pub async fn launch_aggregate( + code: &str, input: &[Column], output: &[Column], +) -> GpuResult<()> { + cuda::init_cuda()?; + let device = cuda::get_device(0)?; + + let kernel_name = + scoped_kernel_name("aggregate_kernel", code, AGGREGATE_KERNEL_HEADERS_FINGERPRINT); + let kernel_source = compose_aggregate_kernel_source(&kernel_name, code); + debug!("{:#}", kernel_source); + let kernel = build_or_load_kernel(&kernel_name, &kernel_source, device)?; + + let input_ffi: Vec> = input.iter().map(|col| col.as_ffi_column()).collect(); + let output_ffi: Vec> = output.iter().map(|col| col.as_ffi_column()).collect(); + + let host_ctx = ContextFFI { error_code: 0 }; + let device_ctx = DeviceMemory::from_slice(&[host_ctx])?; + let dm_input = DeviceMemory::from_slice(&input_ffi)?; + let dm_output = DeviceMemory::from_slice(&output_ffi)?; + if !input.is_empty() { + launch_aggregate_kernel( + kernel, + device_ctx.device_ptr() as u64, + dm_input.device_ptr() as u64, + dm_output.device_ptr() as u64, + input[0].num_rows, + ) + .await?; + } + cuda::last_error()?; + + let host_ctx = device_ctx.to_vec::().unwrap(); + + (host_ctx[0].error_code as KernelErrorCode).check_with_context("Kernel Error")?; + + Ok(()) +} + #[repr(C)] #[derive(Clone, Copy, Debug)] pub struct ContextFFI { diff --git a/tachyon/gpu/src/ffi/cuda_runtime.rs b/tachyon/gpu/src/ffi/cuda_runtime.rs index 21a651e..916919e 100644 --- a/tachyon/gpu/src/ffi/cuda_runtime.rs +++ b/tachyon/gpu/src/ffi/cuda_runtime.rs @@ -4,8 +4,9 @@ * This source code is licensed under the Apache License, Version 2.0, * as found in the LICENSE file in the root directory of this source tree. */ -use std::ffi::c_void; +use std::ffi::{CString, c_void}; use std::ptr; +use std::sync::{Mutex, OnceLock}; use tracing::debug; @@ -66,7 +67,15 @@ unsafe extern "C" { } pub mod cuda { + use std::collections::HashSet; + use super::*; + + fn initialized_context_devices() -> &'static Mutex> { + static DEVICES: OnceLock>> = OnceLock::new(); + DEVICES.get_or_init(|| Mutex::new(HashSet::new())) + } + #[inline] pub fn init_cuda() -> CudaResult<()> { unsafe { cuInit(0) }.check_with_context("cuInit") @@ -127,8 +136,19 @@ pub mod cuda { #[inline] pub fn create_context(device: i32) -> CudaResult<()> { + { + let devices = initialized_context_devices().lock().unwrap(); + if devices.contains(&device) { + return Ok(()); + } + } + let mut context: *mut std::ffi::c_void = ptr::null_mut(); - unsafe { cuCtxCreate_v2(&mut context, 0, device) }.check_with_context("cuCtxCreate_v2") + unsafe { cuCtxCreate_v2(&mut context, 0, device) }.check_with_context("cuCtxCreate_v2")?; + + let mut devices = initialized_context_devices().lock().unwrap(); + devices.insert(device); + Ok(()) } #[inline] @@ -141,14 +161,10 @@ pub mod cuda { pub fn module_get_function( function: &mut *mut c_void, module: *mut std::ffi::c_void, name: &str, ) -> CudaResult<()> { - unsafe { - cuModuleGetFunction( - function as *mut *mut c_void, - module, - name.as_ptr() as *const std::ffi::c_char, - ) - } - .check_with_context("cuModuleGetFunction") + let c_name = + CString::new(name).expect("kernel function name must not contain interior NUL bytes"); + unsafe { cuModuleGetFunction(function as *mut *mut c_void, module, c_name.as_ptr()) } + .check_with_context("cuModuleGetFunction") } #[inline] diff --git a/tachyon/gpu/src/ffi/kernels/aggregate.cuh b/tachyon/gpu/src/ffi/kernels/aggregate.cuh new file mode 100644 index 0000000..6700e20 --- /dev/null +++ b/tachyon/gpu/src/ffi/kernels/aggregate.cuh @@ -0,0 +1,454 @@ +/* + * Copyright (c) NeoCraft Technologies. + * + * This source code is licensed under the Apache License, Version 2.0, + * as found in the LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include "column.cuh" +#include "math.cuh" +#include "types.cuh" + +namespace aggregate { + +constexpr int AGGREGATE_BLOCK_SIZE = 256; +constexpr int WARP_SIZE = 32; +constexpr int MAX_WARPS_PER_BLOCK = AGGREGATE_BLOCK_SIZE / WARP_SIZE; + +template +__device__ __forceinline__ T shfl_down_value(T value, int offset, + unsigned mask = 0xffffffffu) { + if constexpr (sizeof(T) == 8) { + union { + T value; + uint64_t bits; + } payload; + payload.value = value; + payload.bits = __shfl_down_sync(mask, payload.bits, offset); + return payload.value; + } else if constexpr (sizeof(T) == 4) { + union { + T value; + uint32_t bits; + } payload; + payload.value = value; + payload.bits = __shfl_down_sync(mask, payload.bits, offset); + return payload.value; + } else if constexpr (sizeof(T) == 2) { + union { + T value; + uint16_t bits; + } payload; + payload.value = value; + uint32_t widened = static_cast(payload.bits); + widened = __shfl_down_sync(mask, widened, offset); + payload.bits = static_cast(widened); + return payload.value; + } else if constexpr (sizeof(T) == 1) { + union { + T value; + uint8_t bits; + } payload; + payload.value = value; + uint32_t widened = static_cast(payload.bits); + widened = __shfl_down_sync(mask, widened, offset); + payload.bits = static_cast(widened); + return payload.value; + } else { + return value; + } +} + +template <> +__device__ __forceinline__ float16 shfl_down_value(float16 value, + int offset, + unsigned mask) { + uint32_t widened = static_cast(__half_as_ushort(value)); + widened = __shfl_down_sync(mask, widened, offset); + return __ushort_as_half(static_cast(widened)); +} + +template <> +__device__ __forceinline__ bfloat16 shfl_down_value(bfloat16 value, + int offset, + unsigned mask) { + uint32_t widened = static_cast(__bfloat16_as_ushort(value)); + widened = __shfl_down_sync(mask, widened, offset); + return __ushort_as_bfloat16(static_cast(widened)); +} + +template +__device__ __forceinline__ void +combine_min(bool &lhs_valid, typename T::NativeType &lhs_value, bool rhs_valid, + typename T::NativeType rhs_value) { + if (!rhs_valid) { + return; + } + if (!lhs_valid) { + lhs_valid = true; + lhs_value = rhs_value; + return; + } + + if constexpr (T::is_floating) { + const bool lhs_nan = cuda_utils::is_nan(lhs_value); + const bool rhs_nan = cuda_utils::is_nan(rhs_value); + if (lhs_nan && !rhs_nan) { + lhs_value = rhs_value; + } else if (!lhs_nan && !rhs_nan && rhs_value < lhs_value) { + lhs_value = rhs_value; + } + } else if (rhs_value < lhs_value) { + lhs_value = rhs_value; + } +} + +template +__device__ __forceinline__ void +combine_max(bool &lhs_valid, typename T::NativeType &lhs_value, bool rhs_valid, + typename T::NativeType rhs_value) { + if (!rhs_valid) { + return; + } + if (!lhs_valid) { + lhs_valid = true; + lhs_value = rhs_value; + return; + } + + if constexpr (T::is_floating) { + const bool lhs_nan = cuda_utils::is_nan(lhs_value); + const bool rhs_nan = cuda_utils::is_nan(rhs_value); + if (lhs_nan && !rhs_nan) { + lhs_value = rhs_value; + } else if (!lhs_nan && !rhs_nan && rhs_value > lhs_value) { + lhs_value = rhs_value; + } + } else if (rhs_value > lhs_value) { + lhs_value = rhs_value; + } +} + +template +__device__ __forceinline__ void +block_reduce_min(bool &value_valid, typename T::NativeType &value) { + const int lane = threadIdx.x & (WARP_SIZE - 1); + const int warp_id = threadIdx.x / WARP_SIZE; + const int num_warps = (blockDim.x + WARP_SIZE - 1) / WARP_SIZE; + const unsigned warp_mask = __activemask(); + + for (int offset = WARP_SIZE / 2; offset > 0; offset >>= 1) { + const bool other_valid = + __shfl_down_sync(warp_mask, static_cast(value_valid), + offset) != 0; + const auto other_value = shfl_down_value(value, offset, warp_mask); + combine_min(value_valid, value, other_valid, other_value); + } + + __shared__ bool shared_valid[MAX_WARPS_PER_BLOCK]; + __shared__ typename T::NativeType shared_value[MAX_WARPS_PER_BLOCK]; + + if (lane == 0) { + shared_valid[warp_id] = value_valid; + shared_value[warp_id] = value; + } + __syncthreads(); + + if (warp_id == 0) { + bool final_valid = (lane < num_warps) ? shared_valid[lane] : false; + typename T::NativeType final_value = + (lane < num_warps) ? shared_value[lane] : T::zero(); + + for (int offset = WARP_SIZE / 2; offset > 0; offset >>= 1) { + const bool other_valid = + __shfl_down_sync(warp_mask, static_cast(final_valid), + offset) != 0; + const auto other_value = shfl_down_value(final_value, offset, warp_mask); + combine_min(final_valid, final_value, other_valid, other_value); + } + + if (lane == 0) { + shared_valid[0] = final_valid; + shared_value[0] = final_value; + } + } + __syncthreads(); + + value_valid = shared_valid[0]; + value = shared_value[0]; +} + +template +__device__ __forceinline__ void +block_reduce_max(bool &value_valid, typename T::NativeType &value) { + const int lane = threadIdx.x & (WARP_SIZE - 1); + const int warp_id = threadIdx.x / WARP_SIZE; + const int num_warps = (blockDim.x + WARP_SIZE - 1) / WARP_SIZE; + const unsigned warp_mask = __activemask(); + + for (int offset = WARP_SIZE / 2; offset > 0; offset >>= 1) { + const bool other_valid = + __shfl_down_sync(warp_mask, static_cast(value_valid), + offset) != 0; + const auto other_value = shfl_down_value(value, offset, warp_mask); + combine_max(value_valid, value, other_valid, other_value); + } + + __shared__ bool shared_valid[MAX_WARPS_PER_BLOCK]; + __shared__ typename T::NativeType shared_value[MAX_WARPS_PER_BLOCK]; + + if (lane == 0) { + shared_valid[warp_id] = value_valid; + shared_value[warp_id] = value; + } + __syncthreads(); + + if (warp_id == 0) { + bool final_valid = (lane < num_warps) ? shared_valid[lane] : false; + typename T::NativeType final_value = + (lane < num_warps) ? shared_value[lane] : T::zero(); + + for (int offset = WARP_SIZE / 2; offset > 0; offset >>= 1) { + const bool other_valid = + __shfl_down_sync(warp_mask, static_cast(final_valid), + offset) != 0; + const auto other_value = shfl_down_value(final_value, offset, warp_mask); + combine_max(final_valid, final_value, other_valid, other_value); + } + + if (lane == 0) { + shared_valid[0] = final_valid; + shared_value[0] = final_value; + } + } + __syncthreads(); + + value_valid = shared_valid[0]; + value = shared_value[0]; +} + +template __device__ __forceinline__ NativeT zero_value() { + return static_cast(0); +} + +template <> __device__ __forceinline__ float16 zero_value() { + return __float2half(0.0f); +} + +template <> __device__ __forceinline__ bfloat16 zero_value() { + return __float2bfloat16(0.0f); +} + +template +__device__ __forceinline__ bool +add_checked(C *ctx, bool &lhs_valid, typename T::NativeType &lhs_value, + bool rhs_valid, typename T::NativeType rhs_value) { + if (!rhs_valid) { + return true; + } + if (!lhs_valid) { + lhs_valid = true; + lhs_value = rhs_value; + return true; + } + + if constexpr (ErrorMode && T::is_integral) { + T lhs; + lhs.valid = true; + lhs.value = lhs_value; + T rhs; + rhs.valid = true; + rhs.value = rhs_value; + T out = math::add(ctx, lhs, rhs); + if (!out.valid) { + lhs_valid = false; + return false; + } + lhs_value = out.value; + return true; + } else { + lhs_value += rhs_value; + return true; + } +} + +template +__device__ __forceinline__ void +block_reduce_sum_count(C *ctx, bool &sum_valid, typename T::NativeType &sum, + uint64_t &count) { + const int lane = threadIdx.x & (WARP_SIZE - 1); + const int warp_id = threadIdx.x / WARP_SIZE; + const int num_warps = (blockDim.x + WARP_SIZE - 1) / WARP_SIZE; + const unsigned warp_mask = __activemask(); + + for (int offset = WARP_SIZE / 2; offset > 0; offset >>= 1) { + const bool other_valid = + __shfl_down_sync(warp_mask, static_cast(sum_valid), + offset) != 0; + const auto other_sum = shfl_down_value(sum, offset, warp_mask); + add_checked(ctx, sum_valid, sum, other_valid, other_sum); + count += __shfl_down_sync(warp_mask, count, offset); + } + + __shared__ bool shared_valid[MAX_WARPS_PER_BLOCK]; + __shared__ typename T::NativeType shared_sum[MAX_WARPS_PER_BLOCK]; + __shared__ uint64_t shared_count[MAX_WARPS_PER_BLOCK]; + + if (lane == 0) { + shared_valid[warp_id] = sum_valid; + shared_sum[warp_id] = sum; + shared_count[warp_id] = count; + } + __syncthreads(); + + if (warp_id == 0) { + bool final_valid = (lane < num_warps) ? shared_valid[lane] : false; + typename T::NativeType final_sum = + (lane < num_warps) ? shared_sum[lane] + : zero_value(); + uint64_t final_count = (lane < num_warps) ? shared_count[lane] : 0; + + for (int offset = WARP_SIZE / 2; offset > 0; offset >>= 1) { + const bool other_valid = + __shfl_down_sync(warp_mask, static_cast(final_valid), + offset) != 0; + const auto other_sum = shfl_down_value(final_sum, offset, warp_mask); + add_checked(ctx, final_valid, final_sum, other_valid, + other_sum); + final_count += __shfl_down_sync(warp_mask, final_count, offset); + } + + if (lane == 0) { + shared_valid[0] = final_valid; + shared_sum[0] = final_sum; + shared_count[0] = final_count; + } + } + __syncthreads(); + + sum_valid = shared_valid[0]; + sum = shared_sum[0]; + count = shared_count[0]; +} + +template +__device__ __forceinline__ kind_to_wrapper_t min(const Column &col, + size_t num_rows) { + using T = kind_to_wrapper_t; + bool local_valid = false; + typename T::NativeType local_value = T::zero(); + + for (size_t i = threadIdx.x; i < num_rows; i += blockDim.x) { + const auto v = col.load(i); + combine_min(local_valid, local_value, v.valid, v.value); + } + + block_reduce_min(local_valid, local_value); + + T result; + result.valid = local_valid; + result.value = local_value; + return result; +} + +template +__device__ __forceinline__ kind_to_wrapper_t max(const Column &col, + size_t num_rows) { + using T = kind_to_wrapper_t; + bool local_valid = false; + typename T::NativeType local_value = T::zero(); + + for (size_t i = threadIdx.x; i < num_rows; i += blockDim.x) { + const auto v = col.load(i); + combine_max(local_valid, local_value, v.valid, v.value); + } + + block_reduce_max(local_valid, local_value); + + T result; + result.valid = local_valid; + result.value = local_value; + return result; +} + +template +__device__ __forceinline__ kind_to_wrapper_t sum(C *ctx, const Column &col, + size_t num_rows) { + using T = kind_to_wrapper_t; + typename T::NativeType local_sum = zero_value(); + bool local_valid = false; + uint64_t local_count = 0; + + for (size_t i = threadIdx.x; i < num_rows; i += blockDim.x) { + const auto v = col.load(i); + if (v.valid) { + add_checked(ctx, local_valid, local_sum, true, v.value); + ++local_count; + } + } + + block_reduce_sum_count(ctx, local_valid, local_sum, + local_count); + + T result; + result.valid = (local_count > 0) && local_valid; + result.value = local_sum; + return result; +} + +template +__device__ __forceinline__ UInt64 count(const Column &col, size_t num_rows) { + uint64_t local_count = 0; + + for (size_t i = threadIdx.x; i < num_rows; i += blockDim.x) { + if (col.template is_valid(i)) { + ++local_count; + } + } + + bool sum_valid = true; + uint64_t sum = local_count; + uint64_t count_dummy = 0; + block_reduce_sum_count(static_cast(nullptr), + sum_valid, sum, count_dummy); + + UInt64 result; + result.valid = true; + result.value = sum; + return result; +} + +template +__device__ __forceinline__ kind_to_wrapper_t avg(C *ctx, const Column &col, + size_t num_rows) { + using T = kind_to_wrapper_t; + typename T::NativeType local_sum = zero_value(); + bool local_valid = false; + uint64_t local_count = 0; + + for (size_t i = threadIdx.x; i < num_rows; i += blockDim.x) { + const auto v = col.load(i); + if (v.valid) { + add_checked(ctx, local_valid, local_sum, true, v.value); + ++local_count; + } + } + + block_reduce_sum_count(ctx, local_valid, local_sum, + local_count); + + T result; + if (local_count == 0 || !local_valid) { + result.valid = false; + result.value = zero_value(); + return result; + } + + result.valid = true; + result.value = local_sum / static_cast(local_count); + return result; +} + +} // namespace aggregate diff --git a/tachyon/gpu/src/ffi/kernels/aggregate_codegen.cuh b/tachyon/gpu/src/ffi/kernels/aggregate_codegen.cuh new file mode 100644 index 0000000..4ce3277 --- /dev/null +++ b/tachyon/gpu/src/ffi/kernels/aggregate_codegen.cuh @@ -0,0 +1,68 @@ +/* + * Copyright (c) NeoCraft Technologies. + * + * This source code is licensed under the Apache License, Version 2.0, + * as found in the LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include "aggregate.cuh" +#include "column.cuh" +#include "types.cuh" + +namespace aggregate_codegen { + +template +__device__ __forceinline__ void min(C *ctx, Column *input, Column *output, + size_t num_rows, uint16_t col_idx) { + (void)ctx; + auto agg = aggregate::min(input[col_idx], num_rows); + if (threadIdx.x == 0 && blockIdx.x == 0) { + output[0].store(0, agg); + } +} + +template +__device__ __forceinline__ void max(C *ctx, Column *input, Column *output, + size_t num_rows, uint16_t col_idx) { + (void)ctx; + auto agg = aggregate::max(input[col_idx], num_rows); + if (threadIdx.x == 0 && blockIdx.x == 0) { + output[0].store(0, agg); + } +} + +template +__device__ __forceinline__ void sum(C *ctx, Column *input, Column *output, + size_t num_rows, uint16_t col_idx) { + auto agg = + aggregate::sum(ctx, input[col_idx], num_rows); + if (threadIdx.x == 0 && blockIdx.x == 0) { + output[0].store(0, agg); + } +} + +template +__device__ __forceinline__ void avg(C *ctx, Column *input, Column *output, + size_t num_rows, uint16_t col_idx) { + auto agg = + aggregate::avg(ctx, input[col_idx], num_rows); + if (threadIdx.x == 0 && blockIdx.x == 0) { + output[0].store(0, agg); + } +} + +template +__device__ __forceinline__ void count(C *ctx, Column *input, Column *output, + size_t num_rows, uint16_t col_idx) { + (void)ctx; + auto agg = aggregate::count(input[col_idx], num_rows); + if (threadIdx.x == 0 && blockIdx.x == 0) { + output[0].store(0, agg); + } +} + +} // namespace aggregate_codegen