newFiles, PathFilter deletePathFilter,
+ boolean isNeedRecycle, boolean isManaged, Hive hive) throws HiveException {
+ try {
+
+ FileSystem destFs = destf.getFileSystem(conf);
+ // check if srcf contains nested sub-directories
+ FileStatus[] srcs;
+ FileSystem srcFs;
+ try {
+ srcFs = srcf.getFileSystem(conf);
+ srcs = srcFs.globStatus(srcf);
+ } catch (IOException e) {
+ throw new HiveException("Getting globStatus " + srcf.toString(), e);
+ }
+ if (srcs == null) {
+ LOG.info("No sources specified to move: " + srcf);
+ return;
+ }
+
+ if (oldPath != null) {
+ deleteOldPathForReplace(destf, oldPath, conf, purge, deletePathFilter, isNeedRecycle, hive);
+ }
+
+ // first call FileUtils.mkdir to make sure that destf directory exists, if not, it creates
+ // destf
+ boolean destfExist = FileUtils.mkdir(destFs, destf, conf);
+ if(!destfExist) {
+ throw new IOException("Directory " + destf.toString()
+ + " does not exist and could not be created.");
+ }
+
+ // Two cases:
+ // 1. srcs has only a src directory, if rename src directory to destf, we also need to
+ // Copy/move each file under the source directory to avoid to delete the destination
+ // directory if it is the root of an HDFS encryption zone.
+ // 2. srcs must be a list of files -- ensured by LoadSemanticAnalyzer
+ // in both cases, we move the file under destf
+ if (srcs.length == 1 && srcs[0].isDirectory()) {
+ if (!moveFile(conf, srcs[0].getPath(), destf, true, isSrcLocal, isManaged)) {
+ throw new IOException("Error moving: " + srcf + " into: " + destf);
+ }
+
+ // Add file paths of the files that will be moved to the destination if the caller needs it
+ if (null != newFiles) {
+ listNewFilesRecursively(destFs, destf, newFiles);
+ }
+ } else {
+ // its either a file or glob
+ for (FileStatus src : srcs) {
+ Path destFile = new Path(destf, src.getPath().getName());
+ if (!moveFile(conf, src.getPath(), destFile, true, isSrcLocal, isManaged)) {
+ throw new IOException("Error moving: " + srcf + " into: " + destf);
+ }
+
+ // Add file paths of the files that will be moved to the destination if the caller needs it
+ if (null != newFiles) {
+ newFiles.add(destFile);
+ }
+ }
+ }
+ } catch (IOException e) {
+ throw new HiveException(e.getMessage(), e);
+ }
+ }
+
+ public void deleteOldPathForReplace(Path destPath, Path oldPath, HiveConf conf, boolean purge,
+ PathFilter pathFilter, boolean isNeedRecycle, Hive hive) throws HiveException {
+ Utilities.FILE_OP_LOGGER.debug("Deleting old paths for replace in " + destPath
+ + " and old path " + oldPath);
+ boolean isOldPathUnderDestf = false;
+ try {
+ FileSystem oldFs = oldPath.getFileSystem(conf);
+ FileSystem destFs = destPath.getFileSystem(conf);
+ // if oldPath is destf or its subdir, its should definitely be deleted, otherwise its
+ // existing content might result in incorrect (extra) data.
+ // But not sure why we changed not to delete the oldPath in HIVE-8750 if it is
+ // not the destf or its subdir?
+ isOldPathUnderDestf = Hive.isSubDir(oldPath, destPath, oldFs, destFs, false);
+ if (isOldPathUnderDestf) {
+ cleanUpOneDirectoryForReplace(oldPath, oldFs, pathFilter, conf, purge, isNeedRecycle, hive);
+ }
+ } catch (IOException e) {
+ if (isOldPathUnderDestf) {
+ // if oldPath is a subdir of destf but it could not be cleaned
+ throw new HiveException("Directory " + oldPath.toString()
+ + " could not be cleaned up.", e);
+ } else {
+ //swallow the exception since it won't affect the final result
+ LOG.warn("Directory " + oldPath.toString() + " cannot be cleaned: " + e, e);
+ }
+ }
+ }
+
+ private void cleanUpOneDirectoryForReplace(Path path, FileSystem fs, PathFilter pathFilter,
+ HiveConf conf, boolean purge, boolean isNeedRecycle,
+ Hive hive) throws IOException, HiveException {
+ if (isNeedRecycle && conf.getBoolVar(HiveConf.ConfVars.REPLCMENABLED)) {
+ recycleDirToCmPath(path, purge, hive);
+ }
+ FileStatus[] statuses = fs.listStatus(path, pathFilter);
+ if (statuses == null || statuses.length == 0) {
+ return;
+ }
+ if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
+ String s = "Deleting files under " + path + " for replace: ";
+ for (FileStatus file : statuses) {
+ s += file.getPath().getName() + ", ";
+ }
+ Utilities.FILE_OP_LOGGER.trace(s);
+ }
+
+ if (!Hive.trashFiles(fs, statuses, conf, purge)) {
+ throw new HiveException("Old path " + path + " has not been cleaned up.");
+ }
+ }
+
+ /**
+ * Recycles the files recursively from the input path to the cmroot directory either by copying or moving it.
+ *
+ * @param dataPath Path of the data files to be recycled to cmroot
+ * @param isPurge
+ * When set to true files which needs to be recycled are not moved to Trash
+ */
+ public void recycleDirToCmPath(Path dataPath, boolean isPurge, Hive hive) throws HiveException {
+ try {
+ CmRecycleRequest request = new CmRecycleRequest(dataPath.toString(), isPurge);
+ hive.getMSC().recycleDirToCmPath(request);
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
+ }
+
+ /**
+ *
+ * Moves a file from one {@link Path} to another. If {@code isRenameAllowed} is true then the
+ * {@link FileSystem#rename(Path, Path)} method is used to move the file. If its false then the data is copied, if
+ * {@code isSrcLocal} is true then the {@link FileSystem#copyFromLocalFile(Path, Path)} method is used, else
+ * {@link FileUtils#copy(FileSystem, Path, FileSystem, Path, boolean, boolean, HiveConf)} is used.
+ *
+ *
+ *
+ * If the destination file already exists, then {@code _copy_[counter]} is appended to the file name, where counter
+ * is an integer starting from 1.
+ *
+ *
+ * @param conf the {@link HiveConf} to use if copying data
+ * @param sourceFs the {@link FileSystem} where the source file exists
+ * @param sourcePath the {@link Path} to move
+ * @param destFs the {@link FileSystem} to move the file to
+ * @param destDirPath the {@link Path} to move the file to
+ * @param isSrcLocal if the source file is on the local filesystem
+ * @param isOverwrite if true, then overwrite destination file if exist else make a duplicate copy
+ * @param isRenameAllowed true if the data should be renamed and not copied, false otherwise
+ *
+ * @return the {@link Path} the source file was moved to
+ *
+ * @throws IOException if there was an issue moving the file
+ */
+ private static Path mvFile(HiveConf conf, FileSystem sourceFs, Path sourcePath, FileSystem destFs,
+ Path destDirPath, boolean isSrcLocal, boolean isOverwrite,
+ boolean isRenameAllowed, int taskId) throws IOException {
+
+ // Strip off the file type, if any so we don't make:
+ // 000000_0.gz -> 000000_0.gz_copy_1
+ final String fullname = sourcePath.getName();
+ final String name;
+ if (taskId == -1) { // non-acid
+ name = FilenameUtils.getBaseName(sourcePath.getName());
+ } else { // acid
+ name = getPathName(taskId);
+ }
+ final String type = FilenameUtils.getExtension(sourcePath.getName());
+
+ // Incase of ACID, the file is ORC so the extension is not relevant and should not be inherited.
+ Path destFilePath = new Path(destDirPath, taskId == -1 ? fullname : name);
+
+ /*
+ * The below loop may perform bad when the destination file already exists and it has too many _copy_
+ * files as well. A desired approach was to call listFiles() and get a complete list of files from
+ * the destination, and check whether the file exists or not on that list. However, millions of files
+ * could live on the destination directory, and on concurrent situations, this can cause OOM problems.
+ *
+ * I'll leave the below loop for now until a better approach is found.
+ */
+ for (int counter = 1; destFs.exists(destFilePath); counter++) {
+ if (isOverwrite) {
+ destFs.delete(destFilePath, false);
+ break;
+ }
+ destFilePath = new Path(destDirPath, name + (Utilities.COPY_KEYWORD + counter) +
+ ((taskId == -1 && !type.isEmpty()) ? "." + type : ""));
+ }
+
+ if (isRenameAllowed) {
+ destFs.rename(sourcePath, destFilePath);
+ } else if (isSrcLocal) {
+ destFs.copyFromLocalFile(sourcePath, destFilePath);
+ } else {
+ FileUtils.copy(sourceFs, sourcePath, destFs, destFilePath,
+ true, // delete source
+ false, // overwrite destination
+ conf);
+ }
+ return destFilePath;
+ }
+
+ /**
+ * If moving across different FileSystems or differnent encryption zone, need to do a File copy instead of rename.
+ * TODO- consider if need to do this for different file authority.
+ * @throws HiveException
+ */
+ static private boolean needToCopy(Path srcf, Path destf, FileSystem srcFs,
+ FileSystem destFs, String configuredOwner, boolean isManaged) throws HiveException {
+ //Check if different FileSystems
+ if (!FileUtils.equalsFileSystem(srcFs, destFs)) {
+ return true;
+ }
+
+ if (isManaged && !configuredOwner.isEmpty() && srcFs instanceof DistributedFileSystem) {
+ // Need some extra checks
+ // Get the running owner
+ FileStatus srcs;
+
+ try {
+ srcs = srcFs.getFileStatus(srcf);
+ String runningUser = UserGroupInformation.getLoginUser().getShortUserName();
+ boolean isOwned = FileUtils.isOwnerOfFileHierarchy(srcFs, srcs, configuredOwner, false);
+ if (configuredOwner.equals(runningUser)) {
+ // Check if owner has write permission, else it will have to copy
+ if (!(isOwned &&
+ FileUtils.isActionPermittedForFileHierarchy(
+ srcFs, srcs, configuredOwner, FsAction.WRITE, false))) {
+ return true;
+ }
+ } else {
+ // If the configured owner does not own the file, throw
+ if (!isOwned) {
+ throw new HiveException("Load Data failed for " + srcf + " as the file is not owned by "
+ + configuredOwner + " and load data is also not ran as " + configuredOwner);
+ } else {
+ return true;
+ }
+ }
+ } catch (IOException e) {
+ throw new HiveException("Could not fetch FileStatus for source file");
+ } catch (HiveException e) {
+ throw new HiveException(e);
+ } catch (Exception e) {
+ throw new HiveException(" Failed in looking up Permissions on file + " + srcf);
+ }
+ }
+
+ //Check if different encryption zones
+ HadoopShims.HdfsEncryptionShim srcHdfsEncryptionShim = SessionState.get().getHdfsEncryptionShim(srcFs);
+ HadoopShims.HdfsEncryptionShim destHdfsEncryptionShim = SessionState.get().getHdfsEncryptionShim(destFs);
+ try {
+ return srcHdfsEncryptionShim != null
+ && destHdfsEncryptionShim != null
+ && (srcHdfsEncryptionShim.isPathEncrypted(srcf) || destHdfsEncryptionShim.isPathEncrypted(destf))
+ && !srcHdfsEncryptionShim.arePathsOnSameEncryptionZone(srcf, destf, destHdfsEncryptionShim);
+ } catch (IOException e) {
+ throw new HiveException(e);
+ }
+ }
+
+ static private HiveException handlePoolException(ExecutorService pool, Exception e) {
+ HiveException he = null;
+
+ if (e instanceof HiveException) {
+ he = (HiveException) e;
+ if (he.getCanonicalErrorMsg() != ErrorMsg.GENERIC_ERROR) {
+ if (he.getCanonicalErrorMsg() == ErrorMsg.UNRESOLVED_RT_EXCEPTION) {
+ LOG.error("Failed to move: {}", he.getMessage());
+ } else {
+ LOG.error("Failed to move: {}", he.getRemoteErrorMsg());
+ }
+ }
+ } else {
+ LOG.error("Failed to move: {}", e.getMessage());
+ he = new HiveException(e.getCause());
+ }
+ pool.shutdownNow();
+ return he;
+ }
+
+ // List the new files in destination path which gets copied from source.
+ private static void listNewFilesRecursively(final FileSystem destFs, Path dest,
+ List newFiles) throws HiveException {
+ try {
+ for (FileStatus fileStatus : destFs.listStatus(dest, FileUtils.HIDDEN_FILES_PATH_FILTER)) {
+ if (fileStatus.isDirectory()) {
+ // If it is a sub-directory, then recursively list the files.
+ listNewFilesRecursively(destFs, fileStatus.getPath(), newFiles);
+ } else {
+ newFiles.add(fileStatus.getPath());
+ }
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to get source file statuses", e);
+ throw new HiveException(e.getMessage(), e);
+ }
+ }
+
+ private static String getPathName(int taskId) {
+ return Utilities.replaceTaskId("000000", taskId) + "_0";
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java
index 09cbf32f9c98..683db2a25080 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java
@@ -26,6 +26,7 @@
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hadoop.hive.common.classification.InterfaceStability;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -73,6 +74,7 @@ public static void close() {
private final List udfs;
private Reporter reporter;
+ private TaskAttemptID taskAttemptID;
protected MapredContext(boolean isMap, JobConf jobConf) {
this.isMap = isMap;
@@ -105,6 +107,14 @@ public void setReporter(Reporter reporter) {
this.reporter = reporter;
}
+ public TaskAttemptID getTaskAttemptID() {
+ return this.taskAttemptID;
+ }
+
+ public void setTaskAttemptID(TaskAttemptID taskAttemptID) {
+ this.taskAttemptID = taskAttemptID;
+ }
+
private void registerCloseable(Closeable closeable) {
udfs.add(closeable);
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index 2bb3ec4a5a59..89a3165ec6e4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -85,123 +85,6 @@ public MoveTask() {
super();
}
- private void moveFile(Path sourcePath, Path targetPath, boolean isDfsDir)
- throws HiveException {
- try {
- String mesg = "Moving data to " + (isDfsDir ? "" : "local ") + "directory "
- + targetPath.toString();
- String mesg_detail = " from " + sourcePath.toString();
- console.printInfo(mesg, mesg_detail);
-
- FileSystem fs = sourcePath.getFileSystem(conf);
- if (isDfsDir) {
- moveFileInDfs (sourcePath, targetPath, conf);
- } else {
- // This is a local file
- FileSystem dstFs = FileSystem.getLocal(conf);
- moveFileFromDfsToLocal(sourcePath, targetPath, fs, dstFs);
- }
- } catch (Exception e) {
- throw new HiveException("Unable to move source " + sourcePath + " to destination "
- + targetPath, e);
- }
- }
-
- private void moveFileInDfs (Path sourcePath, Path targetPath, HiveConf conf)
- throws HiveException, IOException {
-
- final FileSystem srcFs, tgtFs;
- try {
- tgtFs = targetPath.getFileSystem(conf);
- } catch (IOException e) {
- LOG.error("Failed to get dest fs", e);
- throw new HiveException(e.getMessage(), e);
- }
- try {
- srcFs = sourcePath.getFileSystem(conf);
- } catch (IOException e) {
- LOG.error("Failed to get src fs", e);
- throw new HiveException(e.getMessage(), e);
- }
-
- // if source exists, rename. Otherwise, create a empty directory
- if (srcFs.exists(sourcePath)) {
- Path deletePath = null;
- // If it multiple level of folder are there fs.rename is failing so first
- // create the targetpath.getParent() if it not exist
- if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_INSERT_INTO_MULTILEVEL_DIRS)) {
- deletePath = createTargetPath(targetPath, tgtFs);
- }
- Hive.clearDestForSubDirSrc(conf, targetPath, sourcePath, false);
- // Set isManaged to false as this is not load data operation for which it is needed.
- if (!Hive.moveFile(conf, sourcePath, targetPath, true, false, false)) {
- try {
- if (deletePath != null) {
- tgtFs.delete(deletePath, true);
- }
- } catch (IOException e) {
- LOG.info("Unable to delete the path created for facilitating rename: {}",
- deletePath);
- }
- throw new HiveException("Unable to rename: " + sourcePath
- + " to: " + targetPath);
- }
- } else if (!tgtFs.mkdirs(targetPath)) {
- throw new HiveException("Unable to make directory: " + targetPath);
- }
- }
-
- private void moveFileFromDfsToLocal(Path sourcePath, Path targetPath, FileSystem fs,
- FileSystem dstFs) throws HiveException, IOException {
- // RawLocalFileSystem seems not able to get the right permissions for a local file, it
- // always returns hdfs default permission (00666). So we can not overwrite a directory
- // by deleting and recreating the directory and restoring its permissions. We should
- // delete all its files and subdirectories instead.
- if (dstFs.exists(targetPath)) {
- if (dstFs.isDirectory(targetPath)) {
- FileStatus[] destFiles = dstFs.listStatus(targetPath);
- for (FileStatus destFile : destFiles) {
- if (!dstFs.delete(destFile.getPath(), true)) {
- throw new IOException("Unable to clean the destination directory: " + targetPath);
- }
- }
- } else {
- throw new HiveException("Target " + targetPath + " is not a local directory.");
- }
- } else {
- if (!FileUtils.mkdir(dstFs, targetPath, conf)) {
- throw new HiveException("Failed to create local target directory " + targetPath);
- }
- }
-
- if (fs.exists(sourcePath)) {
- FileStatus[] srcs = fs.listStatus(sourcePath, FileUtils.HIDDEN_FILES_PATH_FILTER);
- for (FileStatus status : srcs) {
- fs.copyToLocalFile(status.getPath(), targetPath);
- }
- }
- }
-
- private Path createTargetPath(Path targetPath, FileSystem fs) throws IOException {
- Path deletePath = null;
- Path mkDirPath = targetPath.getParent();
- if (mkDirPath != null && !fs.exists(mkDirPath)) {
- Path actualPath = mkDirPath;
- // targetPath path is /x/y/z/1/2/3 here /x/y/z is present in the file system
- // create the structure till /x/y/z/1/2 to work rename for multilevel directory
- // and if rename fails delete the path /x/y/z/1
- // If targetPath have multilevel directories like /x/y/z/1/2/3 , /x/y/z/1/2/4
- // the renaming of the directories are not atomic the execution will happen one
- // by one
- while (actualPath != null && !fs.exists(actualPath)) {
- deletePath = actualPath;
- actualPath = actualPath.getParent();
- }
- fs.mkdirs(mkDirPath);
- }
- return deletePath;
- }
-
// Release all the locks acquired for this object
// This becomes important for multi-table inserts when one branch may take much more
// time than the others. It is better to release the lock for this particular insert.
@@ -283,6 +166,15 @@ public int execute(DriverContext driverContext) {
}
Hive db = getHive();
+
+ DataCommitter dataCommitter;
+ if (work.getPathOutputCommitterWork() != null) {
+ dataCommitter = new PathOutputCommitterDataCommitter(work.getPathOutputCommitterWork()
+ .getJobContext(), work.getPathOutputCommitterWork().createPathOutputCommitter());
+ } else {
+ dataCommitter = new HiveDataCommitter(work);
+ }
+
// Do any hive related operations like moving tables and files
// to appropriate locations
LoadFileDesc lfd = work.getLoadFileWork();
@@ -307,7 +199,7 @@ public int execute(DriverContext driverContext) {
}
}
else {
- moveFile(sourcePath, targetPath, lfd.getIsDfsDir());
+ dataCommitter.moveFile(sourcePath, targetPath, lfd.getIsDfsDir(), conf, console);
}
}
}
@@ -328,7 +220,7 @@ public int execute(DriverContext driverContext) {
destFs.mkdirs(destPath.getParent());
}
Utilities.FILE_OP_LOGGER.debug("MoveTask moving (multi-file) " + srcPath + " to " + destPath);
- moveFile(srcPath, destPath, isDfsDir);
+ dataCommitter.moveFile(srcPath, destPath, isDfsDir, conf, console);
} else {
if (!destFs.exists(destPath)) {
destFs.mkdirs(destPath);
@@ -340,7 +232,7 @@ public int execute(DriverContext driverContext) {
Path childSrc = child.getPath();
Path childDest = new Path(destPath, filePrefix + childSrc.getName());
Utilities.FILE_OP_LOGGER.debug("MoveTask moving (multi-file) " + childSrc + " to " + childDest);
- moveFile(childSrc, childDest, isDfsDir);
+ dataCommitter.moveFile(childSrc, childDest, isDfsDir, conf, console);
}
} else {
Utilities.FILE_OP_LOGGER.debug("MoveTask skipping empty directory (multi-file) " + srcPath);
@@ -373,7 +265,7 @@ public int execute(DriverContext driverContext) {
}
db.loadTable(tbd.getSourcePath(), tbd.getTable().getTableName(), tbd.getLoadFileType(),
work.isSrcLocal(), isSkewedStoredAsDirs(tbd), isFullAcidOp, hasFollowingStatsTask(),
- tbd.getWriteId(), tbd.getStmtId(), tbd.isInsertOverwrite());
+ tbd.getWriteId(), tbd.getStmtId(), tbd.isInsertOverwrite(), dataCommitter);
if (work.getOutputs() != null) {
DDLTask.addIfAbsentByName(new WriteEntity(table,
getWriteType(tbd, work.getLoadTableWork().getWriteType())), work.getOutputs());
@@ -387,9 +279,9 @@ public int execute(DriverContext driverContext) {
// deal with dynamic partitions
DynamicPartitionCtx dpCtx = tbd.getDPCtx();
if (dpCtx != null && dpCtx.getNumDPCols() > 0) { // dynamic partitions
- dc = handleDynParts(db, table, tbd, ti, dpCtx);
+ dc = handleDynParts(db, table, tbd, ti, dpCtx, dataCommitter);
} else { // static partitions
- dc = handleStaticParts(db, table, tbd, ti);
+ dc = handleStaticParts(db, table, tbd, ti, dataCommitter);
}
}
if (dc != null) {
@@ -461,7 +353,8 @@ public void logMessage(LoadTableDesc tbd) {
}
private DataContainer handleStaticParts(Hive db, Table table, LoadTableDesc tbd,
- TaskInformation ti) throws HiveException, IOException, InvalidOperationException {
+ TaskInformation ti, DataCommitter dataCommitter) throws HiveException, IOException,
+ InvalidOperationException {
List partVals = MetaStoreUtils.getPvals(table.getPartCols(), tbd.getPartitionSpec());
db.validatePartitionNameCharacters(partVals);
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
@@ -475,7 +368,7 @@ private DataContainer handleStaticParts(Hive db, Table table, LoadTableDesc tbd,
work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID &&
!tbd.isMmTable(),
hasFollowingStatsTask(),
- tbd.getWriteId(), tbd.getStmtId(), tbd.isInsertOverwrite());
+ tbd.getWriteId(), tbd.getStmtId(), tbd.isInsertOverwrite(), dataCommitter);
Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false);
// See the comment inside updatePartitionBucketSortColumns.
@@ -494,7 +387,7 @@ private DataContainer handleStaticParts(Hive db, Table table, LoadTableDesc tbd,
}
private DataContainer handleDynParts(Hive db, Table table, LoadTableDesc tbd,
- TaskInformation ti, DynamicPartitionCtx dpCtx) throws HiveException,
+ TaskInformation ti, DynamicPartitionCtx dpCtx, DataCommitter dataCommitter) throws HiveException,
IOException, InvalidOperationException {
DataContainer dc;
List> dps = Utilities.getFullDPSpecs(conf, dpCtx);
@@ -522,7 +415,8 @@ private DataContainer handleDynParts(Hive db, Table table, LoadTableDesc tbd,
tbd.getStmtId(),
hasFollowingStatsTask(),
work.getLoadTableWork().getWriteType(),
- tbd.isInsertOverwrite());
+ tbd.isInsertOverwrite(),
+ dataCommitter);
// publish DP columns to its subscribers
if (dps != null && dps.size() > 0) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/PathOutputCommitterDataCommitter.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/PathOutputCommitterDataCommitter.java
new file mode 100644
index 000000000000..fa424eade8df
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/PathOutputCommitterDataCommitter.java
@@ -0,0 +1,59 @@
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A {@link DataCommitter} that commits Hive data using a {@link PathOutputCommitter}.
+ */
+class PathOutputCommitterDataCommitter implements DataCommitter {
+
+ private final JobContext jobContext;
+ private final PathOutputCommitter pathOutputCommitter;
+
+ PathOutputCommitterDataCommitter(JobContext jobContext,
+ PathOutputCommitter pathOutputCommitter) {
+ this.jobContext = jobContext;
+ this.pathOutputCommitter = pathOutputCommitter;
+ }
+
+ @Override
+ public void moveFile(Path sourcePath, Path targetPath, boolean isDfsDir, HiveConf conf,
+ SessionState.LogHelper console) throws HiveException {
+ commitJob();
+ }
+
+ @Override
+ public void copyFiles(HiveConf conf, Path srcf, Path destf, FileSystem fs, boolean isSrcLocal,
+ boolean isAcidIUD, boolean isOverwrite, List newFiles,
+ boolean isBucketed, boolean isFullAcidTable,
+ boolean isManaged) throws HiveException {
+ commitJob();
+ }
+
+ @Override
+ public void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, HiveConf conf,
+ boolean isSrcLocal, boolean purge, List newFiles,
+ PathFilter deletePathFilter, boolean isNeedRecycle, boolean isManaged,
+ Hive hive) throws HiveException {
+ commitJob();
+ }
+
+ private void commitJob() throws HiveException {
+ try {
+ this.pathOutputCommitter.commitJob(this.jobContext);
+ } catch (IOException e) {
+ throw new HiveException(e);
+ }
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/PathOutputCommitterSetupTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/PathOutputCommitterSetupTask.java
new file mode 100644
index 000000000000..dc1c62bf3dff
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/PathOutputCommitterSetupTask.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.plan.PathOutputCommitterWork;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class PathOutputCommitterSetupTask extends Task {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PathOutputCommitterSetupTask.class);
+
+ private static final long serialVersionUID = -8867710739987754989L;
+
+ @Override
+ protected int execute(DriverContext driverContext) {
+ try {
+ LOG.info("Running setupJob for Path Output Committer " +
+ work.getPathOutputCommitterClass().getName());
+ work.createPathOutputCommitter().setupJob(getWork().getJobContext());
+ } catch (Exception e) {
+ LOG.error("Failed run setupJob for Path Output Committer " +
+ work.getPathOutputCommitterClass().getName(), e);
+ setException(e);
+ return 1;
+ }
+ return 0;
+ }
+
+ @Override
+ public StageType getType() {
+ return null;
+ }
+
+ @Override
+ public String getName() {
+ return null;
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
index 3a107b7e8128..51562f3c4240 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
@@ -36,6 +36,7 @@
import org.apache.hadoop.hive.ql.io.merge.MergeFileTask;
import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
import org.apache.hadoop.hive.ql.plan.ColumnStatsUpdateWork;
+import org.apache.hadoop.hive.ql.plan.PathOutputCommitterWork;
import org.apache.hadoop.hive.ql.plan.StatsWork;
import org.apache.hadoop.hive.ql.plan.ConditionalWork;
import org.apache.hadoop.hive.ql.plan.CopyWork;
@@ -113,6 +114,7 @@ public TaskTuple(Class workClass, Class extends Task> taskClass) {
taskvec.add(new TaskTuple<>(ReplStateLogWork.class, ReplStateLogTask.class));
taskvec.add(new TaskTuple(ExportWork.class, ExportTask.class));
taskvec.add(new TaskTuple(ReplTxnWork.class, ReplTxnTask.class));
+ taskvec.add(new TaskTuple<>(PathOutputCommitterWork.class, PathOutputCommitterSetupTask.class));
}
private static ThreadLocal tid = new ThreadLocal() {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
index d0038cd3d87b..d24a5357109e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
@@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.CompilationOpContext;
@@ -73,6 +74,8 @@ public class ExecMapper extends MapReduceBase implements Mapper {
@Override
public void configure(JobConf job) {
+ TaskAttemptID taskAttemptID = TaskAttemptID.forName(job.get("mapred.task.id"));
+
execContext = new ExecMapperContext(job);
Utilities.tryLoggingClassPaths(job, l4j);
setDone(false);
@@ -100,6 +103,7 @@ public void configure(JobConf job) {
execContext.setLocalWork(localWork);
MapredContext.init(true, new JobConf(jc));
+ MapredContext.get().setTaskAttemptID(taskAttemptID);
mo.passExecContext(execContext);
mo.initializeLocalWork(jc);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
index 7ce1544839f1..5169c6bfa9ca 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
@@ -23,6 +23,7 @@
import java.util.Iterator;
import java.util.List;
+import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.hive.ql.exec.MapredContext;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -86,6 +87,8 @@ public class ExecReducer extends MapReduceBase implements Reducer {
@Override
public void configure(JobConf job) {
+ TaskAttemptID taskAttemptID = TaskAttemptID.forName(job.get("mapred.task.id"));
+
rowObjectInspector = new ObjectInspector[Byte.MAX_VALUE];
ObjectInspector[] valueObjectInspector = new ObjectInspector[Byte.MAX_VALUE];
ObjectInspector keyObjectInspector;
@@ -127,6 +130,7 @@ public void configure(JobConf job) {
}
MapredContext.init(false, new JobConf(jc));
+ MapredContext.get().setTaskAttemptID(taskAttemptID);
// initialize reduce operator tree
try {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
index 7cd853f87810..8a85a074422f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
@@ -22,8 +22,6 @@
import java.util.Iterator;
import java.util.List;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.exec.AbstractMapOperator;
import org.apache.hadoop.hive.ql.exec.MapOperator;
@@ -42,6 +40,10 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -65,6 +67,8 @@ public void init(JobConf job, OutputCollector output, Reporter repo
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_INIT_OPERATORS);
super.init(job, output, reporter);
+ TaskAttemptID taskAttemptID = TaskAttemptID.forName(job.get("mapred.task.id"));
+
try {
jc = job;
execContext = new ExecMapperContext(jc);
@@ -88,6 +92,7 @@ public void init(JobConf job, OutputCollector output, Reporter repo
execContext.setLocalWork(localWork);
MapredContext.init(true, new JobConf(jc));
+ MapredContext.get().setTaskAttemptID(taskAttemptID);
MapredContext.get().setReporter(reporter);
mo.passExecContext(execContext);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
index 6a7e1dfa59eb..7a129c84334b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
@@ -24,8 +24,6 @@
import java.util.Iterator;
import java.util.List;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.ql.exec.MapredContext;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorUtils;
@@ -57,11 +55,16 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.google.common.base.Preconditions;
+
/**
* Clone from ExecReducer, it is the bridge between the spark framework and
* the Hive operator pipeline at execution time. It's main responsibilities are:
@@ -122,6 +125,8 @@ public void init(JobConf job, OutputCollector output, Reporter reporter) throws
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_INIT_OPERATORS);
super.init(job, output, reporter);
+ TaskAttemptID taskAttemptID = TaskAttemptID.forName(job.get("mapred.task.id"));
+
rowObjectInspector = new ObjectInspector[Byte.MAX_VALUE];
ObjectInspector[] valueObjectInspector = new ObjectInspector[Byte.MAX_VALUE];
ObjectInspector keyObjectInspector;
@@ -218,6 +223,8 @@ public void init(JobConf job, OutputCollector output, Reporter reporter) throws
Utilities.reduceFieldNameList, ois);
}
}
+ MapredContext.init(false, new JobConf(jc));
+ MapredContext.get().setTaskAttemptID(taskAttemptID);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 4d2e1a4e9adf..e9d877df1ceb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -164,9 +164,11 @@
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator;
+import org.apache.hadoop.hive.ql.exec.DataCommitter;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.exec.FunctionTask;
import org.apache.hadoop.hive.ql.exec.FunctionUtils;
+import org.apache.hadoop.hive.ql.exec.HiveDataCommitter;
import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -195,6 +197,7 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hive.common.util.TxnIdUtils;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -438,6 +441,7 @@ public static void closeCurrent() {
*/
private Hive(HiveConf c, boolean doRegisterAllFns) throws HiveException {
conf = c;
+
if (doRegisterAllFns) {
registerAllFunctionsOnce();
}
@@ -1725,7 +1729,7 @@ public Database getDatabaseCurrent() throws HiveException {
public Partition loadPartition(Path loadPath, Table tbl, Map partSpec,
LoadFileType loadFileType, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir,
boolean isSrcLocal, boolean isAcidIUDoperation, boolean hasFollowingStatsTask, Long writeId,
- int stmtId, boolean isInsertOverwrite) throws HiveException {
+ int stmtId, boolean isInsertOverwrite, DataCommitter dataCommitter) throws HiveException {
Path tblDataLocationPath = tbl.getDataLocation();
boolean isMmTableWrite = AcidUtils.isInsertOnlyTable(tbl.getParameters());
assert tbl.getPath() != null : "null==getPath() for " + tbl.getTableName();
@@ -1814,11 +1818,12 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par
boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
boolean needRecycle = !tbl.isTemporary()
&& ReplChangeManager.isSourceOfReplication(Hive.get().getDatabase(tbl.getDbName()));
- replaceFiles(tbl.getPath(), loadPath, destPath, oldPartPath, getConf(), isSrcLocal,
- isAutoPurge, newFiles, FileUtils.HIDDEN_FILES_PATH_FILTER, needRecycle, isManaged);
+ dataCommitter.replaceFiles(tbl.getPath(), loadPath, destPath, oldPartPath, getConf(), isSrcLocal,
+ isAutoPurge, newFiles, FileUtils.HIDDEN_FILES_PATH_FILTER, needRecycle, isManaged,
+ this);
} else {
FileSystem fs = tbl.getDataLocation().getFileSystem(conf);
- copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcidIUDoperation,
+ dataCommitter.copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcidIUDoperation,
(loadFileType == LoadFileType.OVERWRITE_EXISTING), newFiles,
tbl.getNumBuckets() > 0, isFullAcidTable, isManaged);
}
@@ -2197,7 +2202,7 @@ public Map