diff --git a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java index a7fb7e6ac0..9cfc780f2b 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java @@ -1556,4 +1556,102 @@ public void testAddAndRemoveServerTags() throws Exception { "Server tag PERMANENT_OFFLINE not exists for server 2, the current " + "server tag of this server is TEMPORARY_OFFLINE."); } + + @Test + public void testCreateTableWithInvalidAggFunctionDataType() throws Exception { + TablePath tablePath = + TablePath.of( + DEFAULT_TABLE_PATH.getDatabaseName(), + "test_invalid_data_type_for_aggfunction"); + Map propertiesAggregate = new HashMap<>(); + propertiesAggregate.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "aggregation"); + + Schema schema1 = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("and_value", DataTypes.STRING(), AggFunctions.BOOL_AND()) + .primaryKey("id") + .build(); + TableDescriptor t1 = + TableDescriptor.builder() + .schema(schema1) + .comment("aggregate merge engine table") + .properties(propertiesAggregate) + .build(); + assertThatThrownBy(() -> admin.createTable(tablePath, t1, false).get()) + .cause() + .isInstanceOf(InvalidConfigException.class) + .hasMessageContaining( + "Data type for bool_and column must be 'BooleanType' but was 'STRING'"); + + Schema schema2 = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("count", DataTypes.STRING(), AggFunctions.SUM()) + .primaryKey("id") + .build(); + TableDescriptor t2 = + TableDescriptor.builder() + .schema(schema2) + .comment("aggregate merge engine table") + .properties(propertiesAggregate) + .build(); + assertThatThrownBy(() -> admin.createTable(tablePath, t2, false).get()) + .cause() + .isInstanceOf(InvalidConfigException.class) + .hasMessageContaining("Data type for sum column must be part of [NUMERIC]"); + + Schema schema3 = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("max_value", DataTypes.BOOLEAN(), AggFunctions.MAX()) + .primaryKey("id") + .build(); + TableDescriptor t3 = + TableDescriptor.builder() + .schema(schema3) + .comment("aggregate merge engine table") + .properties(propertiesAggregate) + .build(); + assertThatThrownBy(() -> admin.createTable(tablePath, t3, false).get()) + .cause() + .isInstanceOf(InvalidConfigException.class) + .hasMessageContaining( + "Data type for max column must be part of [CHARACTER_STRING, NUMERIC, DATETIME]"); + + Schema schema4 = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("list_agg_value", DataTypes.BOOLEAN(), AggFunctions.LISTAGG()) + .primaryKey("id") + .build(); + TableDescriptor t4 = + TableDescriptor.builder() + .schema(schema4) + .comment("aggregate merge engine table") + .properties(propertiesAggregate) + .build(); + assertThatThrownBy(() -> admin.createTable(tablePath, t4, false).get()) + .cause() + .isInstanceOf(InvalidConfigException.class) + .hasMessageContaining( + "Data type for listagg column must be part of [CHARACTER_STRING]"); + + Schema schema5 = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column( + "first", + DataTypes.ARRAY(DataTypes.BIGINT()), + AggFunctions.FIRST_VALUE()) + .primaryKey("id") + .build(); + TableDescriptor t5 = + TableDescriptor.builder() + .schema(schema5) + .comment("aggregate merge engine table") + .properties(propertiesAggregate) + .build(); + admin.createTable(tablePath, t5, false).get(); + } } diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunction.java b/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunction.java index 996b3d3e3c..c4a81db475 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunction.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunction.java @@ -18,6 +18,8 @@ package org.apache.fluss.metadata; import org.apache.fluss.annotation.PublicEvolving; +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.types.DataType; import javax.annotation.Nullable; @@ -104,18 +106,31 @@ public boolean hasParameters() { } /** - * Validates all parameters of this aggregation function. + * Validates all parameters and data type of this aggregation function. * *

This method checks that: * *

* - * @throws IllegalArgumentException if any parameter is invalid + * @param fieldType the field data type + * @throws IllegalArgumentException if any parameter is invalid or data type is invalid */ - public void validate() { + public void validate(DataType fieldType) { + validateParameters(); + validateDataType(fieldType); + } + + @VisibleForTesting + void validateDataType(DataType fieldType) { + type.validateDataType(fieldType); + } + + @VisibleForTesting + void validateParameters() { for (Map.Entry entry : parameters.entrySet()) { type.validateParameter(entry.getKey(), entry.getValue()); } diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctionType.java b/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctionType.java index 9cf148ad2e..cd2a0f27a2 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctionType.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctionType.java @@ -18,11 +18,17 @@ package org.apache.fluss.metadata; import org.apache.fluss.annotation.PublicEvolving; +import org.apache.fluss.types.BooleanType; +import org.apache.fluss.types.DataType; +import org.apache.fluss.types.DataTypeFamily; +import java.util.Arrays; import java.util.Collections; import java.util.Locale; import java.util.Set; +import static org.apache.fluss.utils.Preconditions.checkArgument; + /** * Aggregation function type for aggregate merge engine. * @@ -111,6 +117,60 @@ public void validateParameter(String parameterName, String parameterValue) { } } + /** + * Validates a data type for this aggregation function. + * + * @param fieldType the field data type + * @throws IllegalArgumentException if the data type is invalid + */ + public void validateDataType(DataType fieldType) { + switch (this) { + // The bool_and and bool_or don't have specific DataFamily, validate them by + // dataType directly. + case BOOL_AND: + case BOOL_OR: + checkArgument( + fieldType instanceof BooleanType, + "Data type for %s column must be 'BooleanType' but was '%s'.", + toString(), + fieldType); + break; + default: + DataTypeFamily[] dataTypeFamilies = getSupportedDataFamilies(); + checkArgument( + fieldType.isAnyOf(dataTypeFamilies), + "Data type for %s column must be part of %s but was '%s'.", + toString(), + Arrays.deepToString(dataTypeFamilies), + fieldType); + break; + } + } + + private DataTypeFamily[] getSupportedDataFamilies() { + switch (this) { + case SUM: + case PRODUCT: + return new DataTypeFamily[] {DataTypeFamily.NUMERIC}; + case MAX: + case MIN: + return new DataTypeFamily[] { + DataTypeFamily.CHARACTER_STRING, DataTypeFamily.NUMERIC, DataTypeFamily.DATETIME + }; + case LAST_VALUE: + case LAST_VALUE_IGNORE_NULLS: + case FIRST_VALUE: + case FIRST_VALUE_IGNORE_NULLS: + return DataTypeFamily.values(); + case LISTAGG: + case STRING_AGG: + return new DataTypeFamily[] {DataTypeFamily.CHARACTER_STRING}; + default: + throw new IllegalArgumentException( + String.format("%s doesn't support any data type", this)); + } + } + /** * Converts a string to an AggFunctionType enum value. * diff --git a/fluss-common/src/test/java/org/apache/fluss/metadata/TableDescriptorTest.java b/fluss-common/src/test/java/org/apache/fluss/metadata/TableDescriptorTest.java index 430d47d904..9568f9467d 100644 --- a/fluss-common/src/test/java/org/apache/fluss/metadata/TableDescriptorTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/metadata/TableDescriptorTest.java @@ -321,7 +321,7 @@ void testPartitionedTable() { @Test void testInvalidListaggParameterEmptyDelimiter() { // LISTAGG with empty delimiter - should fail - assertThatThrownBy(() -> AggFunctions.LISTAGG("").validate()) + assertThatThrownBy(() -> AggFunctions.LISTAGG("").validateParameters()) .isInstanceOf(IllegalArgumentException.class) .hasMessageContaining("must be a non-empty string"); } @@ -332,7 +332,8 @@ void testInvalidListaggParameterUnknownParameter() { Map params = new HashMap<>(); params.put("unknown_param", "value"); - assertThatThrownBy(() -> AggFunctions.of(AggFunctionType.LISTAGG, params).validate()) + assertThatThrownBy( + () -> AggFunctions.of(AggFunctionType.LISTAGG, params).validateParameters()) .isInstanceOf(IllegalArgumentException.class) .hasMessageContaining("unknown_param") .hasMessageContaining("not supported"); @@ -344,7 +345,7 @@ void testInvalidSumFunctionWithParameters() { Map params = new HashMap<>(); params.put("some_param", "value"); - assertThatThrownBy(() -> AggFunctions.of(AggFunctionType.SUM, params).validate()) + assertThatThrownBy(() -> AggFunctions.of(AggFunctionType.SUM, params).validateParameters()) .isInstanceOf(IllegalArgumentException.class) .hasMessageContaining("some_param") .hasMessageContaining("not supported"); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java index 45065ca415..f63057326e 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java @@ -296,11 +296,12 @@ private static void checkMergeEngine( /** * Validates aggregation function parameters in the schema. * - *

This method delegates to {@link AggFunction#validate()} to ensure all parameters are valid - * according to the function's requirements. + *

This method delegates to {@link AggFunction#validate(DataType)} to ensure all parameters + * and data type are valid according to the function's requirements. * * @param schema the schema to validate - * @throws InvalidConfigException if any aggregation function has invalid parameters + * @throws InvalidConfigException if any aggregation function has invalid parameters or data + * types */ private static void validateAggregationFunctionParameters(Schema schema) { // Get primary key columns for early exit @@ -317,9 +318,9 @@ private static void validateAggregationFunctionParameters(Schema schema) { continue; } - // Validate aggregation function parameters + // Validate aggregation function parameters and data type try { - aggFunctionOpt.get().validate(); + aggFunctionOpt.get().validate(column.getDataType()); } catch (IllegalArgumentException e) { throw new InvalidConfigException( String.format( diff --git a/website/docs/table-design/merge-engines/aggregation.md b/website/docs/table-design/merge-engines/aggregation.md index c416c8a399..c483a7b1d0 100644 --- a/website/docs/table-design/merge-engines/aggregation.md +++ b/website/docs/table-design/merge-engines/aggregation.md @@ -258,7 +258,7 @@ TableDescriptor.builder() Computes the product of values across multiple rows. -- **Supported Data Types**: TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, DECIMAL +- **Supported Data Types**: `TINYINT`, `SMALLINT`, `INT`, `BIGINT`, `FLOAT`, `DOUBLE`, `DECIMAL` - **Behavior**: Multiplies incoming values with the accumulator - **Null Handling**: Null values are ignored @@ -314,7 +314,7 @@ TableDescriptor.builder() Identifies and retains the maximum value. -- **Supported Data Types**: CHAR, STRING, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ +- **Supported Data Types**: `CHAR`, `STRING`, `TINYINT`, `SMALLINT`, `INT`, `BIGINT`, `FLOAT`, `DOUBLE`, `DECIMAL`, `DATE`, `TIME`, `TIMESTAMP`, `TIMESTAMP_LTZ` - **Behavior**: Keeps the larger value between accumulator and incoming value - **Null Handling**: Null values are ignored @@ -372,7 +372,7 @@ TableDescriptor.builder() Identifies and retains the minimum value. -- **Supported Data Types**: CHAR, STRING, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ +- **Supported Data Types**: `CHAR`, `STRING`, `TINYINT`, `SMALLINT`, `INT`, `BIGINT`, `FLOAT`, `DOUBLE`, `DECIMAL`, `DATE`, `TIME`, `TIMESTAMP`, `TIMESTAMP_LTZ` - **Behavior**: Keeps the smaller value between accumulator and incoming value - **Null Handling**: Null values are ignored @@ -695,7 +695,7 @@ TableDescriptor.builder() Concatenates multiple string values into a single string with a delimiter. -- **Supported Data Types**: STRING, CHAR +- **Supported Data Types**: `STRING`, `CHAR` - **Behavior**: Concatenates values using the specified delimiter - **Null Handling**: Null values are skipped - **Delimiter**: Specify delimiter directly in the aggregation function (default is comma `,`) @@ -757,7 +757,7 @@ TableDescriptor.builder() Alias for `listagg`. Concatenates multiple string values into a single string with a delimiter. -- **Supported Data Types**: STRING, CHAR +- **Supported Data Types**: `STRING`, `CHAR` - **Behavior**: Same as `listagg` - concatenates values using the specified delimiter - **Null Handling**: Null values are skipped - **Delimiter**: Specify delimiter directly in the aggregation function (default is comma `,`) @@ -824,7 +824,7 @@ TableDescriptor.builder() Evaluates whether all boolean values in a set are true (logical AND). -- **Supported Data Types**: BOOLEAN +- **Supported Data Types**: `BOOLEAN` - **Behavior**: Returns true only if all values are true - **Null Handling**: Null values are ignored @@ -881,7 +881,7 @@ TableDescriptor.builder() Checks if at least one boolean value in a set is true (logical OR). -- **Supported Data Types**: BOOLEAN +- **Supported Data Types**: `BOOLEAN` - **Behavior**: Returns true if any value is true - **Null Handling**: Null values are ignored