diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java index 15d291bd3e..cddbeefd9f 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java @@ -37,6 +37,7 @@ public class SparkActionExecutor extends JavaActionExecutor { public static final String SPARK_MAIN_CLASS_NAME_1 = "org.apache.oozie.action.hadoop.SparkMain"; public static final String SPARK_MAIN_CLASS_NAME_2 = "org.apache.oozie.action.hadoop.SparkMain2"; + public static final String SPARK_MAIN_CLASS_NAME_LATEST = "org.apache.oozie.action.hadoop.SparkMainLatest"; public static final String TASK_USER_PRECEDENCE = "mapreduce.task.classpath.user.precedence"; // hadoop-2 public static final String TASK_USER_CLASSPATH_PRECEDENCE = "mapreduce.user.classpath.first"; // hadoop-1 @@ -143,9 +144,11 @@ Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appP public List getLauncherClasses() { List classes = new ArrayList(); try { - if("2".equals(sparkVersion)){ + if("0".equals(sparkVersion)){ + classes.add(Class.forName(SPARK_MAIN_CLASS_NAME_LATEST)); + } else if("2".equals(sparkVersion)){ classes.add(Class.forName(SPARK_MAIN_CLASS_NAME_2)); - }else { + } else { classes.add(Class.forName(SPARK_MAIN_CLASS_NAME_1)); } } catch (ClassNotFoundException e) { @@ -163,19 +166,24 @@ public List getLauncherClasses() { */ @Override protected String getDefaultShareLibName(Element actionXml) { - if("2".equals(sparkVersion)) { + if("0".equals(sparkVersion)) { + return "sparkLatest"; + } else if("2".equals(sparkVersion)) { return "spark2"; - }else{ + } else{ return "spark"; } } @Override protected String getLauncherMain(Configuration launcherConf, Element actionXml) { - if("2".equals(sparkVersion)){ + if("0".equals(sparkVersion)){ + return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, SPARK_MAIN_CLASS_NAME_LATEST); + + } else if("2".equals(sparkVersion)){ return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, SPARK_MAIN_CLASS_NAME_2); - }else { + } else { return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, SPARK_MAIN_CLASS_NAME_1); } } diff --git a/distro/src/deb/control/postinst b/distro/src/deb/control/postinst index 0437788dfd..e6d40beab9 100755 --- a/distro/src/deb/control/postinst +++ b/distro/src/deb/control/postinst @@ -105,7 +105,7 @@ if [ "$DEPLOYMENT_MODE" == "ONLY_SHARELIB" ]; then fi DISTRO=$(ls /opt/inmobi/${OOZIE_VERSION}|grep distro|grep tar.gz|grep $OOZIE_VERSION) - sudo mkdir $OOZIE_BASE/$OOZIE_SHARELIB + sudo mkdir -p $OOZIE_BASE/$OOZIE_SHARELIB sudo tar -xvf /opt/inmobi/${OOZIE_VERSION}/$DISTRO -C $OOZIE_BASE/$OOZIE_SHARELIB cd $OOZIE_BASE/$OOZIE_SHARELIB/$OOZIE_VERSION @@ -118,6 +118,7 @@ if [ "$DEPLOYMENT_MODE" == "ONLY_SHARELIB" ]; then sudo mkdir -p $OOZIE_BASE/$OOZIE_SHARELIB/$OOZIE_VERSION/temp_sharelib/lib/hive2/ sudo mkdir -p $OOZIE_BASE/$OOZIE_SHARELIB/$OOZIE_VERSION/temp_sharelib/lib/spark/ sudo mkdir -p $OOZIE_BASE/$OOZIE_SHARELIB/$OOZIE_VERSION/temp_sharelib/lib/spark2/ + sudo mkdir -p $OOZIE_BASE/$OOZIE_SHARELIB/$OOZIE_VERSION/temp_sharelib/lib/sparkLatest/ cd $OOZIE_BASE/$OOZIE_SHARELIB/$OOZIE_VERSION sudo cp $OOZIE_BASE/$OOZIE_SHARELIB/$OOZIE_VERSION/share/lib/oozie/* $OOZIE_BASE/$OOZIE_SHARELIB/$OOZIE_VERSION/temp_sharelib/lib/oozie/ || true @@ -126,6 +127,7 @@ if [ "$DEPLOYMENT_MODE" == "ONLY_SHARELIB" ]; then sudo cp $OOZIE_BASE/$OOZIE_SHARELIB/$OOZIE_VERSION/share/lib/hive2/* $OOZIE_BASE/$OOZIE_SHARELIB/$OOZIE_VERSION/temp_sharelib/lib/hive2/ || true sudo cp $OOZIE_BASE/$OOZIE_SHARELIB/$OOZIE_VERSION/share/lib/spark/* $OOZIE_BASE/$OOZIE_SHARELIB/$OOZIE_VERSION/temp_sharelib/lib/spark/ || true sudo cp $OOZIE_BASE/$OOZIE_SHARELIB/$OOZIE_VERSION/share/lib/spark2/* $OOZIE_BASE/$OOZIE_SHARELIB/$OOZIE_VERSION/temp_sharelib/lib/spark2/ || true + sudo cp $OOZIE_BASE/$OOZIE_SHARELIB/$OOZIE_VERSION/share/lib/sparkLatest/* $OOZIE_BASE/$OOZIE_SHARELIB/$OOZIE_VERSION/temp_sharelib/lib/sparkLatest/ || true sudo chown -R oozie:oozie $OOZIE_CURRENT/ || true sudo chown -R oozie $OOZIE_CURRENT/ || true @@ -152,7 +154,7 @@ else sudo mv $OOZIE_BASE/$OOZIE_VERSION $OOZIE_CURRENT cd $OOZIE_CURRENT - sudo mkdir $OOZIE_CURRENT/libext + sudo mkdir -p $OOZIE_CURRENT/libext cd $OOZIE_CURRENT/libext sudo cp $HADOOP_CLIENT/*.jar . || true sudo cp $HADOOP_CLIENT/lib/*.jar . || true @@ -189,6 +191,7 @@ else sudo mkdir -p $OOZIE_CURRENT/temp_sharelib/lib/hive2/ sudo mkdir -p $OOZIE_CURRENT/temp_sharelib/lib/spark/ sudo mkdir -p $OOZIE_CURRENT/temp_sharelib/lib/spark2/ + sudo mkdir -p $OOZIE_CURRENT/temp_sharelib/lib/sparkLatest/ sudo mkdir -p $OOZIE_CURRENT/temp_sharelib/lib/distcp/ cd $OOZIE_CURRENT @@ -199,6 +202,7 @@ else sudo cp $OOZIE_CURRENT/share/lib/hive2/* $OOZIE_CURRENT/temp_sharelib/lib/hive2/ || true sudo cp $OOZIE_CURRENT/share/lib/spark/* $OOZIE_CURRENT/temp_sharelib/lib/spark/ || true sudo cp $OOZIE_CURRENT/share/lib/spark2/* $OOZIE_CURRENT/temp_sharelib/lib/spark2/ || true + sudo cp $OOZIE_CURRENT/share/lib/sparkLatest/* $OOZIE_CURRENT/temp_sharelib/lib/sparkLatest/ || true sudo cp $OOZIE_CURRENT/libext/s4*jar $OOZIE_CURRENT/temp_sharelib/lib/oozie/ || true sudo cp $OOZIE_CURRENT/libext/s4*jar $OOZIE_CURRENT/libext/ || true sudo cp $OOZIE_CURRENT/libext/falcon*jar $OOZIE_CURRENT/libext/ || true diff --git a/pom.xml b/pom.xml index aa96f6b5a5..5b6adfe994 100644 --- a/pom.xml +++ b/pom.xml @@ -102,6 +102,13 @@ 1.6.3 2.11 + + 2.3.1 + 1.6.3 + 1.6.3 + 2.11 + 3.5 + 14.0.1 @@ -294,6 +301,11 @@ oozie-sharelib-spark2 ${project.version} + + org.apache.oozie + oozie-sharelib-sparkLatest + ${project.version} + org.apache.oozie oozie-docs diff --git a/sharelib/pom.xml b/sharelib/pom.xml index fcdc68daa6..f15ebc33ce 100644 --- a/sharelib/pom.xml +++ b/sharelib/pom.xml @@ -41,6 +41,7 @@ distcp spark spark2 + sparkLatest diff --git a/sharelib/sparkLatest/pom.xml b/sharelib/sparkLatest/pom.xml new file mode 100644 index 0000000000..07c59a7b17 --- /dev/null +++ b/sharelib/sparkLatest/pom.xml @@ -0,0 +1,452 @@ + + + + 4.0.0 + + org.apache.oozie + oozie-main + 4.3.9-SNAPSHOT + ../.. + + org.apache.oozie + oozie-sharelib-sparkLatest + 4.3.6-SNAPSHOT + Apache Oozie Share Lib Spark + Apache Oozie Share Lib SparkLatest + jar + + + sparkLatest + false + ${spark.version.latest} + ${spark.scala.binary.version.latest} + ${spark.streaming.kafka.version.latest} + ${spark.bagel.version.latest} + + + + + + com.google.guava + guava + ${spark.guava.version} + + + org.apache.commons + commons-lang3 + ${spark.commons.lang3.version.latest} + + + commons-lang + commons-lang + compile + + + commons-io + commons-io + compile + + + org.apache.oozie + oozie-sharelib-oozie + provided + + + org.apache.spark + spark-core_${spark.scala.binary.version} + ${spark.version} + compile + + + org.apache.hadoop + hadoop-client + + + org.spark-project.hive + hive-beeline + + + org.spark-project.hive + hive-common + + + org.spark-project.hive + hive-exec + + + org.spark-project.hive + hive-jdbc + + + org.spark-project.hive + hive-metastore + + + org.spark-project.hive + hive-serde + + + org.spark-project.hive + hive-service + + + org.spark-project.hive + hive-shims + + + + + org.apache.spark + spark-graphx_${spark.scala.binary.version} + ${spark.version} + compile + + + org.apache.spark + spark-hive_${spark.scala.binary.version} + ${spark.version} + compile + + + org.apache.hadoop + hadoop-yarn-server-web-proxy + + + org.apache.hadoop + hadoop-yarn-server-applicationhistoryservice + + + org.apache.hadoop + hadoop-yarn-server-resourcemanager + + + org.apache.hadoop + hadoop-common + + + org.apache.hadoop + hadoop-annotations + + + org.spark-project.hive + hive-beeline + + + org.spark-project.hive + hive-common + + + org.spark-project.hive + hive-exec + + + org.spark-project.hive + hive-jdbc + + + org.spark-project.hive + hive-metastore + + + org.spark-project.hive + hive-serde + + + org.spark-project.hive + hive-service + + + org.spark-project.hive + hive-shims + + + + + org.apache.spark + spark-mllib_${spark.scala.binary.version} + ${spark.version} + compile + + + org.apache.spark + spark-repl_${spark.scala.binary.version} + ${spark.version} + compile + + + org.apache.spark + spark-sql_${spark.scala.binary.version} + ${spark.version} + compile + + + org.apache.spark + spark-streaming_${spark.scala.binary.version} + ${spark.version} + compile + + + org.mortbay.jetty + servlet-api-2.5 + + + + + org.apache.spark + spark-streaming-flume_${spark.scala.binary.version} + ${spark.version} + compile + + + org.mortbay.jetty + servlet-api-2.5 + + + + + org.apache.spark + spark-streaming-kafka_${spark.scala.binary.version} + ${spark.streaming.kafka.version} + compile + + + org.apache.spark + spark-bagel_${spark.scala.binary.version} + ${spark.bagel.version} + compile + + + + org.apache.oozie + oozie-core + provided + + + javax.servlet + servlet-api + + + + + + org.apache.hadoop + hadoop-client + provided + + + + org.apache.oozie + oozie-core + tests + test + + + + org.apache.hadoop + hadoop-minicluster + test + + + + junit + junit + test + + + + org.apache.oozie + oozie-examples + test + + + + + + + src/main/resources + true + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + gen-classpath + generate-test-resources + + build-classpath + + + compile + ${project.build.directory}/classpath + + + + create-mrapp-generated-classpath + generate-test-resources + + build-classpath + + + + ${project.build.directory}/test-classes/mrapp-generated-classpath + + + + + + org.apache.maven.plugins + maven-antrun-plugin + 1.6 + + + + + + + + + + run + + generate-test-resources + + + + + org.apache.maven.plugins + maven-assembly-plugin + + partial-sharelib + false + + ../../src/main/assemblies/partial-sharelib.xml + + + + + + + + + hadoop-2 + + + org.apache.spark + spark-yarn_${spark.scala.binary.version} + ${spark.version} + compile + + + org.apache.hadoop + hadoop-yarn-client + + + org.apache.hadoop + hadoop-yarn-api + + + org.apache.hadoop + hadoop-yarn-common + + + org.apache.hadoop + hadoop-yarn-server-common + + + org.apache.hadoop + hadoop-yarn-server-web-proxy + + + org.apache.hadoop + hadoop-mapreduce-client-shuffle + + + org.apache.hadoop + hadoop-mapreduce-client-jobclient + + + org.apache.hadoop + hadoop-mapreduce-client-core + + + org.apache.hadoop + hadoop-mapreduce-client-common + + + org.apache.hadoop + hadoop-mapreduce-client-app + + + org.apache.hadoop + hadoop-annotations + + + org.apache.hadoop + hadoop-auth + + + org.apache.hadoop + hadoop-aws + + + org.apache.hadoop + hadoop-client + + + org.spark-project.hive + hive-beeline + + + org.spark-project.hive + hive-common + + + org.spark-project.hive + hive-exec + + + org.spark-project.hive + hive-jdbc + + + org.spark-project.hive + hive-metastore + + + org.spark-project.hive + hive-serde + + + org.spark-project.hive + hive-service + + + org.spark-project.hive + hive-shims + + + + + + + diff --git a/sharelib/sparkLatest/src/main/java/org/apache/oozie/action/hadoop/SparkMainLatest.java b/sharelib/sparkLatest/src/main/java/org/apache/oozie/action/hadoop/SparkMainLatest.java new file mode 100644 index 0000000000..60e9865ee5 --- /dev/null +++ b/sharelib/sparkLatest/src/main/java/org/apache/oozie/action/hadoop/SparkMainLatest.java @@ -0,0 +1,538 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.action.hadoop; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; +import java.util.jar.JarFile; +import java.util.jar.Manifest; +import java.util.regex.Pattern; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.PropertyConfigurator; +import org.apache.spark.deploy.SparkSubmit; + +public class SparkMainLatest extends LauncherMain { + private static final String MASTER_OPTION = "--master"; + private static final String MODE_OPTION = "--deploy-mode"; + private static final String JOB_NAME_OPTION = "--name"; + private static final String CLASS_NAME_OPTION = "--class"; + private static final String VERBOSE_OPTION = "--verbose"; + private static final String DRIVER_CLASSPATH_OPTION = "--driver-class-path"; + private static final String EXECUTOR_CLASSPATH = "spark.executor.extraClassPath="; + private static final String DRIVER_CLASSPATH = "spark.driver.extraClassPath="; + private static final String EXECUTOR_EXTRA_JAVA_OPTIONS = "spark.executor.extraJavaOptions="; + private static final String DRIVER_EXTRA_JAVA_OPTIONS = "spark.driver.extraJavaOptions="; + private static final String LOG4J_CONFIGURATION_JAVA_OPTION = "-Dlog4j.configuration="; + private static final String HIVE_SECURITY_TOKEN = "spark.yarn.security.tokens.hive.enabled"; + private static final String HBASE_SECURITY_TOKEN = "spark.yarn.security.tokens.hbase.enabled"; + private static final String CONF_OOZIE_SPARK_SETUP_HADOOP_CONF_DIR = "oozie.action.spark.setup.hadoop.conf.dir"; + private static final String PWD = "$PWD" + File.separator + "*"; + private static final Pattern[] PYSPARK_DEP_FILE_PATTERN = { Pattern.compile("py4\\S*src.zip"), + Pattern.compile("pyspark.zip") }; + private static final Pattern SPARK_DEFAULTS_FILE_PATTERN = Pattern.compile("spark-defaults.conf"); + private static final String SPARK_LOG4J_PROPS = "spark-log4j.properties"; + private static final Pattern[] SPARK_JOB_IDS_PATTERNS = { + Pattern.compile("Submitted application (application[0-9_]*)") }; + public static final Pattern SPARK_ASSEMBLY_JAR_PATTERN = Pattern + .compile("^spark-assembly((?:(-|_|(\\d+\\.))\\d+(?:\\.\\d+)*))*\\.jar$"); + public static final Pattern SPARK_YARN_JAR_PATTERN = Pattern + .compile("^spark-yarn((?:(-|_|(\\d+\\.))\\d+(?:\\.\\d+)*))*\\.jar$"); + private static final Pattern SPARK_VERSION_1 = Pattern.compile("^1.*"); + private static final String SPARK_YARN_JAR = "spark.yarn.jar"; + private static final String SPARK_YARN_JARS = "spark.yarn.jars"; + private String sparkYarnJar = null; + private String sparkVersion = "1.X.X"; + public static void main(String[] args) throws Exception { + run(SparkMainLatest.class, args); + } + + @Override + protected void run(String[] args) throws Exception { + boolean isPyspark = false; + Configuration actionConf = loadActionConf(); + prepareHadoopConfig(actionConf); + + setYarnTag(actionConf); + LauncherMainHadoopUtils.killChildYarnJobs(actionConf); + String logFile = setUpSparkLog4J(actionConf); + List sparkArgs = new ArrayList(); + + sparkArgs.add(MASTER_OPTION); + String master = actionConf.get(SparkActionExecutor.SPARK_MASTER); + sparkArgs.add(master); + + // In local mode, everything runs here in the Launcher Job. + // In yarn-client mode, the driver runs here in the Launcher Job and the + // executor in Yarn. + // In yarn-cluster mode, the driver and executor run in Yarn. + String sparkDeployMode = actionConf.get(SparkActionExecutor.SPARK_MODE); + if (sparkDeployMode != null) { + sparkArgs.add(MODE_OPTION); + sparkArgs.add(sparkDeployMode); + } + boolean yarnClusterMode = master.equals("yarn-cluster") + || (master.equals("yarn") && sparkDeployMode != null && sparkDeployMode.equals("cluster")); + boolean yarnClientMode = master.equals("yarn-client") + || (master.equals("yarn") && sparkDeployMode != null && sparkDeployMode.equals("client")); + + sparkArgs.add(JOB_NAME_OPTION); + sparkArgs.add(actionConf.get(SparkActionExecutor.SPARK_JOB_NAME)); + + String className = actionConf.get(SparkActionExecutor.SPARK_CLASS); + if (className != null) { + sparkArgs.add(CLASS_NAME_OPTION); + sparkArgs.add(className); + } + + String jarPath = actionConf.get(SparkActionExecutor.SPARK_JAR); + if(jarPath!=null && jarPath.endsWith(".py")){ + isPyspark = true; + } + boolean addedHiveSecurityToken = false; + boolean addedHBaseSecurityToken = false; + boolean addedLog4jDriverSettings = false; + boolean addedLog4jExecutorSettings = false; + StringBuilder driverClassPath = new StringBuilder(); + StringBuilder executorClassPath = new StringBuilder(); + String sparkOpts = actionConf.get(SparkActionExecutor.SPARK_OPTS); + if (StringUtils.isNotEmpty(sparkOpts)) { + List sparkOptions = splitSparkOpts(sparkOpts); + for (int i = 0; i < sparkOptions.size(); i++) { + String opt = sparkOptions.get(i); + boolean addToSparkArgs = true; + if (yarnClusterMode || yarnClientMode) { + if (opt.startsWith(EXECUTOR_CLASSPATH)) { + appendWithPathSeparator(opt.substring(EXECUTOR_CLASSPATH.length()), executorClassPath); + addToSparkArgs = false; + } + if (opt.startsWith(DRIVER_CLASSPATH)) { + appendWithPathSeparator(opt.substring(DRIVER_CLASSPATH.length()), driverClassPath); + addToSparkArgs = false; + } + if (opt.equals(DRIVER_CLASSPATH_OPTION)) { + // we need the next element after this option + appendWithPathSeparator(sparkOptions.get(i + 1), driverClassPath); + // increase i to skip the next element. + i++; + addToSparkArgs = false; + } + } + if (opt.startsWith(HIVE_SECURITY_TOKEN)) { + addedHiveSecurityToken = true; + } + if (opt.startsWith(HBASE_SECURITY_TOKEN)) { + addedHBaseSecurityToken = true; + } + if (opt.startsWith(EXECUTOR_EXTRA_JAVA_OPTIONS) || opt.startsWith(DRIVER_EXTRA_JAVA_OPTIONS)) { + if(!opt.contains(LOG4J_CONFIGURATION_JAVA_OPTION)) { + opt += " " + LOG4J_CONFIGURATION_JAVA_OPTION + SPARK_LOG4J_PROPS; + }else{ + System.out.println("Warning: Spark Log4J settings are overwritten." + + " Child job IDs may not be available"); + } + if(opt.startsWith(EXECUTOR_EXTRA_JAVA_OPTIONS)) { + addedLog4jExecutorSettings = true; + }else{ + addedLog4jDriverSettings = true; + } + } + if(addToSparkArgs) { + sparkArgs.add(opt); + } + } + } + + if ((yarnClusterMode || yarnClientMode)) { + // Include the current working directory (of executor container) + // in executor classpath, because it will contain localized + // files + appendWithPathSeparator(PWD, executorClassPath); + appendWithPathSeparator(PWD, driverClassPath); + + sparkArgs.add("--conf"); + sparkArgs.add(EXECUTOR_CLASSPATH + executorClassPath.toString()); + + sparkArgs.add("--conf"); + sparkArgs.add(DRIVER_CLASSPATH + driverClassPath.toString()); + } + + if (actionConf.get(MAPREDUCE_JOB_TAGS) != null) { + sparkArgs.add("--conf"); + sparkArgs.add("spark.yarn.tags=" + actionConf.get(MAPREDUCE_JOB_TAGS)); + } + + if (!addedHiveSecurityToken) { + sparkArgs.add("--conf"); + sparkArgs.add(HIVE_SECURITY_TOKEN + "=false"); + } + if (!addedHBaseSecurityToken) { + sparkArgs.add("--conf"); + sparkArgs.add(HBASE_SECURITY_TOKEN + "=false"); + } + if(!addedLog4jExecutorSettings) { + sparkArgs.add("--conf"); + sparkArgs.add(EXECUTOR_EXTRA_JAVA_OPTIONS + LOG4J_CONFIGURATION_JAVA_OPTION + SPARK_LOG4J_PROPS); + } + if(!addedLog4jDriverSettings) { + sparkArgs.add("--conf"); + sparkArgs.add(DRIVER_EXTRA_JAVA_OPTIONS + LOG4J_CONFIGURATION_JAVA_OPTION + SPARK_LOG4J_PROPS); + } + File defaultConfFile = getMatchingFile(SPARK_DEFAULTS_FILE_PATTERN); + if (defaultConfFile != null) { + sparkArgs.add("--properties-file"); + sparkArgs.add(SPARK_DEFAULTS_FILE_PATTERN.toString()); + } + + if ((yarnClusterMode || yarnClientMode)) { + LinkedList fixedUris = fixFsDefaultUris(DistributedCache.getCacheFiles(actionConf), jarPath); + String cachedFiles = filterSparkYarnJar(fixedUris); + if (cachedFiles != null && !cachedFiles.isEmpty()) { + sparkArgs.add("--files"); + sparkArgs.add(cachedFiles); + } + fixedUris = fixFsDefaultUris(DistributedCache.getCacheArchives(actionConf), jarPath); + String cachedArchives = StringUtils.join(fixedUris, ","); + if (cachedArchives != null && !cachedArchives.isEmpty()) { + sparkArgs.add("--archives"); + sparkArgs.add(cachedArchives); + } + setSparkYarnJarsConf(sparkArgs); + } + + if (!sparkArgs.contains(VERBOSE_OPTION)) { + sparkArgs.add(VERBOSE_OPTION); + } + + sparkArgs.add(jarPath); + for (String arg : args) { + sparkArgs.add(arg); + } + if (isPyspark){ + createPySparkLibFolder(); + } + + + ClassLoader cl = ClassLoader.getSystemClassLoader(); + + URL[] urls = ((URLClassLoader)cl).getURLs(); + + System.out.println("++++++++++all jar +++++++++++++"); + for(URL url: urls){ + System.out.println(url.getFile()); + } + + + + System.out.println("Spark Action Main class : " + SparkSubmit.class.getName()); + System.out.println(); + System.out.println("Oozie Spark action configuration"); + System.out.println("================================================================="); + System.out.println(); + for (String arg : sparkArgs) { + System.out.println(" " + arg); + } + System.out.println(); + try { + runSpark(sparkArgs.toArray(new String[sparkArgs.size()])); + } + finally { + System.out.println("\n<<< Invocation of Spark command completed <<<\n"); + writeExternalChildIDs(logFile, SPARK_JOB_IDS_PATTERNS, "Spark"); + } + } + + private void prepareHadoopConfig(Configuration actionConf) throws IOException { + // Copying oozie.action.conf.xml into hadoop configuration *-site files. + if (actionConf.getBoolean(CONF_OOZIE_SPARK_SETUP_HADOOP_CONF_DIR, false)) { + String actionXml = System.getProperty("oozie.action.conf.xml"); + if (actionXml != null) { + File currentDir = new File(actionXml).getParentFile(); + writeHadoopConfig(actionXml, currentDir); + } + } + } + + /** + * SparkActionExecutor sets the SPARK_HOME environment variable to the local directory. + * Spark is looking for the pyspark.zip and py4j-VERSION-src.zip files in the python/lib folder under SPARK_HOME. + * This function creates the subfolders and copies the zips from the local folder. + * @throws OozieActionConfiguratorException if the zip files are missing + * @throws IOException if there is an error during file copy + */ + private void createPySparkLibFolder() throws OozieActionConfiguratorException, IOException { + File pythonLibDir = new File("python/lib"); + if(!pythonLibDir.exists()){ + pythonLibDir.mkdirs(); + System.out.println("PySpark lib folder " + pythonLibDir.getAbsolutePath() + " folder created."); + } + + for(Pattern fileNamePattern : PYSPARK_DEP_FILE_PATTERN) { + File file = getMatchingPyFile(fileNamePattern); + File destination = new File(pythonLibDir, file.getName()); + FileUtils.copyFile(file, destination); + System.out.println("Copied " + file + " to " + destination.getAbsolutePath()); + } + } + + /** + * Searches for a file in the current directory that matches the given pattern. + * If there are multiple files matching the pattern returns one of them. + * @param fileNamePattern the pattern to look for + * @return the file if there is one + * @throws OozieActionConfiguratorException if there is are no files matching the pattern + */ + private File getMatchingPyFile(Pattern fileNamePattern) throws OozieActionConfiguratorException { + File f = getMatchingFile(fileNamePattern); + if (f != null) { + return f; + } + throw new OozieActionConfiguratorException("Missing py4j and/or pyspark zip files. Please add them to " + + "the lib folder or to the Spark sharelib."); + } + + /** + * Searches for a file in the current directory that matches the given + * pattern. If there are multiple files matching the pattern returns one of + * them. + * + * @param fileNamePattern the pattern to look for + * @return the file if there is one else it returns null + */ + private File getMatchingFile(Pattern fileNamePattern) throws OozieActionConfiguratorException { + File localDir = new File("."); + for(String fileName : localDir.list()){ + if(fileNamePattern.matcher(fileName).find()){ + return new File(fileName); + } + } + return null; + } + + private void runSpark(String[] args) throws Exception { + System.out.println("================================================================="); + System.out.println(); + System.out.println(">>> Invoking Spark class now >>>"); + System.out.println(); + System.out.flush(); + SparkSubmit.main(args); + } + + /** + * Converts the options to be Spark-compatible. + *
    + *
  • Parameters are separated by whitespace and can be groupped using double quotes
  • + *
  • Quotes should be removed
  • + *
  • Adjacent whitespace separators are treated as one
  • + *
+ * @param sparkOpts the options for Spark + * @return the options parsed into a list + */ + static List splitSparkOpts(String sparkOpts){ + List result = new ArrayList(); + StringBuilder currentWord = new StringBuilder(); + boolean insideQuote = false; + for (int i = 0; i < sparkOpts.length(); i++) { + char c = sparkOpts.charAt(i); + if (c == '"') { + insideQuote = !insideQuote; + } else if (Character.isWhitespace(c) && !insideQuote) { + if (currentWord.length() > 0) { + result.add(currentWord.toString()); + currentWord.setLength(0); + } + } else { + currentWord.append(c); + } + } + if(currentWord.length()>0) { + result.add(currentWord.toString()); + } + return result; + } + + public static String setUpSparkLog4J(Configuration distcpConf) throws IOException { + // Logfile to capture job IDs + String hadoopJobId = System.getProperty("oozie.launcher.job.id"); + if (hadoopJobId == null) { + throw new RuntimeException("Launcher Hadoop Job ID system,property not set"); + } + String logFile = new File("spark-oozie-" + hadoopJobId + ".log").getAbsolutePath(); + Properties hadoopProps = new Properties(); + + // Preparing log4j configuration + URL log4jFile = Thread.currentThread().getContextClassLoader().getResource("log4j.properties"); + if (log4jFile != null) { + // getting hadoop log4j configuration + hadoopProps.load(log4jFile.openStream()); + } + + String logLevel = distcpConf.get("oozie.spark.log.level", "INFO"); + String rootLogLevel = distcpConf.get("oozie.action." + LauncherMapper.ROOT_LOGGER_LEVEL, "INFO"); + + hadoopProps.setProperty("log4j.rootLogger", rootLogLevel + ", A"); + hadoopProps.setProperty("log4j.logger.org.apache.spark", logLevel + ", A, jobid"); + hadoopProps.setProperty("log4j.additivity.org.apache.spark", "false"); + hadoopProps.setProperty("log4j.appender.A", "org.apache.log4j.ConsoleAppender"); + hadoopProps.setProperty("log4j.appender.A.layout", "org.apache.log4j.PatternLayout"); + hadoopProps.setProperty("log4j.appender.A.layout.ConversionPattern", "%d [%t] %-5p %c %x - %m%n"); + hadoopProps.setProperty("log4j.appender.jobid", "org.apache.log4j.FileAppender"); + hadoopProps.setProperty("log4j.appender.jobid.file", logFile); + hadoopProps.setProperty("log4j.appender.jobid.layout", "org.apache.log4j.PatternLayout"); + hadoopProps.setProperty("log4j.appender.jobid.layout.ConversionPattern", "%d [%t] %-5p %c %x - %m%n"); + hadoopProps.setProperty("log4j.logger.org.apache.hadoop.mapred", "INFO, jobid"); + hadoopProps.setProperty("log4j.logger.org.apache.hadoop.mapreduce.Job", "INFO, jobid"); + hadoopProps.setProperty("log4j.logger.org.apache.hadoop.yarn.client.api.impl.YarnClientImpl", "INFO, jobid"); + + String localProps = new File(SPARK_LOG4J_PROPS).getAbsolutePath(); + OutputStream os1 = new FileOutputStream(localProps); + try { + hadoopProps.store(os1, ""); + } + finally { + os1.close(); + } + PropertyConfigurator.configure(SPARK_LOG4J_PROPS); + return logFile; + } + + /** + * Convert URIs into the default format which Spark expects + * + * @param files + * @return + * @throws IOException + * @throws URISyntaxException + */ + private LinkedList fixFsDefaultUris(URI[] files, String jarPath) throws IOException, URISyntaxException { + if (files == null) { + return null; + } + LinkedList listUris = new LinkedList(); + FileSystem fs = FileSystem.get(new Configuration(true)); + for (int i = 0; i < files.length; i++) { + URI fileUri = files[i]; + // Spark compares URIs based on scheme, host and port. + // Here we convert URIs into the default format so that Spark + // won't think those belong to different file system. + // This will avoid an extra copy of files which already exists on + // same hdfs. + if (fs.getUri().getScheme().equals(fileUri.getScheme()) + && (fs.getUri().getHost().equals(fileUri.getHost()) || fileUri.getHost() == null) + && (fs.getUri().getPort() == -1 || fileUri.getPort() == -1 + || fs.getUri().getPort() == fileUri.getPort())) { + URI uri = new URI(fs.getUri().getScheme(), fileUri.getUserInfo(), fs.getUri().getHost(), + fs.getUri().getPort(), fileUri.getPath(), fileUri.getQuery(), fileUri.getFragment()); + // Here we skip the application jar, because + // (if uris are same,) it will get distributed multiple times + // - one time with --files and another time as application jar. + if (!uri.getRawPath().equals(new Path(jarPath).toUri().getRawPath())) { + listUris.add(uri); + } else { + System.out.println("Skipping duplicate file; URI: " + uri.toString()); + } + } + } + return listUris; + } + + /** + * Filters out the Spark yarn jar and records its version + * + * @param listUris string containing uris separated by comma + * @return + * @throws OozieActionConfiguratorException + */ + private String filterSparkYarnJar(LinkedList listUris) throws OozieActionConfiguratorException { + Iterator iterator = listUris.iterator(); + File matchedFile = null; + while (iterator.hasNext()) { + URI uri = iterator.next(); + Path p = new Path(uri); + if (SPARK_YARN_JAR_PATTERN.matcher(p.getName()).find()) { + matchedFile = getMatchingFile(SPARK_YARN_JAR_PATTERN); + } + else if (SPARK_ASSEMBLY_JAR_PATTERN.matcher(p.getName()).find()) { + matchedFile = getMatchingFile(SPARK_ASSEMBLY_JAR_PATTERN); + } + if (matchedFile != null) { + sparkYarnJar = uri.toString(); + try { + sparkVersion = getJarVersion(matchedFile); + System.out.println("Spark Version " + sparkVersion); + } + catch (IOException io) { + System.out.println( + "Unable to open " + matchedFile.getPath() + ". Default Spark Version " + sparkVersion); + } + iterator.remove(); + break; + } + } + return StringUtils.join(listUris, ","); + } + + /** + * Sets spark.yarn.jars for Spark 2.X. Sets spark.yarn.jar for Spark 1.X. + * + * @param sparkArgs + */ + private void setSparkYarnJarsConf(List sparkArgs) { + if (SPARK_VERSION_1.matcher(sparkVersion).find()) { + // In Spark 1.X.X, set spark.yarn.jar to avoid + // multiple distribution + sparkArgs.add("--conf"); + sparkArgs.add(SPARK_YARN_JAR + "=" + sparkYarnJar); + } + else { + // In Spark 2.X.X, set spark.yarn.jars + sparkArgs.add("--conf"); + sparkArgs.add(SPARK_YARN_JARS + "=" + sparkYarnJar); + } + } + + private String getJarVersion(File jarFile) throws IOException { + @SuppressWarnings("resource") + Manifest manifest = new JarFile(jarFile).getManifest(); + return manifest.getMainAttributes().getValue("Specification-Version"); + } + + private void appendWithPathSeparator(String what, StringBuilder to){ + if(to.length() > 0){ + to.append(File.pathSeparator); + } + to.append(what); + } +} \ No newline at end of file diff --git a/sharelib/sparkLatest/src/test/java/org/apache/oozie/action/hadoop/TestPyspark.java b/sharelib/sparkLatest/src/test/java/org/apache/oozie/action/hadoop/TestPyspark.java new file mode 100644 index 0000000000..f49d1bcde5 --- /dev/null +++ b/sharelib/sparkLatest/src/test/java/org/apache/oozie/action/hadoop/TestPyspark.java @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.action.hadoop; + +import java.io.StringReader; +import java.text.MessageFormat; +import java.util.ArrayList; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.RunningJob; +import org.apache.oozie.WorkflowActionBean; +import org.apache.oozie.WorkflowJobBean; +import org.apache.oozie.client.WorkflowAction; +import org.apache.oozie.service.HadoopAccessorService; +import org.apache.oozie.service.Services; +import org.apache.oozie.service.WorkflowAppService; +import org.apache.oozie.util.IOUtils; +import org.apache.oozie.util.XConfiguration; + +public class TestPyspark extends ActionExecutorTestCase { + + + public static String PY4J_ZIP = "py4j-0.9-src.zip"; + public static String PYSPARK_ZIP = "pyspark.zip"; + public static String PI_EXAMPLE = "pi.py"; + + + @Override + protected void setSystemProps() throws Exception { + super.setSystemProps(); + setSystemProperty("oozie.service.ActionService.executor.classes", SparkActionExecutor.class.getName()); + } + + protected String getActionXml(String sparkOpts) { + String script = "" + + "{0}" + + "{1}" + + "local[*]" + + "client" + + "PysparkExample" + + "" + PI_EXAMPLE + "" + + "" +sparkOpts +"" + + "0"+ + ""; + return MessageFormat.format(script, getJobTrackerUri(), getNameNodeUri()); + } + + public void testPyspark() throws Exception { + ArrayList listLibFiles = new ArrayList(); + + // does not have any files + // pyspark and py4j are not present in current directory. + String sparkOpts = "--conf " + TestSparkActionExecutor.SPARK_TESTING_MEMORY; + WorkflowJobBean wf = getWorkflow(listLibFiles); + testPysparkHelper(sparkOpts, wf, "FAILED/KILLED", WorkflowAction.Status.ERROR); + + // has other files; + // pyspark and py4j are not present in current directory. + sparkOpts = "--py-files other.zip,myfunctions.py --conf " + TestSparkActionExecutor.SPARK_TESTING_MEMORY; + listLibFiles.add("other.zip"); + listLibFiles.add("myfunctions.py"); + wf = getWorkflow(listLibFiles); + testPysparkHelper(sparkOpts, wf, "FAILED/KILLED", WorkflowAction.Status.ERROR); + + // does not have any files + // pyspark and py4j are present in current directory. + sparkOpts = "--conf " + TestSparkActionExecutor.SPARK_TESTING_MEMORY; + listLibFiles.clear(); + listLibFiles.add(PY4J_ZIP); + listLibFiles.add(PYSPARK_ZIP); + wf = getWorkflow(listLibFiles); + testPysparkHelper(sparkOpts, wf, "SUCCEEDED", WorkflowAction.Status.OK); + + // has some other files + // pyspark and py4j are present in current directory. + sparkOpts = "--py-files other.zip,myfunctions.py --conf " + TestSparkActionExecutor.SPARK_TESTING_MEMORY; + listLibFiles.clear(); + listLibFiles.add("other.zip"); + listLibFiles.add("myfunctions.py"); + listLibFiles.add(PY4J_ZIP); + listLibFiles.add(PYSPARK_ZIP); + wf = getWorkflow(listLibFiles); + testPysparkHelper(sparkOpts, wf, "SUCCEEDED", WorkflowAction.Status.OK); + } + + private void testPysparkHelper(String sparkOpts, WorkflowJobBean wf, String externalStatus, + WorkflowAction.Status wfStatus) + throws Exception { + Context context = createContext(getActionXml(sparkOpts), wf); + final RunningJob launcherJob = submitAction(context); + waitFor(200 * 1000, new Predicate() { + public boolean evaluate() throws Exception { + return launcherJob.isComplete(); + } + }); + assertTrue(launcherJob.isSuccessful()); + SparkActionExecutor ae = new SparkActionExecutor(); + ae.check(context, context.getAction()); + assertEquals(externalStatus, context.getAction().getExternalStatus()); + ae.end(context, context.getAction()); + assertEquals(wfStatus, context.getAction().getStatus()); + } + + protected RunningJob submitAction(Context context) throws Exception { + SparkActionExecutor ae = new SparkActionExecutor(); + WorkflowAction action = context.getAction(); + ae.prepareActionDir(getFileSystem(), context); + ae.submitLauncher(getFileSystem(), context, action); + String jobId = action.getExternalId(); + String jobTracker = action.getTrackerUri(); + String consoleUrl = action.getConsoleUrl(); + assertNotNull(jobId); + assertNotNull(jobTracker); + assertNotNull(consoleUrl); + JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker); + jobConf.set("mapred.job.tracker", jobTracker); + JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(getTestUser(), jobConf); + final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId)); + assertNotNull(runningJob); + return runningJob; + } + + protected Context createContext(String actionXml, WorkflowJobBean wf) throws Exception { + SparkActionExecutor ae = new SparkActionExecutor(); + WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0); + action.setType(ae.getType()); + action.setConf(actionXml); + return new Context(wf, action); + } + + /** + * @param listLibFiles list of files to be created in workflow lib/ + * directory + * @return + * @throws Exception + */ + protected WorkflowJobBean getWorkflow(ArrayList listLibFiles) throws Exception { + // add the example file as well + listLibFiles.add(PI_EXAMPLE); + String[] libPaths = new String[listLibFiles.size()]; + FileSystem fs = getFileSystem(); + for (int i = 0; i < listLibFiles.size(); i++) { + libPaths[i] = new Path("lib/" + listLibFiles.get(i)).toString(); + if (listLibFiles.get(i).equals(PY4J_ZIP) || listLibFiles.get(i).equals(PYSPARK_ZIP) + || listLibFiles.get(i).equals(PI_EXAMPLE)) { + IOUtils.copyStream(IOUtils.getResourceAsStream(listLibFiles.get(i), -1), + fs.create(new Path(getAppPath(), "lib/" + listLibFiles.get(i)))); + } + else { + fs.createNewFile(new Path(getAppPath(), "lib/" + listLibFiles.get(i))); + } + } + XConfiguration protoConf = new XConfiguration(); + protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser()); + SharelibUtils.addToDistributedCache("spark", getFileSystem(), getFsTestCaseDir(), protoConf); + WorkflowJobBean wf = createBaseWorkflow(protoConf, "spark-action"); + String defaultProtoConf = wf.getProtoActionConf(); + XConfiguration newProtoConf = new XConfiguration(new StringReader(defaultProtoConf)); + newProtoConf.setStrings(WorkflowAppService.APP_LIB_PATH_LIST, libPaths); + wf.setProtoActionConf(newProtoConf.toXmlString()); + return wf; + } +} \ No newline at end of file diff --git a/sharelib/sparkLatest/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java b/sharelib/sparkLatest/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java new file mode 100644 index 0000000000..486fd27b73 --- /dev/null +++ b/sharelib/sparkLatest/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.action.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.RunningJob; +import org.apache.oozie.WorkflowActionBean; +import org.apache.oozie.WorkflowJobBean; +import org.apache.oozie.client.WorkflowAction; +import org.apache.oozie.service.ConfigurationService; +import org.apache.oozie.service.HadoopAccessorService; +import org.apache.oozie.service.Services; +import org.apache.oozie.service.SparkConfigurationService; +import org.apache.oozie.service.WorkflowAppService; +import org.apache.oozie.util.IOUtils; +import org.apache.oozie.util.XConfiguration; +import org.apache.oozie.util.XmlUtils; +import org.jdom.Element; + +import java.io.FileOutputStream; +import java.io.OutputStreamWriter; +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Writer; + +import java.text.MessageFormat; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class TestSparkActionExecutor extends ActionExecutorTestCase { + private static final String SPARK_FILENAME = "file.txt"; + private static final String OUTPUT = "output"; + private static Pattern SPARK_OPTS_PATTERN = Pattern.compile("([^= ]+)=([^= ]+)"); + public static String SPARK_TESTING_MEMORY = "spark.testing.memory=512000000"; // 512MB + @Override + protected void setSystemProps() throws Exception { + super.setSystemProps(); + setSystemProperty("oozie.service.ActionService.executor.classes", SparkActionExecutor.class.getName()); + } + + public void testSetupMethods() throws Exception { + _testSetupMethods("local[*]", new HashMap(), "client"); + _testSetupMethods("yarn", new HashMap(), "cluster"); + _testSetupMethods("yarn", new HashMap(), "client"); + _testSetupMethods("yarn-cluster", new HashMap(), null); + _testSetupMethods("yarn-client", new HashMap(), null); + } + + public void testSetupMethodsWithSparkConfiguration() throws Exception { + File sparkConfDir = new File(getTestCaseConfDir(), "spark-conf"); + sparkConfDir.mkdirs(); + File sparkConf = new File(sparkConfDir, "spark-defaults.conf"); + Properties sparkConfProps = new Properties(); + sparkConfProps.setProperty("a", "A"); + sparkConfProps.setProperty("b", "B"); + FileOutputStream fos = null; + try { + fos = new FileOutputStream(sparkConf); + sparkConfProps.store(fos, ""); + } finally { + IOUtils.closeSafely(fos); + } + SparkConfigurationService scs = Services.get().get(SparkConfigurationService.class); + scs.destroy(); + ConfigurationService.set("oozie.service.SparkConfigurationService.spark.configurations", + getJobTrackerUri() + "=" + sparkConfDir.getAbsolutePath()); + scs.init(Services.get()); + + _testSetupMethods("local[*]", new HashMap(), "client"); + Map extraSparkOpts = new HashMap(2); + extraSparkOpts.put("a", "A"); + extraSparkOpts.put("b", "B"); + _testSetupMethods("yarn-cluster", extraSparkOpts, null); + _testSetupMethods("yarn-client", extraSparkOpts, null); + } + + @SuppressWarnings("unchecked") + private void _testSetupMethods(String master, Map extraSparkOpts, String mode) throws Exception { + SparkActionExecutor ae = new SparkActionExecutor(); + assertEquals(Arrays.asList(SparkMainLatest.class), ae.getLauncherClasses()); + + Element actionXml = XmlUtils.parseXml("" + + "" + getJobTrackerUri() + "" + + "" + getNameNodeUri() + "" + + "" + master + "" + + (mode != null ? "" + mode + "" : "") + + "Some Name" + + "org.apache.oozie.foo" + + "" + getNameNodeUri() + "/foo.jar" + + "--conf foo=bar" + + "0"+ + ""); + + XConfiguration protoConf = new XConfiguration(); + protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser()); + + WorkflowJobBean wf = createBaseWorkflow(protoConf, "spark-action"); + WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0); + action.setType(ae.getType()); + + Context context = new Context(wf, action); + + Configuration conf = ae.createBaseHadoopConf(context, actionXml); + ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir()); + assertEquals(master, conf.get("oozie.spark.master")); + assertEquals(mode, conf.get("oozie.spark.mode")); + assertEquals("Some Name", conf.get("oozie.spark.name")); + assertEquals("org.apache.oozie.foo", conf.get("oozie.spark.class")); + assertEquals(getNameNodeUri() + "/foo.jar", conf.get("oozie.spark.jar")); + Map sparkOpts = new HashMap(); + sparkOpts.put("foo", "bar"); + sparkOpts.putAll(extraSparkOpts); + Matcher m = SPARK_OPTS_PATTERN.matcher(conf.get("oozie.spark.spark-opts")); + int count = 0; + while (m.find()) { + count++; + String key = m.group(1); + String val = m.group(2); + assertEquals(sparkOpts.get(key), val); + } + assertEquals(sparkOpts.size(), count); + } + + private String getActionXml() { + String script = "" + + "{0}" + + "{1}" + + "local[*]" + + "client" + + "SparkFileCopy" + + "org.apache.oozie.example.SparkFileCopy" + + "" + getAppPath() +"/lib/test.jar" + + "" + getAppPath() + "/" + SPARK_FILENAME + "" + + "" + getAppPath() + "/" + OUTPUT + "" + + "--conf " +SPARK_TESTING_MEMORY+""+ + "0"+ + ""; + return MessageFormat.format(script, getJobTrackerUri(), getNameNodeUri()); + } + + + public void testSparkAction() throws Exception { + FileSystem fs = getFileSystem(); + Path file = new Path(getAppPath(), SPARK_FILENAME); + Writer scriptWriter = new OutputStreamWriter(fs.create(file)); + scriptWriter.write("1,2,3"); + scriptWriter.write("\n"); + scriptWriter.write("2,3,4"); + scriptWriter.close(); + + Context context = createContext(getActionXml()); + final RunningJob launcherJob = submitAction(context); + waitFor(200 * 1000, new Predicate() { + public boolean evaluate() throws Exception { + return launcherJob.isComplete(); + } + }); + assertTrue(launcherJob.isSuccessful()); + + SparkActionExecutor ae = new SparkActionExecutor(); + ae.check(context, context.getAction()); + assertEquals("SUCCEEDED", context.getAction().getExternalStatus()); + assertTrue(fs.exists(new Path(getAppPath() + "/" + OUTPUT))); + ae.end(context, context.getAction()); + assertEquals(WorkflowAction.Status.OK, context.getAction().getStatus()); + + } + + protected Context createContext(String actionXml) throws Exception { + SparkActionExecutor ae = new SparkActionExecutor(); + + File jarFile = IOUtils.createJar(new File(getTestCaseDir()), "test.jar", LauncherMainTester.class); + InputStream is = new FileInputStream(jarFile); + OutputStream os = getFileSystem().create(new Path(getAppPath(), "lib/test.jar")); + IOUtils.copyStream(is, os); + + XConfiguration protoConf = new XConfiguration(); + protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser()); + SharelibUtils.addToDistributedCache("spark", getFileSystem(), getFsTestCaseDir(), protoConf); + + WorkflowJobBean wf = createBaseWorkflow(protoConf, "spark-action"); + WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0); + action.setType(ae.getType()); + action.setConf(actionXml); + + return new Context(wf, action); + } + + protected RunningJob submitAction(Context context) throws Exception { + SparkActionExecutor ae = new SparkActionExecutor(); + + WorkflowAction action = context.getAction(); + + ae.prepareActionDir(getFileSystem(), context); + ae.submitLauncher(getFileSystem(), context, action); + + String jobId = action.getExternalId(); + String jobTracker = action.getTrackerUri(); + String consoleUrl = action.getConsoleUrl(); + assertNotNull(jobId); + assertNotNull(jobTracker); + assertNotNull(consoleUrl); + + JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker); + jobConf.set("mapred.job.tracker", jobTracker); + + JobClient jobClient = + Services.get().get(HadoopAccessorService.class).createJobClient(getTestUser(), jobConf); + final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId)); + assertNotNull(runningJob); + return runningJob; + } + + +} \ No newline at end of file diff --git a/sharelib/sparkLatest/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java b/sharelib/sparkLatest/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java new file mode 100644 index 0000000000..91d773ae50 --- /dev/null +++ b/sharelib/sparkLatest/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.action.hadoop; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.util.ArrayList; +import java.util.regex.Pattern; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.oozie.util.IOUtils; +import org.apache.oozie.util.XConfiguration; + +public class TestSparkMain extends MainTestCase { + + private static final String INPUT = "input.txt"; + private static final String OUTPUT = "output"; + + @Override + public Void call() throws Exception { + XConfiguration jobConf = new XConfiguration(); + XConfiguration.copy(createJobConf(), jobConf); + + FileSystem fs = getFileSystem(); + Path file = new Path(getFsTestCaseDir(), "input.txt"); + Writer scriptWriter = new OutputStreamWriter(fs.create(file)); + scriptWriter.write("1,2,3"); + scriptWriter.write("\n"); + scriptWriter.write("2,3,4"); + scriptWriter.close(); + + jobConf.set(JavaMain.JAVA_MAIN_CLASS, "org.apache.spark.deploy.SparkSubmit"); + + jobConf.set("mapreduce.job.tags", "" + System.currentTimeMillis()); + setSystemProperty("oozie.job.launch.time", "" + System.currentTimeMillis()); + File statsDataFile = new File(getTestCaseDir(), "statsdata.properties"); + File hadoopIdsFile = new File(getTestCaseDir(), "hadoopIds"); + File outputDataFile = new File(getTestCaseDir(), "outputdata.properties"); + + jobConf.set(SparkActionExecutor.SPARK_MASTER, "local[*]"); + jobConf.set(SparkActionExecutor.SPARK_MODE, "client"); + jobConf.set(SparkActionExecutor.SPARK_CLASS, "org.apache.oozie.example.SparkFileCopy"); + jobConf.set(SparkActionExecutor.SPARK_JOB_NAME, "Spark Copy File"); + jobConf.set(SparkActionExecutor.SPARK_OPTS, "--driver-memory 1042M " + + "--conf spark.executor.extraJavaOptions=\"-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp\""); + + jobConf.set(SparkActionExecutor.SPARK_JAR, getFsTestCaseDir() + "/lib/test.jar"); + + + File actionXml = new File(getTestCaseDir(), "action.xml"); + OutputStream os = new FileOutputStream(actionXml); + jobConf.writeXml(os); + os.close(); + + System.setProperty("oozie.action.conf.xml", actionXml.getAbsolutePath()); + setSystemProperty("oozie.launcher.job.id", "" + System.currentTimeMillis()); + setSystemProperty("oozie.action.stats.properties", statsDataFile.getAbsolutePath()); + setSystemProperty("oozie.action.externalChildIDs", hadoopIdsFile.getAbsolutePath()); + setSystemProperty("oozie.action.output.properties", outputDataFile.getAbsolutePath()); + + File jarFile = IOUtils.createJar(new File(getTestCaseDir()), "test.jar", LauncherMainTester.class); + InputStream is = new FileInputStream(jarFile); + os = getFileSystem().create(new Path(getFsTestCaseDir(), "lib/test.jar")); + IOUtils.copyStream(is, os); + + String input = getFsTestCaseDir() + "/" + INPUT; + String output = getFsTestCaseDir() + "/" + OUTPUT; + String[] args = {input, output}; + SparkMainLatest.main(args); + assertTrue(getFileSystem().exists(new Path(getFsTestCaseDir() + "/" + OUTPUT))); + return null; + } + + public void testPatterns() { + patternHelper("spark-yarn", SparkMainLatest.SPARK_YARN_JAR_PATTERN); + patternHelper("spark-assembly", SparkMainLatest.SPARK_ASSEMBLY_JAR_PATTERN); + } + + private void patternHelper(String jarName, Pattern pattern) { + ArrayList jarList = new ArrayList(); + jarList.add(jarName + "-1.2.jar"); + jarList.add(jarName + "-1.2.4.jar"); + jarList.add(jarName + "1.2.4.jar"); + jarList.add(jarName + "-1.2.4_1.2.3.4.jar"); + jarList.add(jarName + ".jar"); + + // all should pass + for (String s : jarList) { + assertTrue(pattern.matcher(s).find()); + } + + jarList.clear(); + jarList.add(jarName + "-1.2.3-sources.jar"); + jarList.add(jarName + "-sources-1.2.3.jar"); + jarList.add(jarName + "-sources.jar"); + // all should not pass + for (String s : jarList) { + assertFalse(pattern.matcher(s).find()); + } + } +} \ No newline at end of file diff --git a/sharelib/sparkLatest/src/test/java/org/apache/oozie/action/hadoop/TestSparkOptionsSplitter.java b/sharelib/sparkLatest/src/test/java/org/apache/oozie/action/hadoop/TestSparkOptionsSplitter.java new file mode 100644 index 0000000000..21a264c3a7 --- /dev/null +++ b/sharelib/sparkLatest/src/test/java/org/apache/oozie/action/hadoop/TestSparkOptionsSplitter.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.action.hadoop; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.junit.Assert.*; +import static org.hamcrest.CoreMatchers.*; + +import java.util.Arrays; +import java.util.List; + +@RunWith(Parameterized.class) +public class TestSparkOptionsSplitter { + + @Parameterized.Parameters + public static List params() { + return Arrays.asList(new Object[][]{ + {"--option1 value1", Arrays.asList(new String[]{"--option1", "value1"})}, + {"--option1 value1", Arrays.asList(new String[]{"--option1", "value1"})}, + {" --option1 value1 ", Arrays.asList(new String[]{"--option1", "value1"})}, + {"--conf special=value1", Arrays.asList(new String[]{"--conf", "special=value1"})}, + {"--conf special=\"value1\"", Arrays.asList(new String[]{"--conf", "special=value1"})}, + {"--conf special=\"value1 value2\"", Arrays.asList(new String[]{"--conf", "special=value1 value2"})}, + {" --conf special=\"value1 value2\" ", Arrays.asList(new String[]{"--conf", "special=value1 value2"})}, + }); + } + + private String input; + + private List output; + + public TestSparkOptionsSplitter(String input, List result) { + this.input = input; + this.output = result; + } + + @Test + public void test() { + assertThat("Error for input >>" + input + "<<", SparkMainLatest.splitSparkOpts(input), is(output)); + } +} \ No newline at end of file diff --git a/sharelib/sparkLatest/src/test/resources/pi.py b/sharelib/sparkLatest/src/test/resources/pi.py new file mode 100644 index 0000000000..e9836b29f7 --- /dev/null +++ b/sharelib/sparkLatest/src/test/resources/pi.py @@ -0,0 +1,41 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import sys +from random import random +from operator import add + +from pyspark import SparkContext + + +if __name__ == "__main__": + """ + Usage: pi [partitions] + """ + sc = SparkContext(appName="PythonPi") + partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2 + n = 100000 * partitions + + def f(_): + x = random() * 2 - 1 + y = random() * 2 - 1 + return 1 if x ** 2 + y ** 2 < 1 else 0 + + count = sc.parallelize(range(1, n + 1), partitions).map(f).reduce(add) + print("Pi is roughly %f" % (4.0 * count / n)) + + sc.stop() \ No newline at end of file diff --git a/sharelib/sparkLatest/src/test/resources/py4j-0.9-src.zip b/sharelib/sparkLatest/src/test/resources/py4j-0.9-src.zip new file mode 100644 index 0000000000..dace2d0fe3 Binary files /dev/null and b/sharelib/sparkLatest/src/test/resources/py4j-0.9-src.zip differ diff --git a/sharelib/sparkLatest/src/test/resources/pyspark.zip b/sharelib/sparkLatest/src/test/resources/pyspark.zip new file mode 100644 index 0000000000..9ff8bd83c2 Binary files /dev/null and b/sharelib/sparkLatest/src/test/resources/pyspark.zip differ diff --git a/src/main/assemblies/sharelib.xml b/src/main/assemblies/sharelib.xml index 49b637d03d..1085574b6b 100644 --- a/src/main/assemblies/sharelib.xml +++ b/src/main/assemblies/sharelib.xml @@ -71,6 +71,10 @@ ${basedir}/spark2/target/partial-sharelib / + + ${basedir}/sparkLatest/target/partial-sharelib + / + diff --git a/webapp/pom.xml b/webapp/pom.xml index decc72ced1..0724ec40c5 100644 --- a/webapp/pom.xml +++ b/webapp/pom.xml @@ -244,6 +244,10 @@ org.apache.oozie oozie-sharelib-spark2 + + org.apache.oozie + oozie-sharelib-sparkLatest + ${project.build.directory}/oozie-webapp-${project.version}/WEB-INF/lib true