diff --git a/plugin/trino-hudi/pom.xml b/plugin/trino-hudi/pom.xml
index d9bfda845beb..04fcbbb94d7f 100644
--- a/plugin/trino-hudi/pom.xml
+++ b/plugin/trino-hudi/pom.xml
@@ -132,88 +132,32 @@
org.apache.hudi
- hudi-common
+ hudi-trino-bundle
${dep.hudi.version}
- org.apache.hbase
- hbase-server
+ com.google.protobuf
+ protobuf-java
- org.apache.hbase
- hbase-client
+ commons-lang
+ commons-lang
- org.osgi
- org.osgi.core
-
-
- org.apache.orc
- orc-core
-
-
- com.fasterxml.jackson.core
- jackson-annotations
-
-
- com.fasterxml.jackson.core
- jackson-databind
-
-
- org.apache.httpcomponents
- httpclient
-
-
- org.apache.httpcomponents
- fluent-hc
-
-
- org.rocksdb
- rocksdbjni
-
-
- com.esotericsoftware
- kryo-shaded
-
-
- org.apache.hadoop
- hadoop-client
-
-
- org.apache.hadoop
- hadoop-hdfs
-
-
- org.apache.httpcomponents
- httpcore
-
-
- org.apache.hive
- hive-exec
-
-
- org.apache.hive
- hive-jdbc
+ org.apache.hudi
+ hudi-common
- com.github.ben-manes.caffeine
- caffeine
+ org.apache.hudi
+ hudi-hadoop-mr-bundle
- org.lz4
- lz4-java
+ org.apache.parquet
+ parquet-avro
-
-
-
-
- org.apache.hudi
- hudi-hadoop-mr
- ${dep.hudi.version}
-
- *
- *
+ org.apache.avro
+ avro
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/ForHudiBackgroundSplitLoader.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/ForHudiBackgroundSplitLoader.java
new file mode 100644
index 000000000000..edd5189d0431
--- /dev/null
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/ForHudiBackgroundSplitLoader.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.hudi;
+
+import javax.inject.Qualifier;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+@Retention(RUNTIME)
+@Target({FIELD, PARAMETER, METHOD})
+@Qualifier
+public @interface ForHudiBackgroundSplitLoader
+{
+}
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/ForHudiSplitSource.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/ForHudiSplitSource.java
new file mode 100644
index 000000000000..608c25554b55
--- /dev/null
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/ForHudiSplitSource.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.hudi;
+
+import javax.inject.Qualifier;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+@Retention(RUNTIME)
+@Target({FIELD, PARAMETER, METHOD})
+@Qualifier
+public @interface ForHudiSplitSource
+{
+}
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConfig.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConfig.java
index ea323818bdf6..39d396be4ed7 100644
--- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConfig.java
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConfig.java
@@ -17,11 +17,11 @@
import com.google.common.collect.ImmutableList;
import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
+import io.airlift.configuration.LegacyConfig;
import io.airlift.units.DataSize;
import javax.validation.constraints.DecimalMax;
import javax.validation.constraints.DecimalMin;
-import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
@@ -39,13 +39,15 @@ public class HudiConfig
private List columnsToHide = ImmutableList.of();
private boolean metadataEnabled;
private boolean shouldUseParquetColumnNames = true;
- private int minPartitionBatchSize = 10;
- private int maxPartitionBatchSize = 100;
private boolean sizeBasedSplitWeightsEnabled = true;
private DataSize standardSplitWeightSize = DataSize.of(128, MEGABYTE);
private double minimumAssignedSplitWeight = 0.05;
private int maxSplitsPerSecond = Integer.MAX_VALUE;
private int maxOutstandingSplits = 1000;
+ private int splitLoaderParallelism = 2;
+ private int splitGeneratorParallelism = 16;
+ private long perTransactionMetastoreCacheMaximumSize = 2000;
+ private String fileSystemViewSpillableDirectory = "/tmp";
public List getColumnsToHide()
{
@@ -90,36 +92,6 @@ public boolean getUseParquetColumnNames()
return this.shouldUseParquetColumnNames;
}
- @Config("hudi.min-partition-batch-size")
- @ConfigDescription("Minimum number of partitions returned in a single batch.")
- public HudiConfig setMinPartitionBatchSize(int minPartitionBatchSize)
- {
- this.minPartitionBatchSize = minPartitionBatchSize;
- return this;
- }
-
- @Min(1)
- @Max(100)
- public int getMinPartitionBatchSize()
- {
- return minPartitionBatchSize;
- }
-
- @Config("hudi.max-partition-batch-size")
- @ConfigDescription("Maximum number of partitions returned in a single batch.")
- public HudiConfig setMaxPartitionBatchSize(int maxPartitionBatchSize)
- {
- this.maxPartitionBatchSize = maxPartitionBatchSize;
- return this;
- }
-
- @Min(1)
- @Max(1000)
- public int getMaxPartitionBatchSize()
- {
- return maxPartitionBatchSize;
- }
-
@Config("hudi.size-based-split-weights-enabled")
@ConfigDescription("Unlike uniform splitting, size-based splitting ensures that each batch of splits has enough data to process. " +
"By default, it is enabled to improve performance.")
@@ -191,4 +163,60 @@ public HudiConfig setMaxOutstandingSplits(int maxOutstandingSplits)
this.maxOutstandingSplits = maxOutstandingSplits;
return this;
}
+
+ @Min(1)
+ public int getSplitGeneratorParallelism()
+ {
+ return splitGeneratorParallelism;
+ }
+
+ @Config("hudi.split-generator-parallelism")
+ @ConfigDescription("Number of threads to generate splits from partitions.")
+ public HudiConfig setSplitGeneratorParallelism(int splitGeneratorParallelism)
+ {
+ this.splitGeneratorParallelism = splitGeneratorParallelism;
+ return this;
+ }
+
+ @Min(1)
+ public int getSplitLoaderParallelism()
+ {
+ return splitLoaderParallelism;
+ }
+
+ @Config("hudi.split-loader-parallelism")
+ @ConfigDescription("Number of threads to run background split loader. "
+ + "A single background split loader is needed per query.")
+ public HudiConfig setSplitLoaderParallelism(int splitLoaderParallelism)
+ {
+ this.splitLoaderParallelism = splitLoaderParallelism;
+ return this;
+ }
+
+ @Min(1)
+ public long getPerTransactionMetastoreCacheMaximumSize()
+ {
+ return perTransactionMetastoreCacheMaximumSize;
+ }
+
+ @LegacyConfig("hive.per-transaction-metastore-cache-maximum-size")
+ @Config("hudi.per-transaction-metastore-cache-maximum-size")
+ public HudiConfig setPerTransactionMetastoreCacheMaximumSize(long perTransactionMetastoreCacheMaximumSize)
+ {
+ this.perTransactionMetastoreCacheMaximumSize = perTransactionMetastoreCacheMaximumSize;
+ return this;
+ }
+
+ public String getFileSystemViewSpillableDirectory()
+ {
+ return fileSystemViewSpillableDirectory;
+ }
+
+ @Config("huhi.fs-view-spillable-dir")
+ @ConfigDescription("Path on storage to use when the file system view is held in a spillable map.")
+ public HudiConfig setFileSystemViewSpillableDirectory(String fileSystemViewSpillableDirectory)
+ {
+ this.fileSystemViewSpillableDirectory = fileSystemViewSpillableDirectory;
+ return this;
+ }
}
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiErrorCode.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiErrorCode.java
index 4d5686823665..552216d30adb 100644
--- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiErrorCode.java
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiErrorCode.java
@@ -28,7 +28,8 @@ public enum HudiErrorCode
HUDI_MISSING_DATA(3, EXTERNAL),
HUDI_CANNOT_OPEN_SPLIT(4, EXTERNAL),
HUDI_UNSUPPORTED_FILE_FORMAT(5, EXTERNAL),
- HUDI_CURSOR_ERROR(6, EXTERNAL);
+ HUDI_CURSOR_ERROR(6, EXTERNAL),
+ HUDI_NO_VALID_COMMIT(7, EXTERNAL);
private final ErrorCode errorCode;
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiFile.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiFile.java
new file mode 100644
index 000000000000..f6f96686332f
--- /dev/null
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiFile.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.hudi;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+public class HudiFile
+{
+ private final String path;
+ private final long start;
+ private final long length;
+ private final long fileSize;
+ private final long fileModifiedTime;
+
+ @JsonCreator
+ public HudiFile(
+ @JsonProperty("path") String path,
+ @JsonProperty("start") long start,
+ @JsonProperty("length") long length,
+ @JsonProperty("fileSize") long fileSize,
+ @JsonProperty("fileModifiedTime") long fileModifiedTime)
+ {
+ checkArgument(start >= 0, "start must be positive");
+ checkArgument(length >= 0, "length must be positive");
+ checkArgument(start + length <= fileSize, "fileSize must be at least start + length");
+
+ this.path = requireNonNull(path, "path is null");
+ this.start = start;
+ this.length = length;
+ this.fileSize = fileSize;
+ this.fileModifiedTime = fileModifiedTime;
+ }
+
+ @JsonProperty
+ public String getPath()
+ {
+ return path;
+ }
+
+ @JsonProperty
+ public long getStart()
+ {
+ return start;
+ }
+
+ @JsonProperty
+ public long getLength()
+ {
+ return length;
+ }
+
+ @JsonProperty
+ public long getFileSize()
+ {
+ return fileSize;
+ }
+
+ @JsonProperty
+ public long getFileModifiedTime()
+ {
+ return fileModifiedTime;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ HudiFile hudiFile = (HudiFile) o;
+ return start == hudiFile.start &&
+ length == hudiFile.length &&
+ fileSize == hudiFile.fileSize &&
+ fileModifiedTime == hudiFile.fileModifiedTime &&
+ Objects.equals(path, hudiFile.path);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(path, start, length, fileSize, fileModifiedTime);
+ }
+
+ @Override
+ public String toString()
+ {
+ return toStringHelper(this)
+ .addValue(path)
+ .addValue(start)
+ .addValue(length)
+ .addValue(fileSize)
+ .addValue(fileModifiedTime)
+ .toString();
+ }
+}
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiFileSkippingManager.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiFileSkippingManager.java
new file mode 100644
index 000000000000..1ac881fa1fe7
--- /dev/null
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiFileSkippingManager.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.hudi;
+
+import com.google.common.collect.ImmutableList;
+import io.airlift.log.Logger;
+import io.airlift.slice.Slice;
+import io.airlift.slice.Slices;
+import io.trino.parquet.predicate.TupleDomainParquetPredicate;
+import io.trino.plugin.hive.HiveColumnHandle;
+import io.trino.spi.connector.ColumnHandle;
+import io.trino.spi.predicate.Domain;
+import io.trino.spi.predicate.Range;
+import io.trino.spi.predicate.TupleDomain;
+import io.trino.spi.predicate.ValueSet;
+import io.trino.spi.type.Type;
+import io.trino.spi.type.VarcharType;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.SyncableFileSystemView;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.hash.ColumnIndexID;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static io.trino.parquet.predicate.PredicateUtils.isStatisticsOverflow;
+import static io.trino.spi.type.BigintType.BIGINT;
+import static io.trino.spi.type.BooleanType.BOOLEAN;
+import static io.trino.spi.type.DateType.DATE;
+import static io.trino.spi.type.DoubleType.DOUBLE;
+import static io.trino.spi.type.IntegerType.INTEGER;
+import static io.trino.spi.type.RealType.REAL;
+import static io.trino.spi.type.SmallintType.SMALLINT;
+import static io.trino.spi.type.TinyintType.TINYINT;
+import static java.lang.Float.floatToRawIntBits;
+import static java.util.Objects.requireNonNull;
+
+public class HudiFileSkippingManager
+{
+ private static final Logger log = Logger.get(HudiFileSkippingManager.class);
+
+ private final Optional specifiedQueryInstant;
+ private final HoodieTableMetaClient metaClient;
+ private final HoodieTableMetadata metadataTable;
+
+ private final Map> allInputFileSlices;
+
+ public HudiFileSkippingManager(
+ List partitions,
+ String spillableDir,
+ HoodieEngineContext engineContext,
+ HoodieTableMetaClient metaClient,
+ Optional specifiedQueryInstant)
+ {
+ requireNonNull(partitions, "partitions is null");
+ requireNonNull(spillableDir, "spillableDir is null");
+ requireNonNull(engineContext, "engineContext is null");
+ this.specifiedQueryInstant = requireNonNull(specifiedQueryInstant, "specifiedQueryInstant is null");
+ this.metaClient = requireNonNull(metaClient, "metaClient is null");
+
+ HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().enable(true).build();
+ this.metadataTable = HoodieTableMetadata.create(engineContext, metadataConfig, metaClient.getBasePathV2().toString(), spillableDir, true);
+ this.allInputFileSlices = prepareAllInputFileSlices(partitions, engineContext, metadataConfig, spillableDir);
+ }
+
+ private Map> prepareAllInputFileSlices(
+ List partitions,
+ HoodieEngineContext engineContext,
+ HoodieMetadataConfig metadataConfig,
+ String spillableDir)
+ {
+ long startTime = System.currentTimeMillis();
+ // build system view.
+ SyncableFileSystemView fileSystemView = FileSystemViewManager
+ .createViewManager(engineContext,
+ metadataConfig,
+ FileSystemViewStorageConfig.newBuilder().withBaseStoreDir(spillableDir).build(),
+ HoodieCommonConfig.newBuilder().build(),
+ () -> metadataTable)
+ .getFileSystemView(metaClient);
+ Optional queryInstant = specifiedQueryInstant.isPresent() ?
+ specifiedQueryInstant : metaClient.getActiveTimeline().lastInstant().toJavaOptional().map(HoodieInstant::getTimestamp);
+
+ Map> allInputFileSlices = engineContext
+ .mapToPair(
+ partitions,
+ partitionPath -> Pair.of(
+ partitionPath,
+ getLatestFileSlices(partitionPath, fileSystemView, queryInstant)),
+ partitions.size());
+
+ long duration = System.currentTimeMillis() - startTime;
+ log.debug("prepare query files for table %s, spent: %d ms", metaClient.getTableConfig().getTableName(), duration);
+ return allInputFileSlices;
+ }
+
+ private List getLatestFileSlices(
+ String partitionPath,
+ SyncableFileSystemView fileSystemView,
+ Optional queryInstant)
+ {
+ return queryInstant
+ .map(instant ->
+ fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionPath, queryInstant.get()))
+ .orElse(fileSystemView.getLatestFileSlices(partitionPath))
+ .collect(Collectors.toList());
+ }
+
+ public Map> listQueryFiles(TupleDomain tupleDomain)
+ {
+ // do file skipping by MetadataTable
+ Map> candidateFileSlices = allInputFileSlices;
+ try {
+ if (!tupleDomain.isAll()) {
+ candidateFileSlices = lookupCandidateFilesInMetadataTable(candidateFileSlices, tupleDomain);
+ }
+ }
+ catch (Exception e) {
+ // Should not throw exception, just log this Exception.
+ log.warn(e, "failed to do data skipping for table: %s, fallback to all files scan", metaClient.getBasePathV2());
+ }
+ if (log.isDebugEnabled()) {
+ int candidateFileSize = candidateFileSlices.values().stream().mapToInt(List::size).sum();
+ int totalFiles = allInputFileSlices.values().stream().mapToInt(List::size).sum();
+ double skippingPercent = totalFiles == 0 ? 0.0d : (totalFiles - candidateFileSize) / (totalFiles + 0.0d);
+ log.debug("Total files: %s; candidate files after data skipping: %s; skipping percent %s",
+ totalFiles,
+ candidateFileSize,
+ skippingPercent);
+ }
+ return candidateFileSlices;
+ }
+
+ private Map> lookupCandidateFilesInMetadataTable(
+ Map> inputFileSlices,
+ TupleDomain tupleDomain)
+ {
+ // split regular column predicates
+ TupleDomain regularTupleDomain = HudiPredicates.from(tupleDomain).getRegularColumnPredicates();
+ TupleDomain regularColumnPredicates = regularTupleDomain.transformKeys(HiveColumnHandle::getName);
+ if (regularColumnPredicates.isAll() || regularColumnPredicates.getDomains().isEmpty()) {
+ return inputFileSlices;
+ }
+ List regularColumns = regularColumnPredicates
+ .getDomains().get().keySet().stream().collect(Collectors.toList());
+ // get filter columns
+ List encodedTargetColumnNames = regularColumns
+ .stream()
+ .map(col -> new ColumnIndexID(col).asBase64EncodedString()).collect(Collectors.toList());
+ Map> statsByFileName = metadataTable.getRecordsByKeyPrefixes(
+ encodedTargetColumnNames,
+ HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, true)
+ .collectAsList()
+ .stream()
+ .filter(f -> f.getData().getColumnStatMetadata().isPresent())
+ .map(f -> f.getData().getColumnStatMetadata().get())
+ .collect(Collectors.groupingBy(HoodieMetadataColumnStats::getFileName));
+
+ // prune files.
+ return inputFileSlices
+ .entrySet()
+ .stream()
+ .collect(Collectors
+ .toMap(Map.Entry::getKey, entry -> entry
+ .getValue()
+ .stream()
+ .filter(fileSlice -> pruneFiles(fileSlice, statsByFileName, regularColumnPredicates, regularColumns))
+ .collect(Collectors.toList())));
+ }
+
+ private boolean pruneFiles(
+ FileSlice fileSlice,
+ Map> statsByFileName,
+ TupleDomain regularColumnPredicates,
+ List regularColumns)
+ {
+ String fileSliceName = fileSlice.getBaseFile().map(BaseFile::getFileName).orElse("");
+ // no stats found
+ if (!statsByFileName.containsKey(fileSliceName)) {
+ return true;
+ }
+ List stats = statsByFileName.get(fileSliceName);
+ return evaluateStatisticPredicate(regularColumnPredicates, stats, regularColumns);
+ }
+
+ private boolean evaluateStatisticPredicate(
+ TupleDomain regularColumnPredicates,
+ List stats,
+ List regularColumns)
+ {
+ if (regularColumnPredicates.isNone() || regularColumnPredicates.getDomains().isEmpty()) {
+ return true;
+ }
+ for (String regularColumn : regularColumns) {
+ Domain columnPredicate = regularColumnPredicates.getDomains().get().get(regularColumn);
+ Optional currentColumnStats = stats
+ .stream().filter(s -> s.getColumnName().equals(regularColumn)).findFirst();
+ if (currentColumnStats.isEmpty()) {
+ // no stats for column
+ }
+ else {
+ Domain domain = getDomain(regularColumn, columnPredicate.getType(), currentColumnStats.get());
+ if (columnPredicate.intersect(domain).isNone()) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ private static Domain getDomain(String colName, Type type, HoodieMetadataColumnStats statistics)
+ {
+ if (statistics == null) {
+ return Domain.all(type);
+ }
+ boolean hasNullValue = statistics.getNullCount() != 0L;
+ boolean hasNonNullValue = statistics.getValueCount() - statistics.getNullCount() > 0;
+ if (!hasNonNullValue || statistics.getMaxValue() == null || statistics.getMinValue() == null) {
+ return Domain.create(ValueSet.all(type), hasNullValue);
+ }
+ if (!(statistics.getMinValue() instanceof GenericRecord) ||
+ !(statistics.getMaxValue() instanceof GenericRecord)) {
+ return Domain.all(type);
+ }
+ return getDomain(
+ colName,
+ type,
+ ((GenericRecord) statistics.getMinValue()).get(0),
+ ((GenericRecord) statistics.getMaxValue()).get(0),
+ hasNullValue);
+ }
+
+ /**
+ * Get a domain for the ranges defined by each pair of elements from {@code minimums} and {@code maximums}.
+ * Both arrays must have the same length.
+ */
+ private static Domain getDomain(String colName, Type type, Object minimum, Object maximum, boolean hasNullValue)
+ {
+ try {
+ if (type.equals(BOOLEAN)) {
+ boolean hasTrueValue = (boolean) minimum || (boolean) maximum;
+ boolean hasFalseValue = !(boolean) minimum || !(boolean) maximum;
+ if (hasTrueValue && hasFalseValue) {
+ return Domain.all(type);
+ }
+ if (hasTrueValue) {
+ return Domain.create(ValueSet.of(type, true), hasNullValue);
+ }
+ if (hasFalseValue) {
+ return Domain.create(ValueSet.of(type, false), hasNullValue);
+ }
+ // No other case, since all null case is handled earlier.
+ }
+
+ if ((type.equals(BIGINT) || type.equals(TINYINT) || type.equals(SMALLINT) || type.equals(INTEGER) || type.equals(DATE))) {
+ long minValue = TupleDomainParquetPredicate.asLong(minimum);
+ long maxValue = TupleDomainParquetPredicate.asLong(maximum);
+ if (isStatisticsOverflow(type, minValue, maxValue)) {
+ return Domain.create(ValueSet.all(type), hasNullValue);
+ }
+ return ofMinMax(type, minValue, maxValue, hasNullValue);
+ }
+
+ if (type.equals(REAL)) {
+ Float minValue = (Float) minimum;
+ Float maxValue = (Float) maximum;
+ if (minValue.isNaN() || maxValue.isNaN()) {
+ return Domain.create(ValueSet.all(type), hasNullValue);
+ }
+ return ofMinMax(type, (long) floatToRawIntBits(minValue), (long) floatToRawIntBits(maxValue), hasNullValue);
+ }
+
+ if (type.equals(DOUBLE)) {
+ Double minValue = (Double) minimum;
+ Double maxValue = (Double) maximum;
+ if (minValue.isNaN() || maxValue.isNaN()) {
+ return Domain.create(ValueSet.all(type), hasNullValue);
+ }
+ return ofMinMax(type, minValue, maxValue, hasNullValue);
+ }
+
+ if (type instanceof VarcharType) {
+ Slice min = Slices.utf8Slice((String) minimum);
+ Slice max = Slices.utf8Slice((String) maximum);
+ return ofMinMax(type, min, max, hasNullValue);
+ }
+ return Domain.create(ValueSet.all(type), hasNullValue);
+ }
+ catch (Exception e) {
+ log.warn("failed to create Domain for column: %s which type is: %s", colName, type.toString());
+ return Domain.create(ValueSet.all(type), hasNullValue);
+ }
+ }
+
+ private static Domain ofMinMax(Type type, Object min, Object max, boolean hasNullValue)
+ {
+ Range range = Range.range(type, min, true, max, true);
+ ValueSet vs = ValueSet.ofRanges(ImmutableList.of(range));
+ return Domain.create(vs, hasNullValue);
+ }
+}
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java
index bd2d83fa762b..3e6ec0ce36c0 100644
--- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java
@@ -39,7 +39,6 @@
import io.trino.spi.type.TypeManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hudi.common.model.HoodieTableType;
import java.util.Collection;
import java.util.Collections;
@@ -53,6 +52,7 @@
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT;
import static io.trino.plugin.hive.HiveTimestampPrecision.NANOSECONDS;
+import static io.trino.plugin.hive.metastore.MetastoreUtil.getHiveSchema;
import static io.trino.plugin.hive.util.HiveUtil.columnMetadataGetter;
import static io.trino.plugin.hive.util.HiveUtil.hiveColumnHandles;
import static io.trino.plugin.hive.util.HiveUtil.isHiveSystemSchema;
@@ -60,6 +60,7 @@
import static io.trino.plugin.hudi.HudiSessionProperties.getColumnsToHide;
import static io.trino.plugin.hudi.HudiTableProperties.LOCATION_PROPERTY;
import static io.trino.plugin.hudi.HudiTableProperties.PARTITIONED_BY_PROPERTY;
+import static io.trino.plugin.hudi.HudiUtil.getTableType;
import static io.trino.spi.connector.SchemaTableName.schemaTableName;
import static java.lang.String.format;
import static java.util.Collections.singletonList;
@@ -111,9 +112,10 @@ public HudiTableHandle getTableHandle(ConnectorSession session, SchemaTableName
tableName.getSchemaName(),
tableName.getTableName(),
table.get().getStorage().getLocation(),
- HoodieTableType.COPY_ON_WRITE,
+ getTableType(table.get().getStorage().getStorageFormat().getInputFormat()),
TupleDomain.all(),
- TupleDomain.all());
+ TupleDomain.all(),
+ getHiveSchema(table.get()));
}
@Override
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadataFactory.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadataFactory.java
index bb1a4832bc9e..8c9feadc4e6e 100644
--- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadataFactory.java
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadataFactory.java
@@ -14,8 +14,8 @@
package io.trino.plugin.hudi;
import io.trino.hdfs.HdfsEnvironment;
-import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.hive.metastore.HiveMetastoreFactory;
+import io.trino.plugin.hive.metastore.cache.CachingHiveMetastore;
import io.trino.spi.security.ConnectorIdentity;
import io.trino.spi.type.TypeManager;
@@ -23,6 +23,7 @@
import java.util.Optional;
+import static io.trino.plugin.hive.metastore.cache.CachingHiveMetastore.memoizeMetastore;
import static java.util.Objects.requireNonNull;
public class HudiMetadataFactory
@@ -30,18 +31,23 @@ public class HudiMetadataFactory
private final HiveMetastoreFactory metastoreFactory;
private final HdfsEnvironment hdfsEnvironment;
private final TypeManager typeManager;
+ private final long perTransactionMetastoreCacheMaximumSize;
@Inject
- public HudiMetadataFactory(HiveMetastoreFactory metastoreFactory, HdfsEnvironment hdfsEnvironment, TypeManager typeManager)
+ public HudiMetadataFactory(HiveMetastoreFactory metastoreFactory, HdfsEnvironment hdfsEnvironment, TypeManager typeManager, HudiConfig hudiConfig)
{
this.metastoreFactory = requireNonNull(metastoreFactory, "metastore is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
+ this.perTransactionMetastoreCacheMaximumSize = hudiConfig.getPerTransactionMetastoreCacheMaximumSize();
}
public HudiMetadata create(ConnectorIdentity identity)
{
- HiveMetastore metastore = metastoreFactory.createMetastore(Optional.of(identity));
- return new HudiMetadata(metastore, hdfsEnvironment, typeManager);
+ // create per-transaction cache over hive metastore interface
+ CachingHiveMetastore cachingHiveMetastore = memoizeMetastore(
+ metastoreFactory.createMetastore(Optional.of(identity)),
+ perTransactionMetastoreCacheMaximumSize);
+ return new HudiMetadata(cachingHiveMetastore, hdfsEnvironment, typeManager);
}
}
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiModule.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiModule.java
index 52bd1f7028c2..f946358e68db 100644
--- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiModule.java
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiModule.java
@@ -35,12 +35,15 @@
import javax.inject.Singleton;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BiFunction;
import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static java.util.concurrent.Executors.newCachedThreadPool;
+import static java.util.concurrent.Executors.newFixedThreadPool;
+import static java.util.concurrent.Executors.newScheduledThreadPool;
import static org.weakref.jmx.guice.ExportBinder.newExporter;
public class HudiModule
@@ -65,6 +68,7 @@ public void configure(Binder binder)
configBinder(binder).bindConfig(ParquetReaderConfig.class);
configBinder(binder).bindConfig(ParquetWriterConfig.class);
+ binder.bind(HudiPartitionManager.class).in(Scopes.SINGLETON);
binder.bind(HudiMetadataFactory.class).in(Scopes.SINGLETON);
binder.bind(FileFormatDataSourceStats.class).in(Scopes.SINGLETON);
@@ -76,7 +80,27 @@ public void configure(Binder binder)
@Provides
public ExecutorService createExecutorService()
{
- return newCachedThreadPool(daemonThreadsNamed("hudi-split-manager-%d"));
+ return newCachedThreadPool(daemonThreadsNamed("hudi-split-manager-%s"));
+ }
+
+ @ForHudiSplitSource
+ @Singleton
+ @Provides
+ public ScheduledExecutorService createSplitLoaderExecutor(HudiConfig hudiConfig)
+ {
+ return newScheduledThreadPool(
+ hudiConfig.getSplitLoaderParallelism(),
+ daemonThreadsNamed("hudi-split-loader-%s"));
+ }
+
+ @ForHudiBackgroundSplitLoader
+ @Singleton
+ @Provides
+ public ExecutorService createSplitGeneratorExecutor(HudiConfig hudiConfig)
+ {
+ return newFixedThreadPool(
+ hudiConfig.getSplitGeneratorParallelism(),
+ daemonThreadsNamed("hudi-split-generator-%s"));
}
@Singleton
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSource.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSource.java
index 45db0165c471..9c2edec73309 100644
--- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSource.java
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSource.java
@@ -18,7 +18,6 @@
import io.trino.spi.block.Block;
import io.trino.spi.block.RunLengthEncodedBlock;
import io.trino.spi.connector.ConnectorPageSource;
-import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.util.List;
@@ -51,7 +50,7 @@ public HudiPageSource(
List columnHandles,
Map partitionBlocks,
ConnectorPageSource dataPageSource,
- Path path,
+ String path,
long fileSize,
long fileModifiedTime)
{
@@ -76,7 +75,7 @@ else if (column.getName().equals(PARTITION_COLUMN_NAME)) {
delegateIndexes[outputIndex] = -1;
}
else if (column.getName().equals(PATH_COLUMN_NAME)) {
- prefilledBlocks[outputIndex] = nativeValueToBlock(PATH_TYPE, utf8Slice(path.toString()));
+ prefilledBlocks[outputIndex] = nativeValueToBlock(PATH_TYPE, utf8Slice(path));
delegateIndexes[outputIndex] = -1;
}
else if (column.getName().equals(FILE_SIZE_COLUMN_NAME)) {
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java
index 6f68eb58ab1e..cc57c9ff7d16 100644
--- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java
@@ -17,6 +17,7 @@
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.filesystem.TrinoInputFile;
+import io.trino.hdfs.HdfsEnvironment;
import io.trino.parquet.ParquetCorruptionException;
import io.trino.parquet.ParquetDataSource;
import io.trino.parquet.ParquetDataSourceId;
@@ -42,8 +43,12 @@
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.DynamicFilter;
+import io.trino.spi.connector.RecordCursor;
+import io.trino.spi.connector.RecordPageSource;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.Decimals;
+import io.trino.spi.type.Type;
+import io.trino.spi.type.TypeManager;
import io.trino.spi.type.TypeSignature;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockMissingException;
@@ -72,6 +77,7 @@
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.slice.Slices.utf8Slice;
import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
import static io.trino.parquet.ParquetTypeUtils.getColumnIO;
@@ -90,7 +96,9 @@
import static io.trino.plugin.hudi.HudiErrorCode.HUDI_CURSOR_ERROR;
import static io.trino.plugin.hudi.HudiErrorCode.HUDI_INVALID_PARTITION_VALUE;
import static io.trino.plugin.hudi.HudiErrorCode.HUDI_MISSING_DATA;
+import static io.trino.plugin.hudi.HudiErrorCode.HUDI_UNKNOWN_TABLE_TYPE;
import static io.trino.plugin.hudi.HudiErrorCode.HUDI_UNSUPPORTED_FILE_FORMAT;
+import static io.trino.plugin.hudi.HudiRecordCursor.createRealtimeRecordCursor;
import static io.trino.plugin.hudi.HudiSessionProperties.isParquetOptimizedReaderEnabled;
import static io.trino.plugin.hudi.HudiSessionProperties.shouldUseParquetColumnNames;
import static io.trino.plugin.hudi.HudiUtil.getHudiFileFormat;
@@ -117,6 +125,8 @@
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static java.util.stream.Collectors.toUnmodifiableList;
+import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
+import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ;
public class HudiPageSourceProvider
implements ConnectorPageSourceProvider
@@ -125,18 +135,25 @@ public class HudiPageSourceProvider
private final FileFormatDataSourceStats dataSourceStats;
private final ParquetReaderOptions options;
private final DateTimeZone timeZone;
+ private final HdfsEnvironment hdfsEnvironment;
+ private final TypeManager typeManager;
+
private static final int DOMAIN_COMPACTION_THRESHOLD = 1000;
@Inject
public HudiPageSourceProvider(
TrinoFileSystemFactory fileSystemFactory,
FileFormatDataSourceStats dataSourceStats,
- ParquetReaderConfig parquetReaderConfig)
+ ParquetReaderConfig parquetReaderConfig,
+ HdfsEnvironment hdfsEnvironment,
+ TypeManager typeManager)
{
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.dataSourceStats = requireNonNull(dataSourceStats, "dataSourceStats is null");
this.options = requireNonNull(parquetReaderConfig, "parquetReaderConfig is null").toParquetReaderOptions();
this.timeZone = DateTimeZone.forID(TimeZone.getDefault().getID());
+ this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
+ this.typeManager = requireNonNull(typeManager, "typeManager is null");
}
@Override
@@ -149,12 +166,7 @@ public ConnectorPageSource createPageSource(
DynamicFilter dynamicFilter)
{
HudiSplit split = (HudiSplit) connectorSplit;
- Path path = new Path(split.getPath());
- HoodieFileFormat hudiFileFormat = getHudiFileFormat(path.toString());
- if (!HoodieFileFormat.PARQUET.equals(hudiFileFormat)) {
- throw new TrinoException(HUDI_UNSUPPORTED_FILE_FORMAT, format("File format %s not supported", hudiFileFormat));
- }
-
+ HudiTableHandle tableHandle = (HudiTableHandle) connectorTable;
List hiveColumns = columns.stream()
.map(HiveColumnHandle.class::cast)
.collect(toList());
@@ -163,24 +175,51 @@ public ConnectorPageSource createPageSource(
List regularColumns = hiveColumns.stream()
.filter(columnHandle -> !columnHandle.isPartitionKey() && !columnHandle.isHidden())
.collect(Collectors.toList());
- TrinoFileSystem fileSystem = fileSystemFactory.create(session);
- TrinoInputFile inputFile = fileSystem.newInputFile(path.toString(), split.getFileSize());
- ConnectorPageSource dataPageSource = createPageSource(session, regularColumns, split, inputFile, dataSourceStats, options, timeZone);
- return new HudiPageSource(
- toPartitionName(split.getPartitionKeys()),
- hiveColumns,
- convertPartitionValues(hiveColumns, split.getPartitionKeys()), // create blocks for partition values
- dataPageSource,
- path,
- split.getFileSize(),
- split.getFileModifiedTime());
+ if (COPY_ON_WRITE.equals(tableHandle.getTableType())) {
+ HudiFile baseFile = split.getBaseFile().orElseThrow(() -> new TrinoException(HUDI_CANNOT_OPEN_SPLIT, "Split without base file is invalid"));
+ HoodieFileFormat hudiFileFormat = getHudiFileFormat(baseFile.getPath());
+ if (!HoodieFileFormat.PARQUET.equals(hudiFileFormat)) {
+ throw new TrinoException(HUDI_UNSUPPORTED_FILE_FORMAT, format("File format %s not supported", hudiFileFormat));
+ }
+ TrinoFileSystem fileSystem = fileSystemFactory.create(session);
+ TrinoInputFile inputFile = fileSystem.newInputFile(baseFile.getPath(), baseFile.getFileSize());
+
+ return new HudiPageSource(
+ toPartitionName(split.getPartitionKeys()),
+ hiveColumns,
+ convertPartitionValues(hiveColumns, split.getPartitionKeys()), // create blocks for partition values
+ createPageSource(session, regularColumns, split, baseFile, inputFile, dataSourceStats, options, timeZone),
+ baseFile.getPath(),
+ baseFile.getFileSize(),
+ baseFile.getFileModifiedTime());
+ }
+ else if (MERGE_ON_READ.equals(tableHandle.getTableType())) {
+ RecordCursor recordCursor = createRealtimeRecordCursor(hdfsEnvironment, session, split, tableHandle, regularColumns);
+ List types = regularColumns.stream()
+ .map(column -> column.getHiveType().getType(typeManager))
+ .collect(toImmutableList());
+ HudiFile hudiFile = HudiUtil.getHudiBaseFile(split);
+
+ return new HudiPageSource(
+ toPartitionName(split.getPartitionKeys()),
+ hiveColumns,
+ convertPartitionValues(hiveColumns, split.getPartitionKeys()), // create blocks for partition values
+ new RecordPageSource(types, recordCursor),
+ hudiFile.getPath(),
+ hudiFile.getFileSize(),
+ hudiFile.getFileModifiedTime());
+ }
+ else {
+ throw new TrinoException(HUDI_UNKNOWN_TABLE_TYPE, "Could not create page source for table type " + tableHandle.getTableType());
+ }
}
private static ConnectorPageSource createPageSource(
ConnectorSession session,
List columns,
HudiSplit hudiSplit,
+ HudiFile baseFile,
TrinoInputFile inputFile,
FileFormatDataSourceStats dataSourceStats,
ParquetReaderOptions options,
@@ -188,9 +227,9 @@ private static ConnectorPageSource createPageSource(
{
ParquetDataSource dataSource = null;
boolean useColumnNames = shouldUseParquetColumnNames(session);
- Path path = new Path(hudiSplit.getPath());
- long start = hudiSplit.getStart();
- long length = hudiSplit.getLength();
+ Path path = new Path(baseFile.getPath());
+ long start = baseFile.getStart();
+ long length = baseFile.getLength();
try {
dataSource = new TrinoParquetDataSource(inputFile, options, dataSourceStats);
ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty());
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPartitionManager.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPartitionManager.java
new file mode 100644
index 000000000000..bfcf09501773
--- /dev/null
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPartitionManager.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.hudi;
+
+import com.google.common.collect.ImmutableList;
+import io.airlift.log.Logger;
+import io.airlift.slice.Slice;
+import io.trino.plugin.hive.HiveColumnHandle;
+import io.trino.plugin.hive.HivePartitionManager;
+import io.trino.plugin.hive.metastore.Column;
+import io.trino.plugin.hive.metastore.HiveMetastore;
+import io.trino.plugin.hive.metastore.Table;
+import io.trino.spi.TrinoException;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.TableNotFoundException;
+import io.trino.spi.predicate.Domain;
+import io.trino.spi.predicate.TupleDomain;
+import io.trino.spi.predicate.ValueSet;
+import io.trino.spi.type.StandardTypes;
+import io.trino.spi.type.Type;
+import io.trino.spi.type.TypeManager;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+
+import javax.inject.Inject;
+
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Verify.verify;
+import static io.airlift.slice.Slices.utf8Slice;
+import static io.trino.plugin.hive.metastore.MetastoreUtil.computePartitionKeyFilter;
+import static io.trino.plugin.hive.util.HiveUtil.getPartitionKeyColumnHandles;
+import static io.trino.plugin.hudi.HudiErrorCode.HUDI_INVALID_PARTITION_VALUE;
+import static io.trino.plugin.hudi.HudiSessionProperties.isHudiMetadataEnabled;
+import static java.lang.Double.doubleToRawLongBits;
+import static java.lang.Double.parseDouble;
+import static java.lang.Float.floatToRawIntBits;
+import static java.lang.Float.parseFloat;
+import static java.lang.Long.parseLong;
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+
+public class HudiPartitionManager
+{
+ private static final Logger log = Logger.get(HudiPartitionManager.class);
+ private static final Pattern HIVE_PARTITION_NAME_PATTERN = Pattern.compile("([^/]+)=([^/]+)");
+
+ private final TypeManager typeManager;
+
+ @Inject
+ public HudiPartitionManager(TypeManager typeManager)
+ {
+ this.typeManager = requireNonNull(typeManager, "typeManager is null");
+ }
+
+ public List getEffectivePartitions(HudiTableHandle tableHandle, HiveMetastore metastore, ConnectorSession session, HoodieTableMetaClient metaClient)
+ {
+ Optional table = metastore.getTable(tableHandle.getSchemaName(), tableHandle.getTableName());
+ verify(table.isPresent());
+ List partitionColumns = table.get().getPartitionColumns();
+ if (partitionColumns.isEmpty()) {
+ return ImmutableList.of("");
+ }
+
+ boolean metaTableEnabled = isHudiMetadataEnabled(session);
+
+ return metaTableEnabled ?
+ prunePartitionByMetaDataTable(tableHandle, table.get(), metaClient, partitionColumns) :
+ prunePartitionByMetaStore(tableHandle, metastore, table.get(), partitionColumns);
+ }
+
+ private List prunePartitionByMetaStore(HudiTableHandle tableHandle, HiveMetastore metastore, Table table, List partitionColumns)
+ {
+ List partitionColumnHandles = getPartitionKeyColumnHandles(table, typeManager);
+
+ return metastore.getPartitionNamesByFilter(
+ tableHandle.getSchemaName(),
+ tableHandle.getTableName(),
+ partitionColumns.stream().map(Column::getName).collect(Collectors.toList()),
+ computePartitionKeyFilter(partitionColumnHandles, tableHandle.getPartitionPredicates()))
+ .orElseThrow(() -> new TableNotFoundException(tableHandle.getSchemaTableName()));
+ }
+
+ private List prunePartitionByMetaDataTable(
+ HudiTableHandle tableHandle,
+ Table table,
+ HoodieTableMetaClient metaClient,
+ List partitionColumns)
+ {
+ // non-partition table
+ if (partitionColumns.isEmpty()) {
+ return ImmutableList.of("");
+ }
+ Configuration conf = metaClient.getHadoopConf();
+ HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(conf);
+ HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().enable(true).build();
+
+ // Load all the partition path from the basePath
+ List allPartitions = FSUtils.getAllPartitionPaths(
+ engineContext,
+ metadataConfig,
+ metaClient.getBasePathV2().toString());
+
+ // Extract partition columns predicate
+ TupleDomain partitionPredicate = tableHandle.getPartitionPredicates().transformKeys(columnHandle -> {
+ if (columnHandle.getColumnType() != HiveColumnHandle.ColumnType.PARTITION_KEY) {
+ return null;
+ }
+ return columnHandle.getName();
+ });
+
+ if (partitionPredicate.isAll()) {
+ return allPartitions;
+ }
+
+ if (partitionPredicate.isNone()) {
+ return ImmutableList.of("");
+ }
+
+ List partitionColumnHandles = getPartitionKeyColumnHandles(table, typeManager);
+
+ List matchedPartitionPaths = prunePartitions(
+ partitionPredicate,
+ partitionColumnHandles,
+ getPartitions(
+ partitionColumns.stream().map(f -> f.getName()).collect(Collectors.toList()),
+ allPartitions));
+ log.debug(format("Total partition size is %s, after partition prune size is %s.",
+ allPartitions.size(), matchedPartitionPaths.size()));
+ return matchedPartitionPaths;
+ }
+
+ /**
+ * Returns the partition path key and values as a list of map.
+ * For example:
+ * partition keys: [p1, p2, p3],
+ * partition paths:
+ * p1=val1/p2=val2/p3=val3 (hive style partition)
+ * p1=val4/p2=val5/p3=val6 (hive style partition)
+ * return values {p1=val1/p2=val2/p3=val3 -> {p1 -> val1, p2 -> value2, p3 -> value3}},
+ * {p1=val4/p2=val5/p3=val6 -> {p1 -> val4, p2 -> value5, p3 -> value6}}
+ *
+ * @param partitionKey The partition key list
+ * @param partitionPaths partition path list
+ */
+ public static Map> getPartitions(List partitionKey, List partitionPaths)
+ {
+ Map> result = new HashMap<>();
+ if (partitionPaths.isEmpty() || partitionKey.isEmpty()) {
+ return result;
+ }
+ // try to infer hive style
+ boolean hiveStylePartition = HIVE_PARTITION_NAME_PATTERN.matcher(partitionPaths.get(0).split(Path.SEPARATOR)[0]).matches();
+ for (String partitionPath : partitionPaths) {
+ String[] pathParts = partitionPath.split(Path.SEPARATOR);
+ Map partitionMapping = new LinkedHashMap<>();
+ if (hiveStylePartition) {
+ Arrays.stream(pathParts).forEach(p -> {
+ String[] keyValue = p.split("=");
+ if (keyValue.length == 2) {
+ partitionMapping.put(keyValue[0], keyValue[1]);
+ }
+ });
+ }
+ else {
+ for (int i = 0; i < partitionKey.size(); i++) {
+ partitionMapping.put(partitionKey.get(i), pathParts[i]);
+ }
+ }
+ result.put(partitionPath, partitionMapping);
+ }
+ return result;
+ }
+
+ public static List extractPartitionValues(String partitionName, Optional> partitionColumnNames)
+ {
+ boolean hiveStylePartition = HIVE_PARTITION_NAME_PATTERN.matcher(partitionName).matches();
+ if (!hiveStylePartition) {
+ if (!partitionColumnNames.isPresent() || partitionColumnNames.get().size() == 1) {
+ return ImmutableList.of(partitionName);
+ }
+ else {
+ String[] partitionValues = partitionName.split(Path.SEPARATOR);
+ checkArgument(
+ partitionValues.length == partitionColumnNames.get().size(),
+ "Invalid partition spec: {partitionName: %s, partitionColumnNames: %s}",
+ partitionName,
+ partitionColumnNames.get());
+ return Arrays.asList(partitionValues);
+ }
+ }
+
+ return HivePartitionManager.extractPartitionValues(partitionName);
+ }
+
+ private List prunePartitions(
+ TupleDomain partitionPredicate,
+ List partitionColumnHandles,
+ Map> candidatePartitionPaths)
+ {
+ return candidatePartitionPaths.entrySet().stream().filter(f -> {
+ Map partitionMapping = f.getValue();
+ return partitionMapping
+ .entrySet()
+ .stream()
+ .allMatch(p -> evaluatePartitionPredicate(partitionPredicate, partitionColumnHandles, p.getValue(), p.getKey()));
+ }).map(entry -> entry.getKey()).collect(Collectors.toList());
+ }
+
+ private boolean evaluatePartitionPredicate(
+ TupleDomain partitionPredicate,
+ List partitionColumnHandles,
+ String partitionPathValue,
+ String partitionName)
+ {
+ Optional columnHandleOpt = partitionColumnHandles.stream().filter(f -> f.getName().equals(partitionName)).findFirst();
+ if (columnHandleOpt.isPresent()) {
+ Domain domain = getDomain(columnHandleOpt.get(), partitionPathValue);
+ if (partitionPredicate.getDomains().isEmpty()) {
+ return true;
+ }
+ Domain columnPredicate = partitionPredicate.getDomains().get().get(partitionName);
+ // no predicate on current partitionName
+ if (columnPredicate == null) {
+ return true;
+ }
+
+ // For null partition, hive will produce a default value for current partition.
+ if (partitionPathValue.equals("default")) {
+ return true;
+ }
+ return !columnPredicate.intersect(domain).isNone();
+ }
+ else {
+ // Should not happen
+ throw new IllegalArgumentException(format("Mismatched partition information found,"
+ + " partition: %s from Hudi metadataTable is not included by the partitions from HMS: %s",
+ partitionName, partitionColumnHandles.stream().map(HiveColumnHandle::getName).collect(Collectors.joining(","))));
+ }
+ }
+
+ private Domain getDomain(HiveColumnHandle columnHandle, String partitionValue)
+ {
+ Type type = columnHandle.getHiveType().getType(typeManager);
+ if (partitionValue == null) {
+ return Domain.onlyNull(type);
+ }
+ try {
+ switch (columnHandle.getHiveType().getTypeSignature().getBase()) {
+ case StandardTypes.TINYINT, StandardTypes.SMALLINT, StandardTypes.INTEGER, StandardTypes.BIGINT -> {
+ Long intValue = parseLong(partitionValue);
+ return Domain.create(ValueSet.of(type, intValue), false);
+ }
+ case StandardTypes.REAL -> {
+ Long realValue = (long) floatToRawIntBits(parseFloat(partitionValue));
+ return Domain.create(ValueSet.of(type, realValue), false);
+ }
+ case StandardTypes.DOUBLE -> {
+ Long doubleValue = doubleToRawLongBits(parseDouble(partitionValue));
+ return Domain.create(ValueSet.of(type, doubleValue), false);
+ }
+ case StandardTypes.VARCHAR, StandardTypes.VARBINARY -> {
+ Slice sliceValue = utf8Slice(partitionValue);
+ return Domain.create(ValueSet.of(type, sliceValue), false);
+ }
+ case StandardTypes.DATE -> {
+ Long dateValue = LocalDate.parse(partitionValue, java.time.format.DateTimeFormatter.ISO_LOCAL_DATE).toEpochDay();
+ return Domain.create(ValueSet.of(type, dateValue), false);
+ }
+ case StandardTypes.TIMESTAMP -> {
+ Long timestampValue = Timestamp.valueOf(partitionValue).getTime();
+ return Domain.create(ValueSet.of(type, timestampValue), false);
+ }
+ case StandardTypes.BOOLEAN -> {
+ Boolean booleanValue = Boolean.valueOf(partitionValue);
+ return Domain.create(ValueSet.of(type, booleanValue), false);
+ }
+ default -> throw new TrinoException(HUDI_INVALID_PARTITION_VALUE, format(
+ "partition data type '%s' is unsupported for partition key: %s",
+ columnHandle.getHiveType(),
+ columnHandle.getName()));
+ }
+ }
+ catch (IllegalArgumentException e) {
+ throw new TrinoException(HUDI_INVALID_PARTITION_VALUE, format(
+ "Invalid partition value '%s' for %s partition key: %s",
+ partitionValue,
+ type.getDisplayName(),
+ columnHandle.getName()));
+ }
+ }
+}
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiRecordCursor.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiRecordCursor.java
new file mode 100644
index 000000000000..8993a7885dcf
--- /dev/null
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiRecordCursor.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.hudi;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import io.airlift.compress.lzo.LzoCodec;
+import io.airlift.compress.lzo.LzopCodec;
+import io.trino.hdfs.HdfsContext;
+import io.trino.hdfs.HdfsEnvironment;
+import io.trino.plugin.hive.HiveColumnHandle;
+import io.trino.plugin.hive.util.HiveUtil;
+import io.trino.spi.TrinoException;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.RecordCursor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+import java.util.function.Function;
+
+import static com.google.common.base.MoreObjects.firstNonNull;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.collect.Lists.newArrayList;
+import static io.trino.plugin.hudi.HudiErrorCode.HUDI_CANNOT_OPEN_SPLIT;
+import static io.trino.plugin.hudi.HudiUtil.getHudiBaseFile;
+import static io.trino.plugin.hudi.query.HiveHudiRecordCursor.createRecordCursor;
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.hive.serde2.ColumnProjectionUtils.READ_ALL_COLUMNS;
+import static org.apache.hadoop.hive.serde2.ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR;
+import static org.apache.hadoop.hive.serde2.ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR;
+
+class HudiRecordCursor
+{
+ private HudiRecordCursor() {}
+
+ public static RecordCursor createRealtimeRecordCursor(
+ HdfsEnvironment hdfsEnvironment,
+ ConnectorSession session,
+ HudiSplit split,
+ HudiTableHandle tableHandle,
+ List dataColumns)
+ {
+ requireNonNull(session, "session is null");
+ checkArgument(dataColumns.stream().allMatch(HudiRecordCursor::isRegularColumn), "dataColumns contains non regular column");
+ HudiFile baseFile = getHudiBaseFile(split);
+ Path path = new Path(baseFile.getPath());
+ Configuration configuration = hdfsEnvironment.getConfiguration(new HdfsContext(session), path);
+
+ return hdfsEnvironment.doAs(session.getIdentity(), () -> {
+ RecordReader, ?> recordReader = createRecordReader(configuration, tableHandle.getSchema(), split, dataColumns, tableHandle.getBasePath());
+ @SuppressWarnings("unchecked") RecordReader, ? extends Writable> reader = (RecordReader, ? extends Writable>) recordReader;
+ return createRecordCursor(configuration, path, reader, baseFile.getLength(), tableHandle.getSchema(), dataColumns);
+ });
+ }
+
+ private static RecordReader, ?> createRecordReader(
+ Configuration configuration,
+ Properties schema,
+ HudiSplit split,
+ List dataColumns,
+ String basePath)
+ {
+ // update configuration
+ JobConf jobConf = new JobConf(configuration);
+ jobConf.setBoolean(READ_ALL_COLUMNS, false);
+ jobConf.set(READ_COLUMN_IDS_CONF_STR, join(dataColumns, HiveColumnHandle::getBaseHiveColumnIndex));
+ jobConf.set(READ_COLUMN_NAMES_CONF_STR, join(dataColumns, HiveColumnHandle::getName));
+ schema.stringPropertyNames()
+ .forEach(name -> jobConf.set(name, schema.getProperty(name)));
+ refineCompressionCodecs(jobConf);
+
+ // create input format
+ String inputFormatName = HiveUtil.getInputFormatName(schema);
+ InputFormat, ?> inputFormat = createInputFormat(jobConf, inputFormatName);
+
+ // create record reader for split
+ try {
+ HudiFile baseFile = getHudiBaseFile(split);
+ Path path = new Path(baseFile.getPath());
+ FileSplit fileSplit = new FileSplit(path, baseFile.getStart(), baseFile.getLength(), (String[]) null);
+ List logFiles = split.getLogFiles().stream().map(file -> new HoodieLogFile(file.getPath())).collect(toList());
+ FileSplit hudiSplit = new HoodieRealtimeFileSplit(fileSplit, basePath, logFiles, split.getCommitTime(), false, Option.empty());
+ return inputFormat.getRecordReader(hudiSplit, jobConf, Reporter.NULL);
+ }
+ catch (IOException e) {
+ String msg = format("Error opening Hive split %s using %s: %s",
+ split,
+ inputFormatName,
+ firstNonNull(e.getMessage(), e.getClass().getName()));
+ throw new TrinoException(HUDI_CANNOT_OPEN_SPLIT, msg, e);
+ }
+ }
+
+ private static InputFormat, ?> createInputFormat(Configuration conf, String inputFormat)
+ {
+ try {
+ Class> clazz = conf.getClassByName(inputFormat);
+ @SuppressWarnings("unchecked") Class extends InputFormat, ?>> cls =
+ (Class extends InputFormat, ?>>) clazz.asSubclass(InputFormat.class);
+ return ReflectionUtils.newInstance(cls, conf);
+ }
+ catch (ClassNotFoundException | RuntimeException e) {
+ throw new TrinoException(HUDI_CANNOT_OPEN_SPLIT, "Unable to create input format " + inputFormat, e);
+ }
+ }
+
+ private static void refineCompressionCodecs(Configuration conf)
+ {
+ List codecs = newArrayList(Splitter.on(",").trimResults().omitEmptyStrings()
+ .split(conf.get("io.compression.codecs", "")));
+ if (!codecs.contains(LzoCodec.class.getName())) {
+ codecs.add(0, LzoCodec.class.getName());
+ }
+ if (!codecs.contains(LzopCodec.class.getName())) {
+ codecs.add(0, LzopCodec.class.getName());
+ }
+ conf.set("io.compression.codecs", String.join(",", codecs));
+ }
+
+ private static String join(List list, Function extractor)
+ {
+ return Joiner.on(',').join(list.stream().map(extractor).iterator());
+ }
+
+ private static boolean isRegularColumn(HiveColumnHandle column)
+ {
+ return column.getColumnType() == HiveColumnHandle.ColumnType.REGULAR;
+ }
+}
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSessionProperties.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSessionProperties.java
index 92a1d90a6d73..92c8271de91f 100644
--- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSessionProperties.java
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSessionProperties.java
@@ -33,6 +33,7 @@
import static io.trino.spi.session.PropertyMetadata.booleanProperty;
import static io.trino.spi.session.PropertyMetadata.doubleProperty;
import static io.trino.spi.session.PropertyMetadata.integerProperty;
+import static io.trino.spi.session.PropertyMetadata.stringProperty;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
@@ -44,11 +45,13 @@ public class HudiSessionProperties
private static final String METADATA_ENABLED = "metadata_enabled";
private static final String USE_PARQUET_COLUMN_NAMES = "use_parquet_column_names";
private static final String PARQUET_OPTIMIZED_READER_ENABLED = "parquet_optimized_reader_enabled";
- private static final String MIN_PARTITION_BATCH_SIZE = "min_partition_batch_size";
- private static final String MAX_PARTITION_BATCH_SIZE = "max_partition_batch_size";
private static final String SIZE_BASED_SPLIT_WEIGHTS_ENABLED = "size_based_split_weights_enabled";
private static final String STANDARD_SPLIT_WEIGHT_SIZE = "standard_split_weight_size";
private static final String MINIMUM_ASSIGNED_SPLIT_WEIGHT = "minimum_assigned_split_weight";
+ private static final String MAX_SPLITS_PER_SECOND = "max_splits_per_second";
+ private static final String MAX_OUTSTANDING_SPLITS = "max_outstanding_splits";
+ private static final String SPLIT_GENERATOR_PARALLELISM = "split_generator_parallelism";
+ private static final String FILE_SYSTEM_VIEW_SPILLABLE_DIR = "fs_view_spillable_dir";
private final List> sessionProperties;
@@ -82,16 +85,6 @@ public HudiSessionProperties(HudiConfig hudiConfig, ParquetReaderConfig parquetR
"Use optimized Parquet reader",
parquetReaderConfig.isOptimizedReaderEnabled(),
false),
- integerProperty(
- MIN_PARTITION_BATCH_SIZE,
- "Minimum number of partitions returned in a single batch.",
- hudiConfig.getMinPartitionBatchSize(),
- false),
- integerProperty(
- MAX_PARTITION_BATCH_SIZE,
- "Maximum number of partitions returned in a single batch.",
- hudiConfig.getMaxPartitionBatchSize(),
- false),
booleanProperty(
SIZE_BASED_SPLIT_WEIGHTS_ENABLED,
format("If enabled, size-based splitting ensures that each batch of splits has enough data to process as defined by %s", STANDARD_SPLIT_WEIGHT_SIZE),
@@ -111,6 +104,26 @@ public HudiSessionProperties(HudiConfig hudiConfig, ParquetReaderConfig parquetR
throw new TrinoException(INVALID_SESSION_PROPERTY, format("%s must be > 0 and <= 1.0: %s", MINIMUM_ASSIGNED_SPLIT_WEIGHT, value));
}
},
+ false),
+ integerProperty(
+ MAX_SPLITS_PER_SECOND,
+ "Rate at which splits are enqueued for processing. The queue will throttle if this rate limit is breached.",
+ hudiConfig.getMaxSplitsPerSecond(),
+ false),
+ integerProperty(
+ MAX_OUTSTANDING_SPLITS,
+ "Maximum outstanding splits in a batch enqueued for processing.",
+ hudiConfig.getMaxOutstandingSplits(),
+ false),
+ integerProperty(
+ SPLIT_GENERATOR_PARALLELISM,
+ "Number of threads to generate splits from partitions",
+ hudiConfig.getSplitGeneratorParallelism(),
+ false),
+ stringProperty(
+ FILE_SYSTEM_VIEW_SPILLABLE_DIR,
+ "Path on local storage to use, when file system view is held in a spillable map.",
+ hudiConfig.getFileSystemViewSpillableDirectory(),
false));
}
@@ -141,16 +154,6 @@ public static boolean isParquetOptimizedReaderEnabled(ConnectorSession session)
return session.getProperty(PARQUET_OPTIMIZED_READER_ENABLED, Boolean.class);
}
- public static int getMinPartitionBatchSize(ConnectorSession session)
- {
- return session.getProperty(MIN_PARTITION_BATCH_SIZE, Integer.class);
- }
-
- public static int getMaxPartitionBatchSize(ConnectorSession session)
- {
- return session.getProperty(MAX_PARTITION_BATCH_SIZE, Integer.class);
- }
-
public static boolean isSizeBasedSplitWeightsEnabled(ConnectorSession session)
{
return session.getProperty(SIZE_BASED_SPLIT_WEIGHTS_ENABLED, Boolean.class);
@@ -165,4 +168,24 @@ public static double getMinimumAssignedSplitWeight(ConnectorSession session)
{
return session.getProperty(MINIMUM_ASSIGNED_SPLIT_WEIGHT, Double.class);
}
+
+ public static int getMaxSplitsPerSecond(ConnectorSession session)
+ {
+ return session.getProperty(MAX_SPLITS_PER_SECOND, Integer.class);
+ }
+
+ public static int getMaxOutstandingSplits(ConnectorSession session)
+ {
+ return session.getProperty(MAX_OUTSTANDING_SPLITS, Integer.class);
+ }
+
+ public static int getSplitGeneratorParallelism(ConnectorSession session)
+ {
+ return session.getProperty(SPLIT_GENERATOR_PARALLELISM, Integer.class);
+ }
+
+ public static String getFileSystemViewSpillableDir(ConnectorSession session)
+ {
+ return session.getProperty(FILE_SYSTEM_VIEW_SPILLABLE_DIR, String.class);
+ }
}
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplit.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplit.java
index 0081e35fa7d1..d5282399decb 100644
--- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplit.java
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplit.java
@@ -23,51 +23,46 @@
import io.trino.spi.SplitWeight;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.predicate.TupleDomain;
+import org.openjdk.jol.info.ClassLayout;
import java.util.List;
+import java.util.Optional;
import static com.google.common.base.MoreObjects.toStringHelper;
-import static com.google.common.base.Preconditions.checkArgument;
+import static io.airlift.slice.SizeOf.estimatedSizeOf;
+import static java.lang.Math.toIntExact;
import static java.util.Objects.requireNonNull;
public class HudiSplit
implements ConnectorSplit
{
- private final String path;
- private final long start;
- private final long length;
- private final long fileSize;
- private final long fileModifiedTime;
+ private static final int INSTANCE_SIZE = toIntExact(ClassLayout.parseClass(HudiSplit.class).instanceSize());
+
private final List addresses;
private final TupleDomain predicate;
private final List partitionKeys;
private final SplitWeight splitWeight;
+ private final Optional baseFile;
+ private final List logFiles;
+ private final String commitTime;
@JsonCreator
public HudiSplit(
- @JsonProperty("path") String path,
- @JsonProperty("start") long start,
- @JsonProperty("length") long length,
- @JsonProperty("fileSize") long fileSize,
- @JsonProperty("fileModifiedTime") long fileModifiedTime,
@JsonProperty("addresses") List addresses,
@JsonProperty("predicate") TupleDomain predicate,
@JsonProperty("partitionKeys") List partitionKeys,
- @JsonProperty("splitWeight") SplitWeight splitWeight)
+ @JsonProperty("splitWeight") SplitWeight splitWeight,
+ @JsonProperty("baseFile") Optional baseFile,
+ @JsonProperty("logFiles") List logFiles,
+ @JsonProperty("commitTime") String commitTime)
{
- checkArgument(start >= 0, "start must be positive");
- checkArgument(length >= 0, "length must be positive");
- checkArgument(start + length <= fileSize, "fileSize must be at least start + length");
-
- this.path = requireNonNull(path, "path is null");
- this.start = start;
- this.length = length;
- this.fileSize = fileSize;
- this.fileModifiedTime = fileModifiedTime;
this.addresses = ImmutableList.copyOf(requireNonNull(addresses, "addresses is null"));
this.predicate = requireNonNull(predicate, "predicate is null");
this.partitionKeys = ImmutableList.copyOf(requireNonNull(partitionKeys, "partitionKeys is null"));
this.splitWeight = requireNonNull(splitWeight, "splitWeight is null");
+ this.baseFile = requireNonNull(baseFile, "baseFile is null");
+ this.logFiles = requireNonNull(logFiles, "logFiles is null");
+ this.commitTime = requireNonNull(commitTime, "commitTime is null");
}
@Override
@@ -86,13 +81,15 @@ public List getAddresses()
@Override
public Object getInfo()
{
- return ImmutableMap.builder()
- .put("path", path)
- .put("start", start)
- .put("length", length)
- .put("fileSize", fileSize)
- .put("fileModifiedTime", fileModifiedTime)
- .buildOrThrow();
+ ImmutableMap.Builder