Skip to content
13 changes: 7 additions & 6 deletions src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,30 +152,31 @@ public QueryOptions forStatement(int i)

private static class WithPerStatementVariables extends BatchQueryOptions
{
private final List<QueryOptions> perStatementOptions;
private final QueryOptions[] perStatementOptions;

private WithPerStatementVariables(QueryOptions wrapped, List<byte[][]> variables, List<Object> queryOrIdList)
{
super(wrapped, queryOrIdList);
this.perStatementOptions = new ArrayList<>(variables.size());
this.perStatementOptions = new QueryOptions[variables.size()];
int i = 0;
for (final byte[][] vars : variables)
perStatementOptions.add(new BatchQueryOptionsWrapper(wrapped, vars));
perStatementOptions[i++] = new BatchQueryOptionsWrapper(wrapped, vars);
}

public QueryOptions forStatement(int i)
{
return perStatementOptions.get(i);
return perStatementOptions[i];
}

@Override
public void prepareStatement(int i, ImmutableList<ColumnSpecification> boundNames)
{
if (isPreparedStatement(i))
{
QueryOptions options = perStatementOptions.get(i);
QueryOptions options = perStatementOptions[i];
options.prepare(boundNames);
options = QueryOptions.addColumnSpecifications(options, boundNames);
perStatementOptions.set(i, options);
perStatementOptions[i] = options;
}
else
{
Expand Down
15 changes: 8 additions & 7 deletions src/java/org/apache/cassandra/cql3/UpdateParameters.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class UpdateParameters
protected final long timestamp;
private final int ttl;

private final DeletionTime deletionTime;
private DeletionTime deletionTime;

// Holds data for operations that require a read-before-write. Will be null otherwise.
private final Map<DecoratedKey, Partition> prefetchedRows;
Expand All @@ -82,8 +82,6 @@ public UpdateParameters(TableMetadata metadata,
this.timestamp = timestamp;
this.ttl = ttl;

this.deletionTime = DeletionTime.build(timestamp, nowInSec);

this.prefetchedRows = prefetchedRows;

// We use MIN_VALUE internally to mean the absence of of timestamp (in Selection, in sstable stats, ...), so exclude
Expand Down Expand Up @@ -128,7 +126,7 @@ private void addPrimaryKeyLivenessInfo(LivenessInfo info)

public void addRowDeletion()
{
addRowDeletion(Row.Deletion.regular(deletionTime));
addRowDeletion(Row.Deletion.regular(deletionTime()));
}

private void addRowDeletion(Row.Deletion deletion)
Expand Down Expand Up @@ -266,11 +264,12 @@ public void addCounter(ColumnMetadata column, long increment) throws InvalidRequ

public void setComplexDeletionTime(ColumnMetadata column)
{
builder.addComplexDeletion(column, deletionTime);
builder.addComplexDeletion(column, deletionTime());
}

public void setComplexDeletionTimeForOverwrite(ColumnMetadata column)
{
DeletionTime deletionTime = deletionTime();
builder.addComplexDeletion(column, DeletionTime.build(deletionTime.markedForDeleteAt() - 1, deletionTime.localDeletionTime()));
}

Expand All @@ -283,7 +282,9 @@ public Row buildRow()

public DeletionTime deletionTime()
{
return deletionTime;
if (deletionTime == null)
deletionTime = DeletionTime.build(timestamp, nowInSec);
return deletionTime;
}

public RangeTombstone makeRangeTombstone(ClusteringComparator comparator, Clustering<?> clustering)
Expand All @@ -293,7 +294,7 @@ public RangeTombstone makeRangeTombstone(ClusteringComparator comparator, Cluste

public RangeTombstone makeRangeTombstone(Slice slice)
{
return new RangeTombstone(slice, deletionTime);
return new RangeTombstone(slice, deletionTime());
}

public byte[] nextTimeUUIDAsBytes()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,14 @@ public ClusteringColumnRestrictions mergeWith(Restriction restriction, @Nullable

public NavigableSet<Clustering<?>> valuesAsClustering(QueryOptions options, ClientState state) throws InvalidRequestException
{
// fast path, a typical case when a single full restriction is used
// for example, when we specify a single clustering key (a single row) to insert/update
if (restrictions.size() == 1 && !restrictions.hasIN())
{
SingleRestriction r = restrictions.lastRestriction();
List<ClusteringElements> values = r.values(options);
return MultiCBuilder.build(comparator, values);
}
MultiCBuilder builder = new MultiCBuilder(comparator);
for (SingleRestriction r : restrictions)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
Expand All @@ -30,6 +31,7 @@

import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.functions.Function;
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.ClusteringComparator;
import org.apache.cassandra.db.ClusteringPrefix;
import org.apache.cassandra.db.MultiCBuilder;
Expand Down Expand Up @@ -191,6 +193,17 @@ public AbstractBounds<PartitionPosition> bounds(IPartitioner partitioner, QueryO
*/
private List<ByteBuffer> nonTokenRestrictionValues(QueryOptions options, ClientState state)
{
// fast path, a typical case when a single full restriction is used
// for example, when we specify a single partition key to modify
if (restrictions.size() == 1 && !restrictions.hasIN())
{
SingleRestriction r = restrictions.lastRestriction();
List<ClusteringElements> values = r.values(options);
if (values.size() == 1)
return toByteBuffers(Clustering.make(values.get(0).toArray(new ByteBuffer[comparator.size()])));
return toByteBuffers(MultiCBuilder.build(comparator, values));
}

MultiCBuilder builder = new MultiCBuilder(comparator);
for (SingleRestriction r : restrictions)
{
Expand All @@ -205,8 +218,21 @@ private List<ByteBuffer> nonTokenRestrictionValues(QueryOptions options, ClientS
return toByteBuffers(builder.build());
}

private List<ByteBuffer> toByteBuffers(ClusteringPrefix<?> clustering)
{
clustering.validate();
return Collections.singletonList(clustering.serializeAsPartitionKey());
}

private List<ByteBuffer> toByteBuffers(SortedSet<? extends ClusteringPrefix<?>> clusterings)
{
if (clusterings.size() == 1)
{
ClusteringPrefix<?> clustering = clusterings.first();
clustering.validate();
return Collections.singletonList(clustering.serializeAsPartitionKey());
}

List<ByteBuffer> l = new ArrayList<>(clusterings.size());
for (ClusteringPrefix<?> clustering : clusterings)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ public int compare(ColumnMetadata column, ColumnMetadata otherColumn)
*/
private final NavigableMap<ColumnMetadata, SingleRestriction> restrictions;

private final SingleRestriction lastRestriction;


/**
* {@code true} if it contains multi-column restrictions, {@code false} otherwise.
*/
Expand Down Expand Up @@ -100,6 +103,8 @@ private RestrictionSet(NavigableMap<ColumnMetadata, SingleRestriction> restricti
boolean needsFilteringOrIndexing)
{
this.restrictions = restrictions;
// Map.lastEntry allocates an object, so we cache the value to avoid it, restrictions is immutable
this.lastRestriction = restrictions.isEmpty() ? null : restrictions.lastEntry().getValue();
this.hasMultiColumnRestrictions = hasMultiColumnRestrictions;
this.hasIn = hasIn;
this.hasSlice = hasSlice;
Expand Down Expand Up @@ -312,7 +317,7 @@ ColumnMetadata nextColumn(ColumnMetadata columnDef)
*/
SingleRestriction lastRestriction()
{
return restrictions.lastEntry().getValue();
return lastRestriction;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,22 @@ private List<ClusteringElements> bindAndGetClusteringElements(QueryOptions optio

private List<ClusteringElements> bindAndGetSingleTermClusteringElements(QueryOptions options)
{
if (values.isSingleTerm(options))
{
ByteBuffer value = bindAndGetSingle(options);
return Collections.singletonList(ClusteringElements.of(columnsExpression.columnSpecification(), value, isOnToken()));
}

List<ByteBuffer> values = bindAndGet(options);
if (values.isEmpty())
return Collections.emptyList();

if (values.size() == 1)
{
ClusteringElements value = ClusteringElements.of(columnsExpression.columnSpecification(), values.get(0), isOnToken());
return Collections.singletonList(value);
}

List<ClusteringElements> elements = new ArrayList<>(values.size());
for (int i = 0; i < values.size(); i++)
elements.add(ClusteringElements.of(columnsExpression.columnSpecification(), values.get(i), isOnToken()));
Expand All @@ -288,6 +300,13 @@ private List<ByteBuffer> bindAndGet(QueryOptions options)
return buffers;
}

private ByteBuffer bindAndGetSingle(QueryOptions options)
{
ByteBuffer buffer = values.bindAndGetSingleTermValue(options);
validate(buffer);
return buffer;
}

private List<List<ByteBuffer>> bindAndGetElements(QueryOptions options)
{
List<List<ByteBuffer>> elementsList = values.bindAndGetElements(options);
Expand Down
55 changes: 45 additions & 10 deletions src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,27 +142,62 @@ public BatchStatement(Type type, VariableSpecifications bindVariables, List<Modi
this.attrs = attrs;

boolean hasConditions = false;
MultiTableColumnsBuilder regularBuilder = new MultiTableColumnsBuilder();
RegularAndStaticColumns.Builder conditionBuilder = RegularAndStaticColumns.builder();
boolean updateRegular = false;
boolean updateStatic = false;
boolean updatesVirtualTables = false;

boolean sameTableAndColumnsNoConditions = true;
TableMetadata tableMetadata = null;
RegularAndStaticColumns regularAndStaticColumns = null;
// we check initially if it is a typical scenario:
// when many similar rows for the same table are written unconditionally
// in this case we can avoid columns info merging and builders allocation
for (ModificationStatement stmt : statements)
{
regularBuilder.addAll(stmt.metadata(), stmt.updatedColumns());
if (tableMetadata == null)
tableMetadata = stmt.metadata();
if (regularAndStaticColumns == null)
regularAndStaticColumns = stmt.updatedColumns();

if (tableMetadata != stmt.metadata()
|| regularAndStaticColumns != stmt.updatedColumns()
|| stmt.hasConditions())
{
sameTableAndColumnsNoConditions = false;
break;
}

updateRegular |= stmt.updatesRegularRows();
updatesVirtualTables |= stmt.isVirtual();
if (stmt.hasConditions())
updateStatic |= stmt.updatesStaticRow();
}

if (sameTableAndColumnsNoConditions && tableMetadata != null)
{
this.updatedColumns = Collections.singletonMap(tableMetadata.id(), regularAndStaticColumns);
this.conditionColumns = RegularAndStaticColumns.NONE;
}
else
{

MultiTableColumnsBuilder regularBuilder = new MultiTableColumnsBuilder();
RegularAndStaticColumns.Builder conditionBuilder = RegularAndStaticColumns.builder();
for (ModificationStatement stmt : statements)
{
hasConditions = true;
conditionBuilder.addAll(stmt.conditionColumns());
updateStatic |= stmt.updatesStaticRow();
regularBuilder.addAll(stmt.metadata(), stmt.updatedColumns());
updateRegular |= stmt.updatesRegularRows();
updatesVirtualTables |= stmt.isVirtual();
if (stmt.hasConditions())
{
hasConditions = true;
conditionBuilder.addAll(stmt.conditionColumns());
updateStatic |= stmt.updatesStaticRow();
}
}
}

this.updatedColumns = regularBuilder.build();
this.conditionColumns = conditionBuilder.build();
this.updatedColumns = regularBuilder.build();
this.conditionColumns = conditionBuilder.build();
}
this.updatesRegularRows = updateRegular;
this.updatesStaticRow = updateStatic;
this.hasConditions = hasConditions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,8 +521,8 @@ public List<ByteBuffer> buildPartitionKeyNames(QueryOptions options, ClientState
throws InvalidRequestException
{
List<ByteBuffer> partitionKeys = restrictions.getPartitionKeys(options, state);
for (ByteBuffer key : partitionKeys)
QueryProcessor.validateKey(key);
for (int i = 0; i < partitionKeys.size(); i++)
QueryProcessor.validateKey(partitionKeys.get(i));

return partitionKeys;
}
Expand Down Expand Up @@ -567,8 +567,9 @@ public boolean requiresRead()
return isReadRequired;
}

private Map<DecoratedKey, Partition> readRequiredLists(Collection<ByteBuffer> partitionKeys,
ClusteringIndexFilter filter,
private <F> Map<DecoratedKey, Partition> readRequiredLists(Collection<ByteBuffer> partitionKeys,
java.util.function.Function<F, ClusteringIndexFilter> filterBuilder,
F filterArg,
DataLimits limits,
boolean local,
ConsistencyLevel cl,
Expand All @@ -595,7 +596,7 @@ private Map<DecoratedKey, Partition> readRequiredLists(Collection<ByteBuffer> pa
RowFilter.none(),
limits,
metadata().partitioner.decorateKey(key),
filter));
filterBuilder.apply(filterArg)));

SinglePartitionReadCommand.Group group = SinglePartitionReadCommand.Group.create(commands, DataLimits.NONE);

Expand Down Expand Up @@ -1024,7 +1025,8 @@ final void addUpdates(UpdatesCollector collector,
return;

UpdateParameters params = makeUpdateParameters(keys,
new ClusteringIndexSliceFilter(slices, false),
(slicesToFilter) -> new ClusteringIndexSliceFilter(slicesToFilter, false),
slices,
state,
options,
DataLimits.NONE,
Expand Down Expand Up @@ -1117,7 +1119,8 @@ private UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys,
{
if (clusterings.contains(Clustering.STATIC_CLUSTERING))
return makeUpdateParameters(keys,
new ClusteringIndexSliceFilter(Slices.ALL, false),
(clusteringsToFilter) -> new ClusteringIndexSliceFilter(Slices.ALL, false),
clusterings,
state,
options,
DataLimits.cqlLimits(1),
Expand All @@ -1128,7 +1131,8 @@ private UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys,
);

return makeUpdateParameters(keys,
new ClusteringIndexNamesFilter(clusterings, false),
(clusteringsToFilter) -> new ClusteringIndexNamesFilter(clusteringsToFilter, false),
clusterings,
state,
options,
DataLimits.NONE,
Expand All @@ -1139,8 +1143,10 @@ private UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys,
);
}

private UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys,
ClusteringIndexFilter filter,
private <F> UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys,
// filter is needed rarely, so we allocate it on demand
java.util.function.Function<F, ClusteringIndexFilter> filterBuilder,
F filterArg,
ClientState state,
QueryOptions options,
DataLimits limits,
Expand All @@ -1152,7 +1158,8 @@ private UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys,
// Some lists operation requires reading
Map<DecoratedKey, Partition> lists =
readRequiredLists(keys,
filter,
filterBuilder,
filterArg,
limits,
local,
options.getConsistency(),
Expand Down
Loading