Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,5 @@ dependency-reduced-pom.xml
apache-hive-3.1.3-bin-gdp-*.tar.gz
hive_build_dist/
.devcontainer/.devpod-internal/

.qodo
11 changes: 10 additions & 1 deletion common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,9 @@ public static enum ConfVars {
// Metastore stuff. Be sure to update HiveConf.metaVars when you add something here!
METASTOREDBTYPE("hive.metastore.db.type", "DERBY", new StringSet("DERBY", "ORACLE", "MYSQL", "MSSQL", "POSTGRES"),
"Type of database used by the metastore. Information schema & JDBCStorageHandler depend on it."),
METASTORE_CLIENT_FACTORY_CLASS("hive.metastore.client.factory.class",
"org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClientFactory",
"The name of the factory class that produces objects implementing the IMetaStoreClient interface."),
/**
* @deprecated Use MetastoreConf.WAREHOUSE
*/
Expand Down Expand Up @@ -4420,7 +4423,13 @@ public static enum ConfVars {
"This parameter enables a number of optimizations when running on blobstores:\n" +
"(1) If hive.blobstore.use.blobstore.as.scratchdir is false, force the last Hive job to write to the blobstore.\n" +
"This is a performance optimization that forces the final FileSinkOperator to write to the blobstore.\n" +
"See HIVE-15121 for details.");
"See HIVE-15121 for details."),

HIVE_BLOBSTORE_USE_OUTPUTCOMMITTER("hive.blobstore.use.output-committer", false, "Whether to " +
"use a custom PathOutputCommitter to commit data. For all the URIs specified in " +
"hive.blobstore.supported.schemes, Hive will honor the config " +
"mapreduce.outputcommitter.factory.scheme.[uri-scheme]. This overrides the behavior " +
"described in hive.blobstore.optimizations.enabled. See HIVE-16295 for details.");

public final String varname;
public final String altName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public class PerfLogger {
public static final String LOAD_HASHTABLE = "LoadHashtable";
public static final String TEZ_GET_SESSION = "TezGetSession";
public static final String SAVE_TO_RESULTS_CACHE = "saveToResultsCache";
public static final String FILE_MOVES = "FileMoves";

public static final String SPARK_SUBMIT_TO_RUNNING = "SparkSubmitToRunning";
public static final String SPARK_BUILD_PLAN = "SparkBuildPlan";
Expand Down
21 changes: 21 additions & 0 deletions hcatalog/core/pom.xml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<!--

Check failure on line 1 in hcatalog/core/pom.xml

View check run for this annotation

OX Security / ox-security/scan

hadoop-common@2.7.0 • 9 direct + 29 indirect CVEs • Multiple Public Exploits • EPSS Medium

• org.apache.hadoop:hadoop-common@3.9.1 (major upgrade) resolves 30 of 38 CVEs in org.apache.hadoop:hadoop-common@2.7.0

Check warning on line 1 in hcatalog/core/pom.xml

View check run for this annotation

OX Security / ox-security/scan

jackson-core@2.12.7 • 2 direct CVEs • EPSS Low

• com.fasterxml.jackson.core:jackson-core@2.15.0 (minor upgrade) resolves 2 of 2 CVEs in com.fasterxml.jackson.core:jackson-core@2.12.7

Check warning on line 1 in hcatalog/core/pom.xml

View check run for this annotation

OX Security / ox-security/scan

aws-java-sdk-core@1.12.472 • 1 indirect CVE • EPSS Low • No Public Exploit

• software.amazon.ion:ion-java@1.10.5 (minor upgrade) resolves 1 of 1 CVE in software.amazon.ion:ion-java@1.0.2
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
Expand Down Expand Up @@ -100,6 +100,21 @@
<artifactId>hadoop-annotations</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- The v1 SDK is used at compilation time for adapter classes in
org.apache.hadoop.fs.s3a.adapter. It is not needed at runtime
unless a non-standard v1 credential provider is declared. -->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
<version>1.12.472</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bundle</artifactId>
<version>2.20.109</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-archives</artifactId>
Expand Down Expand Up @@ -256,6 +271,12 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.tez</groupId>
<artifactId>tez-mapreduce</artifactId>
<version>0.9.1</version>
<scope>compile</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
Expand Down Expand Up @@ -757,4 +758,13 @@ public static void assertNotNull(Object t, String msg, Logger logger) {
throw new IllegalArgumentException(msg);
}
}

public static boolean isS3(Path path) {
if (path.toString().startsWith("s3")) {
return true;
}
else {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,15 @@
*/
package org.apache.hive.hcatalog.common;

import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
Expand All @@ -30,6 +37,7 @@

import javax.security.auth.login.LoginException;

import com.google.common.collect.ImmutableSet;
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
Expand All @@ -39,6 +47,7 @@
import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.annotation.NoReconnect;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.security.UserGroupInformation;
Expand Down Expand Up @@ -90,7 +99,8 @@ private int getThreadId() {
}

public static IMetaStoreClient getNonCachedHiveMetastoreClient(HiveConf hiveConf) throws MetaException {
return RetryingMetaStoreClient.getProxy(hiveConf, true);
//return RetryingMetaStoreClient.getProxy(hiveConf, true);
return HiveUtils.createMetaStoreClient(hiveConf, true, new ConcurrentHashMap());
}

public HiveClientCache(HiveConf hiveConf) {
Expand Down Expand Up @@ -275,7 +285,24 @@ public IMetaStoreClient get(final HiveConf hiveConf) throws MetaException, IOExc
cacheableHiveMetaStoreClient.acquire();
}
}
return cacheableHiveMetaStoreClient;
return (IMetaStoreClient)cacheableHiveMetaStoreClient;
}

private static Class<?>[] getAllInterfaces(Class<?>... classes) {
ImmutableSet.Builder<Class<?>> builder = ImmutableSet.builder();
Class[] classArray = classes;
int length = classes.length;

for(int i = 0; i < length; ++i) {
Class<?> element = classArray[i];
if (element.isInterface()) {
builder.add(element);
} else {
builder.addAll(Arrays.asList(element.getInterfaces()));
}
}

return (Class[])builder.build().toArray(new Class[0]);
}

/**
Expand All @@ -289,17 +316,12 @@ public IMetaStoreClient get(final HiveConf hiveConf) throws MetaException, IOExc
private ICacheableMetaStoreClient getOrCreate(final HiveClientCacheKey cacheKey)
throws IOException, MetaException, LoginException {
try {
return hiveCache.get(cacheKey, new Callable<ICacheableMetaStoreClient>() {
@Override
public ICacheableMetaStoreClient call() throws MetaException {
// This is called from HCat, so always allow embedded metastore (as was the default).
return
(ICacheableMetaStoreClient) RetryingMetaStoreClient.getProxy(cacheKey.getHiveConf(),
new Class<?>[]{HiveConf.class, Integer.class, Boolean.class},
new Object[]{cacheKey.getHiveConf(), timeout, true},
CacheableHiveMetaStoreClient.class.getName());
}
});
return (ICacheableMetaStoreClient)this.hiveCache.get(cacheKey, new Callable<ICacheableMetaStoreClient>() {
public ICacheableMetaStoreClient call() throws MetaException {
IMetaStoreClient metaStoreClient = HiveClientCache.getNonCachedHiveMetastoreClient(cacheKey.getHiveConf());
return (ICacheableMetaStoreClient) Proxy.newProxyInstance(HiveClientCache.class.getClassLoader(), HiveClientCache.getAllInterfaces(IMetaStoreClient.class, CacheableHiveMetaStoreClient.class), new CacheableHiveMetaStoreClient(metaStoreClient));
}
});
} catch (ExecutionException e) {
Throwable t = e.getCause();
if (t instanceof IOException) {
Expand Down Expand Up @@ -368,7 +390,7 @@ public String toString() {
}

@InterfaceAudience.Private
public interface ICacheableMetaStoreClient extends IMetaStoreClient {
public interface ICacheableMetaStoreClient extends Closeable {
@NoReconnect
void acquire();

Expand Down Expand Up @@ -398,15 +420,18 @@ public interface ICacheableMetaStoreClient extends IMetaStoreClient {
/**
* Add # of current users on HiveMetaStoreClient, so that the client can be cleaned when no one is using it.
*/
static class CacheableHiveMetaStoreClient extends HiveMetaStoreClient implements ICacheableMetaStoreClient {
static class CacheableHiveMetaStoreClient implements InvocationHandler, ICacheableMetaStoreClient {

private final AtomicInteger users = new AtomicInteger(0);

private static final ImmutableSet<Method> CLOSE_METHODS;
private static final String CLOSE_METHOD_NAME = "close";
private volatile boolean expiredFromCache = false;
private final IMetaStoreClient base;
private boolean isClosed = false;

CacheableHiveMetaStoreClient(final HiveConf conf, final Integer timeout, Boolean allowEmbedded)
throws MetaException {
super(conf, null, allowEmbedded);
CacheableHiveMetaStoreClient(IMetaStoreClient base) {
this.base = base;
}

/**
Expand Down Expand Up @@ -468,7 +493,7 @@ public AtomicInteger getUsers() {
public boolean isOpen() {
try {
// Look for an unlikely database name and see if either MetaException or TException is thrown
super.getDatabases("NonExistentDatabaseUsedForHealthCheck");
this.base.getDatabases("NonExistentDatabaseUsedForHealthCheck");
} catch (TException e) {
return false;
}
Expand Down Expand Up @@ -507,7 +532,7 @@ public synchronized void tearDownIfUnused() {
public void tearDown() {
try {
if (!isClosed) {
super.close();
this.base.close();
}
isClosed = true;
} catch (Exception e) {
Expand Down Expand Up @@ -538,5 +563,28 @@ protected void finalize() throws Throwable {
super.finalize();
}
}

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
try {
if (method.getDeclaringClass().isAssignableFrom(this.getClass())) {
return method.invoke(this, args);
} else if (CLOSE_METHODS.contains(method)) {
this.close();
return null;
} else {
return method.invoke(this.base, args);
}
} catch (InvocationTargetException var5) {
throw var5.getCause();
}
}

static {
try {
CLOSE_METHODS = ImmutableSet.of(AutoCloseable.class.getMethod("close"), Closeable.class.getMethod("close"), IMetaStoreClient.class.getMethod("close"), ICacheableMetaStoreClient.class.getMethod("close"));
} catch (NoSuchMethodException e) {
throw new RuntimeException(e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.hive.hcatalog.common.ErrorType;
import org.apache.hive.hcatalog.common.HCatException;
import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.hive.hcatalog.mapreduce.s3.commit.magic.MagicS3GuardCommitter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -157,6 +158,11 @@ protected LocalFileWriter getLocalFileWriter(HCatRecord value) throws IOExceptio
HCatMapRedUtil.createTaskAttemptContext(context);
configureDynamicStorageHandler(currTaskContext, dynamicPartValues);
localJobInfo = HCatBaseOutputFormat.getJobInfo(currTaskContext.getConfiguration());
boolean isMagic = false;

if (localJobInfo.getLocation().startsWith("s3")) {
isMagic = true;
}

// Setup serDe.
AbstractSerDe currSerDe =
Expand All @@ -182,9 +188,11 @@ protected LocalFileWriter getLocalFileWriter(HCatRecord value) throws IOExceptio
// but may become an issue for cases when the method is used to perform
// other setup tasks.

Path outputLoc = new Path(localJobInfo.getLocation());

// Get Output Committer
org.apache.hadoop.mapred.OutputCommitter baseOutputCommitter =
currTaskContext.getJobConf().getOutputCommitter();
org.apache.hadoop.mapred.OutputCommitter baseOutputCommitter = isMagic ?
new MagicS3GuardCommitter(outputLoc, currTaskContext) : currTaskContext.getJobConf().getOutputCommitter();

// Create currJobContext the latest so it gets all the config changes
org.apache.hadoop.mapred.JobContext currJobContext =
Expand All @@ -199,10 +207,17 @@ protected LocalFileWriter getLocalFileWriter(HCatRecord value) throws IOExceptio
currTaskContext.getTaskAttemptID(), currTaskContext.getProgressible());

// Set temp location.
currTaskContext.getConfiguration().set(
"mapred.work.output.dir",
new FileOutputCommitter(new Path(localJobInfo.getLocation()), currTaskContext)
.getWorkPath().toString());
if (isMagic) {
currTaskContext.getConfiguration().set(
"mapred.work.output.dir",
outputLoc.toString());
}
else {
currTaskContext.getConfiguration().set(
"mapred.work.output.dir",
new FileOutputCommitter(new Path(localJobInfo.getLocation()), currTaskContext)
.getWorkPath().toString());
}

// Set up task.
baseOutputCommitter.setupTask(currTaskContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.hive.hcatalog.common.HCatException;
import org.apache.hive.hcatalog.common.HCatUtil;
import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.hive.hcatalog.mapreduce.s3.commit.magic.MagicS3GuardCommitter;
import org.apache.thrift.TException;

import java.io.IOException;
Expand Down Expand Up @@ -142,10 +143,18 @@ public void checkOutputSpecs(JobContext context) throws IOException, Interrupted
public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
//this needs to be manually set, under normal circumstances MR Task does this
setWorkOutputPath(context);
return new FileOutputCommitterContainer(context,
HCatBaseOutputFormat.getJobInfo(context.getConfiguration()).isDynamicPartitioningUsed() ?
null :
new JobConf(context.getConfiguration()).getOutputCommitter());
if (true/*HCatUtil.isS3A(context.getConfiguration())*/) {
return new S3OutputCommitterContainer(context,
HCatBaseOutputFormat.getJobInfo(context.getConfiguration()).isDynamicPartitioningUsed() ?
null :
new MagicS3GuardCommitter(new Path(context.getConfiguration().get("mapred.output.dir")), context));
}
else {
return new FileOutputCommitterContainer(context,
HCatBaseOutputFormat.getJobInfo(context.getConfiguration()).isDynamicPartitioningUsed() ?
null :
new JobConf(context.getConfiguration()).getOutputCommitter());
}
}

/**
Expand Down Expand Up @@ -246,8 +255,15 @@ static void setWorkOutputPath(TaskAttemptContext context) throws IOException {
String outputPath = context.getConfiguration().get("mapred.output.dir");
//we need to do this to get the task path and set it for mapred implementation
//since it can't be done automatically because of mapreduce->mapred abstraction
if (outputPath != null)
context.getConfiguration().set("mapred.work.output.dir",
new FileOutputCommitter(new Path(outputPath), context).getWorkPath().toString());
if (outputPath != null) {
if (HCatUtil.isS3(new Path(outputPath))) {
context.getConfiguration().set("mapred.work.output.dir",
new MagicS3GuardCommitter(new Path(outputPath), context).getWorkPath().toString());
}
else {
context.getConfiguration().set("mapred.work.output.dir",
new FileOutputCommitter(new Path(outputPath), context).getWorkPath().toString());
}
}
}
}
Loading
Loading