From 3a9d49a8ce15354e0b48a7c9c4f13caf8b04daa0 Mon Sep 17 00:00:00 2001 From: = Date: Fri, 28 Apr 2017 12:33:17 -0500 Subject: [PATCH 1/6] introduce the 'include_column_list' parameter --- .../scala/com/databricks/spark/redshift/Parameters.scala | 9 ++++++++- .../com/databricks/spark/redshift/ParametersSuite.scala | 8 +++++--- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/src/main/scala/com/databricks/spark/redshift/Parameters.scala b/src/main/scala/com/databricks/spark/redshift/Parameters.scala index 875f5b75..7bdb38c8 100644 --- a/src/main/scala/com/databricks/spark/redshift/Parameters.scala +++ b/src/main/scala/com/databricks/spark/redshift/Parameters.scala @@ -38,7 +38,8 @@ private[redshift] object Parameters { "diststyle" -> "EVEN", "usestagingtable" -> "true", "preactions" -> ";", - "postactions" -> ";" + "postactions" -> ";", + "include_column_list" -> "false" ) val VALID_TEMP_FORMATS = Set("AVRO", "CSV", "CSV GZIP") @@ -285,5 +286,11 @@ private[redshift] object Parameters { new BasicSessionCredentials(accessKey, secretAccessKey, sessionToken)) } } + + /** + * If true then this library will extract the column list from the schema to + * include in the COPY command (e.g. `COPY "PUBLIC"."tablename" (column1 [,column2, ...]))` + */ + def includeColumnList: Boolean = parameters("include_column_list").toBoolean } } diff --git a/src/test/scala/com/databricks/spark/redshift/ParametersSuite.scala b/src/test/scala/com/databricks/spark/redshift/ParametersSuite.scala index e4ed9d14..bdcf64b4 100644 --- a/src/test/scala/com/databricks/spark/redshift/ParametersSuite.scala +++ b/src/test/scala/com/databricks/spark/redshift/ParametersSuite.scala @@ -28,7 +28,8 @@ class ParametersSuite extends FunSuite with Matchers { "tempdir" -> "s3://foo/bar", "dbtable" -> "test_schema.test_table", "url" -> "jdbc:redshift://foo/bar?user=user&password=password", - "forward_spark_s3_credentials" -> "true") + "forward_spark_s3_credentials" -> "true", + "include_column_list" -> "true") val mergedParams = Parameters.mergeParameters(params) @@ -37,10 +38,11 @@ class ParametersSuite extends FunSuite with Matchers { mergedParams.jdbcUrl shouldBe params("url") mergedParams.table shouldBe Some(TableName("test_schema", "test_table")) assert(mergedParams.forwardSparkS3Credentials) + assert(mergedParams.includeColumnList) // Check that the defaults have been added - (Parameters.DEFAULT_PARAMETERS - "forward_spark_s3_credentials").foreach { - case (key, value) => mergedParams.parameters(key) shouldBe value + (Parameters.DEFAULT_PARAMETERS - "forward_spark_s3_credentials" - "include_column_list").foreach { + case (key, value) => println(key); println(value); mergedParams.parameters(key) shouldBe value } } From 39245aeb8ab11c2a538aea60bd8f2ec64ead447a Mon Sep 17 00:00:00 2001 From: = Date: Fri, 28 Apr 2017 12:34:11 -0500 Subject: [PATCH 2/6] implement 'include_column_list' in the RedshiftWriter to add the columns to the COPY command --- .../spark/redshift/RedshiftWriter.scala | 11 ++++++++-- .../spark/redshift/RedshiftSourceSuite.scala | 21 +++++++++++++++++++ 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala b/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala index 8383231d..073c08bb 100644 --- a/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala +++ b/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala @@ -88,6 +88,7 @@ private[redshift] class RedshiftWriter( */ private def copySql( sqlContext: SQLContext, + schema: StructType, params: MergedParameters, creds: AWSCredentialsProvider, manifestUrl: String): String = { @@ -98,7 +99,13 @@ private[redshift] class RedshiftWriter( case "AVRO" => "AVRO 'auto'" case csv if csv == "CSV" || csv == "CSV GZIP" => csv + s" NULL AS '${params.nullString}'" } - s"COPY ${params.table.get} FROM '$fixedUrl' CREDENTIALS '$credsString' FORMAT AS " + + val columns = if (params.includeColumnList) { + "(" + schema.fieldNames.map(name => s""""$name"""").mkString(",") + ")" + } else { + "" + } + + s"COPY ${params.table.get} $columns FROM '$fixedUrl' CREDENTIALS '$credsString' FORMAT AS " + s"${format} manifest ${params.extraCopyOptions}" } @@ -140,7 +147,7 @@ private[redshift] class RedshiftWriter( manifestUrl.foreach { manifestUrl => // Load the temporary data into the new file - val copyStatement = copySql(data.sqlContext, params, creds, manifestUrl) + val copyStatement = copySql(data.sqlContext, data.schema, params, creds, manifestUrl) log.info(copyStatement) try { jdbcWrapper.executeInterruptibly(conn.prepareStatement(copyStatement)) diff --git a/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala b/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala index ac2a644a..6d320f8b 100644 --- a/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala +++ b/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala @@ -435,6 +435,27 @@ class RedshiftSourceSuite mockRedshift.verifyThatExpectedQueriesWereIssued(expectedCommands) } + test("Include Column List adds the schema columns to the COPY query") { + val copyCommand = + "COPY \"PUBLIC\".\"test_table\" \\(\"testbyte\",\"testbool\",\"testdate\",\"testdouble\"" + + ",\"testfloat\",\"testint\",\"testlong\",\"testshort\",\"teststring\",\"testtimestamp\"\\) .*" + val expectedCommands = + Seq("CREATE TABLE IF NOT EXISTS \"PUBLIC\".\"test_table\" .*".r, + copyCommand.r) + + val params = defaultParams ++ Map("include_column_list" -> "true") + + val mockRedshift = new MockRedshift( + defaultParams("url"), + Map(TableName.parseFromEscaped(defaultParams("dbtable")).toString -> null)) + + val source = new DefaultSource(mockRedshift.jdbcWrapper, _ => mockS3Client) + source.createRelation(testSqlContext, SaveMode.Append, params, expectedDataDF) + + mockRedshift.verifyThatConnectionsWereClosed() + mockRedshift.verifyThatExpectedQueriesWereIssued(expectedCommands) + } + test("configuring maxlength on string columns") { val longStrMetadata = new MetadataBuilder().putLong("maxlength", 512).build() val shortStrMetadata = new MetadataBuilder().putLong("maxlength", 10).build() From 5cc0bb7a27082b4a2eb4dbbad09c8ea1198a1d6a Mon Sep 17 00:00:00 2001 From: = Date: Fri, 28 Apr 2017 13:39:52 -0500 Subject: [PATCH 3/6] update comment --- src/main/scala/com/databricks/spark/redshift/Parameters.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/com/databricks/spark/redshift/Parameters.scala b/src/main/scala/com/databricks/spark/redshift/Parameters.scala index 7bdb38c8..94b732b3 100644 --- a/src/main/scala/com/databricks/spark/redshift/Parameters.scala +++ b/src/main/scala/com/databricks/spark/redshift/Parameters.scala @@ -289,7 +289,7 @@ private[redshift] object Parameters { /** * If true then this library will extract the column list from the schema to - * include in the COPY command (e.g. `COPY "PUBLIC"."tablename" (column1 [,column2, ...]))` + * include in the COPY command (e.g. `COPY "PUBLIC"."tablename" ("column1" [,"column2", ...]))` */ def includeColumnList: Boolean = parameters("include_column_list").toBoolean } From 9fc299a2d2fead91086afcaa5713ef30f5a9cb0b Mon Sep 17 00:00:00 2001 From: = Date: Fri, 28 Apr 2017 13:43:09 -0500 Subject: [PATCH 4/6] remove extra comma when not using column list --- .../scala/com/databricks/spark/redshift/RedshiftWriter.scala | 4 ++-- .../com/databricks/spark/redshift/RedshiftSourceSuite.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala b/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala index 073c08bb..784285d8 100644 --- a/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala +++ b/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala @@ -100,12 +100,12 @@ private[redshift] class RedshiftWriter( case csv if csv == "CSV" || csv == "CSV GZIP" => csv + s" NULL AS '${params.nullString}'" } val columns = if (params.includeColumnList) { - "(" + schema.fieldNames.map(name => s""""$name"""").mkString(",") + ")" + "(" + schema.fieldNames.map(name => s""""$name"""").mkString(",") + ") " } else { "" } - s"COPY ${params.table.get} $columns FROM '$fixedUrl' CREDENTIALS '$credsString' FORMAT AS " + + s"COPY ${params.table.get} ${columns}FROM '$fixedUrl' CREDENTIALS '$credsString' FORMAT AS " + s"${format} manifest ${params.extraCopyOptions}" } diff --git a/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala b/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala index 6d320f8b..ed2da22d 100644 --- a/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala +++ b/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala @@ -438,7 +438,7 @@ class RedshiftSourceSuite test("Include Column List adds the schema columns to the COPY query") { val copyCommand = "COPY \"PUBLIC\".\"test_table\" \\(\"testbyte\",\"testbool\",\"testdate\",\"testdouble\"" + - ",\"testfloat\",\"testint\",\"testlong\",\"testshort\",\"teststring\",\"testtimestamp\"\\) .*" + ",\"testfloat\",\"testint\",\"testlong\",\"testshort\",\"teststring\",\"testtimestamp\"\\) FROM .*" val expectedCommands = Seq("CREATE TABLE IF NOT EXISTS \"PUBLIC\".\"test_table\" .*".r, copyCommand.r) From 39873d05f79b1427fdbe0336df8dd36cab654526 Mon Sep 17 00:00:00 2001 From: = Date: Fri, 28 Apr 2017 13:44:37 -0500 Subject: [PATCH 5/6] remove println statements --- .../scala/com/databricks/spark/redshift/ParametersSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/scala/com/databricks/spark/redshift/ParametersSuite.scala b/src/test/scala/com/databricks/spark/redshift/ParametersSuite.scala index bdcf64b4..590b5505 100644 --- a/src/test/scala/com/databricks/spark/redshift/ParametersSuite.scala +++ b/src/test/scala/com/databricks/spark/redshift/ParametersSuite.scala @@ -42,7 +42,7 @@ class ParametersSuite extends FunSuite with Matchers { // Check that the defaults have been added (Parameters.DEFAULT_PARAMETERS - "forward_spark_s3_credentials" - "include_column_list").foreach { - case (key, value) => println(key); println(value); mergedParams.parameters(key) shouldBe value + case (key, value) => mergedParams.parameters(key) shouldBe value } } From 6b8c9fbcd3a57af2da7e6e83e4c19b211fb75c10 Mon Sep 17 00:00:00 2001 From: = Date: Fri, 28 Apr 2017 16:34:48 -0500 Subject: [PATCH 6/6] add include_column_list parameter to README --- README.md | 10 ++++++++++ .../com/databricks/spark/redshift/Parameters.scala | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 794e5c4f..1d863692 100644 --- a/README.md +++ b/README.md @@ -571,6 +571,16 @@ must also set a distribution key with the distkey option.

Since setting usestagingtable=false operation risks data loss / unavailability, we have chosen to deprecate it in favor of requiring users to manually drop the destination table themselves.

+ + include_column_list + No + false + + If true then this library will automatically extract the columns from the schema + and add them to the COPY command according to the Column List docs. + (e.g. `COPY "PUBLIC"."tablename" ("column1" [,"column2", ...])`). + + description No diff --git a/src/main/scala/com/databricks/spark/redshift/Parameters.scala b/src/main/scala/com/databricks/spark/redshift/Parameters.scala index 94b732b3..e897ba7c 100644 --- a/src/main/scala/com/databricks/spark/redshift/Parameters.scala +++ b/src/main/scala/com/databricks/spark/redshift/Parameters.scala @@ -289,7 +289,7 @@ private[redshift] object Parameters { /** * If true then this library will extract the column list from the schema to - * include in the COPY command (e.g. `COPY "PUBLIC"."tablename" ("column1" [,"column2", ...]))` + * include in the COPY command (e.g. `COPY "PUBLIC"."tablename" ("column1" [,"column2", ...])`) */ def includeColumnList: Boolean = parameters("include_column_list").toBoolean }