Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1006,6 +1006,60 @@ private List<TxnWrite.Fragment> getTxnWriteFragment(int index, ClientState state
return fragments;
}

public List<RowKey> getRowKeys(ClientState state, QueryOptions options)
{
List<RowKey> rowKeys = new ArrayList<>();
List<PartitionUpdate> updates = getTxnUpdate(state, options);
for (PartitionUpdate update : updates)
{
DecoratedKey key = update.partitionKey();
List<Clustering<?>> clusteringColumns = txnClusterings(options, state);
for (Clustering<?> clustering : clusteringColumns)
rowKeys.add(new RowKey(key, clustering));
}
return rowKeys;
}

public static class RowKey
{
public final DecoratedKey key;
public final Clustering<?> clustering;

public RowKey(DecoratedKey key, Clustering<?> clustering)
{
this.key = key;
this.clustering = clustering;
}

public DecoratedKey partitionKey()
{
return key;
}

public Clustering<?> clustering()
{
return clustering;
}

@Override
public boolean equals(Object other)
{
if (other == this)
return true;
if (!(other instanceof RowKey))
return false;

RowKey that = (RowKey) other;
return this.partitionKey().equals(that.partitionKey()) && this.clustering().equals(that.clustering());
}

@Override
public int hashCode()
{
return partitionKey().hashCode() * 31 + clustering().hashCode();
}
}

final void addUpdates(UpdatesCollector collector,
List<ByteBuffer> keys,
ClientState state,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
import org.apache.cassandra.cql3.transactions.RowDataReference;
import org.apache.cassandra.cql3.transactions.SelectReferenceSource;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.Columns;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.SinglePartitionReadCommand;
import org.apache.cassandra.db.SinglePartitionReadQuery;
import org.apache.cassandra.db.filter.DataLimits;
Expand Down Expand Up @@ -104,6 +106,7 @@
import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
import static org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
import org.apache.cassandra.cql3.statements.ModificationStatement.RowKey;
import static org.apache.cassandra.service.accord.txn.TxnData.TxnDataNameKind.AUTO_READ;
import static org.apache.cassandra.service.accord.txn.TxnData.TxnDataNameKind.RETURNING;
import static org.apache.cassandra.service.accord.txn.TxnData.TxnDataNameKind.USER;
Expand All @@ -130,6 +133,7 @@ public class TransactionStatement implements CQLStatement.CompositeCQLStatement,
public static final String SELECT_REFS_NEED_COLUMN_MESSAGE = "SELECT references must specify a column.";
public static final String TRANSACTIONS_DISABLED_MESSAGE = "Accord transactions are disabled. (See accord.enabled in cassandra.yaml)";
public static final String ILLEGAL_RANGE_QUERY_MESSAGE = "Range queries are not allowed for reads within a transaction; %s %s";
public static final String DUPLICATE_KEYS_IN_SAME_TRANSACTION_MESSAGE = "Transaction contains update to the same key";
public static final String UNSUPPORTED_MIGRATION = "Transaction Statement is unsupported when migrating away from Accord or before migration to Accord is complete for a range";
public static final String NO_PARTITION_IN_CLAUSE_WITH_LIMIT = "Partition key is present in IN clause and there is a LIMIT... this is currently not supported; %s statement %s";
public static final String WRITE_TXN_EMPTY_WITH_IGNORED_READS = "Write txn produced no mutation, and its reads do not return to the caller; ignoring...";
Expand Down Expand Up @@ -556,6 +560,36 @@ public ResultMessage execute(QueryState state, QueryOptions options, Dispatcher.
checkTrue(returningSelect.select.getLimit(options) == DataLimits.NO_LIMIT, NO_PARTITION_IN_CLAUSE_WITH_LIMIT, "SELECT", returningSelect.select.source);
}

// check that within a transaction we don't have multiple updates to the same primary key, column pair
HashMap<RowKey, Columns> seenRegularColumns = new HashMap<>();
HashMap<DecoratedKey, Columns> seenStaticColumns = new HashMap<>();

for (ModificationStatement statement : updates)
{
List<RowKey> rowKeys = statement.getRowKeys(state.getClientState(), options);
Columns regularColumns = statement.updatedColumns().columns(false);
Columns staticColumns = statement.updatedColumns().columns(true);

for (RowKey rowKey : rowKeys)
{
Columns existingRegularColumns = seenRegularColumns.putIfAbsent(rowKey, regularColumns);
if (existingRegularColumns != null)
{
for (ColumnMetadata column : regularColumns)
checkFalse(existingRegularColumns.contains(column), DUPLICATE_KEYS_IN_SAME_TRANSACTION_MESSAGE);
seenRegularColumns.put(rowKey, existingRegularColumns.mergeTo(regularColumns));
}

Columns existingStaticColumns = seenStaticColumns.putIfAbsent(rowKey.partitionKey(), staticColumns);
if (existingStaticColumns != null)
{
for (ColumnMetadata column : staticColumns)
checkFalse(existingStaticColumns.contains(column), DUPLICATE_KEYS_IN_SAME_TRANSACTION_MESSAGE);
seenStaticColumns.put(rowKey.partitionKey(), existingStaticColumns.mergeTo(staticColumns));
}
}
}

Txn txn = createTxn(state.getClientState(), options);
if (txn == null)
return new ResultMessage.Void();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,81 @@ public void testRejectTransactionStatement() throws Throwable
});
}

@Test
public void testRejectTransactionWithUpdatesToSamePrimaryKeySameColumns() throws Exception
{
test(cluster -> {
try
{
cluster.coordinator(1).execute(wrapInTxn("INSERT INTO " + qualifiedAccordTableName + " (k, c, v) VALUES (?, ?, ?)"), ConsistencyLevel.ALL, 1, 1, 2);
String txn = "BEGIN TRANSACTION\n" +
" UPDATE " + qualifiedAccordTableName + " SET v = 2 WHERE k = 1 AND c = 1;\n" +
" UPDATE " + qualifiedAccordTableName + " SET v = 10 WHERE k = 1 AND c = 1;\n" +
"COMMIT TRANSACTION";

cluster.coordinator(1).executeWithResult(txn, ConsistencyLevel.SERIAL);
fail("Expected exception");
}
catch (Throwable t)
{
assertEquals(InvalidRequestException.class.getName(), t.getClass().getName());
assertEquals(TransactionStatement.DUPLICATE_KEYS_IN_SAME_TRANSACTION_MESSAGE, t.getMessage());
}
});
}

@Test
public void testRejectTransactionWithUpdatesToSamePrimaryKeyWithInClause() throws Exception
{
test("CREATE TABLE " + qualifiedAccordTableName + " (k int, c int, v int, r int, j int, primary key (k, c, v)) WITH " + transactionalMode.asCqlParam(), cluster -> {
try
{
cluster.coordinator(1).execute(wrapInTxn("INSERT INTO " + qualifiedAccordTableName + " (k, c, v, r, j) VALUES (?, ?, ?, ?, ?)"), ConsistencyLevel.ALL, 1, 1, 1, 3, 5);
cluster.coordinator(1).execute(wrapInTxn("INSERT INTO " + qualifiedAccordTableName + " (k, c, v, r, j) VALUES (?, ?, ?, ?, ?)"), ConsistencyLevel.ALL, 2, 2, 2, 3, 5);
String txn = "BEGIN TRANSACTION\n" +
" UPDATE " + qualifiedAccordTableName + " SET j = 5 WHERE k = 1 AND c = 1 AND v IN (1, 2);\n" +
" UPDATE " + qualifiedAccordTableName + " SET j = 3 WHERE k = 1 AND c = 1 AND v = 2;\n" +
"COMMIT TRANSACTION";

cluster.coordinator(1).executeWithResult(txn, ConsistencyLevel.SERIAL);
fail("Expected exception");
}
catch (Throwable t)
{
assertEquals(InvalidRequestException.class.getName(), t.getClass().getName());
assertEquals(TransactionStatement.DUPLICATE_KEYS_IN_SAME_TRANSACTION_MESSAGE, t.getMessage());
}
});
}

@Test
public void testAcceptTransactionWithUpdatesToSamePrimaryKeyButDifferentColumns() throws Exception
{
test("CREATE TABLE " + qualifiedAccordTableName + " (k int, c int, v int, r int, primary key (k, c)) WITH " + transactionalMode.asCqlParam(), cluster -> {
cluster.coordinator(1).execute(wrapInTxn("INSERT INTO " + qualifiedAccordTableName + " (k, c, v, r) VALUES (?, ?, ?, ?)"), ConsistencyLevel.ALL, 1, 1, 2, 3);
String txn = "BEGIN TRANSACTION\n" +
" UPDATE " + qualifiedAccordTableName + " SET v = 2 WHERE k = 1 AND c = 1;\n" +
" UPDATE " + qualifiedAccordTableName + " SET r = 10 WHERE k = 1 AND c = 1;\n" +
"COMMIT TRANSACTION";

cluster.coordinator(1).executeWithResult(txn, ConsistencyLevel.SERIAL);
});
}

@Test
public void testAcceptTransactionWithUpdatesToSamePrimaryKeyDisjointColumns() throws Exception
{
test("CREATE TABLE " + qualifiedAccordTableName + " (k int, c int, v int, r int, j int, primary key (k, c)) WITH " + transactionalMode.asCqlParam(), cluster -> {
cluster.coordinator(1).execute(wrapInTxn("INSERT INTO " + qualifiedAccordTableName + " (k, c, v, r, j) VALUES (?, ?, ?, ?, ?)"), ConsistencyLevel.ALL, 1, 1, 2, 3, 5);
String txn = "BEGIN TRANSACTION\n" +
" UPDATE " + qualifiedAccordTableName + " SET r = 2, j = 5 WHERE k = 1 AND c = 1;\n" +
" UPDATE " + qualifiedAccordTableName + " SET v = 10 WHERE k = 1 AND c = 1;\n" +
"COMMIT TRANSACTION";

cluster.coordinator(1).executeWithResult(txn, ConsistencyLevel.SERIAL);
});
}

@Test
public void testCounterCreateTableTransactionalModeFails() throws Exception
{
Expand Down