diff --git a/pom.xml b/pom.xml index 25ba182..112fe4a 100644 --- a/pom.xml +++ b/pom.xml @@ -7,17 +7,6 @@ 1.0.0 jar - - - ossrh - https://oss.sonatype.org/content/repositories/snapshots - - - ossrh - https://oss.sonatype.org/service/local/staging/deploy/maven2/ - - - ${project.groupId}:${project.artifactId} An application designed to be used as a command line utility for compacting files using the Spark framework. http://maven.apache.org @@ -122,6 +111,9 @@ org.apache.maven.plugins maven-gpg-plugin + + true + 1.5 @@ -140,17 +132,17 @@ org.apache.spark spark-core_2.10 - 1.5.2 + 2.2.0 org.apache.spark spark-sql_2.10 - 1.5.2 + 2.2.0 org.apache.hadoop hadoop-hdfs - 2.6.4 + 2.7.2 commons-cli @@ -172,4 +164,4 @@ - + \ No newline at end of file diff --git a/src/main/java/com/github/KeithSSmith/spark_compaction/Compact.java b/src/main/java/com/github/KeithSSmith/spark_compaction/Compact.java index 40c7de3..b3f4c4a 100644 --- a/src/main/java/com/github/KeithSSmith/spark_compaction/Compact.java +++ b/src/main/java/com/github/KeithSSmith/spark_compaction/Compact.java @@ -17,7 +17,7 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.Dataset; import org.apache.spark.sql.SQLContext; public class Compact { @@ -180,13 +180,13 @@ public void compact(String inputPath, String outputPath) throws IOException { textFile.coalesce(this.splitSize).saveAsTextFile(outputPath); } else if (this.outputSerialization.equals(PARQUET)) { SQLContext sqlContext = new SQLContext(sc); - DataFrame parquetFile = sqlContext.read().parquet(this.concatInputPath(inputPath)); + Dataset parquetFile = sqlContext.read().parquet(this.concatInputPath(inputPath)); parquetFile.coalesce(this.splitSize).write().parquet(outputPath); } else if (this.outputSerialization.equals(AVRO)) { // For this to work the files must end in .avro // Another issue is that when using compression the compression codec extension is not being added to the file name. SQLContext sqlContext = new SQLContext(sc); - DataFrame avroFile = sqlContext.read().format("com.databricks.spark.avro").load(this.concatInputPath(inputPath)); + Dataset avroFile = sqlContext.read().format("com.databricks.spark.avro").load(this.concatInputPath(inputPath)); avroFile.coalesce(this.splitSize).write().format("com.databricks.spark.avro").save(outputPath); } else { System.out.println("Did not match any serialization type: text, parquet, or avro. Recieved: " + @@ -207,12 +207,12 @@ public void compact(String[] args) throws IOException { textFile.coalesce(this.splitSize).saveAsTextFile(outputPath); } else if (this.outputSerialization.equals(PARQUET)) { SQLContext sqlContext = new SQLContext(sc); - DataFrame parquetFile = sqlContext.read().parquet(this.concatInputPath(inputPath)); + Dataset parquetFile = sqlContext.read().parquet(this.concatInputPath(inputPath)); parquetFile.coalesce(this.splitSize).write().parquet(outputPath); } else if (this.outputSerialization.equals(AVRO)) { // For this to work the files must end in .avro SQLContext sqlContext = new SQLContext(sc); - DataFrame avroFile = sqlContext.read().format("com.databricks.spark.avro").load(this.concatInputPath(inputPath)); + Dataset avroFile = sqlContext.read().format("com.databricks.spark.avro").load(this.concatInputPath(inputPath)); avroFile.coalesce(this.splitSize).write().format("com.databricks.spark.avro").save(outputPath); } else { System.out.println("Did not match any serialization type: text, parquet, or avro. Recieved: " +