diff --git a/wayang-api/wayang-api-python/pom.xml b/wayang-api/wayang-api-python/pom.xml
index 2b45dbe4e..ccf202bfb 100644
--- a/wayang-api/wayang-api-python/pom.xml
+++ b/wayang-api/wayang-api-python/pom.xml
@@ -75,4 +75,25 @@
compile
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+ ${python.worker.tests.skip}
+
+
+
+
+
+
+
+ python-worker-tests
+
+ false
+
+
+
diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ObjectFileSerialization.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ObjectFileSerialization.java
new file mode 100644
index 000000000..a96848a5d
--- /dev/null
+++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ObjectFileSerialization.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.basic.operators;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.CollectionType;
+import org.apache.commons.lang3.Validate;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Utility methods that convert between Java objects and the on-disk representation used by {@link ObjectFileSink}.
+ */
+public final class ObjectFileSerialization {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+ .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
+
+ static {
+ OBJECT_MAPPER.findAndRegisterModules();
+ }
+
+ private ObjectFileSerialization() {
+ }
+
+ /**
+ * Serialize a chunk of objects using the provided {@link ObjectFileSerializationMode}.
+ *
+ * @param chunk buffer that contains the objects to serialize
+ * @param validLength number of valid entries inside {@code chunk}
+ * @param mode the serialization mode
+ * @return serialized bytes
+ * @throws IOException if serialization fails
+ */
+ public static byte[] serializeChunk(Object[] chunk, int validLength, ObjectFileSerializationMode mode) throws IOException {
+ Validate.notNull(mode, "Serialization mode must be provided.");
+ switch (mode) {
+ case JSON:
+ return serializeJson(chunk, validLength);
+ case LEGACY_JAVA_SERIALIZATION:
+ return serializeLegacy(chunk, validLength);
+ default:
+ throw new IllegalArgumentException("Unknown serialization mode: " + mode);
+ }
+ }
+
+ /**
+ * Deserialize a chunk of objects.
+ *
+ * @param payload the serialized data
+ * @param mode the serialization mode
+ * @param elementType the expected element type
+ * @return list of deserialized objects (never {@code null})
+ * @throws IOException if deserialization fails
+ * @throws ClassNotFoundException if a class cannot be resolved in legacy mode
+ */
+ public static List deserializeChunk(byte[] payload,
+ ObjectFileSerializationMode mode,
+ Class> elementType) throws IOException, ClassNotFoundException {
+ Validate.notNull(mode, "Serialization mode must be provided.");
+ switch (mode) {
+ case JSON:
+ return deserializeJson(payload, elementType);
+ case LEGACY_JAVA_SERIALIZATION:
+ return deserializeLegacy(payload);
+ default:
+ throw new IllegalArgumentException("Unknown serialization mode: " + mode);
+ }
+ }
+
+ private static byte[] serializeLegacy(Object[] chunk, int validLength) throws IOException {
+ Object[] payload = Arrays.copyOf(chunk, validLength);
+ try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bos)) {
+ oos.writeObject(payload);
+ oos.flush();
+ return bos.toByteArray();
+ }
+ }
+
+ private static List deserializeLegacy(byte[] payload) throws IOException, ClassNotFoundException {
+ try (ByteArrayInputStream bis = new ByteArrayInputStream(payload);
+ ObjectInputStream ois = new ObjectInputStream(bis)) {
+ Object tmp = ois.readObject();
+ if (tmp == null) {
+ return Collections.emptyList();
+ }
+ if (tmp instanceof Collection) {
+ return new ArrayList<>((Collection>) tmp);
+ }
+ if (tmp.getClass().isArray()) {
+ return Arrays.asList((Object[]) tmp);
+ }
+ return new ArrayList<>(Collections.singletonList(tmp));
+ }
+ }
+
+ private static byte[] serializeJson(Object[] chunk, int validLength) throws IOException {
+ Object[] payload = Arrays.copyOf(chunk, validLength);
+ return OBJECT_MAPPER.writeValueAsBytes(payload);
+ }
+
+ private static List deserializeJson(byte[] payload, Class> elementType) throws IOException {
+ Validate.notNull(elementType, "Element type must be provided for JSON deserialization.");
+ CollectionType type = OBJECT_MAPPER.getTypeFactory()
+ .constructCollectionType(List.class, elementType);
+ List> list = OBJECT_MAPPER.readValue(payload, type);
+ if (list == null) {
+ return Collections.emptyList();
+ }
+ return new ArrayList<>(list);
+ }
+}
diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ObjectFileSerializationMode.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ObjectFileSerializationMode.java
new file mode 100644
index 000000000..3865a1f2a
--- /dev/null
+++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ObjectFileSerializationMode.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.basic.operators;
+
+/**
+ * Supported serialization modes for {@link ObjectFileSource} and {@link ObjectFileSink}.
+ */
+public enum ObjectFileSerializationMode {
+
+ /**
+ * Legacy Java serialization using {@link java.io.ObjectOutputStream}/{@link java.io.ObjectInputStream}.
+ * This mode is deprecated and will be removed in a future release.
+ */
+ @Deprecated
+ LEGACY_JAVA_SERIALIZATION,
+
+ /**
+ * JSON-based serialization that avoids Java serialization gadget chains.
+ */
+ JSON
+}
diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ObjectFileSink.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ObjectFileSink.java
index 00c6454bf..c9d445ea3 100644
--- a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ObjectFileSink.java
+++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ObjectFileSink.java
@@ -19,6 +19,9 @@
package org.apache.wayang.basic.operators;
import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import org.apache.wayang.core.function.TransformationDescriptor;
import org.apache.wayang.core.optimizer.costs.DefaultLoadEstimator;
import org.apache.wayang.core.optimizer.costs.NestableLoadProfileEstimator;
@@ -36,6 +39,12 @@ public class ObjectFileSink extends UnarySink {
protected final Class tClass;
+ private static final AtomicBoolean LEGACY_WARNING_EMITTED = new AtomicBoolean(false);
+
+ private final Logger logger = LogManager.getLogger(this.getClass());
+
+ private ObjectFileSerializationMode serializationMode = ObjectFileSerializationMode.JSON;
+
/**
* Creates a new instance.
*
@@ -69,5 +78,35 @@ public ObjectFileSink(ObjectFileSink that) {
super(that);
this.textFileUrl = that.textFileUrl;
this.tClass = that.tClass;
+ this.serializationMode = that.getSerializationMode();
+ }
+
+ public ObjectFileSerializationMode getSerializationMode() {
+ if (this.serializationMode == ObjectFileSerializationMode.LEGACY_JAVA_SERIALIZATION
+ && LEGACY_WARNING_EMITTED.compareAndSet(false, true)) {
+ this.logger.warn("ObjectFileSink is using deprecated legacy Java serialization. "
+ + "Please switch to the JSON serialization mode via ObjectFileSink#useJsonSerialization().");
+ }
+ return this.serializationMode;
+ }
+
+ public ObjectFileSink withSerializationMode(ObjectFileSerializationMode serializationMode) {
+ this.serializationMode = Objects.requireNonNull(serializationMode, "serializationMode");
+ return this;
+ }
+
+ /**
+ * Configure this sink to use the deprecated legacy Java serialization.
+ */
+ @Deprecated
+ public ObjectFileSink useLegacySerialization() {
+ return this.withSerializationMode(ObjectFileSerializationMode.LEGACY_JAVA_SERIALIZATION);
+ }
+
+ /**
+ * Configure this sink to use the JSON-based serialization.
+ */
+ public ObjectFileSink useJsonSerialization() {
+ return this.withSerializationMode(ObjectFileSerializationMode.JSON);
}
}
diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ObjectFileSource.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ObjectFileSource.java
index 7ca341087..6fdccce50 100644
--- a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ObjectFileSource.java
+++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ObjectFileSource.java
@@ -18,9 +18,11 @@
package org.apache.wayang.basic.operators;
+import java.util.Objects;
import java.util.Optional;
import java.util.OptionalDouble;
import java.util.OptionalLong;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.Validate;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -43,6 +45,10 @@ public class ObjectFileSource extends UnarySource {
private final Class tClass;
+ private static final AtomicBoolean LEGACY_WARNING_EMITTED = new AtomicBoolean(false);
+
+ private ObjectFileSerializationMode serializationMode = ObjectFileSerializationMode.JSON;
+
public ObjectFileSource(String inputUrl, DataSetType type) {
super(type);
this.inputUrl = inputUrl;
@@ -64,6 +70,7 @@ public ObjectFileSource(ObjectFileSource that) {
super(that);
this.inputUrl = that.getInputUrl();
this.tClass = that.getTypeClass();
+ this.serializationMode = that.getSerializationMode();
}
public String getInputUrl() {
@@ -74,6 +81,35 @@ public Class getTypeClass(){
return this.tClass;
}
+ public ObjectFileSerializationMode getSerializationMode() {
+ if (this.serializationMode == ObjectFileSerializationMode.LEGACY_JAVA_SERIALIZATION
+ && LEGACY_WARNING_EMITTED.compareAndSet(false, true)) {
+ this.logger.warn("ObjectFileSource is using deprecated legacy Java serialization. "
+ + "Please switch to the JSON serialization mode via ObjectFileSource#useJsonSerialization().");
+ }
+ return this.serializationMode;
+ }
+
+ public ObjectFileSource withSerializationMode(ObjectFileSerializationMode serializationMode) {
+ this.serializationMode = Objects.requireNonNull(serializationMode, "serializationMode");
+ return this;
+ }
+
+ /**
+ * Configure this source to use the deprecated legacy Java serialization.
+ */
+ @Deprecated
+ public ObjectFileSource useLegacySerialization() {
+ return this.withSerializationMode(ObjectFileSerializationMode.LEGACY_JAVA_SERIALIZATION);
+ }
+
+ /**
+ * Configure this source to use the JSON-based serialization.
+ */
+ public ObjectFileSource useJsonSerialization() {
+ return this.withSerializationMode(ObjectFileSerializationMode.JSON);
+ }
+
@Override
public Optional createCardinalityEstimator(
final int outputIndex,
diff --git a/wayang-commons/wayang-basic/src/test/java/org/apache/wayang/basic/operators/ObjectFileSerializationTest.java b/wayang-commons/wayang-basic/src/test/java/org/apache/wayang/basic/operators/ObjectFileSerializationTest.java
new file mode 100644
index 000000000..0cc3a830d
--- /dev/null
+++ b/wayang-commons/wayang-basic/src/test/java/org/apache/wayang/basic/operators/ObjectFileSerializationTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.basic.operators;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+public class ObjectFileSerializationTest {
+
+ @Test
+ public void jsonRoundtrip() throws Exception {
+ Object[] chunk = new Object[]{"alpha", "beta", "gamma"};
+ byte[] payload = ObjectFileSerialization.serializeChunk(
+ chunk,
+ chunk.length,
+ ObjectFileSerializationMode.JSON);
+
+ List result = ObjectFileSerialization.deserializeChunk(
+ payload,
+ ObjectFileSerializationMode.JSON,
+ String.class);
+
+ Assert.assertEquals(3, result.size());
+ Assert.assertEquals("alpha", result.get(0));
+ Assert.assertEquals("beta", result.get(1));
+ Assert.assertEquals("gamma", result.get(2));
+ }
+
+ @Test
+ public void legacyRoundtrip() throws Exception {
+ SerializablePayload payload = new SerializablePayload("data", 42);
+ Object[] chunk = new Object[]{payload};
+
+ byte[] bytes = ObjectFileSerialization.serializeChunk(
+ chunk,
+ chunk.length,
+ ObjectFileSerializationMode.LEGACY_JAVA_SERIALIZATION);
+
+ List result = ObjectFileSerialization.deserializeChunk(
+ bytes,
+ ObjectFileSerializationMode.LEGACY_JAVA_SERIALIZATION,
+ SerializablePayload.class);
+
+ Assert.assertEquals(1, result.size());
+ SerializablePayload deserialized = (SerializablePayload) result.get(0);
+ Assert.assertEquals("data", deserialized.text);
+ Assert.assertEquals(42, deserialized.value);
+ }
+
+ private static class SerializablePayload implements Serializable {
+ final String text;
+ final int value;
+
+ private SerializablePayload(String text, int value) {
+ this.text = text;
+ this.value = value;
+ }
+ }
+}
diff --git a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/compiler/WayangFileOutputFormat.java b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/compiler/WayangFileOutputFormat.java
index e2d8eb52c..d717b234c 100644
--- a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/compiler/WayangFileOutputFormat.java
+++ b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/compiler/WayangFileOutputFormat.java
@@ -32,15 +32,15 @@
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
+import org.apache.wayang.basic.operators.ObjectFileSerialization;
+import org.apache.wayang.basic.operators.ObjectFileSerializationMode;
import org.apache.wayang.core.api.exception.WayangException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import java.io.ByteArrayOutputStream;
import java.io.FileNotFoundException;
import java.io.FilterOutputStream;
import java.io.IOException;
-import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.net.URI;
@@ -125,6 +125,7 @@ public static void initDefaultsFromConfiguration(Configuration configuration) {
private transient DataOutputViewStreamWrapper outView;
+ private ObjectFileSerializationMode serializationMode = ObjectFileSerializationMode.JSON;
// --------------------------------------------------------------------------------------------
public WayangFileOutputFormat() {}
@@ -149,6 +150,13 @@ public Path getOutputFilePath() {
return this.outputFilePath;
}
+ public void setSerializationMode(ObjectFileSerializationMode serializationMode) {
+ if (serializationMode == null) {
+ throw new NullPointerException("serializationMode");
+ }
+ this.serializationMode = serializationMode;
+ }
+
public void setWriteMode(FileSystem.WriteMode mode) {
if (mode == null) {
@@ -258,15 +266,10 @@ public void open(int taskNumber, int numTasks) throws IOException {
@Override
public void writeRecord(IT record) throws IOException {
//this.blockBasedOutput.startRecord();
- try{
- ByteArrayOutputStream b = new ByteArrayOutputStream();
- ObjectOutputStream objStream = new ObjectOutputStream(b);
- objStream.writeObject(record);
- BytesWritable bytesWritable = new BytesWritable(b.toByteArray());
- writer.append(NullWritable.get(), bytesWritable);
- }catch (Exception e){
- e.printStackTrace();
- }
+ Object[] chunk = new Object[]{record};
+ byte[] payload = ObjectFileSerialization.serializeChunk(chunk, chunk.length, this.serializationMode);
+ BytesWritable bytesWritable = new BytesWritable(payload);
+ writer.append(NullWritable.get(), bytesWritable);
}
protected String getDirectoryFileName(int taskNumber) {
diff --git a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkObjectFileSink.java b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkObjectFileSink.java
index ccff8fa6a..faf5ccd08 100644
--- a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkObjectFileSink.java
+++ b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkObjectFileSink.java
@@ -21,6 +21,7 @@
import org.apache.flink.api.java.operators.DataSink;
import org.apache.flink.core.fs.FileSystem;
import org.apache.wayang.basic.channels.FileChannel;
+import org.apache.wayang.basic.operators.ObjectFileSerializationMode;
import org.apache.wayang.basic.operators.ObjectFileSink;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
@@ -80,8 +81,11 @@ public Tuple, Collection> eval
//TODO: remove the set parallelism 1
DataSetChannel.Instance input = (DataSetChannel.Instance) inputs[0];
+ ObjectFileSerializationMode serializationMode = this.getSerializationMode();
+ WayangFileOutputFormat outputFormat = new WayangFileOutputFormat<>(targetPath);
+ outputFormat.setSerializationMode(serializationMode);
final DataSink tDataSink = input.provideDataSet()
- .write(new WayangFileOutputFormat(targetPath), targetPath, FileSystem.WriteMode.OVERWRITE)
+ .write(outputFormat, targetPath, FileSystem.WriteMode.OVERWRITE)
.setParallelism(flinkExecutor.fee.getParallelism());
@@ -116,4 +120,5 @@ public boolean containsAction() {
@Override public boolean isConversion() {
return true;
}
+
}
diff --git a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkObjectFileSource.java b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkObjectFileSource.java
index 4b650b74b..c784d4ca0 100644
--- a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkObjectFileSource.java
+++ b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkObjectFileSource.java
@@ -27,6 +27,8 @@
import org.apache.hadoop.io.NullWritable;
import org.apache.wayang.basic.channels.FileChannel;
import org.apache.wayang.basic.data.Tuple2;
+import org.apache.wayang.basic.operators.ObjectFileSerialization;
+import org.apache.wayang.basic.operators.ObjectFileSerializationMode;
import org.apache.wayang.basic.operators.ObjectFileSource;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
@@ -41,8 +43,6 @@
import org.apache.wayang.flink.execution.FlinkExecutor;
import org.apache.wayang.flink.platform.FlinkPlatform;
-import java.io.ByteArrayInputStream;
-import java.io.ObjectInputStream;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -86,6 +86,8 @@ public Tuple, Collection> eval
path = this.getInputUrl();
}
DataSetChannel.Instance output = (DataSetChannel.Instance) outputs[0];
+ ObjectFileSerializationMode serializationMode = this.getSerializationMode();
+ final Class typeClass = this.getTypeClass();
HadoopInputFormat _file = HadoopInputs.readSequenceFile(NullWritable.class, BytesWritable.class, path);
final DataSet dataSet =
@@ -95,8 +97,9 @@ public Tuple, Collection> eval
.flatMap(new FlatMapFunction, Tuple2>() {
@Override
public void flatMap(org.apache.flink.api.java.tuple.Tuple2 value, Collector out) throws Exception {
- Object tmp = new ObjectInputStream(new ByteArrayInputStream(value.f1.getBytes())).readObject();
- for(Object element: (Iterable)tmp){
+ byte[] payload = value.f1.copyBytes();
+ List chunk = ObjectFileSerialization.deserializeChunk(payload, serializationMode, typeClass);
+ for (Object element : chunk) {
out.collect((Tuple2) element);
}
}
diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaObjectFileSink.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaObjectFileSink.java
index 8fbd47003..c95562578 100644
--- a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaObjectFileSink.java
+++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaObjectFileSink.java
@@ -18,13 +18,14 @@
package org.apache.wayang.java.operators;
-import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.wayang.basic.channels.FileChannel;
+import org.apache.wayang.basic.operators.ObjectFileSerialization;
+import org.apache.wayang.basic.operators.ObjectFileSerializationMode;
import org.apache.wayang.basic.operators.ObjectFileSink;
import org.apache.wayang.basic.operators.TextFileSink;
import org.apache.wayang.core.api.exception.WayangException;
@@ -45,7 +46,6 @@
import org.apache.logging.log4j.LogManager;
import java.io.IOException;
-import java.io.ObjectOutputStream;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Collection;
@@ -94,21 +94,14 @@ public Tuple, Collection> eval
final SequenceFile.Writer.Option fileOption = SequenceFile.Writer.file(new Path(path));
final SequenceFile.Writer.Option keyClassOption = SequenceFile.Writer.keyClass(NullWritable.class);
final SequenceFile.Writer.Option valueClassOption = SequenceFile.Writer.valueClass(BytesWritable.class);
+ final ObjectFileSerializationMode serializationMode = this.getSerializationMode();
try (SequenceFile.Writer writer = SequenceFile.createWriter(new Configuration(true), fileOption, keyClassOption, valueClassOption)) {
// Chunk the stream of data quanta and write the chunks into the sequence file.
StreamChunker streamChunker = new StreamChunker(10, (chunk, size) -> {
- if (chunk.length != size) {
- System.out.println("heer");
- System.out.println(chunk.length);
- System.out.println(size);
- chunk = Arrays.copyOfRange(chunk, 0, size);
- }
try {
- final ByteArrayOutputStream bos = new ByteArrayOutputStream();
- final ObjectOutputStream oos = new ObjectOutputStream(bos);
- oos.writeObject(chunk);
- BytesWritable bytesWritable = new BytesWritable(bos.toByteArray());
+ byte[] payload = ObjectFileSerialization.serializeChunk(chunk, size, serializationMode);
+ BytesWritable bytesWritable = new BytesWritable(payload);
writer.append(NullWritable.get(), bytesWritable);
} catch (IOException e) {
throw new UncheckedIOException("Writing or serialization failed.", e);
diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaObjectFileSource.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaObjectFileSource.java
index 4da3b00e8..85f3c123b 100644
--- a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaObjectFileSource.java
+++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaObjectFileSource.java
@@ -26,6 +26,8 @@
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.wayang.basic.channels.FileChannel;
+import org.apache.wayang.basic.operators.ObjectFileSerialization;
+import org.apache.wayang.basic.operators.ObjectFileSerializationMode;
import org.apache.wayang.basic.operators.ObjectFileSource;
import org.apache.wayang.core.api.exception.WayangException;
import org.apache.wayang.core.optimizer.OptimizationContext;
@@ -43,11 +45,9 @@
import org.apache.wayang.java.platform.JavaPlatform;
import org.apache.logging.log4j.LogManager;
-import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
@@ -93,7 +93,8 @@ public Tuple, Collection> eval
}
try {
final String actualInputPath = FileSystems.findActualSingleInputPath(path);
- sequenceFileIterator = new SequenceFileIterator<>(actualInputPath);
+ ObjectFileSerializationMode serializationMode = this.getSerializationMode();
+ sequenceFileIterator = new SequenceFileIterator<>(actualInputPath, serializationMode, this.getTypeClass());
Stream> sequenceFileStream =
StreamSupport.stream(Spliterators.spliteratorUnknownSize(sequenceFileIterator, 0), false);
((StreamChannel.Instance) outputs[0]).accept(sequenceFileStream);
@@ -136,38 +137,38 @@ private static class SequenceFileIterator implements Iterator, AutoCloseab
private Object[] nextElements;
- private ArrayList nextElements_cole;
-
private int nextIndex;
- SequenceFileIterator(String path) throws IOException {
+ private final ObjectFileSerializationMode serializationMode;
+
+ private final Class typeClass;
+
+ SequenceFileIterator(String path,
+ ObjectFileSerializationMode serializationMode,
+ Class typeClass) throws IOException {
final SequenceFile.Reader.Option fileOption = SequenceFile.Reader.file(new Path(path));
this.sequenceFileReader = new SequenceFile.Reader(new Configuration(true), fileOption);
Validate.isTrue(this.sequenceFileReader.getKeyClass().equals(NullWritable.class));
Validate.isTrue(this.sequenceFileReader.getValueClass().equals(BytesWritable.class));
+ this.serializationMode = serializationMode;
+ this.typeClass = typeClass;
this.tryAdvance();
}
private void tryAdvance() {
if (this.nextElements != null && ++this.nextIndex < this.nextElements.length) return;
- if (this.nextElements_cole != null && ++this.nextIndex < this.nextElements_cole.size()) return;
try {
if (!this.sequenceFileReader.next(this.nullWritable, this.bytesWritable)) {
this.nextElements = null;
return;
}
- Object tmp = new ObjectInputStream(new ByteArrayInputStream(this.bytesWritable.getBytes())).readObject();
- if(tmp instanceof Collection) {
- this.nextElements = null;
- this.nextElements_cole = (ArrayList) tmp;
- }else if(tmp instanceof Object[]){
- this.nextElements = (Object[]) tmp;
- this.nextElements_cole = null;
- }else {
- this.nextElements = new Object[1];
- this.nextElements[0] = tmp;
-
+ byte[] payload = Arrays.copyOf(this.bytesWritable.getBytes(), this.bytesWritable.getLength());
+ List chunk = ObjectFileSerialization.deserializeChunk(payload, this.serializationMode, this.typeClass);
+ if (chunk == null || chunk.isEmpty()) {
+ this.tryAdvance();
+ return;
}
+ this.nextElements = chunk.toArray();
this.nextIndex = 0;
} catch (IOException | ClassNotFoundException e) {
this.nextElements = null;
@@ -178,22 +179,14 @@ private void tryAdvance() {
@Override
public boolean hasNext() {
- return this.nextElements != null || this.nextElements_cole != null;
+ return this.nextElements != null;
}
@Override
public T next() {
Validate.isTrue(this.hasNext());
@SuppressWarnings("unchecked")
- final T result;
- if(this.nextElements_cole != null){
- result = (T) this.nextElements_cole.get(this.nextIndex);
- }else if (this.nextElements != null) {
- result = (T) this.nextElements[this.nextIndex];
- }else{
- result = null;
- }
-
+ final T result = (T) this.nextElements[this.nextIndex];
this.tryAdvance();
return result;
}
diff --git a/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaObjectFileSourceTest.java b/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaObjectFileSourceTest.java
index f3cd42f5b..df14677b9 100644
--- a/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaObjectFileSourceTest.java
+++ b/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaObjectFileSourceTest.java
@@ -48,6 +48,7 @@ void testReading() throws IOException {
final URL inputUrl = this.getClass().getResource("/0-to-10000.input");
JavaObjectFileSource source = new JavaObjectFileSource<>(
inputUrl.toString(), DataSetType.createDefault(Integer.class));
+ source.useLegacySerialization();
// Execute.
diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkObjectFileSink.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkObjectFileSink.java
index 1afc9b660..ed27bdcd7 100644
--- a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkObjectFileSink.java
+++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkObjectFileSink.java
@@ -18,7 +18,13 @@
package org.apache.wayang.spark.operators;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.spark.api.java.JavaPairRDD;
import org.apache.wayang.basic.channels.FileChannel;
+import org.apache.wayang.basic.operators.ObjectFileSerializationMode;
+import org.apache.wayang.basic.operators.ObjectFileSerialization;
import org.apache.wayang.basic.operators.ObjectFileSink;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
@@ -33,9 +39,14 @@
import org.apache.wayang.spark.execution.SparkExecutor;
import org.apache.wayang.spark.platform.SparkPlatform;
import org.apache.logging.log4j.LogManager;
+import scala.Tuple2;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
/**
@@ -75,10 +86,15 @@ public Tuple, Collection> eval
}
RddChannel.Instance input = (RddChannel.Instance) inputs[0];
-
- input.provideRdd()
- .coalesce(1) // TODO: Remove. This only hotfixes the issue that JavaObjectFileSource reads only a single file.
- .saveAsObjectFile(targetPath);
+ ObjectFileSerializationMode serializationMode = this.getSerializationMode();
+
+ final int chunkSize = 10;
+ JavaPairRDD serializedRdd = input.provideRdd()
+ .mapPartitionsToPair(iterator -> encodePartition(iterator, serializationMode, chunkSize));
+ serializedRdd.saveAsNewAPIHadoopFile(targetPath,
+ NullWritable.class,
+ BytesWritable.class,
+ SequenceFileOutputFormat.class);
LogManager.getLogger(this.getClass()).info("Writing dataset to {}.", targetPath);
return ExecutionOperator.modelEagerExecution(inputs, outputs, operatorContext);
@@ -109,4 +125,34 @@ public boolean containsAction() {
return true;
}
+ private static Tuple2 encodeBuffer(Object[] buffer,
+ int validLength,
+ ObjectFileSerializationMode mode) {
+ try {
+ byte[] payload = ObjectFileSerialization.serializeChunk(buffer, validLength, mode);
+
+ return new Tuple2<>(NullWritable.get(), new BytesWritable(payload));
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to serialize Spark object file chunk.", e);
+ }
+ }
+
+ private static Iterator> encodePartition(Iterator> iterator,
+ ObjectFileSerializationMode mode,
+ int chunkSize) {
+ List> serialized = new ArrayList<>();
+ Object[] buffer = new Object[chunkSize];
+ int index = 0;
+ while (iterator.hasNext()) {
+ buffer[index++] = iterator.next();
+ if (index >= buffer.length) {
+ serialized.add(encodeBuffer(buffer, index, mode));
+ index = 0;
+ }
+ }
+ if (index > 0) {
+ serialized.add(encodeBuffer(buffer, index, mode));
+ }
+ return serialized.iterator();
+ }
}
diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkObjectFileSource.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkObjectFileSource.java
index a084bcd44..836c4ca3c 100644
--- a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkObjectFileSource.java
+++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkObjectFileSource.java
@@ -18,8 +18,13 @@
package org.apache.wayang.spark.operators;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.wayang.basic.channels.FileChannel;
+import org.apache.wayang.basic.operators.ObjectFileSerialization;
+import org.apache.wayang.basic.operators.ObjectFileSerializationMode;
import org.apache.wayang.basic.operators.ObjectFileSource;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
@@ -37,6 +42,9 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -79,7 +87,18 @@ public Tuple, Collection> eval
RddChannel.Instance output = (RddChannel.Instance) outputs[0];
final String actualInputPath = FileSystems.findActualSingleInputPath(sourcePath);
- final JavaRDD rdd = sparkExecutor.sc.objectFile(actualInputPath);
+ final ObjectFileSerializationMode serializationMode = this.getSerializationMode();
+ final JavaPairRDD rawRdd =
+ sparkExecutor.sc.sequenceFile(actualInputPath, NullWritable.class, BytesWritable.class);
+ final Class typeClass = this.getTypeClass();
+ final JavaRDD rdd = rawRdd.flatMap(tuple -> {
+ byte[] payload = Arrays.copyOf(tuple._2.getBytes(), tuple._2.getLength());
+ try {
+ return ObjectFileSerialization.deserializeChunk(payload, serializationMode, typeClass).iterator();
+ } catch (IOException | ClassNotFoundException e) {
+ throw new UncheckedIOException(new IOException("Failed to deserialize Spark object file chunk.", e));
+ }
+ });
this.name(rdd);
output.accept(rdd, sparkExecutor);
diff --git a/wayang-platforms/wayang-spark/src/test/java/org/apache/wayang/spark/operators/SparkObjectFileSourceTest.java b/wayang-platforms/wayang-spark/src/test/java/org/apache/wayang/spark/operators/SparkObjectFileSourceTest.java
index 55c66d811..88f0d5b8d 100644
--- a/wayang-platforms/wayang-spark/src/test/java/org/apache/wayang/spark/operators/SparkObjectFileSourceTest.java
+++ b/wayang-platforms/wayang-spark/src/test/java/org/apache/wayang/spark/operators/SparkObjectFileSourceTest.java
@@ -49,6 +49,7 @@ void testWritingDoesNotFail() throws IOException {
final URL inputUrl = this.getClass().getResource("/0-to-10000.input");
SparkObjectFileSource source = new SparkObjectFileSource<>(
inputUrl.toString(), DataSetType.createDefault(Integer.class));
+ source.useLegacySerialization();
// Set up the ChannelInstances.
final ChannelInstance[] inputs = new ChannelInstance[]{};