Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,4 @@ temp/
Thumbs.db
ehthumbs.db
Desktop.ini
worktrees/
222 changes: 138 additions & 84 deletions src/operators/fractional.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,40 @@ impl Operator for FractionalOperator {
));
}

// Evaluate the first argument to determine bucketing key logic
// Evaluate the first argument to determine bucketing key logic.
// If the first arg is an Array literal, treat all args as buckets (no explicit seed).
// If the first arg is an expression that evaluates to a String, use it as seed.
// If the first arg is an expression that evaluates to null/non-string, return an error
// so the flag engine falls back to the defaultVariant.
let evaluated_first = evaluator.evaluate(&args[0], context)?;
let (bucket_key, start_index) = if let Value::String(s) = &evaluated_first {
// Explicit bucketing key provided
(s.clone(), 1)
} else {
// Fallback: use flagKey + targetingKey from context data
let data = context.root().data().clone();
let targeting_key = data
.get("targetingKey")
.and_then(|v| v.as_str())
.unwrap_or("");
let flag_key = data
.get("$flagd")
.and_then(|v| v.get("flagKey"))
.and_then(|v| v.as_str())
.unwrap_or("");
(format!("{}{}", flag_key, targeting_key), 0)
let (bucket_key, start_index) = match (&args[0], &evaluated_first) {
(_, Value::String(s)) => (s.clone(), 1),
(Value::Array(_), _) => {
// First arg is explicitly an array — no seed provided, use flagKey+targetingKey
let root = context.root();
let data = root.data();
let targeting_key = data
.get("targetingKey")
.and_then(|v| v.as_str())
.unwrap_or("");
let flag_key = data
.get("$flagd")
.and_then(|v| v.get("flagKey"))
.and_then(|v| v.as_str())
.unwrap_or("");
(format!("{}{}", flag_key, targeting_key), 0)
}
_ => {
// Expression resolved to null or non-string — no valid seed; signal fallback
return Err(DataLogicError::Custom(
"fractional: bucket key expression resolved to a non-string value".into(),
));
}
};

// Parse bucket definitions from remaining arguments
// Parse bucket definitions from remaining arguments.
// Each inner element (name and weight) is evaluated through JSON Logic so that
// nested expressions like {"if": [...]} and {"var": "..."} are resolved first.
let mut bucket_values: Vec<Value> = Vec::new();

if start_index == 1 && args.len() == 2 {
Expand All @@ -66,13 +79,14 @@ impl Operator for FractionalOperator {
for arg in &args[start_index..] {
let evaluated = evaluator.evaluate(arg, context)?;
if let Some(bucket_def) = evaluated.as_array() {
// Each bucket is [name, weight] or [name] (weight=1)
// Each bucket is [name, weight] or [name] (weight=1).
// Evaluate each inner element so nested JSON Logic is resolved.
if bucket_def.len() >= 2 {
bucket_values.push(bucket_def[0].clone());
bucket_values.push(bucket_def[1].clone());
bucket_values.push(evaluator.evaluate(&bucket_def[0], context)?);
bucket_values.push(evaluator.evaluate(&bucket_def[1], context)?);
} else if bucket_def.len() == 1 {
// Shorthand: [name] implies weight of 1
bucket_values.push(bucket_def[0].clone());
bucket_values.push(evaluator.evaluate(&bucket_def[0], context)?);
bucket_values.push(Value::Number(1.into()));
}
} else {
Expand All @@ -85,7 +99,7 @@ impl Operator for FractionalOperator {
}

match fractional(&bucket_key, &bucket_values) {
Ok(bucket_name) => Ok(Value::String(bucket_name)),
Ok(value) => Ok(value),
Err(e) => Err(DataLogicError::Custom(e)),
}
}
Expand All @@ -97,6 +111,13 @@ impl Operator for FractionalOperator {
/// a list of bucket definitions with integer weights. It uses consistent hashing
/// to always assign the same bucket key to the same bucket.
///
/// Bucket names may be any JSON scalar (string, boolean, number). The name is
/// serialised to its JSON representation for display/return, while the hash is
/// computed on the bucket key string only.
///
/// Negative weights are clamped to zero (the bucket still participates in the
/// name list but never receives any traffic).
///
/// # Algorithm
///
/// Uses high-resolution integer arithmetic instead of float-based percentage
Expand All @@ -107,59 +128,64 @@ impl Operator for FractionalOperator {
/// bucket = (u64(hash) * u64(totalWeight)) >> 32
/// ```
///
/// Weights must be non-negative integers summing to at most `i32::MAX`
/// (2,147,483,647).
/// Weights must sum to at most `i32::MAX` (2,147,483,647).
///
/// # Arguments
/// * `bucket_key` - The key to use for bucket assignment (e.g., user ID)
/// * `buckets` - Array of [name, weight, name, weight, ...] values
///
/// # Returns
/// The name of the selected bucket, or an error if the input is invalid
///
/// # Example
/// ```json
/// {"fractional": ["user123", ["control", 50, "treatment", 50]]}
/// ```
/// This will consistently assign "user123" to either "control" or "treatment"
/// based on its hash value.
pub fn fractional(bucket_key: &str, buckets: &[Value]) -> Result<String, String> {
/// The value of the selected bucket as a `serde_json::Value`, or an error if
/// the input is invalid.
pub fn fractional(bucket_key: &str, buckets: &[Value]) -> Result<Value, String> {
if buckets.is_empty() {
return Err("Fractional operator requires at least one bucket".to_string());
}

// Parse bucket definitions: [name1, weight1, name2, weight2, ...]
let mut bucket_defs: Vec<(String, u64)> = Vec::new();
let mut bucket_defs: Vec<(Value, u64)> = Vec::new();
let mut total_weight: u64 = 0;

let mut i = 0;
while i < buckets.len() {
// Get bucket name
let name = match &buckets[i] {
Value::String(s) => s.clone(),
_ => return Err(format!("Bucket name at index {} must be a string", i)),
};

i += 1;
for (i, chunk) in buckets.chunks(2).enumerate() {
// Accept any scalar JSON value as a bucket name.
let name_value = chunk[0].clone();
if matches!(name_value, Value::Object(_) | Value::Array(_)) {
return Err(format!(
"Bucket name at index {} must be a scalar value (string, boolean, or number), got a complex type",
i * 2
));
}

// Get bucket weight
if i >= buckets.len() {
return Err(format!("Missing weight for bucket '{}'", name));
if chunk.len() < 2 {
return Err(format!("Missing weight for bucket at index {}", i * 2));
}

let weight = match &buckets[i] {
Value::Number(n) => n.as_u64().ok_or_else(|| {
format!("Weight for bucket '{}' must be a positive integer", name)
})?,
_ => return Err(format!("Weight for bucket '{}' must be a number", name)),
// Get bucket weight — negative weights are clamped to zero.
let weight: u64 = match &chunk[1] {
Value::Number(n) => {
if let Some(u) = n.as_u64() {
u
} else if let Some(signed) = n.as_i64() {
// Negative integer — clamp to zero
signed.max(0) as u64
} else {
// Float — round down, clamp to zero
n.as_f64().unwrap_or(0.0).max(0.0) as u64
}
}
_ => {
return Err(format!(
"Weight for bucket at index {} must be a number",
i * 2 + 1
))
}
};

total_weight = total_weight
.checked_add(weight)
.ok_or_else(|| "Total weight overflow".to_string())?;

bucket_defs.push((name, weight));
i += 1;
bucket_defs.push((name_value, weight));
}

if bucket_defs.is_empty() {
Expand All @@ -183,22 +209,25 @@ pub fn fractional(bucket_key: &str, buckets: &[Value]) -> Result<String, String>
let hash: u32 = murmurhash3_x86_32(bucket_key.as_bytes(), 0);

// Map the 32-bit hash uniformly into [0, totalWeight) using integer arithmetic.
// This replaces the previous float-based approach (abs(hash)/i32::MAX * 100)
// with higher resolution and no floating-point imprecision.
let bucket_value: u64 = (hash as u64 * total_weight) >> 32;

// Find which bucket this value falls into by accumulating weights
// Find which bucket this value falls into by accumulating weights.
// Buckets with zero weight are skipped (their cumulative weight never advances).
let mut cumulative_weight: u64 = 0;
for (name, weight) in &bucket_defs {
for (value, weight) in &bucket_defs {
cumulative_weight += weight;
if bucket_value < cumulative_weight {
return Ok(name.clone());
return Ok(value.clone());
}
}

// Unreachable for valid inputs: bucket_value < total_weight is always true
// since (hash * total_weight) >> 32 < total_weight. Fall back defensively.
Ok(bucket_defs.last().unwrap().0.clone())
// Unreachable for valid inputs, but fall back to last non-zero bucket defensively.
Ok(bucket_defs
.iter()
.rev()
.find(|(_, w)| *w > 0)
.map(|(v, _)| v.clone())
.unwrap_or(Value::Null))
}

#[cfg(test)]
Expand All @@ -223,13 +252,10 @@ mod tests {
let mut seen_control = false;
let mut seen_treatment = false;
for i in 0..100 {
match fractional(&format!("user-{}", i), &buckets)
.unwrap()
.as_str()
{
"control" => seen_control = true,
"treatment" => seen_treatment = true,
other => panic!("Unexpected bucket: {}", other),
match fractional(&format!("user-{}", i), &buckets).unwrap() {
Value::String(s) if s == "control" => seen_control = true,
Value::String(s) if s == "treatment" => seen_treatment = true,
other => panic!("Unexpected bucket: {:?}", other),
}
}
assert!(seen_control, "control bucket must be reachable");
Expand All @@ -243,13 +269,10 @@ mod tests {
let mut small_count = 0u32;
let mut large_count = 0u32;
for i in 0..1000 {
match fractional(&format!("user-{}", i), &buckets)
.unwrap()
.as_str()
{
"small" => small_count += 1,
"large" => large_count += 1,
other => panic!("Unexpected bucket: {}", other),
match fractional(&format!("user-{}", i), &buckets).unwrap() {
Value::String(s) if s == "small" => small_count += 1,
Value::String(s) if s == "large" => large_count += 1,
other => panic!("Unexpected bucket: {:?}", other),
}
}
// 90/10 split — large should dominate
Expand All @@ -275,9 +298,9 @@ mod tests {
let mut counts = std::collections::HashMap::new();
for i in 0..3000 {
let r = fractional(&format!("u-{}", i), &buckets).unwrap();
*counts.entry(r).or_insert(0u32) += 1;
*counts.entry(r.to_string()).or_insert(0u32) += 1;
}
for bucket in ["a", "b", "c"] {
for bucket in ["\"a\"", "\"b\"", "\"c\""] {
let c = counts.get(bucket).copied().unwrap_or(0);
// Each should get roughly 1/3; allow generous tolerance
assert!(
Expand Down Expand Up @@ -329,7 +352,8 @@ mod tests {

#[test]
fn test_fractional_invalid_name_type() {
let buckets = vec![json!(123), json!(50)];
// Complex types (objects/arrays) are rejected as bucket names
let buckets = vec![json!({"key": "value"}), json!(50)];
assert!(fractional("user-123", &buckets).is_err());
}

Expand All @@ -342,14 +366,13 @@ mod tests {
#[test]
fn test_fractional_single_bucket() {
let buckets = vec![json!("only"), json!(100)];
assert_eq!(fractional("any-key", &buckets).unwrap(), "only");
assert_eq!(fractional("another-key", &buckets).unwrap(), "only");
assert_eq!(fractional("any-key", &buckets).unwrap(), json!("only"));
assert_eq!(fractional("another-key", &buckets).unwrap(), json!("only"));
}

#[test]
fn test_fractional_integer_arithmetic_matches_spec() {
// Verify the formula: bucket_value = (hash as u64 * totalWeight) >> 32
// for a known hash, so the algorithm is pinned against regression.
let key = "test-key";
let hash = murmurhash3_x86_32(key.as_bytes(), 0);
let total_weight: u64 = 100;
Expand All @@ -359,13 +382,44 @@ mod tests {
let result = fractional(key, &buckets).unwrap();

let expected = if expected_bucket_value < 50 {
"low"
json!("low")
} else {
"high"
json!("high")
};
assert_eq!(
result, expected,
"bucket assignment must match the spec formula"
);
}

#[test]
fn test_fractional_boolean_bucket_names() {
// Boolean values are valid bucket names — used when fractional is a condition
let buckets = vec![json!(false), json!(0u64), json!(true), json!(100u64)];
let result = fractional("any-key", &buckets).unwrap();
assert_eq!(
result,
json!(true),
"100% weight on true must always select true"
);
}

#[test]
fn test_fractional_negative_weight_clamped_to_zero() {
// Negative weight is clamped to 0; the other bucket gets 100% of traffic
let buckets = vec![
json!("one"),
Value::Number(serde_json::Number::from(-50i64)),
json!("two"),
json!(100),
];
let result = fractional("any-key", &buckets).unwrap();
assert_eq!(result, json!("two"), "negative weight must be clamped to 0");
}

#[test]
fn test_fractional_all_zero_weights_error() {
let buckets = vec![json!("one"), json!(0), json!("two"), json!(0)];
assert!(fractional("any-key", &buckets).is_err());
}
}
2 changes: 1 addition & 1 deletion testbed
6 changes: 3 additions & 3 deletions tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1616,8 +1616,8 @@ fn test_fractional_distribution_uniformity() {
// 100k sequential keys — dense sample, fast, and tight enough for 10% tolerance.
for i in 0u64..100_000 {
let key = format!("{}", i);
let bucket_str = fractional(&key, &bucket_defs).unwrap();
let bucket_idx: usize = bucket_str.parse().unwrap();
let bucket_val = fractional(&key, &bucket_defs).unwrap();
let bucket_idx: usize = bucket_val.as_str().unwrap().parse().unwrap();
hits[bucket_idx] += 1;
}

Expand Down Expand Up @@ -1656,7 +1656,7 @@ fn test_fractional_boundary_hashes_do_not_panic() {
let result = fractional(key, &buckets);
assert!(result.is_ok(), "key={key:?} should not error: {:?}", result);
assert!(
["a", "b", "c", "d"].contains(&result.unwrap().as_str()),
["a", "b", "c", "d"].contains(&result.unwrap().as_str().unwrap()),
"key={key:?} must map to a valid bucket"
);
}
Expand Down
Loading