From 669d493a8f29d39b9a3abef6ff50701e8d85afb2 Mon Sep 17 00:00:00 2001 From: XQ Hu Date: Sat, 31 Jan 2026 20:19:14 -0500 Subject: [PATCH 1/2] feat: Improve YamlUtils.dumpAsMap error handling by identifying non-serializable keys that cause YAML serialization failures. --- .../beam/sdk/schemas/utils/YamlUtils.java | 111 +++++---- .../apache/beam/sdk/util/YamlUtilsTest.java | 220 ++++++++++-------- 2 files changed, 183 insertions(+), 148 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java index d87526e51e6f..8cb09f2d22cc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java @@ -23,6 +23,7 @@ import java.math.BigDecimal; import java.util.Collections; import java.util.List; +import java.util.ArrayList; import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; @@ -36,24 +37,21 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding; import org.checkerframework.checker.nullness.qual.Nullable; import org.yaml.snakeyaml.Yaml; +import org.yaml.snakeyaml.error.YAMLException; public class YamlUtils { - private static final Map> YAML_VALUE_PARSERS = - ImmutableMap - .> - builder() - .put(Schema.TypeName.BYTE, Byte::valueOf) - .put(Schema.TypeName.INT16, Short::valueOf) - .put(Schema.TypeName.INT32, Integer::valueOf) - .put(Schema.TypeName.INT64, Long::valueOf) - .put(Schema.TypeName.FLOAT, Float::valueOf) - .put(Schema.TypeName.DOUBLE, Double::valueOf) - .put(Schema.TypeName.DECIMAL, BigDecimal::new) - .put(Schema.TypeName.BOOLEAN, Boolean::valueOf) - .put(Schema.TypeName.STRING, str -> str) - .put(Schema.TypeName.BYTES, str -> BaseEncoding.base64().decode(str)) - .build(); + private static final Map> YAML_VALUE_PARSERS = ImmutableMap.>builder() + .put(Schema.TypeName.BYTE, Byte::valueOf) + .put(Schema.TypeName.INT16, Short::valueOf) + .put(Schema.TypeName.INT32, Integer::valueOf) + .put(Schema.TypeName.INT64, Long::valueOf) + .put(Schema.TypeName.FLOAT, Float::valueOf) + .put(Schema.TypeName.DOUBLE, Double::valueOf) + .put(Schema.TypeName.DECIMAL, BigDecimal::new) + .put(Schema.TypeName.BOOLEAN, Boolean::valueOf) + .put(Schema.TypeName.STRING, str -> str) + .put(Schema.TypeName.BYTES, str -> BaseEncoding.base64().decode(str)) + .build(); public static Row toBeamRow(@Nullable String yamlString, Schema schema) { return toBeamRow(yamlString, schema, false); @@ -62,10 +60,9 @@ public static Row toBeamRow(@Nullable String yamlString, Schema schema) { public static Row toBeamRow( @Nullable String yamlString, Schema schema, boolean convertNamesToCamelCase) { if (yamlString == null || yamlString.isEmpty()) { - List requiredFields = - schema.getFields().stream() - .filter(field -> !field.getType().getNullable()) - .collect(Collectors.toList()); + List requiredFields = schema.getFields().stream() + .filter(field -> !field.getType().getNullable()) + .collect(Collectors.toList()); if (requiredFields.isEmpty()) { return Row.nullRow(schema); } else { @@ -114,29 +111,26 @@ public static Row toBeamRow( } if (yamlValue instanceof List) { - FieldType innerType = - Preconditions.checkNotNull( - fieldType.getCollectionElementType(), - "Cannot convert YAML type '%s` to `%s` because the YAML value is a List, but the output schema field does not define a collection type.", - yamlValue.getClass(), - fieldType); + FieldType innerType = Preconditions.checkNotNull( + fieldType.getCollectionElementType(), + "Cannot convert YAML type '%s` to `%s` because the YAML value is a List, but the output schema field does not define a collection type.", + yamlValue.getClass(), + fieldType); return ((List) yamlValue) .stream() - .map( - v -> - Preconditions.checkNotNull( - toBeamValue(field.withType(innerType), v, convertNamesToCamelCase))) - .collect(Collectors.toList()); + .map( + v -> Preconditions.checkNotNull( + toBeamValue(field.withType(innerType), v, convertNamesToCamelCase))) + .collect(Collectors.toList()); } if (yamlValue instanceof Map) { if (fieldType.getTypeName() == Schema.TypeName.ROW) { - Schema nestedSchema = - Preconditions.checkNotNull( - fieldType.getRowSchema(), - "Received a YAML '%s' type, but output schema field '%s' does not define a Row Schema", - yamlValue.getClass(), - fieldType); + Schema nestedSchema = Preconditions.checkNotNull( + fieldType.getRowSchema(), + "Received a YAML '%s' type, but output schema field '%s' does not define a Row Schema", + yamlValue.getClass(), + fieldType); return toBeamRow((Map) yamlValue, nestedSchema, convertNamesToCamelCase); } else if (fieldType.getTypeName() == Schema.TypeName.MAP) { return yamlValue; @@ -152,10 +146,9 @@ public static Row toBeamRow( public static Row toBeamRow( @Nullable Map map, Schema rowSchema, boolean toCamelCase) { if (map == null || map.isEmpty()) { - List requiredFields = - rowSchema.getFields().stream() - .filter(field -> !field.getType().getNullable()) - .collect(Collectors.toList()); + List requiredFields = rowSchema.getFields().stream() + .filter(field -> !field.getType().getNullable()) + .collect(Collectors.toList()); if (requiredFields.isEmpty()) { return Row.nullRow(rowSchema); } else { @@ -167,9 +160,8 @@ public static Row toBeamRow( } return rowSchema.getFields().stream() .map( - field -> - toBeamValue( - field, map.get(maybeGetSnakeCase(field.getName(), toCamelCase)), toCamelCase)) + field -> toBeamValue( + field, map.get(maybeGetSnakeCase(field.getName(), toCamelCase)), toCamelCase)) .collect(toRow(rowSchema)); } @@ -181,7 +173,36 @@ public static String yamlStringFromMap(@Nullable Map map) { if (map == null || map.isEmpty()) { return ""; } - return new Yaml().dumpAsMap(map); + try { + return new Yaml().dumpAsMap(map); + } catch (YAMLException e) { + List problematicKeys = findNonSerializableKeys(map); + throw new IllegalArgumentException( + String.format( + "Failed to convert configuration map to YAML. " + + "The following keys contain values that cannot be serialized: %s. " + + "Please ensure all configuration values are simple types (String, Number, Boolean) " + + "or properly structured Maps and Lists. Original error: %s", + problematicKeys, e.getMessage()), + e); + } + } + + private static List findNonSerializableKeys(Map map) { + List problematicKeys = new ArrayList<>(); + Yaml yaml = new Yaml(); + for (Map.Entry entry : map.entrySet()) { + try { + yaml.dump(entry.getValue()); + } catch (YAMLException e) { + problematicKeys.add( + String.format( + "%s (type: %s)", + entry.getKey(), + entry.getValue() != null ? entry.getValue().getClass().getName() : "null")); + } + } + return problematicKeys; } public static Map yamlStringToMap(@Nullable String yaml) { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/YamlUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/YamlUtilsTest.java index bf032aed7b5c..3fa169971a1f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/YamlUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/YamlUtilsTest.java @@ -37,7 +37,8 @@ @RunWith(JUnit4.class) public class YamlUtilsTest { - @Rule public transient ExpectedException thrown = ExpectedException.none(); + @Rule + public transient ExpectedException thrown = ExpectedException.none(); public String makeNested(String input) { return Arrays.stream(input.split("\n")) @@ -67,12 +68,11 @@ public void testInvalidEmptyYamlWithNonEmptySchema() { @Test public void testNullableValues() { String yamlString = "nullable_string:\n" + "nullable_integer:\n" + "nullable_boolean:\n"; - Schema schema = - Schema.builder() - .addNullableStringField("nullable_string") - .addNullableInt32Field("nullable_integer") - .addNullableBooleanField("nullable_boolean") - .build(); + Schema schema = Schema.builder() + .addNullableStringField("nullable_string") + .addNullableInt32Field("nullable_integer") + .addNullableBooleanField("nullable_boolean") + .build(); assertEquals(Row.nullRow(schema), YamlUtils.toBeamRow(yamlString, schema)); } @@ -80,12 +80,11 @@ public void testNullableValues() { @Test public void testMissingNullableValues() { String yamlString = "nullable_string:"; - Schema schema = - Schema.builder() - .addNullableStringField("nullable_string") - .addNullableInt32Field("nullable_integer") - .addNullableBooleanField("nullable_boolean") - .build(); + Schema schema = Schema.builder() + .addNullableStringField("nullable_string") + .addNullableInt32Field("nullable_integer") + .addNullableBooleanField("nullable_boolean") + .build(); assertEquals(Row.nullRow(schema), YamlUtils.toBeamRow(yamlString, schema)); } @@ -93,8 +92,7 @@ public void testMissingNullableValues() { @Test public void testInvalidNullableValues() { String yamlString = "nullable_string:\n" + "integer:"; - Schema schema = - Schema.builder().addNullableStringField("nullable_string").addInt32Field("integer").build(); + Schema schema = Schema.builder().addNullableStringField("nullable_string").addInt32Field("integer").build(); thrown.expect(IllegalArgumentException.class); thrown.expectMessage("Received null value for non-nullable field \"integer\""); @@ -104,8 +102,7 @@ public void testInvalidNullableValues() { @Test public void testInvalidMissingRequiredValues() { String yamlString = "nullable_string:"; - Schema schema = - Schema.builder().addNullableStringField("nullable_string").addInt32Field("integer").build(); + Schema schema = Schema.builder().addNullableStringField("nullable_string").addInt32Field("integer").build(); thrown.expect(IllegalArgumentException.class); thrown.expectMessage("Received null value for non-nullable field \"integer\""); @@ -131,45 +128,42 @@ public void testInvalidTopLevelArray() { YamlUtils.toBeamRow(invalidYaml, schema); } - private static final Schema FLAT_SCHEMA = - Schema.builder() - .addByteField("byte_field") - .addInt16Field("int16_field") - .addInt32Field("int32_field") - .addInt64Field("int64_field") - .addFloatField("float_field") - .addDoubleField("double_field") - .addDecimalField("decimal_field") - .addBooleanField("boolean_field") - .addStringField("string_field") - .addByteArrayField("bytes_field") - .build(); - - private static final Row FLAT_ROW = - Row.withSchema(FLAT_SCHEMA) - .withFieldValue("byte_field", Byte.valueOf("123")) - .withFieldValue("int16_field", Short.valueOf("16")) - .withFieldValue("int32_field", 32) - .withFieldValue("int64_field", 64L) - .withFieldValue("float_field", 123.456F) - .withFieldValue("double_field", 456.789) - .withFieldValue("decimal_field", BigDecimal.valueOf(789.123)) - .withFieldValue("boolean_field", true) - .withFieldValue("string_field", "some string") - .withFieldValue("bytes_field", BaseEncoding.base64().decode("abc")) - .build(); - - private static final String FLAT_YAML = - "byte_field: 123\n" - + "int16_field: 16\n" - + "int32_field: 32\n" - + "int64_field: 64\n" - + "float_field: 123.456\n" - + "double_field: 456.789\n" - + "decimal_field: 789.123\n" - + "boolean_field: true\n" - + "string_field: some string\n" - + "bytes_field: abc"; + private static final Schema FLAT_SCHEMA = Schema.builder() + .addByteField("byte_field") + .addInt16Field("int16_field") + .addInt32Field("int32_field") + .addInt64Field("int64_field") + .addFloatField("float_field") + .addDoubleField("double_field") + .addDecimalField("decimal_field") + .addBooleanField("boolean_field") + .addStringField("string_field") + .addByteArrayField("bytes_field") + .build(); + + private static final Row FLAT_ROW = Row.withSchema(FLAT_SCHEMA) + .withFieldValue("byte_field", Byte.valueOf("123")) + .withFieldValue("int16_field", Short.valueOf("16")) + .withFieldValue("int32_field", 32) + .withFieldValue("int64_field", 64L) + .withFieldValue("float_field", 123.456F) + .withFieldValue("double_field", 456.789) + .withFieldValue("decimal_field", BigDecimal.valueOf(789.123)) + .withFieldValue("boolean_field", true) + .withFieldValue("string_field", "some string") + .withFieldValue("bytes_field", BaseEncoding.base64().decode("abc")) + .build(); + + private static final String FLAT_YAML = "byte_field: 123\n" + + "int16_field: 16\n" + + "int32_field: 32\n" + + "int64_field: 64\n" + + "float_field: 123.456\n" + + "double_field: 456.789\n" + + "decimal_field: 789.123\n" + + "boolean_field: true\n" + + "string_field: some string\n" + + "bytes_field: abc"; @Test public void testAllTypesFlat() { @@ -181,27 +175,22 @@ public void testAllTypesNested() { String nestedFlatTypes = makeNested(FLAT_YAML); String topLevelYaml = "top_string: abc\n" + "nested: \n" + nestedFlatTypes; - Schema schema = - Schema.builder().addStringField("top_string").addRowField("nested", FLAT_SCHEMA).build(); - Row expectedRow = - Row.withSchema(schema) - .withFieldValue("top_string", "abc") - .withFieldValue("nested", FLAT_ROW) - .build(); + Schema schema = Schema.builder().addStringField("top_string").addRowField("nested", FLAT_SCHEMA).build(); + Row expectedRow = Row.withSchema(schema) + .withFieldValue("top_string", "abc") + .withFieldValue("nested", FLAT_ROW) + .build(); assertEquals(expectedRow, YamlUtils.toBeamRow(topLevelYaml, schema)); } - private static final String INT_ARRAY_YAML = - "arr:\n" + " - 1\n" + " - 2\n" + " - 3\n" + " - 4\n" + " - 5\n"; + private static final String INT_ARRAY_YAML = "arr:\n" + " - 1\n" + " - 2\n" + " - 3\n" + " - 4\n" + " - 5\n"; - private static final Schema INT_ARRAY_SCHEMA = - Schema.builder().addArrayField("arr", Schema.FieldType.INT32).build(); + private static final Schema INT_ARRAY_SCHEMA = Schema.builder().addArrayField("arr", Schema.FieldType.INT32).build(); - private static final Row INT_ARRAY_ROW = - Row.withSchema(INT_ARRAY_SCHEMA) - .withFieldValue("arr", IntStream.range(1, 6).boxed().collect(Collectors.toList())) - .build(); + private static final Row INT_ARRAY_ROW = Row.withSchema(INT_ARRAY_SCHEMA) + .withFieldValue("arr", IntStream.range(1, 6).boxed().collect(Collectors.toList())) + .build(); @Test public void testArray() { @@ -213,49 +202,74 @@ public void testNestedArray() { String nestedArray = makeNested(INT_ARRAY_YAML); String yamlString = "str_field: some string\n" + "nested: \n" + nestedArray; - Schema schema = - Schema.builder() - .addStringField("str_field") - .addRowField("nested", INT_ARRAY_SCHEMA) - .build(); + Schema schema = Schema.builder() + .addStringField("str_field") + .addRowField("nested", INT_ARRAY_SCHEMA) + .build(); - Row expectedRow = - Row.withSchema(schema) - .withFieldValue("str_field", "some string") - .withFieldValue("nested", INT_ARRAY_ROW) - .build(); + Row expectedRow = Row.withSchema(schema) + .withFieldValue("str_field", "some string") + .withFieldValue("nested", INT_ARRAY_ROW) + .build(); assertEquals(expectedRow, YamlUtils.toBeamRow(yamlString, schema)); } - private static final Schema FLAT_SCHEMA_CAMEL_CASE = - Schema.builder() - .addFields( - FLAT_SCHEMA.getFields().stream() - .map( - field -> - field.withName( - CaseFormat.LOWER_UNDERSCORE.to( - CaseFormat.LOWER_CAMEL, field.getName()))) - .collect(Collectors.toList())) - .build(); - - private static final Map FLAT_MAP = - FLAT_SCHEMA.getFields().stream() - .collect( - Collectors.toMap( - Schema.Field::getName, - field -> Preconditions.checkArgumentNotNull(FLAT_ROW.getValue(field.getName())))); + private static final Schema FLAT_SCHEMA_CAMEL_CASE = Schema.builder() + .addFields( + FLAT_SCHEMA.getFields().stream() + .map( + field -> field.withName( + CaseFormat.LOWER_UNDERSCORE.to( + CaseFormat.LOWER_CAMEL, field.getName()))) + .collect(Collectors.toList())) + .build(); + + private static final Map FLAT_MAP = FLAT_SCHEMA.getFields().stream() + .collect( + Collectors.toMap( + Schema.Field::getName, + field -> Preconditions.checkArgumentNotNull(FLAT_ROW.getValue(field.getName())))); @Test public void testSnakeCaseMapToCamelCaseRow() { - Row expectedRow = - FLAT_SCHEMA.getFields().stream() - .map(field -> Preconditions.checkStateNotNull(FLAT_ROW.getValue(field.getName()))) - .collect(Row.toRow(FLAT_SCHEMA_CAMEL_CASE)); + Row expectedRow = FLAT_SCHEMA.getFields().stream() + .map(field -> Preconditions.checkStateNotNull(FLAT_ROW.getValue(field.getName()))) + .collect(Row.toRow(FLAT_SCHEMA_CAMEL_CASE)); Row convertedRow = YamlUtils.toBeamRow(FLAT_MAP, FLAT_SCHEMA_CAMEL_CASE, true); assertEquals(expectedRow, convertedRow); } + + @Test + public void testYamlStringFromMapWithNonSerializableObject() { + // Create a map with ImmutableMap.Builder which cannot be serialized + Map configWithBuilder = new java.util.HashMap<>(); + configWithBuilder.put("good_key", "valid_value"); + configWithBuilder.put( + "bad_key", + org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap + .builder()); // Not yet built! + + // We expect an IllegalArgumentException with a helpful message + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Failed to convert configuration map to YAML"); + thrown.expectMessage("bad_key"); + thrown.expectMessage("ImmutableMap$Builder"); + + YamlUtils.yamlStringFromMap(configWithBuilder); + } + + @Test + public void testYamlStringFromMapWithValidMap() { + Map config = org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.of( + "string_key", "value", + "int_key", 123, + "boolean_key", true); + + String yaml = YamlUtils.yamlStringFromMap(config); + org.junit.Assert.assertNotNull(yaml); + org.junit.Assert.assertTrue(yaml.contains("string_key")); + } } From 9f50d45572908c8f3119d776146afb4245e0c342 Mon Sep 17 00:00:00 2001 From: XQ Hu Date: Sat, 31 Jan 2026 20:57:24 -0500 Subject: [PATCH 2/2] style: Reformat YamlUtilsTest for improved readability by adjusting line breaks and spacing. --- .../beam/sdk/schemas/utils/YamlUtils.java | 80 +++---- .../apache/beam/sdk/util/YamlUtilsTest.java | 196 ++++++++++-------- 2 files changed, 151 insertions(+), 125 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java index 8cb09f2d22cc..4382062b4619 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java @@ -21,9 +21,9 @@ import java.io.InputStream; import java.math.BigDecimal; +import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.ArrayList; import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; @@ -40,18 +40,22 @@ import org.yaml.snakeyaml.error.YAMLException; public class YamlUtils { - private static final Map> YAML_VALUE_PARSERS = ImmutableMap.>builder() - .put(Schema.TypeName.BYTE, Byte::valueOf) - .put(Schema.TypeName.INT16, Short::valueOf) - .put(Schema.TypeName.INT32, Integer::valueOf) - .put(Schema.TypeName.INT64, Long::valueOf) - .put(Schema.TypeName.FLOAT, Float::valueOf) - .put(Schema.TypeName.DOUBLE, Double::valueOf) - .put(Schema.TypeName.DECIMAL, BigDecimal::new) - .put(Schema.TypeName.BOOLEAN, Boolean::valueOf) - .put(Schema.TypeName.STRING, str -> str) - .put(Schema.TypeName.BYTES, str -> BaseEncoding.base64().decode(str)) - .build(); + private static final Map> YAML_VALUE_PARSERS = + ImmutableMap + .> + builder() + .put(Schema.TypeName.BYTE, Byte::valueOf) + .put(Schema.TypeName.INT16, Short::valueOf) + .put(Schema.TypeName.INT32, Integer::valueOf) + .put(Schema.TypeName.INT64, Long::valueOf) + .put(Schema.TypeName.FLOAT, Float::valueOf) + .put(Schema.TypeName.DOUBLE, Double::valueOf) + .put(Schema.TypeName.DECIMAL, BigDecimal::new) + .put(Schema.TypeName.BOOLEAN, Boolean::valueOf) + .put(Schema.TypeName.STRING, str -> str) + .put(Schema.TypeName.BYTES, str -> BaseEncoding.base64().decode(str)) + .build(); public static Row toBeamRow(@Nullable String yamlString, Schema schema) { return toBeamRow(yamlString, schema, false); @@ -60,9 +64,10 @@ public static Row toBeamRow(@Nullable String yamlString, Schema schema) { public static Row toBeamRow( @Nullable String yamlString, Schema schema, boolean convertNamesToCamelCase) { if (yamlString == null || yamlString.isEmpty()) { - List requiredFields = schema.getFields().stream() - .filter(field -> !field.getType().getNullable()) - .collect(Collectors.toList()); + List requiredFields = + schema.getFields().stream() + .filter(field -> !field.getType().getNullable()) + .collect(Collectors.toList()); if (requiredFields.isEmpty()) { return Row.nullRow(schema); } else { @@ -111,26 +116,29 @@ public static Row toBeamRow( } if (yamlValue instanceof List) { - FieldType innerType = Preconditions.checkNotNull( - fieldType.getCollectionElementType(), - "Cannot convert YAML type '%s` to `%s` because the YAML value is a List, but the output schema field does not define a collection type.", - yamlValue.getClass(), - fieldType); + FieldType innerType = + Preconditions.checkNotNull( + fieldType.getCollectionElementType(), + "Cannot convert YAML type '%s` to `%s` because the YAML value is a List, but the output schema field does not define a collection type.", + yamlValue.getClass(), + fieldType); return ((List) yamlValue) .stream() - .map( - v -> Preconditions.checkNotNull( - toBeamValue(field.withType(innerType), v, convertNamesToCamelCase))) - .collect(Collectors.toList()); + .map( + v -> + Preconditions.checkNotNull( + toBeamValue(field.withType(innerType), v, convertNamesToCamelCase))) + .collect(Collectors.toList()); } if (yamlValue instanceof Map) { if (fieldType.getTypeName() == Schema.TypeName.ROW) { - Schema nestedSchema = Preconditions.checkNotNull( - fieldType.getRowSchema(), - "Received a YAML '%s' type, but output schema field '%s' does not define a Row Schema", - yamlValue.getClass(), - fieldType); + Schema nestedSchema = + Preconditions.checkNotNull( + fieldType.getRowSchema(), + "Received a YAML '%s' type, but output schema field '%s' does not define a Row Schema", + yamlValue.getClass(), + fieldType); return toBeamRow((Map) yamlValue, nestedSchema, convertNamesToCamelCase); } else if (fieldType.getTypeName() == Schema.TypeName.MAP) { return yamlValue; @@ -146,9 +154,10 @@ public static Row toBeamRow( public static Row toBeamRow( @Nullable Map map, Schema rowSchema, boolean toCamelCase) { if (map == null || map.isEmpty()) { - List requiredFields = rowSchema.getFields().stream() - .filter(field -> !field.getType().getNullable()) - .collect(Collectors.toList()); + List requiredFields = + rowSchema.getFields().stream() + .filter(field -> !field.getType().getNullable()) + .collect(Collectors.toList()); if (requiredFields.isEmpty()) { return Row.nullRow(rowSchema); } else { @@ -160,8 +169,9 @@ public static Row toBeamRow( } return rowSchema.getFields().stream() .map( - field -> toBeamValue( - field, map.get(maybeGetSnakeCase(field.getName(), toCamelCase)), toCamelCase)) + field -> + toBeamValue( + field, map.get(maybeGetSnakeCase(field.getName(), toCamelCase)), toCamelCase)) .collect(toRow(rowSchema)); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/YamlUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/YamlUtilsTest.java index 3fa169971a1f..05bc73370c7d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/YamlUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/YamlUtilsTest.java @@ -37,8 +37,7 @@ @RunWith(JUnit4.class) public class YamlUtilsTest { - @Rule - public transient ExpectedException thrown = ExpectedException.none(); + @Rule public transient ExpectedException thrown = ExpectedException.none(); public String makeNested(String input) { return Arrays.stream(input.split("\n")) @@ -68,11 +67,12 @@ public void testInvalidEmptyYamlWithNonEmptySchema() { @Test public void testNullableValues() { String yamlString = "nullable_string:\n" + "nullable_integer:\n" + "nullable_boolean:\n"; - Schema schema = Schema.builder() - .addNullableStringField("nullable_string") - .addNullableInt32Field("nullable_integer") - .addNullableBooleanField("nullable_boolean") - .build(); + Schema schema = + Schema.builder() + .addNullableStringField("nullable_string") + .addNullableInt32Field("nullable_integer") + .addNullableBooleanField("nullable_boolean") + .build(); assertEquals(Row.nullRow(schema), YamlUtils.toBeamRow(yamlString, schema)); } @@ -80,11 +80,12 @@ public void testNullableValues() { @Test public void testMissingNullableValues() { String yamlString = "nullable_string:"; - Schema schema = Schema.builder() - .addNullableStringField("nullable_string") - .addNullableInt32Field("nullable_integer") - .addNullableBooleanField("nullable_boolean") - .build(); + Schema schema = + Schema.builder() + .addNullableStringField("nullable_string") + .addNullableInt32Field("nullable_integer") + .addNullableBooleanField("nullable_boolean") + .build(); assertEquals(Row.nullRow(schema), YamlUtils.toBeamRow(yamlString, schema)); } @@ -92,7 +93,8 @@ public void testMissingNullableValues() { @Test public void testInvalidNullableValues() { String yamlString = "nullable_string:\n" + "integer:"; - Schema schema = Schema.builder().addNullableStringField("nullable_string").addInt32Field("integer").build(); + Schema schema = + Schema.builder().addNullableStringField("nullable_string").addInt32Field("integer").build(); thrown.expect(IllegalArgumentException.class); thrown.expectMessage("Received null value for non-nullable field \"integer\""); @@ -102,7 +104,8 @@ public void testInvalidNullableValues() { @Test public void testInvalidMissingRequiredValues() { String yamlString = "nullable_string:"; - Schema schema = Schema.builder().addNullableStringField("nullable_string").addInt32Field("integer").build(); + Schema schema = + Schema.builder().addNullableStringField("nullable_string").addInt32Field("integer").build(); thrown.expect(IllegalArgumentException.class); thrown.expectMessage("Received null value for non-nullable field \"integer\""); @@ -128,42 +131,45 @@ public void testInvalidTopLevelArray() { YamlUtils.toBeamRow(invalidYaml, schema); } - private static final Schema FLAT_SCHEMA = Schema.builder() - .addByteField("byte_field") - .addInt16Field("int16_field") - .addInt32Field("int32_field") - .addInt64Field("int64_field") - .addFloatField("float_field") - .addDoubleField("double_field") - .addDecimalField("decimal_field") - .addBooleanField("boolean_field") - .addStringField("string_field") - .addByteArrayField("bytes_field") - .build(); - - private static final Row FLAT_ROW = Row.withSchema(FLAT_SCHEMA) - .withFieldValue("byte_field", Byte.valueOf("123")) - .withFieldValue("int16_field", Short.valueOf("16")) - .withFieldValue("int32_field", 32) - .withFieldValue("int64_field", 64L) - .withFieldValue("float_field", 123.456F) - .withFieldValue("double_field", 456.789) - .withFieldValue("decimal_field", BigDecimal.valueOf(789.123)) - .withFieldValue("boolean_field", true) - .withFieldValue("string_field", "some string") - .withFieldValue("bytes_field", BaseEncoding.base64().decode("abc")) - .build(); - - private static final String FLAT_YAML = "byte_field: 123\n" - + "int16_field: 16\n" - + "int32_field: 32\n" - + "int64_field: 64\n" - + "float_field: 123.456\n" - + "double_field: 456.789\n" - + "decimal_field: 789.123\n" - + "boolean_field: true\n" - + "string_field: some string\n" - + "bytes_field: abc"; + private static final Schema FLAT_SCHEMA = + Schema.builder() + .addByteField("byte_field") + .addInt16Field("int16_field") + .addInt32Field("int32_field") + .addInt64Field("int64_field") + .addFloatField("float_field") + .addDoubleField("double_field") + .addDecimalField("decimal_field") + .addBooleanField("boolean_field") + .addStringField("string_field") + .addByteArrayField("bytes_field") + .build(); + + private static final Row FLAT_ROW = + Row.withSchema(FLAT_SCHEMA) + .withFieldValue("byte_field", Byte.valueOf("123")) + .withFieldValue("int16_field", Short.valueOf("16")) + .withFieldValue("int32_field", 32) + .withFieldValue("int64_field", 64L) + .withFieldValue("float_field", 123.456F) + .withFieldValue("double_field", 456.789) + .withFieldValue("decimal_field", BigDecimal.valueOf(789.123)) + .withFieldValue("boolean_field", true) + .withFieldValue("string_field", "some string") + .withFieldValue("bytes_field", BaseEncoding.base64().decode("abc")) + .build(); + + private static final String FLAT_YAML = + "byte_field: 123\n" + + "int16_field: 16\n" + + "int32_field: 32\n" + + "int64_field: 64\n" + + "float_field: 123.456\n" + + "double_field: 456.789\n" + + "decimal_field: 789.123\n" + + "boolean_field: true\n" + + "string_field: some string\n" + + "bytes_field: abc"; @Test public void testAllTypesFlat() { @@ -175,22 +181,27 @@ public void testAllTypesNested() { String nestedFlatTypes = makeNested(FLAT_YAML); String topLevelYaml = "top_string: abc\n" + "nested: \n" + nestedFlatTypes; - Schema schema = Schema.builder().addStringField("top_string").addRowField("nested", FLAT_SCHEMA).build(); - Row expectedRow = Row.withSchema(schema) - .withFieldValue("top_string", "abc") - .withFieldValue("nested", FLAT_ROW) - .build(); + Schema schema = + Schema.builder().addStringField("top_string").addRowField("nested", FLAT_SCHEMA).build(); + Row expectedRow = + Row.withSchema(schema) + .withFieldValue("top_string", "abc") + .withFieldValue("nested", FLAT_ROW) + .build(); assertEquals(expectedRow, YamlUtils.toBeamRow(topLevelYaml, schema)); } - private static final String INT_ARRAY_YAML = "arr:\n" + " - 1\n" + " - 2\n" + " - 3\n" + " - 4\n" + " - 5\n"; + private static final String INT_ARRAY_YAML = + "arr:\n" + " - 1\n" + " - 2\n" + " - 3\n" + " - 4\n" + " - 5\n"; - private static final Schema INT_ARRAY_SCHEMA = Schema.builder().addArrayField("arr", Schema.FieldType.INT32).build(); + private static final Schema INT_ARRAY_SCHEMA = + Schema.builder().addArrayField("arr", Schema.FieldType.INT32).build(); - private static final Row INT_ARRAY_ROW = Row.withSchema(INT_ARRAY_SCHEMA) - .withFieldValue("arr", IntStream.range(1, 6).boxed().collect(Collectors.toList())) - .build(); + private static final Row INT_ARRAY_ROW = + Row.withSchema(INT_ARRAY_SCHEMA) + .withFieldValue("arr", IntStream.range(1, 6).boxed().collect(Collectors.toList())) + .build(); @Test public void testArray() { @@ -202,40 +213,46 @@ public void testNestedArray() { String nestedArray = makeNested(INT_ARRAY_YAML); String yamlString = "str_field: some string\n" + "nested: \n" + nestedArray; - Schema schema = Schema.builder() - .addStringField("str_field") - .addRowField("nested", INT_ARRAY_SCHEMA) - .build(); + Schema schema = + Schema.builder() + .addStringField("str_field") + .addRowField("nested", INT_ARRAY_SCHEMA) + .build(); - Row expectedRow = Row.withSchema(schema) - .withFieldValue("str_field", "some string") - .withFieldValue("nested", INT_ARRAY_ROW) - .build(); + Row expectedRow = + Row.withSchema(schema) + .withFieldValue("str_field", "some string") + .withFieldValue("nested", INT_ARRAY_ROW) + .build(); assertEquals(expectedRow, YamlUtils.toBeamRow(yamlString, schema)); } - private static final Schema FLAT_SCHEMA_CAMEL_CASE = Schema.builder() - .addFields( - FLAT_SCHEMA.getFields().stream() - .map( - field -> field.withName( - CaseFormat.LOWER_UNDERSCORE.to( - CaseFormat.LOWER_CAMEL, field.getName()))) - .collect(Collectors.toList())) - .build(); - - private static final Map FLAT_MAP = FLAT_SCHEMA.getFields().stream() - .collect( - Collectors.toMap( - Schema.Field::getName, - field -> Preconditions.checkArgumentNotNull(FLAT_ROW.getValue(field.getName())))); + private static final Schema FLAT_SCHEMA_CAMEL_CASE = + Schema.builder() + .addFields( + FLAT_SCHEMA.getFields().stream() + .map( + field -> + field.withName( + CaseFormat.LOWER_UNDERSCORE.to( + CaseFormat.LOWER_CAMEL, field.getName()))) + .collect(Collectors.toList())) + .build(); + + private static final Map FLAT_MAP = + FLAT_SCHEMA.getFields().stream() + .collect( + Collectors.toMap( + Schema.Field::getName, + field -> Preconditions.checkArgumentNotNull(FLAT_ROW.getValue(field.getName())))); @Test public void testSnakeCaseMapToCamelCaseRow() { - Row expectedRow = FLAT_SCHEMA.getFields().stream() - .map(field -> Preconditions.checkStateNotNull(FLAT_ROW.getValue(field.getName()))) - .collect(Row.toRow(FLAT_SCHEMA_CAMEL_CASE)); + Row expectedRow = + FLAT_SCHEMA.getFields().stream() + .map(field -> Preconditions.checkStateNotNull(FLAT_ROW.getValue(field.getName()))) + .collect(Row.toRow(FLAT_SCHEMA_CAMEL_CASE)); Row convertedRow = YamlUtils.toBeamRow(FLAT_MAP, FLAT_SCHEMA_CAMEL_CASE, true); @@ -263,10 +280,9 @@ public void testYamlStringFromMapWithNonSerializableObject() { @Test public void testYamlStringFromMapWithValidMap() { - Map config = org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.of( - "string_key", "value", - "int_key", 123, - "boolean_key", true); + Map config = + org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.of( + "string_key", "value", "int_key", 123, "boolean_key", true); String yaml = YamlUtils.yamlStringFromMap(config); org.junit.Assert.assertNotNull(yaml);