Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions wayang-api/wayang-api-python/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,25 @@
<scope>compile</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skipTests>${python.worker.tests.skip}</skipTests>
</configuration>
</plugin>
</plugins>
</build>

<profiles>
<profile>
<id>python-worker-tests</id>
<properties>
<python.worker.tests.skip>false</python.worker.tests.skip>
</properties>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.wayang.basic.operators;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.CollectionType;
import org.apache.commons.lang3.Validate;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

/**
* Utility methods that convert between Java objects and the on-disk representation used by {@link ObjectFileSink}.
*/
public final class ObjectFileSerialization {

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);

static {
OBJECT_MAPPER.findAndRegisterModules();
}

private ObjectFileSerialization() {
}

/**
* Serialize a chunk of objects using the provided {@link ObjectFileSerializationMode}.
*
* @param chunk buffer that contains the objects to serialize
* @param validLength number of valid entries inside {@code chunk}
* @param mode the serialization mode
* @return serialized bytes
* @throws IOException if serialization fails
*/
public static byte[] serializeChunk(Object[] chunk, int validLength, ObjectFileSerializationMode mode) throws IOException {
Validate.notNull(mode, "Serialization mode must be provided.");
switch (mode) {
case JSON:
return serializeJson(chunk, validLength);
case LEGACY_JAVA_SERIALIZATION:
return serializeLegacy(chunk, validLength);
default:
throw new IllegalArgumentException("Unknown serialization mode: " + mode);
}
}

/**
* Deserialize a chunk of objects.
*
* @param payload the serialized data
* @param mode the serialization mode
* @param elementType the expected element type
* @return list of deserialized objects (never {@code null})
* @throws IOException if deserialization fails
* @throws ClassNotFoundException if a class cannot be resolved in legacy mode
*/
public static List<Object> deserializeChunk(byte[] payload,
ObjectFileSerializationMode mode,
Class<?> elementType) throws IOException, ClassNotFoundException {
Validate.notNull(mode, "Serialization mode must be provided.");
switch (mode) {
case JSON:
return deserializeJson(payload, elementType);
case LEGACY_JAVA_SERIALIZATION:
return deserializeLegacy(payload);
default:
throw new IllegalArgumentException("Unknown serialization mode: " + mode);
}
}

private static byte[] serializeLegacy(Object[] chunk, int validLength) throws IOException {
Object[] payload = Arrays.copyOf(chunk, validLength);
try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos)) {
oos.writeObject(payload);
oos.flush();
return bos.toByteArray();
}
}

private static List<Object> deserializeLegacy(byte[] payload) throws IOException, ClassNotFoundException {
try (ByteArrayInputStream bis = new ByteArrayInputStream(payload);
ObjectInputStream ois = new ObjectInputStream(bis)) {
Object tmp = ois.readObject();
if (tmp == null) {
return Collections.emptyList();
}
if (tmp instanceof Collection) {
return new ArrayList<>((Collection<?>) tmp);
}
if (tmp.getClass().isArray()) {
return Arrays.asList((Object[]) tmp);
}
return new ArrayList<>(Collections.singletonList(tmp));
}
}

private static byte[] serializeJson(Object[] chunk, int validLength) throws IOException {
Object[] payload = Arrays.copyOf(chunk, validLength);
return OBJECT_MAPPER.writeValueAsBytes(payload);
}

private static List<Object> deserializeJson(byte[] payload, Class<?> elementType) throws IOException {
Validate.notNull(elementType, "Element type must be provided for JSON deserialization.");
CollectionType type = OBJECT_MAPPER.getTypeFactory()
.constructCollectionType(List.class, elementType);
List<?> list = OBJECT_MAPPER.readValue(payload, type);
if (list == null) {
return Collections.emptyList();
}
return new ArrayList<>(list);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.wayang.basic.operators;

/**
* Supported serialization modes for {@link ObjectFileSource} and {@link ObjectFileSink}.
*/
public enum ObjectFileSerializationMode {

/**
* Legacy Java serialization using {@link java.io.ObjectOutputStream}/{@link java.io.ObjectInputStream}.
* This mode is deprecated and will be removed in a future release.
*/
@Deprecated
LEGACY_JAVA_SERIALIZATION,

/**
* JSON-based serialization that avoids Java serialization gadget chains.
*/
JSON
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
package org.apache.wayang.basic.operators;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.wayang.core.function.TransformationDescriptor;
import org.apache.wayang.core.optimizer.costs.DefaultLoadEstimator;
import org.apache.wayang.core.optimizer.costs.NestableLoadProfileEstimator;
Expand All @@ -36,6 +39,12 @@ public class ObjectFileSink<T> extends UnarySink<T> {

protected final Class<T> tClass;

private static final AtomicBoolean LEGACY_WARNING_EMITTED = new AtomicBoolean(false);

private final Logger logger = LogManager.getLogger(this.getClass());

private ObjectFileSerializationMode serializationMode = ObjectFileSerializationMode.JSON;

/**
* Creates a new instance.
*
Expand Down Expand Up @@ -69,5 +78,35 @@ public ObjectFileSink(ObjectFileSink<T> that) {
super(that);
this.textFileUrl = that.textFileUrl;
this.tClass = that.tClass;
this.serializationMode = that.getSerializationMode();
}

public ObjectFileSerializationMode getSerializationMode() {
if (this.serializationMode == ObjectFileSerializationMode.LEGACY_JAVA_SERIALIZATION
&& LEGACY_WARNING_EMITTED.compareAndSet(false, true)) {
this.logger.warn("ObjectFileSink is using deprecated legacy Java serialization. "
+ "Please switch to the JSON serialization mode via ObjectFileSink#useJsonSerialization().");
}
return this.serializationMode;
}

public ObjectFileSink<T> withSerializationMode(ObjectFileSerializationMode serializationMode) {
this.serializationMode = Objects.requireNonNull(serializationMode, "serializationMode");
return this;
}

/**
* Configure this sink to use the deprecated legacy Java serialization.
*/
@Deprecated
public ObjectFileSink<T> useLegacySerialization() {
return this.withSerializationMode(ObjectFileSerializationMode.LEGACY_JAVA_SERIALIZATION);
}

/**
* Configure this sink to use the JSON-based serialization.
*/
public ObjectFileSink<T> useJsonSerialization() {
return this.withSerializationMode(ObjectFileSerializationMode.JSON);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

package org.apache.wayang.basic.operators;

import java.util.Objects;
import java.util.Optional;
import java.util.OptionalDouble;
import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.Validate;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -43,6 +45,10 @@ public class ObjectFileSource<T> extends UnarySource<T> {

private final Class<T> tClass;

private static final AtomicBoolean LEGACY_WARNING_EMITTED = new AtomicBoolean(false);

private ObjectFileSerializationMode serializationMode = ObjectFileSerializationMode.JSON;

public ObjectFileSource(String inputUrl, DataSetType<T> type) {
super(type);
this.inputUrl = inputUrl;
Expand All @@ -64,6 +70,7 @@ public ObjectFileSource(ObjectFileSource that) {
super(that);
this.inputUrl = that.getInputUrl();
this.tClass = that.getTypeClass();
this.serializationMode = that.getSerializationMode();
}

public String getInputUrl() {
Expand All @@ -74,6 +81,35 @@ public Class<T> getTypeClass(){
return this.tClass;
}

public ObjectFileSerializationMode getSerializationMode() {
if (this.serializationMode == ObjectFileSerializationMode.LEGACY_JAVA_SERIALIZATION
&& LEGACY_WARNING_EMITTED.compareAndSet(false, true)) {
this.logger.warn("ObjectFileSource is using deprecated legacy Java serialization. "
+ "Please switch to the JSON serialization mode via ObjectFileSource#useJsonSerialization().");
}
return this.serializationMode;
}

public ObjectFileSource<T> withSerializationMode(ObjectFileSerializationMode serializationMode) {
this.serializationMode = Objects.requireNonNull(serializationMode, "serializationMode");
return this;
}

/**
* Configure this source to use the deprecated legacy Java serialization.
*/
@Deprecated
public ObjectFileSource<T> useLegacySerialization() {
return this.withSerializationMode(ObjectFileSerializationMode.LEGACY_JAVA_SERIALIZATION);
}

/**
* Configure this source to use the JSON-based serialization.
*/
public ObjectFileSource<T> useJsonSerialization() {
return this.withSerializationMode(ObjectFileSerializationMode.JSON);
}

@Override
public Optional<org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator> createCardinalityEstimator(
final int outputIndex,
Expand Down
Loading
Loading