diff --git a/streamingpro-spark-3.3.0-adaptor/src/main/java/org/apache/spark/sql/jdbc/UpsertUtils.scala b/streamingpro-spark-3.3.0-adaptor/src/main/java/org/apache/spark/sql/jdbc/UpsertUtils.scala index 55a6543fd..fc24579d2 100644 --- a/streamingpro-spark-3.3.0-adaptor/src/main/java/org/apache/spark/sql/jdbc/UpsertUtils.scala +++ b/streamingpro-spark-3.3.0-adaptor/src/main/java/org/apache/spark/sql/jdbc/UpsertUtils.scala @@ -114,7 +114,7 @@ object UpsertUtils extends Logging { if (row.isNullAt(i)) { stmt.setNull(idx + 1, nullTypes(i)) } else { - uschema.fields(i).dataType match { + uschema.fields.filter(_.name.equalsIgnoreCase(f.name)).last.dataType match { case IntegerType => stmt.setInt(idx + 1, row.getInt(i)) case LongType => stmt.setLong(idx + 1, row.getLong(i)) case DoubleType => stmt.setDouble(idx + 1, row.getDouble(i)) @@ -260,7 +260,10 @@ object MysqlUpsertBuilder extends UpsertBuilder with Logging { object OracleUpsertBuilder extends UpsertBuilder with Logging { def generateStatement(table: String, idF: Seq[StructField], schema: StructType): (String, StructType) = { // generate merge into statement with placeholders. - val onClause = idF.map(f => s"${f.name} = ?").mkString(" AND ") + // B A C => A B C Sort the id field + val onClause = idF.sortBy(_.name).map(f => s"${f.name} = ?").mkString(" AND ") + // Because the condition comes first, we need to redefine the schema order. The schema defines the sort + val onClauseFields = schema.filter(s => idF.map(_.name).contains(s.name)).sortBy(_.name) // Column names are not quoted. Because quoting makes them case sensitive, // this could result in "invalid identifier" error in Oracle. val updateFields = schema.filterNot(s => idF.map(_.name).contains(s.name)) @@ -276,7 +279,7 @@ object OracleUpsertBuilder extends UpsertBuilder with Logging { """.stripMargin logInfo(s"Using sql $statement") // Number of placeholders is doubled, so as the schema - (statement, StructType(schema ++ schema)) + (statement, StructType(onClauseFields ++ updateFields ++ schema)) } override def upsertStatement(conn: Connection, table: String, dialect: JdbcDialect, idField: Option[Seq[StructField]],