Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@

package org.apache.wayang.api.sql.calcite.converter;

import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rex.RexNode;

import org.apache.wayang.api.sql.calcite.converter.functions.ProjectMapFuncImpl;
import org.apache.wayang.api.sql.calcite.rel.WayangProject;
import org.apache.wayang.basic.data.Record;
import org.apache.wayang.basic.function.ProjectionDescriptor;
import org.apache.wayang.basic.operators.MapOperator;
import org.apache.wayang.core.plan.wayangplan.Operator;
import org.apache.wayang.core.types.BasicDataUnitType;

import java.util.List;

Expand All @@ -39,14 +40,15 @@ public class WayangProjectVisitor extends WayangRelNodeVisitor<WayangProject> {
Operator visit(final WayangProject wayangRelNode) {
final Operator childOp = wayangRelConverter.convert(wayangRelNode.getInput(0));

/* Quick check */
final List<RexNode> projects = ((Project) wayangRelNode).getProjects();
final List<RexNode> projects = wayangRelNode.getProjects();

// TODO: create a map with specific dataset type
final MapOperator<Record, Record> projection = new MapOperator<>(
final ProjectionDescriptor<Record, Record> projectionDescriptor = new ProjectionDescriptor<>(
new ProjectMapFuncImpl(projects),
Record.class,
Record.class);
wayangRelNode.getRowType().getFieldNames(),
BasicDataUnitType.createBasic(Record.class),
BasicDataUnitType.createBasic(Record.class));

final MapOperator<Record, Record> projection = new MapOperator<>(projectionDescriptor);

childOp.connectTo(0, projection, 0);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@
import org.apache.wayang.api.sql.sources.fs.JavaCSVTableSource;
import org.apache.wayang.core.plan.wayangplan.Operator;
import org.apache.wayang.core.types.DataSetType;
import org.apache.wayang.jdbc.operators.JdbcTableSource;
import org.apache.wayang.jdbc.platform.JdbcPlatformTemplate;
import org.apache.wayang.postgres.operators.PostgresTableSource;
import org.apache.wayang.basic.data.Record;
import org.apache.wayang.basic.operators.TableSource;

import java.util.List;
import java.util.stream.Collectors;
Expand All @@ -49,9 +52,7 @@ Operator visit(final WayangTableScan wayangRelNode) {

if (tableSource.equals("postgres")) {
return new PostgresTableSource(tableName, columnNames.toArray(String[]::new));
}

if (tableSource.equals("fs")) {
} else if (tableSource.equals("fs")) {
final ModelParser modelParser;
try {
modelParser = this.wayangRelConverter.getConfiguration() == null
Expand All @@ -72,7 +73,18 @@ Operator visit(final WayangTableScan wayangRelNode) {
final char separator = modelParser.getSchemaDelimiter(tableSource);

return new JavaCSVTableSource<>(url, DataSetType.createDefault(Record.class), fieldTypes, separator);
} else if (wayangRelNode.getTable().getQualifiedName().size() == 1) {
// we assume that it is coming from a test environement or in memory db.

return new JdbcTableSource(wayangRelNode.getTable().getQualifiedName().get(0), wayangRelNode.getRowType().getFieldNames().toArray(String[]::new)) {

@Override
public JdbcPlatformTemplate getPlatform() {
throw new UnsupportedOperationException("Unimplemented method 'getPlatform'");
}
};
} else
throw new RuntimeException("Source not supported");
throw new RuntimeException(
"Source not supported, got: " + tableSource + ", expected either postgres or filesystem (fs).");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,14 @@ public default Node fromRexNode(final RexNode node) {
* @return a serializable function of +, -, * or /
* @throws UnsupportedOperationException on unrecognized {@link SqlKind}
*/
public SerializableFunction<List<Object>, Object> deriveOperation(SqlKind kind);
public SerializableFunction<List<Object>, Object> deriveOperation(final SqlKind kind);
}

interface Node extends Serializable {
public Object evaluate(final Record rec);
}

class Call implements Node {
final class Call implements Node {
private final List<Node> operands;
final SerializableFunction<List<Object>, Object> operation;

Expand All @@ -83,7 +83,7 @@ public Object evaluate(final Record rec) {
}
}

class Literal implements Node {
final class Literal implements Node {
final Serializable value;

Literal(final RexLiteral literal) {
Expand All @@ -109,7 +109,7 @@ public Object evaluate(final Record rec) {
}
}

class InputRef implements Node {
final class InputRef implements Node {
private final int key;

InputRef(final RexInputRef inputRef) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,11 +211,11 @@ public RelNode optimize(RelNode node, RelTraitSet requiredTraitSet, RuleSet rule
);
}

public WayangPlan convert(RelNode relNode) {
public static WayangPlan convert(RelNode relNode) {
return convert(relNode, new ArrayList<>());
}

public WayangPlan convert(RelNode relNode, Collection<Record> collector) {
public static WayangPlan convert(RelNode relNode, Collection<Record> collector) {

LocalCallbackSink<Record> sink = LocalCallbackSink.createCollectingSink(collector, Record.class);

Expand All @@ -225,8 +225,7 @@ public WayangPlan convert(RelNode relNode, Collection<Record> collector) {
return new WayangPlan(sink);
}

public WayangPlan convertWithConfig(RelNode relNode, Configuration configuration, Collection<Record> collector) {

public static WayangPlan convertWithConfig(RelNode relNode, Configuration configuration, Collection<Record> collector) {
LocalCallbackSink<Record> sink = LocalCallbackSink.createCollectingSink(collector, Record.class);

Operator op = new WayangRelConverter(configuration).convert(relNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@
import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.schema.Table;
import org.apache.wayang.api.sql.calcite.convention.WayangConvention;
import org.apache.wayang.api.sql.calcite.utils.ModelParser;

import java.util.List;

public class WayangTableScan extends TableScan implements WayangRel {

//TODO: fields are never queried, why?
private final int[] fields;

public WayangTableScan(RelOptCluster cluster,
Expand Down Expand Up @@ -83,11 +83,15 @@ public String toString() {
}

public String getQualifiedName() {
return table.getQualifiedName().get(1);
return table.getQualifiedName().size() == 1
? table.getQualifiedName().get(0)
: table.getQualifiedName().get(1);
}

public String getTableName() {
return table.getQualifiedName().get(1);
return table.getQualifiedName().size() == 1
? table.getQualifiedName().get(0)
: table.getQualifiedName().get(1);
}

public List<String> getColumnNames() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public static void main(final String[] args) throws Exception {
PrintUtils.print("After translating logical intermediate plan", wayangRel);

final Collection<Record> collector = new ArrayList<>();
final WayangPlan wayangPlan = optimizer.convertWithConfig(wayangRel, configuration, collector);
final WayangPlan wayangPlan = Optimizer.convertWithConfig(wayangRel, configuration, collector);
collector.add(new Record(wayangRel.getRowType().getFieldNames().toArray()));
context.execute(getJobName(), wayangPlan);

Expand All @@ -182,7 +182,6 @@ public static void main(final String[] args) throws Exception {
}

public Collection<Record> executeSql(final String sql) throws SqlParseException {

final Properties configProperties = Optimizer.ConfigProperties.getDefaults();
final RelDataTypeFactory relDataTypeFactory = new JavaTypeFactoryImpl();

Expand Down Expand Up @@ -216,7 +215,7 @@ public Collection<Record> executeSql(final String sql) throws SqlParseException
PrintUtils.print("After translating logical intermediate plan", wayangRel);

final Collection<Record> collector = new ArrayList<>();
final WayangPlan wayangPlan = optimizer.convert(wayangRel, collector);
final WayangPlan wayangPlan = Optimizer.convert(wayangRel, collector);

this.execute(getJobName(), wayangPlan);

Expand Down
Loading
Loading