Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 51 additions & 116 deletions datafusion/functions/src/crypto/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,17 @@

//! "crypto" DataFusion functions

use arrow::array::{
Array, ArrayRef, BinaryArray, BinaryArrayType, BinaryViewArray, GenericBinaryArray,
OffsetSizeTrait,
};
use arrow::array::{AsArray, GenericStringArray, StringViewArray};
use arrow::array::{Array, ArrayRef, BinaryArray, BinaryArrayType};
use arrow::array::{AsArray, StringViewArray};
use arrow::datatypes::DataType;
use blake2::{Blake2b512, Blake2s256, Digest};
use blake3::Hasher as Blake3;
use datafusion_common::cast::as_binary_array;

use arrow::compute::StringArrayType;
use datafusion_common::internal_err;
use datafusion_common::{
exec_err, internal_err, plan_err, utils::take_function_args, DataFusionError, Result,
ScalarValue,
exec_err, plan_err, utils::take_function_args, DataFusionError, Result, ScalarValue,
};
use datafusion_expr::ColumnarValue;
use md5::Md5;
Expand Down Expand Up @@ -84,18 +81,7 @@ define_digest_function!(
"computes blake3 hash digest of the given input"
);

macro_rules! digest_to_scalar {
($METHOD: ident, $INPUT:expr) => {{
ScalarValue::Binary($INPUT.as_ref().map(|v| {
let mut digest = $METHOD::default();
digest.update(v);
#[allow(deprecated)]
digest.finalize().as_slice().to_vec()
}))
}};
}

#[derive(Debug, Copy, Clone)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub enum DigestAlgorithm {
Md5,
Sha224,
Expand All @@ -107,23 +93,6 @@ pub enum DigestAlgorithm {
Blake3,
}

/// Digest computes a binary hash of the given data, accepts Utf8 or LargeUtf8 and returns a [`ColumnarValue`].
/// Second argument is the algorithm to use.
/// Standard algorithms are md5, sha1, sha224, sha256, sha384 and sha512.
pub fn digest(args: &[ColumnarValue]) -> Result<ColumnarValue> {
let [data, digest_algorithm] = take_function_args("digest", args)?;
let digest_algorithm = match digest_algorithm {
ColumnarValue::Scalar(scalar) => match scalar.try_as_str() {
Some(Some(method)) => method.parse::<DigestAlgorithm>(),
_ => exec_err!("Unsupported data type {scalar:?} for function digest"),
},
ColumnarValue::Array(_) => {
internal_err!("Digest using dynamically decided method is not yet supported")
}
}?;
digest_process(data, digest_algorithm)
}

impl FromStr for DigestAlgorithm {
type Err = DataFusionError;
fn from_str(name: &str) -> Result<DigestAlgorithm> {
Expand Down Expand Up @@ -183,7 +152,7 @@ pub fn md5(args: &[ColumnarValue]) -> Result<ColumnarValue> {
ColumnarValue::Scalar(ScalarValue::Binary(opt)) => {
ColumnarValue::Scalar(ScalarValue::Utf8View(opt.map(hex_encode::<_>)))
}
_ => return exec_err!("Impossibly got invalid results from digest"),
_ => return internal_err!("Impossibly got invalid results from digest"),
})
}

Expand All @@ -198,25 +167,7 @@ fn hex_encode<T: AsRef<[u8]>>(data: T) -> String {
}
s
}
pub fn utf8_or_binary_to_binary_type(
arg_type: &DataType,
name: &str,
) -> Result<DataType> {
Ok(match arg_type {
DataType::Utf8View
| DataType::LargeUtf8
| DataType::Utf8
| DataType::Binary
| DataType::BinaryView
| DataType::LargeBinary => DataType::Binary,
DataType::Null => DataType::Null,
_ => {
return plan_err!(
"The {name:?} function can only accept strings or binary arrays."
);
}
})
}

macro_rules! digest_to_array {
($METHOD:ident, $INPUT:expr) => {{
let binary_array: BinaryArray = $INPUT
Expand All @@ -232,9 +183,21 @@ macro_rules! digest_to_array {
Arc::new(binary_array)
}};
}

macro_rules! digest_to_scalar {
($METHOD: ident, $INPUT:expr) => {{
ScalarValue::Binary($INPUT.as_ref().map(|v| {
let mut digest = $METHOD::default();
digest.update(v);
#[allow(deprecated)]
digest.finalize().as_slice().to_vec()
}))
}};
}

impl DigestAlgorithm {
/// digest an optional string to its hash value, null values are returned as is
pub fn digest_scalar(self, value: Option<&[u8]>) -> ColumnarValue {
fn digest_scalar(self, value: Option<&[u8]>) -> ColumnarValue {
ColumnarValue::Scalar(match self {
Self::Md5 => digest_to_scalar!(Md5, value),
Self::Sha224 => digest_to_scalar!(Sha224, value),
Expand All @@ -251,49 +214,7 @@ impl DigestAlgorithm {
})
}

/// digest a binary array to their hash values
pub fn digest_binary_array<T>(self, value: &dyn Array) -> Result<ColumnarValue>
where
T: OffsetSizeTrait,
{
let array = match value.data_type() {
DataType::Binary | DataType::LargeBinary => {
let v = value.as_binary::<T>();
self.digest_binary_array_impl::<&GenericBinaryArray<T>>(v)
}
DataType::BinaryView => {
let v = value.as_binary_view();
self.digest_binary_array_impl::<&BinaryViewArray>(v)
}
other => {
return exec_err!("unsupported type for digest_utf_array: {other:?}")
}
};
Ok(ColumnarValue::Array(array))
}

/// digest a string array to their hash values
pub fn digest_utf8_array<T>(self, value: &dyn Array) -> Result<ColumnarValue>
where
T: OffsetSizeTrait,
{
let array = match value.data_type() {
DataType::Utf8 | DataType::LargeUtf8 => {
let v = value.as_string::<T>();
self.digest_utf8_array_impl::<&GenericStringArray<T>>(v)
}
DataType::Utf8View => {
let v = value.as_string_view();
self.digest_utf8_array_impl::<&StringViewArray>(v)
}
other => {
return exec_err!("unsupported type for digest_utf_array: {other:?}")
}
};
Ok(ColumnarValue::Array(array))
}

pub fn digest_utf8_array_impl<'a, StringArrType>(
fn digest_utf8_array_impl<'a, StringArrType>(
self,
input_value: StringArrType,
) -> ArrayRef
Expand Down Expand Up @@ -324,7 +245,7 @@ impl DigestAlgorithm {
}
}

pub fn digest_binary_array_impl<'a, BinaryArrType>(
fn digest_binary_array_impl<'a, BinaryArrType>(
self,
input_value: BinaryArrType,
) -> ArrayRef
Expand Down Expand Up @@ -355,26 +276,40 @@ impl DigestAlgorithm {
}
}
}

pub fn digest_process(
value: &ColumnarValue,
digest_algorithm: DigestAlgorithm,
) -> Result<ColumnarValue> {
match value {
ColumnarValue::Array(a) => match a.data_type() {
DataType::Utf8View => digest_algorithm.digest_utf8_array::<i32>(a.as_ref()),
DataType::Utf8 => digest_algorithm.digest_utf8_array::<i32>(a.as_ref()),
DataType::LargeUtf8 => digest_algorithm.digest_utf8_array::<i64>(a.as_ref()),
DataType::Binary => digest_algorithm.digest_binary_array::<i32>(a.as_ref()),
DataType::LargeBinary => {
digest_algorithm.digest_binary_array::<i64>(a.as_ref())
}
DataType::BinaryView => {
digest_algorithm.digest_binary_array::<i32>(a.as_ref())
}
other => exec_err!(
"Unsupported data type {other:?} for function {digest_algorithm}"
),
},
ColumnarValue::Array(a) => {
let output = match a.data_type() {
DataType::Utf8View => {
digest_algorithm.digest_utf8_array_impl(a.as_string_view())
}
DataType::Utf8 => {
digest_algorithm.digest_utf8_array_impl(a.as_string::<i32>())
}
DataType::LargeUtf8 => {
digest_algorithm.digest_utf8_array_impl(a.as_string::<i64>())
}
DataType::Binary => {
digest_algorithm.digest_binary_array_impl(a.as_binary::<i32>())
}
DataType::LargeBinary => {
digest_algorithm.digest_binary_array_impl(a.as_binary::<i64>())
}
DataType::BinaryView => {
digest_algorithm.digest_binary_array_impl(a.as_binary_view())
}
other => {
return exec_err!(
"Unsupported data type {other:?} for function {digest_algorithm}"
)
}
};
Ok(ColumnarValue::Array(output))
}
ColumnarValue::Scalar(scalar) => {
match scalar {
ScalarValue::Utf8View(a)
Expand Down
42 changes: 32 additions & 10 deletions datafusion/functions/src/crypto/digest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
// specific language governing permissions and limitations
// under the License.

//! "crypto" DataFusion functions
use super::basic::{digest, utf8_or_binary_to_binary_type};
use crate::crypto::basic::{digest_process, DigestAlgorithm};

use arrow::datatypes::DataType;
use datafusion_common::{
exec_err, not_impl_err,
types::{logical_binary, logical_string},
utils::take_function_args,
Result,
};
use datafusion_expr::{
Expand All @@ -36,16 +38,16 @@ use std::any::Any;
syntax_example = "digest(expression, algorithm)",
sql_example = r#"```sql
> select digest('foo', 'sha256');
+------------------------------------------+
| digest(Utf8("foo"), Utf8("sha256")) |
+------------------------------------------+
| <binary_hash_result> |
+------------------------------------------+
+------------------------------------------------------------------+
| digest(Utf8("foo"),Utf8("sha256")) |
+------------------------------------------------------------------+
| 2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae |
+------------------------------------------------------------------+
```"#,
standard_argument(name = "expression", prefix = "String"),
argument(
name = "algorithm",
description = "String expression specifying algorithm to use. Must be one of:
description = "String expression specifying algorithm to use. Must be one of:
- md5
- sha224
- sha256
Expand All @@ -60,6 +62,7 @@ use std::any::Any;
pub struct DigestFunc {
signature: Signature,
}

impl Default for DigestFunc {
fn default() -> Self {
Self::new()
Expand All @@ -85,6 +88,7 @@ impl DigestFunc {
}
}
}

impl ScalarUDFImpl for DigestFunc {
fn as_any(&self) -> &dyn Any {
self
Expand All @@ -98,9 +102,10 @@ impl ScalarUDFImpl for DigestFunc {
&self.signature
}

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
utf8_or_binary_to_binary_type(&arg_types[0], self.name())
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(DataType::Binary)
}

fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
digest(&args.args)
}
Expand All @@ -109,3 +114,20 @@ impl ScalarUDFImpl for DigestFunc {
self.doc()
}
}

/// Digest computes a binary hash of the given data, accepts Utf8 or LargeUtf8 and returns a [`ColumnarValue`].
/// Second argument is the algorithm to use.
/// Standard algorithms are md5, sha1, sha224, sha256, sha384 and sha512.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docs list 'sha1' as a supported algorithm, but DigestAlgorithm doesn't include SHA-1 and no implementation is provided here—consider removing 'sha1' from this list (also applies to other docs if present).

🤖 Was this useful? React with 👍 or 👎

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:documentation; feedback:The Augment AI reviewer is correct! There is no support for SHA1 and since it is a weak algorithm it should not be advertised in the docs. Prevents confusion in the reader.

fn digest(args: &[ColumnarValue]) -> Result<ColumnarValue> {
let [data, digest_algorithm] = take_function_args("digest", args)?;
let digest_algorithm = match digest_algorithm {
ColumnarValue::Scalar(scalar) => match scalar.try_as_str() {
Some(Some(method)) => method.parse::<DigestAlgorithm>(),
_ => exec_err!("Unsupported data type {scalar:?} for function digest"),
},
ColumnarValue::Array(_) => {
not_impl_err!("Digest using dynamically decided method is not yet supported")
}
}?;
digest_process(data, digest_algorithm)
}
Loading
Loading