From 6a60a4cd330859ce4c72ef3ac6235963cff2f02f Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Mon, 26 Jan 2026 11:53:37 -0800 Subject: [PATCH] changes to TransactionStatement to check for case with same write to row & col within a transaction --- .../statements/ModificationStatement.java | 54 +++++++++++++ .../cql3/statements/TransactionStatement.java | 34 +++++++++ .../test/accord/AccordCQLTestBase.java | 75 +++++++++++++++++++ 3 files changed, 163 insertions(+) diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index 118f2c1fa44..2dd955ffdde 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -1006,6 +1006,60 @@ private List getTxnWriteFragment(int index, ClientState state return fragments; } + public List getRowKeys(ClientState state, QueryOptions options) + { + List rowKeys = new ArrayList<>(); + List updates = getTxnUpdate(state, options); + for (PartitionUpdate update : updates) + { + DecoratedKey key = update.partitionKey(); + List> clusteringColumns = txnClusterings(options, state); + for (Clustering clustering : clusteringColumns) + rowKeys.add(new RowKey(key, clustering)); + } + return rowKeys; + } + + public static class RowKey + { + public final DecoratedKey key; + public final Clustering clustering; + + public RowKey(DecoratedKey key, Clustering clustering) + { + this.key = key; + this.clustering = clustering; + } + + public DecoratedKey partitionKey() + { + return key; + } + + public Clustering clustering() + { + return clustering; + } + + @Override + public boolean equals(Object other) + { + if (other == this) + return true; + if (!(other instanceof RowKey)) + return false; + + RowKey that = (RowKey) other; + return this.partitionKey().equals(that.partitionKey()) && this.clustering().equals(that.clustering()); + } + + @Override + public int hashCode() + { + return partitionKey().hashCode() * 31 + clustering().hashCode(); + } + } + final void addUpdates(UpdatesCollector collector, List keys, ClientState state, diff --git a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java index 52039e7beff..9f0e2004a99 100644 --- a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java @@ -61,6 +61,8 @@ import org.apache.cassandra.cql3.transactions.RowDataReference; import org.apache.cassandra.cql3.transactions.SelectReferenceSource; import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.Columns; +import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.SinglePartitionReadCommand; import org.apache.cassandra.db.SinglePartitionReadQuery; import org.apache.cassandra.db.filter.DataLimits; @@ -104,6 +106,7 @@ import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull; import static org.apache.cassandra.cql3.statements.RequestValidations.checkTrue; import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest; +import org.apache.cassandra.cql3.statements.ModificationStatement.RowKey; import static org.apache.cassandra.service.accord.txn.TxnData.TxnDataNameKind.AUTO_READ; import static org.apache.cassandra.service.accord.txn.TxnData.TxnDataNameKind.RETURNING; import static org.apache.cassandra.service.accord.txn.TxnData.TxnDataNameKind.USER; @@ -130,6 +133,7 @@ public class TransactionStatement implements CQLStatement.CompositeCQLStatement, public static final String SELECT_REFS_NEED_COLUMN_MESSAGE = "SELECT references must specify a column."; public static final String TRANSACTIONS_DISABLED_MESSAGE = "Accord transactions are disabled. (See accord.enabled in cassandra.yaml)"; public static final String ILLEGAL_RANGE_QUERY_MESSAGE = "Range queries are not allowed for reads within a transaction; %s %s"; + public static final String DUPLICATE_KEYS_IN_SAME_TRANSACTION_MESSAGE = "Transaction contains update to the same key"; public static final String UNSUPPORTED_MIGRATION = "Transaction Statement is unsupported when migrating away from Accord or before migration to Accord is complete for a range"; public static final String NO_PARTITION_IN_CLAUSE_WITH_LIMIT = "Partition key is present in IN clause and there is a LIMIT... this is currently not supported; %s statement %s"; public static final String WRITE_TXN_EMPTY_WITH_IGNORED_READS = "Write txn produced no mutation, and its reads do not return to the caller; ignoring..."; @@ -556,6 +560,36 @@ public ResultMessage execute(QueryState state, QueryOptions options, Dispatcher. checkTrue(returningSelect.select.getLimit(options) == DataLimits.NO_LIMIT, NO_PARTITION_IN_CLAUSE_WITH_LIMIT, "SELECT", returningSelect.select.source); } + // check that within a transaction we don't have multiple updates to the same primary key, column pair + HashMap seenRegularColumns = new HashMap<>(); + HashMap seenStaticColumns = new HashMap<>(); + + for (ModificationStatement statement : updates) + { + List rowKeys = statement.getRowKeys(state.getClientState(), options); + Columns regularColumns = statement.updatedColumns().columns(false); + Columns staticColumns = statement.updatedColumns().columns(true); + + for (RowKey rowKey : rowKeys) + { + Columns existingRegularColumns = seenRegularColumns.putIfAbsent(rowKey, regularColumns); + if (existingRegularColumns != null) + { + for (ColumnMetadata column : regularColumns) + checkFalse(existingRegularColumns.contains(column), DUPLICATE_KEYS_IN_SAME_TRANSACTION_MESSAGE); + seenRegularColumns.put(rowKey, existingRegularColumns.mergeTo(regularColumns)); + } + + Columns existingStaticColumns = seenStaticColumns.putIfAbsent(rowKey.partitionKey(), staticColumns); + if (existingStaticColumns != null) + { + for (ColumnMetadata column : staticColumns) + checkFalse(existingStaticColumns.contains(column), DUPLICATE_KEYS_IN_SAME_TRANSACTION_MESSAGE); + seenStaticColumns.put(rowKey.partitionKey(), existingStaticColumns.mergeTo(staticColumns)); + } + } + } + Txn txn = createTxn(state.getClientState(), options); if (txn == null) return new ResultMessage.Void(); diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java index dbf6d6553f6..dca81522641 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java @@ -205,6 +205,81 @@ public void testRejectTransactionStatement() throws Throwable }); } + @Test + public void testRejectTransactionWithUpdatesToSamePrimaryKeySameColumns() throws Exception + { + test(cluster -> { + try + { + cluster.coordinator(1).execute(wrapInTxn("INSERT INTO " + qualifiedAccordTableName + " (k, c, v) VALUES (?, ?, ?)"), ConsistencyLevel.ALL, 1, 1, 2); + String txn = "BEGIN TRANSACTION\n" + + " UPDATE " + qualifiedAccordTableName + " SET v = 2 WHERE k = 1 AND c = 1;\n" + + " UPDATE " + qualifiedAccordTableName + " SET v = 10 WHERE k = 1 AND c = 1;\n" + + "COMMIT TRANSACTION"; + + cluster.coordinator(1).executeWithResult(txn, ConsistencyLevel.SERIAL); + fail("Expected exception"); + } + catch (Throwable t) + { + assertEquals(InvalidRequestException.class.getName(), t.getClass().getName()); + assertEquals(TransactionStatement.DUPLICATE_KEYS_IN_SAME_TRANSACTION_MESSAGE, t.getMessage()); + } + }); + } + + @Test + public void testRejectTransactionWithUpdatesToSamePrimaryKeyWithInClause() throws Exception + { + test("CREATE TABLE " + qualifiedAccordTableName + " (k int, c int, v int, r int, j int, primary key (k, c, v)) WITH " + transactionalMode.asCqlParam(), cluster -> { + try + { + cluster.coordinator(1).execute(wrapInTxn("INSERT INTO " + qualifiedAccordTableName + " (k, c, v, r, j) VALUES (?, ?, ?, ?, ?)"), ConsistencyLevel.ALL, 1, 1, 1, 3, 5); + cluster.coordinator(1).execute(wrapInTxn("INSERT INTO " + qualifiedAccordTableName + " (k, c, v, r, j) VALUES (?, ?, ?, ?, ?)"), ConsistencyLevel.ALL, 2, 2, 2, 3, 5); + String txn = "BEGIN TRANSACTION\n" + + " UPDATE " + qualifiedAccordTableName + " SET j = 5 WHERE k = 1 AND c = 1 AND v IN (1, 2);\n" + + " UPDATE " + qualifiedAccordTableName + " SET j = 3 WHERE k = 1 AND c = 1 AND v = 2;\n" + + "COMMIT TRANSACTION"; + + cluster.coordinator(1).executeWithResult(txn, ConsistencyLevel.SERIAL); + fail("Expected exception"); + } + catch (Throwable t) + { + assertEquals(InvalidRequestException.class.getName(), t.getClass().getName()); + assertEquals(TransactionStatement.DUPLICATE_KEYS_IN_SAME_TRANSACTION_MESSAGE, t.getMessage()); + } + }); + } + + @Test + public void testAcceptTransactionWithUpdatesToSamePrimaryKeyButDifferentColumns() throws Exception + { + test("CREATE TABLE " + qualifiedAccordTableName + " (k int, c int, v int, r int, primary key (k, c)) WITH " + transactionalMode.asCqlParam(), cluster -> { + cluster.coordinator(1).execute(wrapInTxn("INSERT INTO " + qualifiedAccordTableName + " (k, c, v, r) VALUES (?, ?, ?, ?)"), ConsistencyLevel.ALL, 1, 1, 2, 3); + String txn = "BEGIN TRANSACTION\n" + + " UPDATE " + qualifiedAccordTableName + " SET v = 2 WHERE k = 1 AND c = 1;\n" + + " UPDATE " + qualifiedAccordTableName + " SET r = 10 WHERE k = 1 AND c = 1;\n" + + "COMMIT TRANSACTION"; + + cluster.coordinator(1).executeWithResult(txn, ConsistencyLevel.SERIAL); + }); + } + + @Test + public void testAcceptTransactionWithUpdatesToSamePrimaryKeyDisjointColumns() throws Exception + { + test("CREATE TABLE " + qualifiedAccordTableName + " (k int, c int, v int, r int, j int, primary key (k, c)) WITH " + transactionalMode.asCqlParam(), cluster -> { + cluster.coordinator(1).execute(wrapInTxn("INSERT INTO " + qualifiedAccordTableName + " (k, c, v, r, j) VALUES (?, ?, ?, ?, ?)"), ConsistencyLevel.ALL, 1, 1, 2, 3, 5); + String txn = "BEGIN TRANSACTION\n" + + " UPDATE " + qualifiedAccordTableName + " SET r = 2, j = 5 WHERE k = 1 AND c = 1;\n" + + " UPDATE " + qualifiedAccordTableName + " SET v = 10 WHERE k = 1 AND c = 1;\n" + + "COMMIT TRANSACTION"; + + cluster.coordinator(1).executeWithResult(txn, ConsistencyLevel.SERIAL); + }); + } + @Test public void testCounterCreateTableTransactionalModeFails() throws Exception {