From c2dc7f79e7f278de80be8cf8bfbb2a8454eec434 Mon Sep 17 00:00:00 2001 From: Benedict Elliott Smith Date: Sun, 14 Sep 2025 14:28:04 +0100 Subject: [PATCH] Accord: Improve Tracing - Introduce pattern tracing, that can intercept failed or new coordinations matching various filters - Support additional tracing event collection modes (SAMPLE and RING) patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20911 --- modules/accord | 2 +- .../db/virtual/AccordDebugKeyspace.java | 561 +++++++++-- .../metrics/AccordCoordinatorMetrics.java | 2 - .../metrics/AccordReplicaMetrics.java | 2 - .../metrics/AccordSystemMetrics.java | 4 + .../service/accord/AccordService.java | 2 + .../service/accord/AccordTracing.java | 936 +++++++++++++++--- .../cassandra/service/accord/TokenRange.java | 17 + .../service/accord/api/AccordAgent.java | 13 +- .../service/accord/api/TokenKey.java | 15 +- .../db/virtual/AccordDebugKeyspaceTest.java | 264 +++-- 11 files changed, 1525 insertions(+), 293 deletions(-) diff --git a/modules/accord b/modules/accord index 657f344eb3b3..e896c9c328d3 160000 --- a/modules/accord +++ b/modules/accord @@ -1 +1 @@ -Subproject commit 657f344eb3b3570966bf8cff7731bef6eeea98f1 +Subproject commit e896c9c328d3e8a3b7ddd85381edeaf1b4b8ccb2 diff --git a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java index 1f796bdd151e..efe8c390cfc5 100644 --- a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java +++ b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java @@ -27,24 +27,31 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; -import java.util.Optional; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeoutException; import java.util.function.BiConsumer; import java.util.function.Function; +import java.util.function.IntFunction; import java.util.function.UnaryOperator; import java.util.stream.Collectors; import javax.annotation.Nullable; +import com.google.common.collect.ImmutableSet; + import accord.coordinate.AbstractCoordination; import accord.coordinate.Coordination; +import accord.coordinate.Coordination.CoordinationKind; import accord.coordinate.Coordinations; import accord.coordinate.PrepareRecovery; import accord.coordinate.tracking.AbstractTracker; import accord.local.cfk.CommandsForKey.TxnInfo; +import accord.primitives.Ranges; +import accord.primitives.Routable; import accord.primitives.RoutingKeys; import accord.utils.SortedListMap; +import accord.utils.TinyEnumSet; import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.db.marshal.CompositeType; import org.apache.cassandra.db.marshal.TxnIdUtf8Type; @@ -52,7 +59,6 @@ import org.slf4j.LoggerFactory; import accord.api.RoutingKey; -import accord.api.TraceEventType; import accord.coordinate.FetchData; import accord.coordinate.FetchRoute; import accord.coordinate.MaybeRecover; @@ -114,9 +120,14 @@ import org.apache.cassandra.service.accord.AccordKeyspace; import org.apache.cassandra.service.accord.AccordService; import org.apache.cassandra.service.accord.AccordTracing; +import org.apache.cassandra.service.accord.AccordTracing.BucketMode; +import org.apache.cassandra.service.accord.AccordTracing.CoordinationKinds; +import org.apache.cassandra.service.accord.AccordTracing.TracePattern; +import org.apache.cassandra.service.accord.AccordTracing.TxnKindsAndDomains; import org.apache.cassandra.service.accord.DebugBlockedTxns; import org.apache.cassandra.service.accord.IAccordService; import org.apache.cassandra.service.accord.JournalKey; +import org.apache.cassandra.service.accord.TokenRange; import org.apache.cassandra.service.accord.api.AccordAgent; import org.apache.cassandra.service.accord.api.TokenKey; import org.apache.cassandra.service.consensus.migration.ConsensusMigrationState; @@ -124,7 +135,6 @@ import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.utils.LocalizeString; -import static accord.api.TraceEventType.RECOVER; import static accord.coordinate.Infer.InvalidIf.NotKnownToBeInvalid; import static accord.local.RedundantStatus.Property.GC_BEFORE; import static accord.local.RedundantStatus.Property.LOCALLY_APPLIED; @@ -151,13 +161,14 @@ public class AccordDebugKeyspace extends VirtualKeyspace { - public static final String COORDINATIONS = "coordinations"; - public static final String EXECUTORS = "executors"; public static final String COMMANDS_FOR_KEY = "commands_for_key"; public static final String COMMANDS_FOR_KEY_UNMANAGED = "commands_for_key_unmanaged"; + public static final String CONSTANTS = "constants"; + public static final String COORDINATIONS = "coordinations"; public static final String DURABILITY_SERVICE = "durability_service"; public static final String DURABLE_BEFORE = "durable_before"; public static final String EXECUTOR_CACHE = "executor_cache"; + public static final String EXECUTORS = "executors"; public static final String JOURNAL = "journal"; public static final String MAX_CONFLICTS = "max_conflicts"; public static final String MIGRATION_STATE = "migration_state"; @@ -166,6 +177,8 @@ public class AccordDebugKeyspace extends VirtualKeyspace public static final String REJECT_BEFORE = "reject_before"; public static final String TXN = "txn"; public static final String TXN_BLOCKED_BY = "txn_blocked_by"; + public static final String TXN_PATTERN_TRACE = "txn_pattern_trace"; + public static final String TXN_PATTERN_TRACES = "txn_pattern_traces"; public static final String TXN_TRACE = "txn_trace"; public static final String TXN_TRACES = "txn_traces"; public static final String TXN_OPS = "txn_ops"; @@ -194,11 +207,42 @@ private AccordDebugKeyspace() new TxnTable(), new TxnTraceTable(), new TxnTracesTable(), - new TxnOpsTable() + new TxnPatternTraceTable(), + new TxnPatternTracesTable(), + new TxnOpsTable(), + new ConstantsTable() )); } - // TODO (desired): human readable packed key tracker (but requires loading Txn, so might be preferable to only do conditionally) + public static final class ConstantsTable extends AbstractVirtualTable + { + private final SimpleDataSet dataSet; + private ConstantsTable() + { + super(parse(VIRTUAL_ACCORD_DEBUG, CONSTANTS, + "Accord Debug Keyspace Constants", + "CREATE TABLE %s (\n" + + " kind text,\n" + + " name text,\n" + + " description text,\n" + + " PRIMARY KEY (kind, name)" + + ')', UTF8Type.instance)); + dataSet = new SimpleDataSet(metadata()); + + for (CoordinationKind coordinationKind : CoordinationKind.values()) + dataSet.row("CoordinationKind", coordinationKind.name()); + + for (TxnOpsTable.Op op : TxnOpsTable.Op.values()) + dataSet.row("Op", op.name()).column("description", op.description); + } + + @Override + public DataSet data() + { + return dataSet; + } + } + public static final class ExecutorsTable extends AbstractLazyVirtualTable { private ExecutorsTable() @@ -913,7 +957,7 @@ protected void collect(PartitionsCollector collector) * UPDATE system_accord_debug.txn_trace SET permits = N WHERE txn_id = ? AND event_type = ? * SELECT * FROM system_accord_debug.txn_traces WHERE txn_id = ? AND event_type = ? */ - public static final class TxnTraceTable extends AbstractMutableVirtualTable + public static final class TxnTraceTable extends AbstractMutableLazyVirtualTable { private TxnTraceTable() { @@ -921,63 +965,111 @@ private TxnTraceTable() "Accord Transaction Trace Configuration", "CREATE TABLE %s (\n" + " txn_id 'TxnIdUtf8Type',\n" + - " event_type text,\n" + - " permits int,\n" + - " PRIMARY KEY (txn_id, event_type)" + - ')', TxnIdUtf8Type.instance)); + " bucket_mode text,\n" + + " bucket_seen int,\n" + + " bucket_size int,\n" + + " bucket_sub_size int,\n" + + " chance float,\n" + + " current_size int,\n" + + " trace_events text,\n" + + " managed_by_pattern boolean,\n" + + " PRIMARY KEY (txn_id)" + + ')', TxnIdUtf8Type.instance), FAIL, UNSORTED); } @Override - public DataSet data() + protected void collect(PartitionsCollector collector) { AccordTracing tracing = tracing(); - SimpleDataSet dataSet = new SimpleDataSet(metadata()); - tracing.forEach(id -> true, (txnId, eventType, permits, events) -> { - dataSet.row(txnId.toString(), eventType.toString()).column("permits", permits); + tracing.forEach(id -> true, (txnId, events) -> { + collector.row(txnId.toString()) + .eagerCollect(columns -> { + columns.add("bucket_mode", events.bucketMode().name()) + .add("bucket_seen", events.bucketSeen()) + .add("bucket_size", events.bucketSize()) + .add("bucket_sub_size", events.bucketSubSize()) + .add("chance", events.chance()) + .add("current_size", events.size()) + .add("trace_events", events.traceEvents(), TO_STRING) + .add("managed_by_pattern", events.hasOwner()) + ; + }); }); - return dataSet; - } - - private AccordTracing tracing() - { - return ((AccordAgent)AccordService.instance().agent()).tracing(); } @Override - protected void applyPartitionDeletion(ColumnValues partitionKey) + protected void applyPartitionDeletion(Object[] partitionKey) { - TxnId txnId = TxnId.parse(partitionKey.value(0)); - tracing().erasePermits(txnId); + TxnId txnId = TxnId.parse((String)partitionKey[0]); + tracing().stopTracing(txnId); } @Override - protected void applyRowDeletion(ColumnValues partitionKey, ColumnValues clusteringColumns) + protected void applyRowDeletion(Object[] partitionKeys, Object[] clusteringKeys) { - TxnId txnId = TxnId.parse(partitionKey.value(0)); - tracing().erasePermits(txnId, parseEventType(clusteringColumns.value(0))); + TxnId txnId = TxnId.parse((String)partitionKeys[0]); + tracing().stopTracing(txnId); } @Override - protected void applyColumnDeletion(ColumnValues partitionKey, ColumnValues clusteringColumns, String columnName) + protected void applyRowUpdate(Object[] partitionKeys, @Nullable Object[] clusteringKeys, ColumnMetadata[] columns, Object[] values) { - TxnId txnId = TxnId.parse(partitionKey.value(0)); - TraceEventType eventType = parseEventType(clusteringColumns.value(0)); - tracing().erasePermits(txnId, eventType); - } + TxnId txnId = TxnId.parse((String)partitionKeys[0]); + CoordinationKinds newTrace = null; + BucketMode newBucketMode = null; + boolean unsetManagedByOwner = false; + int newBucketSize = -1, newBucketSubSize = -1, newBucketSeen = -1; + float newChance = Float.NaN; + for (int i = 0 ; i < columns.length ; ++i) + { + String name = columns[i].name.toString(); + switch (name) + { + default: throw new InvalidRequestException("Cannot update '" + name + '\''); + case "bucket_mode": + newBucketMode = checkBucketMode(values[i]); + break; + case "chance": + newChance = checkChance(values[i], name); + break; + case "managed_by_pattern": + if (values[i] != null && (Boolean)values[i]) + throw new InvalidRequestException("Can only unset '" + name + '\''); + unsetManagedByOwner = true; + break; + case "bucket_size": + newBucketSize = checkNonNegative(values[i], name, 0); + break; + case "bucket_sub_size": + newBucketSubSize = checkNonNegative(values[i], name, 0); + break; + case "bucket_seen": + newBucketSeen = checkNonNegative(values[i], name, 0); + break; + case "trace_events": + newTrace = tryParseCoordinationKinds(values[i]); + break; + } + } - @Override - protected void applyColumnUpdate(ColumnValues partitionKey, ColumnValues clusteringColumns, Optional columnValue) - { - TxnId txnId = TxnId.parse(partitionKey.value(0)); - TraceEventType eventType = parseEventType(clusteringColumns.value(0)); - if (columnValue.isEmpty()) tracing().erasePermits(txnId, eventType); - else tracing().setPermits(txnId, eventType, columnValue.get().value()); + if (newBucketSize == 0) + { + if (newBucketMode != null || newBucketSeen > 0 || newBucketSubSize > 0 || !Float.isNaN(newChance) || (newTrace != null && !newTrace.isEmpty())) + throw new InvalidRequestException("Setting bucket size to zero clears config; cannot set other fields."); + tracing().stopTracing(txnId); + } + else + { + if (newBucketSubSize == 0) + throw new InvalidRequestException("Cannot set bucket_sub_size to zero."); + tracing().set(txnId, newTrace, newBucketMode, newBucketSize, newBucketSubSize, newBucketSeen, newChance, unsetManagedByOwner); + } } @Override public void truncate() { - tracing().eraseAllEvents(); + tracing().eraseAllBuckets(); } } @@ -989,20 +1081,15 @@ private TxnTracesTable() "Accord Transaction Traces", "CREATE TABLE %s (\n" + " txn_id 'TxnIdUtf8Type',\n" + - " event_type text,\n" + " id_micros bigint,\n" + + " event text,\n" + " at_micros bigint,\n" + " command_store_id int,\n" + " message text,\n" + - " PRIMARY KEY (txn_id, event_type, id_micros, at_micros)" + + " PRIMARY KEY (txn_id, id_micros, event, at_micros)" + ')', TxnIdUtf8Type.instance), FAIL, UNSORTED, UNSORTED); } - private AccordTracing tracing() - { - return ((AccordAgent)AccordService.instance().agent()).tracing(); - } - @Override protected void applyPartitionDeletion(Object[] partitionKeys) { @@ -1014,21 +1101,22 @@ protected void applyPartitionDeletion(Object[] partitionKeys) protected void applyRangeTombstone(Object[] partitionKeys, Object[] starts, boolean startInclusive, Object[] ends, boolean endInclusive) { TxnId txnId = TxnId.parse((String) partitionKeys[0]); - if (!startInclusive) throw invalidRequest("May restrict deletion by at most one event_type"); - if (starts.length != 1) throw invalidRequest("Deletion restricted by lower bound on id_micros or at_micros is unsupported"); - if (ends.length == 0 || (ends.length == 1 && !endInclusive)) throw invalidRequest("Range deletion must specify one event_type"); - if (!ends[0].equals(starts[0])) throw invalidRequest("May restrict deletion by at most one event_type"); - if (ends.length > 2) throw invalidRequest("Deletion restricted by upper bound on at_micros is unsupported"); - TraceEventType eventType = parseEventType((String) starts[0]); - if (ends.length == 1) + if (starts.length > 1 || ends.length > 1) throw invalidRequest("May only delete on txn_id and id_micros"); + + long minId = Long.MIN_VALUE, maxId = Long.MAX_VALUE; + if (starts.length == 1) { - tracing().eraseEvents(txnId, eventType); + minId = ((Long)starts[0]); + if (!startInclusive && minId < Long.MAX_VALUE) + ++minId; } - else + if (ends.length == 1) { - long before = (Long)ends[1]; - tracing().eraseEventsBefore(txnId, eventType, before); + maxId = ((Long)ends[0]); + if (!endInclusive && maxId > Long.MIN_VALUE) + --maxId; } + tracing().eraseEventsBetween(txnId, minId, maxId); } @Override @@ -1040,11 +1128,11 @@ public void truncate() @Override public void collect(PartitionsCollector collector) { - tracing().forEach(id -> true, (txnId, eventType, permits, events) -> { + tracing().forEach(id -> true, (txnId, events) -> { events.forEach(e -> { if (e.messages().isEmpty()) { - collector.row(txnId.toString(), eventType.name(), e.idMicros, 0L) + collector.row(txnId.toString(), e.idMicros, e.kind.name(), 0L) .eagerCollect(columns -> { columns.add("message", ""); }); @@ -1052,7 +1140,7 @@ public void collect(PartitionsCollector collector) else { e.messages().forEach(m -> { - collector.row(txnId.toString(), eventType.name(), e.idMicros, NANOSECONDS.toMicros(m.atNanos - e.atNanos)) + collector.row(txnId.toString(), e.idMicros, e.kind.name(), NANOSECONDS.toMicros(m.atNanos - e.atNanos)) .eagerCollect(columns -> { columns.add("command_store_id", m.commandStoreId) .add("message", m.message); @@ -1064,6 +1152,226 @@ public void collect(PartitionsCollector collector) } } + /** + * Usage: + * collect N events (may be more than N messages) + * UPDATE system_accord_debug.txn_trace SET permits = N WHERE txn_id = ? AND event_type = ? + * SELECT * FROM system_accord_debug.txn_traces WHERE txn_id = ? AND event_type = ? + */ + public static final class TxnPatternTraceTable extends AbstractMutableLazyVirtualTable + { + private TxnPatternTraceTable() + { + super(parse(VIRTUAL_ACCORD_DEBUG, TXN_PATTERN_TRACE, + "Accord Transaction Pattern Trace Configuration", + "CREATE TABLE %s (\n" + + " id int,\n" + + " bucket_mode text,\n" + + " bucket_seen int,\n" + + " bucket_size int,\n" + + " chance float,\n" + + " current_size int,\n" + + " if_intersects text,\n" + + " if_kind text,\n" + + " on_failure text,\n" + + " on_new text,\n" + + " trace_bucket_mode text,\n" + + " trace_bucket_size int,\n" + + " trace_bucket_sub_size int,\n" + + " trace_events text,\n" + + " PRIMARY KEY (id)" + + ')', Int32Type.instance), FAIL, SORTED); + } + + @Override + protected void collect(PartitionsCollector collector) + { + AccordTracing tracing = tracing(); + tracing.forEachPattern((state) -> { + collector.row(state.id()) + .eagerCollect(columns -> { + columns.add("bucket_mode", state.mode().name()) + .add("bucket_size", state.bucketSize()) + .add("bucket_seen", state.bucketSeen()) + .add("chance", state.pattern().chance) + .add("current_size", state.currentSize()) + .add("if_intersects", state.pattern().intersects, TxnPatternTraceTable::toString) + .add("if_kind", state.pattern().kinds, TO_STRING) + .add("on_failure", state.pattern().traceFailures, TO_STRING) + .add("on_new", state.pattern().traceNew, TO_STRING) + .add("trace_bucket_mode", state.traceWithMode().name()) + .add("trace_bucket_size", state.traceBucketSize()) + .add("trace_bucket_sub_size", state.traceBucketSubSize()) + .add("trace_events", state.traceEvents(), TO_STRING) + ; + }); + }); + } + + @Override + protected void applyPartitionDeletion(Object[] partitionKeys) + { + int id = (Integer) partitionKeys[0]; + tracing().erasePattern(id); + } + + @Override + protected void applyRowUpdate(Object[] partitionKeys, @Nullable Object[] clusteringKeys, ColumnMetadata[] columns, Object[] values) + { + int id = (Integer)partitionKeys[0]; + Function pattern = Function.identity(); + CoordinationKinds newTraceEvents = null; + BucketMode newBucketMode = null, newTraceBucketMode = null; + int newBucketSize = -1, newTraceBucketSize = -1, newTraceBucketSubSize = -1; + int newBucketSeen = -1; + for (int i = 0 ; i < columns.length ; ++i) + { + String name = columns[i].name.toString(); + switch (name) + { + default: throw new InvalidRequestException("Cannot update '" + name + '\''); + case "bucket_mode": + newBucketMode = checkBucketMode(values[i]); + break; + case "bucket_seen": + newBucketSeen = checkNonNegative(values[i], name, 0); + break; + case "bucket_size": + newBucketSize = checkNonNegative(values[i], name, 0); + break; + case "chance": + float newChance = checkChance(values[i], name); + pattern = pattern.andThen(p -> p.withChance(newChance)); + break; + case "if_intersects": + Participants intersects = parseParticipants(values[i]); + pattern = pattern.andThen(p -> p.withIntersects(intersects)); + break; + case "if_kind": + TxnKindsAndDomains kinds = tryParseTxnKinds(values[i]); + pattern = pattern.andThen(p -> p.withKinds(kinds)); + break; + case "on_failure": + CoordinationKinds traceFailures = tryParseCoordinationKinds(values[i]); + pattern = pattern.andThen(p -> p.withTraceFailures(traceFailures)); + break; + case "on_new": + CoordinationKinds traceNew = tryParseCoordinationKinds(values[i]); + pattern = pattern.andThen(p -> p.withTraceNew(traceNew)); + break; + case "trace_bucket_mode": + newTraceBucketMode = checkBucketMode(values[i]); + break; + case "trace_bucket_size": + newTraceBucketSize = checkNonNegative(values[i], name, 0); + break; + case "trace_bucket_sub_size": + newTraceBucketSubSize = checkNonNegative(values[i], name, 0); + break; + case "trace_events": + newTraceEvents = tryParseCoordinationKinds(values[i]); + break; + } + } + + tracing().setPattern(id, pattern, newBucketMode, newBucketSeen, newBucketSize, newTraceBucketMode, newTraceBucketSize, newTraceBucketSubSize, newTraceEvents); + } + + private static String toString(Participants participants) + { + StringBuilder out = new StringBuilder(); + for (Routable r : participants) + { + if (out.length() != 0) + out.append('|'); + out.append(r); + } + return out.toString(); + } + + private static Participants parseParticipants(Object input) + { + if (input == null) + return null; + + String[] vs = ((String)input).split("\\|"); + if (vs.length == 0) + return RoutingKeys.EMPTY; + + if (!vs[0].endsWith("]")) + { + RoutingKey[] keys = new RoutingKey[vs.length]; + for (int i = 0 ; i < keys.length ; ++i) + { + try { keys[i] = TokenKey.parse(vs[i], DatabaseDescriptor.getPartitioner()); } + catch (Throwable t) { throw new InvalidRequestException("Could not parse TokenKey " + vs[0]); } + } + return RoutingKeys.of(keys); + } + else + { + TokenRange[] ranges = new TokenRange[vs.length]; + for (int i = 0 ; i < ranges.length ; ++i) + { + try { ranges[i] = TokenRange.parse(vs[0], DatabaseDescriptor.getPartitioner()); } + catch (Throwable t) { throw new InvalidRequestException("Could not parse TokenKey " + vs[0]); } + } + return Ranges.of(ranges); + } + } + + @Override + public void truncate() + { + tracing().eraseAllPatterns(); + } + } + + public static final class TxnPatternTracesTable extends AbstractMutableLazyVirtualTable + { + private TxnPatternTracesTable() + { + super(parse(VIRTUAL_ACCORD_DEBUG, TXN_PATTERN_TRACES, + "Accord Transaction Pattern Traces", + "CREATE TABLE %s (\n" + + " id int,\n" + + " txn_id 'TxnIdUtf8Type',\n" + + " PRIMARY KEY (id, txn_id)" + + ')', Int32Type.instance), FAIL, SORTED, SORTED); + } + + @Override + protected void applyPartitionDeletion(Object[] partitionKeys) + { + int id = (Integer) partitionKeys[0]; + tracing().erasePatternTraces(id); + } + + @Override + public void truncate() + { + tracing().eraseAllPatternTraces(); + } + + @Override + public void collect(PartitionsCollector collector) + { + tracing().forEachPattern(state -> { + if (state.currentSize() == 0) + { + collector.row(state.id(), "") + .eagerCollect(column -> {}); + } + else + { + for (int i = 0, size = state.currentSize(); i < size ; ++i) + collector.row(state.id(), state.get(i).toString()) + .eagerCollect(columns -> {}); + } + }); + } + } + // TODO (desired): don't report null as "null" abstract static class AbstractJournalTable extends AbstractLazyVirtualTable { @@ -1291,7 +1599,24 @@ void collect(PartitionsCollector collector, AccordService accord, JournalKey key public static final class TxnOpsTable extends AbstractMutableLazyVirtualTable { // TODO (expected): test each of these operations - enum Op { ERASE_VESTIGIAL, INVALIDATE, TRY_EXECUTE, FORCE_APPLY, FORCE_UPDATE, RECOVER, FETCH, RESET_PROGRESS_LOG } + enum Op + { + LOCALLY_ERASE_VESTIGIAL("USE WITH CAUTION: Move the command to the vestigial status, erasing its contents. This has distributed state machine implications."), + LOCALLY_INVALIDATE("USE WITH CAUTION: Move the command to the invalidated status, erasing its contents. This has distributed state machine implications."), + TRY_EXECUTE("Try to execute a stuck transaction. This is safe, and will no-op if not able to."), + FORCE_APPLY("USE WITH CAUTION: Apply the command if we have the relevant information locally."), + FORCE_UPDATE("Try to reset in-memory book-keeping related to a command."), + RECOVER("Initiate recovery for a command."), + FETCH("Initiate a fetch request for a command."), + REQUEUE_PROGRESS_LOG("Ask the progress log to queue both home and waiting states."); + + final String description; + + Op(String description) + { + this.description = description; + } + } private TxnOpsTable() { super(parse(VIRTUAL_ACCORD_DEBUG, TXN_OPS, @@ -1315,14 +1640,16 @@ protected void applyRowUpdate(Object[] partitionKeys, Object[] clusteringKeys, C { TxnId txnId = TxnId.parse((String) partitionKeys[0]); int commandStoreId = (Integer) clusteringKeys[0]; - Op op = Op.valueOf((String)values[0]); + + Op op = tryParse(values[0], true, Op.class, Op::valueOf); + switch (op) { default: throw new UnhandledEnum(op); - case ERASE_VESTIGIAL: + case LOCALLY_ERASE_VESTIGIAL: cleanup(txnId, commandStoreId, Cleanup.VESTIGIAL); break; - case INVALIDATE: + case LOCALLY_INVALIDATE: cleanup(txnId, commandStoreId, Cleanup.INVALIDATE); break; case TRY_EXECUTE: @@ -1360,7 +1687,7 @@ protected void applyRowUpdate(Object[] partitionKeys, Object[] clusteringKeys, C recover(txnId, route, result); }); break; - case RESET_PROGRESS_LOG: + case REQUEUE_PROGRESS_LOG: run(txnId, commandStoreId, safeStore -> { ((DefaultProgressLog)safeStore.progressLog()).requeue(safeStore, TxnStateKind.Waiting, txnId); ((DefaultProgressLog)safeStore.progressLog()).requeue(safeStore, TxnStateKind.Home, txnId); @@ -1381,7 +1708,7 @@ private void runWithRoute(TxnId txnId, int commandStoreId, Function, AsyncResult.Settable> consumer = apply.apply(command); if (command.route() == null) { - FetchRoute.fetchRoute(node, txnId, command.maxContactable(), LatentStoreSelector.standard(), (success, fail) -> { + FetchRoute.fetchRoute(node, txnId, command.maxParticipants(), LatentStoreSelector.standard(), (success, fail) -> { if (fail != null) result.setFailure(fail); else consumer.accept(success, result); }); @@ -1411,7 +1738,7 @@ private void recover(TxnId txnId, @Nullable Route route, AsyncResult.Settable PrepareRecovery.recover(node, node.someSequentialExecutor(), txnId, NotKnownToBeInvalid, (FullRoute) route, null, LatentStoreSelector.standard(), (success, fail) -> { if (fail != null) result.setFailure(fail); else result.setSuccess(null); - }, node.agent().trace(txnId, RECOVER)); + }); } else { @@ -1497,21 +1824,6 @@ protected void collect(PartitionsCollector collector) } } - private static TableId tableId(int commandStoreId, CommandStores commandStores) - { - AccordCommandStore commandStore = (AccordCommandStore) commandStores.forId(commandStoreId); - if (commandStore == null) - return null; - return commandStore.tableId(); - } - - private static TableMetadata tableMetadata(TableId tableId) - { - if (tableId == null) - return null; - return Schema.instance.getTableMetadata(tableId); - } - private static String printToken(RoutingKey routingKey) { TokenKey key = (TokenKey) routingKey; @@ -1539,10 +1851,9 @@ private static String toStr(StoreParticipants participants, Function'; } } + + private static BucketMode checkBucketMode(Object value) + { + try { return AccordTracing.BucketMode.valueOf(LocalizeString.toUpperCaseLocalized((String)value, Locale.ENGLISH)); } + catch (IllegalArgumentException | NullPointerException e) + { + throw new InvalidRequestException("Unknown bucket_mode '" + value + '\''); + } + + } + + private static int checkNonNegative(Object value, String field, int ifNull) + { + if (value == null) + return ifNull; + + int v = (Integer)value; + if (v < 0) + throw new InvalidRequestException("Cannot set '" + field + "' to negative value"); + return v; + } + + private static float checkChance(Object value, String field) + { + if (value == null) + return 1.0f; + + float v = (Float)value; + if (v <= 0 || v > 1.0f) + throw new InvalidRequestException("Cannot set '" + field + "' to value outside the range (0..1]"); + return v; + } + + private static > Set toStrings(TinyEnumSet set, IntFunction lookup) + { + ImmutableSet.Builder builder = ImmutableSet.builder(); + for (T t : set.iterable(lookup)) + builder.add(t.name()); + return builder.build(); + } + + private static AccordTracing tracing() + { + return ((AccordAgent)AccordService.instance().agent()).tracing(); + } + + private static > E tryParse(Object input, boolean toUpperCase, Class clazz, Function valueOf) + { + try + { + String str = (String) input; + if (toUpperCase) + str = LocalizeString.toUpperCaseLocalized(str, Locale.ENGLISH); + return valueOf.apply(str); + } + catch (IllegalArgumentException | NullPointerException e) + { + throw new InvalidRequestException("Unknown " + clazz.getName() + ": '" + input + '\''); + } + } + + private static CoordinationKinds tryParseCoordinationKinds(Object input) + { + if (input == null) + return null; + + return CoordinationKinds.parse((String) input); + } + + private static TxnKindsAndDomains tryParseTxnKinds(Object input) + { + if (input == null) + return null; + + return TxnKindsAndDomains.parse((String) input); + } } diff --git a/src/java/org/apache/cassandra/metrics/AccordCoordinatorMetrics.java b/src/java/org/apache/cassandra/metrics/AccordCoordinatorMetrics.java index f062e9507662..c4a37ebf2e81 100644 --- a/src/java/org/apache/cassandra/metrics/AccordCoordinatorMetrics.java +++ b/src/java/org/apache/cassandra/metrics/AccordCoordinatorMetrics.java @@ -198,8 +198,6 @@ public String toString() public static class Listener implements CoordinatorEventListener { - public static final Listener instance = new Listener(); - private AccordCoordinatorMetrics forTransaction(TxnId txnId) { if (txnId != null) diff --git a/src/java/org/apache/cassandra/metrics/AccordReplicaMetrics.java b/src/java/org/apache/cassandra/metrics/AccordReplicaMetrics.java index 551a7ee37d45..91973e4a95d7 100644 --- a/src/java/org/apache/cassandra/metrics/AccordReplicaMetrics.java +++ b/src/java/org/apache/cassandra/metrics/AccordReplicaMetrics.java @@ -112,8 +112,6 @@ public String toString() public static class Listener implements ReplicaEventListener { - public static final Listener instance = new Listener(); - private AccordReplicaMetrics forTransaction(TxnId txnId) { if (txnId != null) diff --git a/src/java/org/apache/cassandra/metrics/AccordSystemMetrics.java b/src/java/org/apache/cassandra/metrics/AccordSystemMetrics.java index c9f813022537..11f5f970b38e 100644 --- a/src/java/org/apache/cassandra/metrics/AccordSystemMetrics.java +++ b/src/java/org/apache/cassandra/metrics/AccordSystemMetrics.java @@ -132,6 +132,10 @@ static class SnapshotBuilder } } + public static void touch() + { + } + private AccordSystemMetrics() { Invariants.expect(AccordService.isSetup()); diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java b/src/java/org/apache/cassandra/service/accord/AccordService.java index 6742f2484369..33de4a500689 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordService.java @@ -41,6 +41,7 @@ import accord.api.ConfigurationService.EpochReady; import accord.primitives.Txn; import org.apache.cassandra.metrics.AccordReplicaMetrics; +import org.apache.cassandra.metrics.AccordSystemMetrics; import org.apache.cassandra.service.accord.api.AccordViolationHandler; import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.concurrent.AsyncFuture; @@ -307,6 +308,7 @@ public synchronized static AccordService startup(NodeId tcmId) instance = as; AccordReplicaMetrics.touch(); + AccordSystemMetrics.touch(); AccordViolationHandler.setup(); WatermarkCollector.fetchAndReportWatermarksAsync(as.configService); diff --git a/src/java/org/apache/cassandra/service/accord/AccordTracing.java b/src/java/org/apache/cassandra/service/accord/AccordTracing.java index 9de40299a0f3..8bbae998ca79 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordTracing.java +++ b/src/java/org/apache/cassandra/service/accord/AccordTracing.java @@ -18,40 +18,75 @@ package org.apache.cassandra.service.accord; -import java.util.AbstractList; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.EnumMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.Predicate; +import javax.annotation.Nullable; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import accord.api.Tracing; -import accord.api.TraceEventType; +import accord.coordinate.Coordination; +import accord.coordinate.Coordination.CoordinationKind; +import accord.coordinate.tracking.AbstractTracker; import accord.local.CommandStore; +import accord.local.Node; +import accord.primitives.Participants; +import accord.primitives.Routable; +import accord.primitives.Txn; import accord.primitives.TxnId; +import accord.utils.Invariants; +import accord.utils.SortedListMap; +import accord.utils.TinyEnumSet; +import accord.utils.UnhandledEnum; +import org.apache.cassandra.metrics.AccordCoordinatorMetrics; +import org.apache.cassandra.service.ClientWarn; import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.NoSpamLogger; -public class AccordTracing +import static org.apache.cassandra.service.accord.AccordTracing.BucketMode.LEAKY; +import static org.apache.cassandra.service.accord.AccordTracing.BucketMode.SAMPLE; + +public class AccordTracing extends AccordCoordinatorMetrics.Listener { private static final int MAX_EVENTS = 10000; private static final Logger logger = LoggerFactory.getLogger(AccordTracing.class); private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 1L, TimeUnit.MINUTES); + public enum BucketMode + { + LEAKY, SAMPLE, RING; + + int position(int permits, int total) + { + switch (this) + { + default: throw UnhandledEnum.unknown(this); + case LEAKY: return Integer.MAX_VALUE; + case RING: return total % permits; + case SAMPLE: return ThreadLocalRandom.current().nextInt(total); + } + } + } + public interface ConsumeState { - void accept(TxnId txnId, TraceEventType eventType, int permits, List events); + void accept(TxnId txnId, TxnEvents state); } public static class Message @@ -74,11 +109,18 @@ public String toString() } } - public static class Event implements Tracing, Comparable + public static class TxnEvent implements Tracing, Comparable { + public final CoordinationKind kind; public final long idMicros = uniqueNowMicros(); public final long atNanos = Clock.Global.nanoTime(); final List messages = new ArrayList<>(); + int index = -1, subIndex = -1; + + public TxnEvent(CoordinationKind kind) + { + this.kind = kind; + } @Override public void trace(CommandStore commandStore, String s) @@ -91,7 +133,7 @@ public void trace(CommandStore commandStore, String s) } @Override - public int compareTo(Event that) + public int compareTo(TxnEvent that) { return Long.compareUnsigned(this.idMicros, that.idMicros); } @@ -102,37 +144,594 @@ public List messages() } } - static class TraceState extends AbstractList + private static class TxnEventsList { - int permits; - int size; - Event[] events; + TxnEvent[] events; + int size, bucketSeen; - void addInternal(Event event) + void addInternal(TxnEvent event) { - if (events == null) events = new Event[10]; + if (events == null) events = new TxnEvent[10]; else if (size == events.length) events = Arrays.copyOf(events, events.length * 2); events[size++] = event; } - void truncate(int eraseBefore) + void truncateInternal(int eraseBefore) { System.arraycopy(events, eraseBefore, events, 0, size - eraseBefore); Arrays.fill(events, size - eraseBefore, size, null); size -= eraseBefore; } - @Override - public Event get(int index) + public TxnEvent get(int index) { return events[index]; } - @Override + public boolean isEmpty() + { + return size == 0; + } + public int size() { return size; } + + void incrementSeen() + { + if (++bucketSeen < 0) + bucketSeen = Integer.MAX_VALUE; + } + } + + public static class TxnEvents extends TxnEventsList + { + private final EnumMap subLists = new EnumMap<>(CoordinationKind.class); + private CoordinationKinds traceEvents = CoordinationKinds.ALL; + private BucketMode mode = LEAKY; + private TracePatternState owner; + private int bucketSize, bucketSubSize; + private float chance = 1.0f; + + void remove(int index) + { + TxnEvent removing = get(index); + TxnEventsList subList = subLists.get(removing.kind); + if (--subList.size == 0) + { + subLists.remove(removing.kind); + } + else + { + if (removing.subIndex < subList.size) + { + TxnEvent replaceWith = subList.events[subList.size]; + subList.events[removing.subIndex] = replaceWith; + replaceWith.subIndex = removing.subIndex; + } + subList.events[subList.size] = null; + } + --size; + if (removing.index < size) + { + TxnEvent replaceWith = events[size]; + events[index] = replaceWith; + replaceWith.index = removing.index; + } + events[size] = null; + } + + TxnEvents eraseEvents(CoordinationKind kind, GlobalCount globalCount) + { + TxnEventsList subList = subLists.remove(kind); + if (subList == null) + return this; + + while (!subList.isEmpty()) + { + remove(subList.get(subList.size() - 1).index); + globalCount.decrementAndGet(); + } + + return nullIfDefunct(); + } + + TxnEvents eraseEventsBetween(long minId, long maxId, GlobalCount globalCount) + { + int i = 0; + while (i < size()) + { + TxnEvent event = get(i); + if (event.idMicros >= minId && event.idMicros <= maxId) + { + remove(i); + globalCount.decrementAndGet(); + } + else ++i; + } + return nullIfDefunct(); + } + + TxnEvents stopTracing() + { + bucketSize = bucketSubSize = 0; + return nullIfDefunct(); + } + + TxnEvents eraseEvents(GlobalCount globalCount) + { + globalCount.addAndGet(-size()); + subLists.clear(); + truncateInternal(size()); + return nullIfDefunct(); + } + + TxnEvents nullIfDefunct() + { + return isEmpty() && bucketSize == 0 ? null : this; + } + + TxnEvent trace(CoordinationKind kind, GlobalCount globalCount) + { + if (bucketSize == 0 || !traceEvents.contains(kind)) + return null; + + if (chance < 1.0f && ThreadLocalRandom.current().nextFloat() >= chance) + return null; + + TxnEventsList subList = subLists.get(kind); + if (subList != null && bucketSubSize <= subList.size) + { + subList.incrementSeen(); + int position = mode.position(bucketSubSize, subList.bucketSeen); + if (position >= bucketSubSize) + return null; + + remove(subList.get(position).index); + } + else if (bucketSize <= size) + { + incrementSeen(); + int position = mode.position(bucketSize, bucketSeen); + if (position >= bucketSize) + return null; + + remove(position); + } + else + { + if (!globalCount.admit()) + return null; + } + + return newTrace(kind, subList); + } + + TxnEvent forceTrace(CoordinationKind kind, GlobalCount globalCount) + { + // ignore all LOCAL accounting and filters, just add a new trace + if (!globalCount.admit()) + return null; + + return newTrace(kind, null); + } + + private TxnEvent newTrace(CoordinationKind kind, TxnEventsList subList) + { + if (subList == null) + subLists.put(kind, subList = new TxnEventsList()); + + TxnEvent event = new TxnEvent(kind); + event.subIndex = subList.size; + subList.addInternal(event); + event.index = size; + addInternal(event); + return event; + } + + public boolean hasOwner() + { + return owner != null; + } + + public CoordinationKinds traceEvents() + { + return traceEvents; + } + + public int bucketSize() + { + return bucketSize; + } + + public int bucketSubSize() + { + return bucketSubSize; + } + + public int bucketSeen() + { + return bucketSeen; + } + + public float chance() + { + return chance; + } + + public BucketMode bucketMode() + { + return mode; + } + + public void forEach(Consumer forEach) + { + for (int i = 0 ; i < size ; ++i) + forEach.accept(events[i]); + } + } + + enum NewOrFailure + { + NEW, FAILURE + } + + public static class CoordinationKinds extends TinyEnumSet + { + private static final int ALL_BITS = CoordinationKind.ALL.bitset(); + public static final CoordinationKinds ALL = new CoordinationKinds(false, ALL_BITS); + public static final CoordinationKinds NONE = new CoordinationKinds(false, 0); + + final boolean printAsSubtraction; + public CoordinationKinds(boolean printAsSubtraction, int bitset) + { + super(bitset); + this.printAsSubtraction = printAsSubtraction; + } + + @Override + public String toString() + { + if (bitset == ALL_BITS) + return "*"; + if (printAsSubtraction) + return '-' + toString(ALL_BITS & ~bitset); + return toString(bitset, CoordinationKind::forOrdinal); + } + + public static CoordinationKinds parse(String input) + { + input = input.trim(); + if (input.equals("*")) + return ALL; + if (input.equals("{}")) + return NONE; + + boolean subtraction = false; + if (input.length() >= 1 && input.charAt(0) == '-') + { + subtraction = true; + input = input.substring(1); + } + if (input.length() < 2 || input.charAt(0) != '{' || input.charAt(input.length() - 1) != '}') + throw new IllegalArgumentException("Invalid CoordinationKinds specification: " + input); + + int bits = 0; + for (String name : input.substring(1, input.length() - 1).split("\\s*,\\s*")) + bits |= TinyEnumSet.encode(CoordinationKind.valueOf(name)); + + if (subtraction) + bits = ALL_BITS & ~bits; + return new CoordinationKinds(subtraction, bits); + } + + private static String toString(int bitset) + { + return TinyEnumSet.toString(bitset, CoordinationKind::forOrdinal); + } + } + + public static class TxnKindsAndDomains + { + static final int ALL_KINDS = Txn.Kind.All.bitset(); + static final TxnKindsAndDomains ALL = new TxnKindsAndDomains(false, ALL_KINDS, ALL_KINDS); + static final TxnKindsAndDomains NONE = new TxnKindsAndDomains(false, 0, 0); + + final boolean printAsSubtraction; + final int keys, ranges; + public TxnKindsAndDomains(boolean printAsSubtraction, int keys, int ranges) + { + this.printAsSubtraction = printAsSubtraction; + this.keys = keys; + this.ranges = ranges; + } + + boolean matches(TxnId txnId) + { + int bits = txnId.is(Routable.Domain.Key) ? keys : ranges; + return TinyEnumSet.contains(bits, txnId.kind()); + } + + @Override + public String toString() + { + if (keys == ALL_KINDS && ranges == ALL_KINDS) + return "*"; + if (printAsSubtraction) + return '-' + toString(ALL_KINDS & ~keys, ALL_KINDS & ~ranges); + return '+' + toString(keys, ranges); + } + + public static TxnKindsAndDomains parse(String input) + { + input = input.trim(); + if (input.equals("*")) + return ALL; + if (input.equals("{}")) + return NONE; + + boolean subtraction = false; + if (input.length() >= 1 && input.charAt(0) == '-') + { + subtraction = true; + input = input.substring(1); + } + if (input.length() < 2 || input.charAt(0) != '{' || input.charAt(input.length() - 1) != '}') + throw new IllegalArgumentException("Invalid TxnKindsAndDomain specification: " + input); + + int keys = 0, ranges = 0; + for (String element : input.substring(1, input.length() - 1).split("\\s*,\\s*")) + { + if (element.length() != 2) + throw new IllegalArgumentException("Invalid TxnKindsAndDomain element: " + element); + + int kinds; + if (element.charAt(1) == '*') kinds = ALL_KINDS; + else + { + Txn.Kind kind = Txn.Kind.forShortName(element.charAt(1)); + if (kind == null) throw new IllegalArgumentException("Unknown Txn.Kind: " + element.charAt(1)); + kinds = TinyEnumSet.encode(kind); + } + + switch (element.charAt(0)) + { + default: throw new IllegalArgumentException("Invalid TxnKindsAndDomain element: " + element); + case '*': keys |= kinds; ranges |= kinds; break; + case 'K': keys |= kinds; break; + case 'R': ranges |= kinds; break; + } + } + + if (subtraction) + { + keys = ALL_KINDS & ~keys; + ranges = ALL_KINDS & ~ranges; + } + return new TxnKindsAndDomains(subtraction, keys, ranges); + } + + private static String toString(int keys, int ranges) + { + StringBuilder out = new StringBuilder("{"); + if (keys != 0) + { + if (keys == ALL_KINDS) out.append("K*"); + else TinyEnumSet.append(keys, Txn.Kind::forOrdinal, k -> "K" + k.shortName(), out); + } + + if (ranges != 0) + { + if (keys != 0) out.append(','); + if (ranges == ALL_KINDS) out.append("R*"); + else TinyEnumSet.append(ranges, Txn.Kind::forOrdinal, k -> "R" + k.shortName(), out); + } + out.append('}'); + return out.toString(); + } + } + + public static class TracePattern + { + private static final TracePattern EMPTY = new TracePattern(null, null, null, null, 1.0f); + + public final TxnKindsAndDomains kinds; + public final Participants intersects; + public final CoordinationKinds traceNew; + public final CoordinationKinds traceFailures; + public final float chance; + + public TracePattern(TxnKindsAndDomains kinds, @Nullable Participants intersects, CoordinationKinds traceNew, CoordinationKinds traceFailures, float chance) + { + this.kinds = kinds; + this.intersects = intersects; + this.traceNew = traceNew; + this.traceFailures = traceFailures; + this.chance = chance; + } + + public TracePattern withKinds(TxnKindsAndDomains kinds) + { + return new TracePattern(kinds, intersects, traceNew, traceFailures, chance); + } + + public TracePattern withIntersects(Participants intersects) + { + return new TracePattern(kinds, intersects, traceNew, traceFailures, chance); + } + + public TracePattern withTraceNew(CoordinationKinds traceNew) + { + return new TracePattern(kinds, intersects, traceNew, traceFailures, chance); + } + + public TracePattern withTraceFailures(CoordinationKinds traceFailures) + { + return new TracePattern(kinds, intersects, traceNew, traceFailures, chance); + } + + public TracePattern withChance(float chance) + { + return new TracePattern(kinds, intersects, traceNew, traceFailures, chance); + } + + boolean matches(TxnId txnId, @Nullable Participants participants, CoordinationKind kind, NewOrFailure newOrFailure) + { + if (kinds != null && !kinds.matches(txnId)) + return false; + + TinyEnumSet testKind = newOrFailure == NewOrFailure.NEW ? traceNew : traceFailures; + if (testKind == null || !testKind.contains(kind)) + return false; + + if (intersects != null && (participants == null || !intersects.intersects(participants))) + return false; + + return chance >= 1.0f || ThreadLocalRandom.current().nextFloat() <= chance; + } + } + + public class TracePatternState + { + final int id; + + private volatile TracePattern pattern; + private BucketMode bucketMode = SAMPLE; + private int bucketSize; + private int bucketSeen; + private BucketMode traceBucketMode = SAMPLE; + private int traceBucketSize, traceBucketSubSize; + private CoordinationKinds traceEvents = new CoordinationKinds(false, 0); + + private final List txnIds = new ArrayList<>(); + + public TracePatternState(int id) + { + this.pattern = TracePattern.EMPTY; + this.id = id; + } + + public int id() { return id; } + public TracePattern pattern() { return pattern; } + public int bucketSize() { return bucketSize; } + public BucketMode mode() { return bucketMode; } + public int bucketSeen() { return bucketSeen; } + public BucketMode traceWithMode() { return traceBucketMode; } + public int traceBucketSize() { return traceBucketSize; } + public int traceBucketSubSize() { return traceBucketSubSize; } + public CoordinationKinds traceEvents() { return traceEvents; } + + public int currentSize() + { + return txnIds.size(); + } + + public TxnId get(int index) + { + return txnIds.get(index); + } + + TxnEvents maybeAdd(TxnId txnId, @Nullable Participants participants, CoordinationKind kind, NewOrFailure newOrFailure) + { + if (!pattern.matches(txnId, participants, kind, newOrFailure)) + return null; + + return maybeAdd(txnId); + } + + private synchronized TxnEvents maybeAdd(TxnId txnId) + { + if (bucketSize == 0) + return null; + + if (++bucketSeen < 0) + bucketSeen = Integer.MAX_VALUE; + + if (bucketSize > txnIds.size()) + { + TxnEvents added = trace(txnId); + if (added != null) + txnIds.add(txnId); + return added; + } + + int position = bucketMode.position(bucketSize, bucketSeen); + + if (position >= bucketSize) + return null; + + TxnEvents added = trace(txnId); + if (added == null) + return null; + + untrace(txnIds.get(position)); + txnIds.set(position, txnId); + return added; + } + + private synchronized void untrace(TxnId txnId) + { + txnIdMap.compute(txnId, (ignore, cur) -> { + if (cur == null || cur.owner == this) + return null; + return cur; + }); + } + + private synchronized TxnEvents trace(TxnId txnId) + { + TxnEvents events = new TxnEvents(); + events.mode = traceBucketMode; + events.bucketSize = traceBucketSize; + events.bucketSubSize = traceBucketSubSize; + events.owner = this; + if (null == txnIdMap.putIfAbsent(txnId, events)) + return events; + return null; + } + + synchronized void set(Function pattern, BucketMode newBucketMode, int newBucketSeen, int newBucketSize, BucketMode newTraceBucketMode, int newTraceBucketSize, int newTraceBucketSubSize, CoordinationKinds newTraceEvents) + { + Invariants.require(newBucketSize != 0); + Invariants.require(newTraceBucketSize != 0); + this.pattern = pattern.apply(this.pattern); + if (newBucketMode != null) + this.bucketMode = newBucketMode; + if (newBucketSize >= 0) + this.bucketSize = newBucketSize; + if (newBucketSeen >= 0) + this.bucketSeen = newBucketSeen; + if (newTraceBucketMode != null) + this.traceBucketMode = newTraceBucketMode; + if (newTraceBucketSize >= 0) + this.traceBucketSize = newTraceBucketSize; + if (newTraceBucketSubSize >= 0) + this.traceBucketSubSize = newTraceBucketSubSize; + if (newTraceEvents != null) + this.traceEvents = newTraceEvents; + } + + synchronized void clear() + { + for (TxnId txnId : txnIds) + untrace(txnId); + txnIds.clear(); + } + } + + static class GlobalCount extends AtomicInteger + { + public boolean admit() + { + if (incrementAndGet() <= MAX_EVENTS) + return true; + + decrementAndGet(); + ClientWarn.instance.warn("Too many Accord trace events stored already; delete some to continue tracing"); + noSpamLogger.warn("Too many Accord trace events stored already; delete some to continue tracing"); + return false; + } } private static final AtomicLong lastNowMicros = new AtomicLong(); @@ -149,176 +748,265 @@ private static long uniqueNowMicros() } } - final Map> stateMap = new ConcurrentHashMap<>(); - final AtomicInteger count = new AtomicInteger(); + final Map txnIdMap = new ConcurrentHashMap<>(); + final CopyOnWriteArrayList allPatterns = new CopyOnWriteArrayList<>(); + final CopyOnWriteArrayList traceNewPatterns = new CopyOnWriteArrayList<>(); + final GlobalCount globalCount = new GlobalCount(); - public Tracing trace(TxnId txnId, TraceEventType eventType) + public Tracing trace(TxnId txnId, @Nullable Participants participants, CoordinationKind kind) { - if (!stateMap.containsKey(txnId)) + if (!txnIdMap.containsKey(txnId) && null == maybeTrace(txnId, participants, kind, NewOrFailure.NEW, traceNewPatterns)) return null; - class Register implements BiFunction, EnumMap> + class Register implements BiFunction { - Event event; + TxnEvent event; @Override - public EnumMap apply(TxnId id, EnumMap cur) + public TxnEvents apply(TxnId id, TxnEvents state) { - if (cur == null) + if (state == null) return null; - TraceState curState = cur.get(eventType); - if (curState == null || curState.permits == 0) - return cur; - - if (count.incrementAndGet() >= MAX_EVENTS) - { - count.decrementAndGet(); - noSpamLogger.warn("Too many Accord trace events stored already; delete some to continue tracing"); - } - else - { - curState.permits--; - curState.addInternal(event = new Event()); - } - return cur; + event = state.trace(kind, globalCount); + return state; } } Register register = new Register(); - stateMap.compute(txnId, register); + txnIdMap.compute(txnId, register); return register.event; } - public void setPermits(TxnId txnId, TraceEventType eventType, int newPermits) + // null values, or values < 0, are ignored + public void set(TxnId txnId, CoordinationKinds trace, BucketMode newBucketMode, int newBucketSize, int newBucketSubSize, int newBucketSeen, float newChance, boolean unsetManagedByPattern) { - stateMap.compute(txnId, (id, cur) -> { - if (newPermits != 0) - { - if (cur == null) - cur = new EnumMap<>(TraceEventType.class); - cur.computeIfAbsent(eventType, ignore -> new TraceState()).permits = newPermits; - } - else if (cur != null) + Invariants.requireArgument(newBucketSize != 0); + Invariants.requireArgument(newBucketSubSize != 0); + Invariants.requireArgument(Float.isNaN(newChance) || (newChance <= 1.0f && newChance > 0f)); + txnIdMap.compute(txnId, (id, cur) -> { + if (cur == null) { - TraceState curState = cur.get(eventType); - if (curState != null) - { - if (!curState.isEmpty()) curState.permits = 0; - else - { - cur.remove(eventType); - if (cur.isEmpty()) - return null; - } - } + if (newBucketSize < 0) + throw new IllegalArgumentException("Must specify bucket size for new trace config."); + + cur = new TxnEvents(); + if (newBucketSubSize < 0) + cur.bucketSubSize = newBucketSize; } + + if (newBucketMode != null) + cur.mode = newBucketMode; + if (newBucketSize >= 0) + cur.bucketSize = newBucketSize; + if (newBucketSubSize >= 0) + cur.bucketSubSize = newBucketSubSize; + if (newBucketSeen >= 0) + cur.bucketSeen = newBucketSeen; + if (!Float.isNaN(newChance)) + cur.chance = newChance; + if (trace != null) + cur.traceEvents = trace; + if (unsetManagedByPattern) + cur.owner = null; return cur; }); } - public void erasePermits(TxnId txnId) + public void stopTracing(TxnId txnId) { - stateMap.compute(txnId, (id, cur) -> { + txnIdMap.compute(txnId, (id, cur) -> { if (cur == null) return null; - Iterator iter = cur.values().iterator(); - while (iter.hasNext()) - { - TraceState state = iter.next(); - state.permits = 0; - if (state.isEmpty()) iter.remove(); - } - return cur.isEmpty() ? null : cur; + return cur.stopTracing(); }); } - public void erasePermits(TxnId txnId, TraceEventType eventType) - { - setPermits(txnId, eventType, 0); - } - public void eraseEvents(TxnId txnId) { - stateMap.compute(txnId, (id, cur) -> { + txnIdMap.compute(txnId, (id, cur) -> { if (cur == null) return null; - Iterator iter = cur.values().iterator(); - while (iter.hasNext()) - { - TraceState state = iter.next(); - count.addAndGet(-state.size()); - state.truncate(state.size()); - if (state.permits == 0) iter.remove(); - } - return cur.isEmpty() ? null : cur; + return cur.eraseEvents(globalCount); }); } - public void eraseEvents(TxnId txnId, TraceEventType eventType) + public void eraseEvents(TxnId txnId, CoordinationKind kind) { - stateMap.compute(txnId, (id, cur) -> { - if (cur != null) - { - TraceState state = cur.get(eventType); - if (state == null) - return cur; + txnIdMap.compute(txnId, (id, cur) -> { + if (cur == null) + return null; - count.addAndGet(-state.size()); - state.truncate(state.size()); - if (state.permits == 0) - cur.remove(eventType); - if (cur.isEmpty()) - return null; - } - return cur; + return cur.eraseEvents(kind, globalCount); }); } - public void eraseEventsBefore(TxnId txnId, TraceEventType eventType, long timestamp) + public void eraseEventsBetween(TxnId txnId, long minIdInclusive, long maxIdInclusive) { - stateMap.compute(txnId, (id, cur) -> { - if (cur != null) - { - TraceState state = cur.get(eventType); - if (state == null) - return cur; + txnIdMap.compute(txnId, (id, cur) -> { + if (cur == null) + return null; - int i = 0; - while (i < state.size() && state.get(i).idMicros < timestamp) - ++i; - state.truncate(i); - count.addAndGet(-i); - if (cur.isEmpty()) - return null; - } - return cur; + return cur.eraseEventsBetween(minIdInclusive, maxIdInclusive, globalCount); }); } public void eraseAllEvents() { - stateMap.keySet().forEach(this::eraseEvents); + txnIdMap.keySet().forEach(this::eraseEvents); } - public void eraseAllPermits() + public void eraseAllBuckets() { - stateMap.keySet().forEach(this::erasePermits); + txnIdMap.keySet().forEach(this::stopTracing); } public void forEach(Predicate include, ConsumeState forEach) { - stateMap.forEach((txnId, state) -> { + txnIdMap.forEach((txnId, state) -> { if (include.test(txnId)) { // ensure lock is held for duration of callback - stateMap.compute(txnId, (id, cur) -> { - if (cur != null) - cur.forEach((event, events) -> forEach.accept(txnId, event, events.permits, Collections.unmodifiableList(events))); + txnIdMap.compute(txnId, (id, cur) -> { + forEach.accept(txnId, cur); return cur; }); } }); } + + public void setPattern(int id, Function pattern, BucketMode newBucketMode, int newBucketSeen, int newBucketSize, BucketMode newTraceBucketMode, int newTraceBucketSize, int newTraceBucketSubSize, CoordinationKinds newTraceEvents) + { + synchronized (allPatterns) + { + TracePatternState state = findPattern(id, false); + TracePatternState update = state != null ? state : new TracePatternState(id); + boolean prevTraceNew = state != null && state.pattern.traceNew != null; + update.set(pattern, newBucketMode, newBucketSeen, newBucketSize, newTraceBucketMode, newTraceBucketSize, newTraceBucketSubSize, newTraceEvents); + if (state == null) + allPatterns.add(update); + if (update.pattern.traceNew != null && !prevTraceNew) + traceNewPatterns.add(update); + else if (update.pattern.traceNew == null && prevTraceNew) + traceNewPatterns.remove(update); + } + } + + public void erasePattern(int id) + { + TracePatternState removed = findPattern(id, true); + if (removed != null) + removed.clear(); + } + + + public void erasePatternTraces(int id) + { + TracePatternState state = findPattern(id, false); + if (state != null) + state.clear(); + } + + private TracePatternState findPattern(int id, boolean remove) + { + synchronized (allPatterns) + { + for (int i = 0; i < allPatterns.size() ; ++i) + { + TracePatternState state = allPatterns.get(i); + if (state.id == id) + { + if (remove) + { + allPatterns.remove(i); + if (state.pattern.traceNew != null) + traceNewPatterns.remove(state); + } + return state; + } + } + } + return null; + } + + public void eraseAllPatterns() + { + List removed = new ArrayList<>(); + allPatterns.removeIf(p -> { removed.add(p); return true; }); + removed.forEach(TracePatternState::clear); + } + + public void eraseAllPatternTraces() + { + for (TracePatternState state : allPatterns) + state.clear(); + } + + public void forEachPattern(Consumer consumer) + { + allPatterns.forEach(pattern -> { + synchronized (pattern) + { + consumer.accept(pattern); + } + }); + } + + @Override + public void onFailed(Throwable failure, TxnId txnId, Participants participants, Coordination coordination) + { + TxnEvents tracing = maybeTrace(txnId, participants, coordination.kind(), NewOrFailure.FAILURE, allPatterns); + if (tracing != null) + { + txnIdMap.compute(txnId, (id, cur) -> { + if (cur != tracing) + return cur; + + TxnEvent event = tracing.forceTrace(coordination.kind(), globalCount); + if (event == null) // we still honour global limit + return cur; + + event.trace(null, "Failed Coordination Dump"); + { + String description = coordination.describe(); + if (description != null) + event.trace(null, "Description: %s", description); + } + { + Participants scope = coordination.scope(); + if (scope != null) + event.trace(null, "Scope: %s", scope); + } + { + AbstractTracker tracker = coordination.tracker(); + if (tracker != null) + event.trace(null, "Tracker: %s", tracker.summariseTracker()); + } + { + SortedListMap replies = coordination.replies(); + if (replies != null) + { + for (int i = 0 ; i < replies.domainSize() ; ++i) + event.trace(null, "from %s: %s", replies.getKey(i), replies.getValue(i)); + } + } + return cur; + }); + } + } + + private TxnEvents maybeTrace(TxnId txnId, @Nullable Participants participants, CoordinationKind kind, NewOrFailure newOrFailure, List patterns) + { + if (patterns.isEmpty()) + return null; + + for (TracePatternState state : patterns) + { + TxnEvents added = state.maybeAdd(txnId, participants, kind, newOrFailure); + if (added != null) + return added; + } + return null; + } } diff --git a/src/java/org/apache/cassandra/service/accord/TokenRange.java b/src/java/org/apache/cassandra/service/accord/TokenRange.java index 08750a841f18..91be38ec2fc8 100644 --- a/src/java/org/apache/cassandra/service/accord/TokenRange.java +++ b/src/java/org/apache/cassandra/service/accord/TokenRange.java @@ -173,4 +173,21 @@ public long serializedSize(TokenRange t) + TokenKey.noTableSerializer.serializedSize(t.end()); } }; + + public static TokenRange parse(String str, IPartitioner partitioner) + { + TableId tableId; + { + int split = str.indexOf(':', str.startsWith("tid:") ? 4 : 0); + tableId = TableId.fromString(str.substring(0, split)); + str = str.substring(split + 2, str.length() - 1); + } + + String[] bounds = str.split(","); + if (bounds.length != 2) + throw new IllegalArgumentException("Invalid TokenRange: " + str); + + return new TokenRange(TokenKey.parse(tableId, bounds[0], partitioner), TokenKey.parse(tableId, bounds[1], partitioner)); + } + } diff --git a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java index e09a565c4130..4f3e0ace119f 100644 --- a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java +++ b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java @@ -35,7 +35,7 @@ import accord.api.ProgressLog.BlockedUntil; import accord.api.RoutingKey; import accord.api.Tracing; -import accord.api.TraceEventType; +import accord.coordinate.Coordination; import accord.local.Command; import accord.local.Node; import accord.local.SafeCommand; @@ -43,6 +43,7 @@ import accord.local.TimeService; import accord.messages.ReplyContext; import accord.primitives.Keys; +import accord.primitives.Participants; import accord.primitives.Ranges; import accord.primitives.Routable; import accord.primitives.Status; @@ -62,7 +63,6 @@ import accord.utils.async.Cancellable; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.exceptions.RequestTimeoutException; -import org.apache.cassandra.metrics.AccordCoordinatorMetrics; import org.apache.cassandra.metrics.AccordReplicaMetrics; import org.apache.cassandra.net.ResponseContext; import org.apache.cassandra.service.accord.AccordService; @@ -98,6 +98,7 @@ public class AccordAgent implements Agent, OwnershipEventListener { private static final Logger logger = LoggerFactory.getLogger(AccordAgent.class); private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 1L, MINUTES); + private static final ReplicaEventListener replicaEventListener = new AccordReplicaMetrics.Listener(); private static BiConsumer onFailedBarrier; public static void setOnFailedBarrier(BiConsumer newOnFailedBarrier) { onFailedBarrier = newOnFailedBarrier; } @@ -121,9 +122,9 @@ public AccordTracing tracing() } @Override - public @Nullable Tracing trace(TxnId txnId, TraceEventType eventType) + public @Nullable Tracing trace(TxnId txnId, Participants participants, Coordination.CoordinationKind eventType) { - return tracing.trace(txnId, eventType); + return tracing.trace(txnId, participants, eventType); } @Override @@ -225,13 +226,13 @@ public Txn emptySystemTxn(Kind kind, Routable.Domain domain) @Override public CoordinatorEventListener coordinatorEvents() { - return AccordCoordinatorMetrics.Listener.instance; + return tracing; } @Override public ReplicaEventListener replicaEvents() { - return AccordReplicaMetrics.Listener.instance; + return replicaEventListener; } private static final long ONE_SECOND = SECONDS.toMicros(1L); diff --git a/src/java/org/apache/cassandra/service/accord/api/TokenKey.java b/src/java/org/apache/cassandra/service/accord/api/TokenKey.java index b71be2f333ae..88a32babd2d5 100644 --- a/src/java/org/apache/cassandra/service/accord/api/TokenKey.java +++ b/src/java/org/apache/cassandra/service/accord/api/TokenKey.java @@ -149,12 +149,15 @@ public String toString() public static TokenKey parse(String str, IPartitioner partitioner) { - TableId tableId; - { - int split = str.indexOf(':', str.startsWith("tid:") ? 4 : 0); - tableId = TableId.fromString(str.substring(0, split)); - str = str.substring(split + 1); - } + + int split = str.indexOf(':', str.startsWith("tid:") ? 4 : 0); + TableId tableId = TableId.fromString(str.substring(0, split)); + str = str.substring(split + 1); + return parse(tableId, str, partitioner); + } + + public static TokenKey parse(TableId tableId, String str, IPartitioner partitioner) + { if (str.endsWith("Inf")) { return new TokenKey(tableId, str.charAt(0) == '-' ? MIN_TABLE_SENTINEL : MAX_TABLE_SENTINEL, partitioner.getMinimumToken()); diff --git a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java index 0889ebffe509..c69d20551766 100644 --- a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java +++ b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java @@ -20,6 +20,7 @@ import java.util.Collections; import java.util.ArrayList; +import java.util.EnumSet; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -127,16 +128,16 @@ public class AccordDebugKeyspaceTest extends CQLTester String.format("SELECT txn_id, save_status FROM %s.%s WHERE node_id = ? AND txn_id=?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE, AccordDebugKeyspace.JOURNAL); private static final String SET_TRACE = - String.format("UPDATE %s.%s SET permits = ? WHERE txn_id = ? AND event_type = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_TRACE); + String.format("UPDATE %s.%s SET bucket_size = ?, trace_events = ? WHERE txn_id = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_TRACE); private static final String SET_TRACE_REMOTE = - String.format("UPDATE %s.%s SET permits = ? WHERE node_id = ? AND txn_id = ? AND event_type = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE, AccordDebugKeyspace.TXN_TRACE); + String.format("UPDATE %s.%s SET bucket_size = ?, trace_events = ? WHERE node_id = ? AND txn_id = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE, AccordDebugKeyspace.TXN_TRACE); private static final String QUERY_TRACE = - String.format("SELECT * FROM %s.%s WHERE txn_id = ? AND event_type = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_TRACE); + String.format("SELECT txn_id, bucket_size, trace_events FROM %s.%s WHERE txn_id = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_TRACE); private static final String QUERY_TRACE_REMOTE = - String.format("SELECT * FROM %s.%s WHERE node_id = ? AND txn_id = ? AND event_type = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE, AccordDebugKeyspace.TXN_TRACE); + String.format("SELECT node_id, txn_id, bucket_size, trace_events FROM %s.%s WHERE node_id = ? AND txn_id = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE, AccordDebugKeyspace.TXN_TRACE); private static final String UNSET_TRACE1 = String.format("DELETE FROM %s.%s WHERE txn_id = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_TRACE); @@ -144,33 +145,27 @@ public class AccordDebugKeyspaceTest extends CQLTester private static final String UNSET_TRACE1_REMOTE = String.format("DELETE FROM %s.%s WHERE node_id = ? AND txn_id = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE, AccordDebugKeyspace.TXN_TRACE); - private static final String UNSET_TRACE2 = - String.format("DELETE FROM %s.%s WHERE txn_id = ? AND event_type = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_TRACE); - - private static final String UNSET_TRACE2_REMOTE = - String.format("DELETE FROM %s.%s WHERE node_id = ? AND txn_id = ? AND event_type = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE, AccordDebugKeyspace.TXN_TRACE); + private static final String QUERY_ALL_TRACES = + String.format("SELECT * FROM %s.%s WHERE txn_id = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_TRACES); private static final String QUERY_TRACES = - String.format("SELECT * FROM %s.%s WHERE txn_id = ? AND event_type = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_TRACES); + String.format("SELECT * FROM %s.%s WHERE txn_id = ? AND event = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_TRACES); private static final String QUERY_TRACES_REMOTE = - String.format("SELECT * FROM %s.%s WHERE node_id = ? AND txn_id = ? AND event_type = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE, AccordDebugKeyspace.TXN_TRACES); + String.format("SELECT * FROM %s.%s WHERE node_id = ? AND txn_id = ? AND event = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE, AccordDebugKeyspace.TXN_TRACES); private static final String ERASE_TRACES1 = - String.format("DELETE FROM %s.%s WHERE txn_id = ? AND event_type = ? AND id_micros < ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_TRACES); + String.format("DELETE FROM %s.%s WHERE txn_id = ? AND id_micros < ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_TRACES); private static final String ERASE_TRACES1_REMOTE = - String.format("DELETE FROM %s.%s WHERE node_id = ? AND txn_id = ? AND event_type = ? AND id_micros < ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE, AccordDebugKeyspace.TXN_TRACES); - - private static final String ERASE_TRACES2 = - String.format("DELETE FROM %s.%s WHERE txn_id = ? AND event_type = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_TRACES); - - private static final String ERASE_TRACES2_REMOTE = - String.format("DELETE FROM %s.%s WHERE node_id = ? AND txn_id = ? AND event_type = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE, AccordDebugKeyspace.TXN_TRACES); + String.format("DELETE FROM %s.%s WHERE node_id = ? AND txn_id = ? AND id_micros < ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE, AccordDebugKeyspace.TXN_TRACES); private static final String ERASE_TRACES3 = String.format("DELETE FROM %s.%s WHERE txn_id = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_TRACES); + private static final String TRUNCATE_TRACES = + String.format("TRUNCATE %s.%s", SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_TRACES); + private static final String ERASE_TRACES3_REMOTE = String.format("DELETE FROM %s.%s WHERE node_id = ? AND txn_id = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE, AccordDebugKeyspace.TXN_TRACES); @@ -192,6 +187,15 @@ public class AccordDebugKeyspaceTest extends CQLTester private static final String QUERY_REDUNDANT_BEFORE_FILTER_SHARD_APPLIED_GEQ_REMOTE = String.format("SELECT * FROM %s.%s WHERE node_id = ? AND shard_applied >= ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE, AccordDebugKeyspace.REDUNDANT_BEFORE); + private static final String SET_PATTERN_TRACE = + String.format("UPDATE %s.%s SET bucket_mode = ?, bucket_seen = ?, bucket_size = ?, chance = ?, if_intersects = ?, if_kind = ?, on_failure = ?, on_new = ?, trace_bucket_mode = ?, trace_bucket_size = ?, trace_bucket_sub_size = ?, trace_events = ? WHERE id = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_PATTERN_TRACE); + + private static final String UNSET_PATTERN_TRACE = + String.format("DELETE FROM %s.%s WHERE id = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_PATTERN_TRACE); + + private static final String QUERY_PATTERN_TRACE = + String.format("SELECT * FROM %s.%s WHERE id = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_PATTERN_TRACE); + @BeforeClass public static void setUpClass() { @@ -239,40 +243,31 @@ public void tracing() Txn txn = createTxn(wrapInTxn(String.format("INSERT INTO %s.%s(k, c, v) VALUES (?, ?, ?)", KEYSPACE, tableName)), 0, 0, 0); filter.appliesTo(id); - execute(SET_TRACE, 1, id.toString(), "WAIT_PROGRESS"); - assertRows(execute(QUERY_TRACE, id.toString(), "WAIT_PROGRESS"), row(id.toString(), "WAIT_PROGRESS", 1)); - assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString(), "WAIT_PROGRESS"), row(nodeId, id.toString(), "WAIT_PROGRESS", 1)); - execute(SET_TRACE, 0, id.toString(), "WAIT_PROGRESS"); - assertRows(execute(QUERY_TRACE, id.toString(), "WAIT_PROGRESS")); - assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString(), "WAIT_PROGRESS")); - execute(SET_TRACE, 1, id.toString(), "WAIT_PROGRESS"); - assertRows(execute(QUERY_TRACE, id.toString(), "WAIT_PROGRESS"), row(id.toString(), "WAIT_PROGRESS", 1)); - assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString(), "WAIT_PROGRESS"), row(nodeId, id.toString(), "WAIT_PROGRESS", 1)); + execute(SET_TRACE, 1, "{WaitProgress}", id.toString()); + assertRows(execute(QUERY_TRACE, id.toString()), row(id.toString(), 1, "{WaitProgress}")); + assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString()), row(nodeId, id.toString(), 1, "{WaitProgress}")); + execute(SET_TRACE, 0, "{}", id.toString()); + assertRows(execute(QUERY_TRACE, id.toString())); + assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString())); + execute(SET_TRACE, 1, "{WaitProgress}", id.toString()); + assertRows(execute(QUERY_TRACE, id.toString()), row(id.toString(), 1, "{WaitProgress}")); + assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString()), row(nodeId, id.toString(), 1, "{WaitProgress}")); execute(UNSET_TRACE1, id.toString()); - assertRows(execute(QUERY_TRACE, id.toString(), "WAIT_PROGRESS")); - assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString(), "WAIT_PROGRESS")); - execute(SET_TRACE, 1, id.toString(), "WAIT_PROGRESS"); - assertRows(execute(QUERY_TRACE, id.toString(), "WAIT_PROGRESS"), row(id.toString(), "WAIT_PROGRESS", 1)); - assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString(), "WAIT_PROGRESS"), row(nodeId, id.toString(), "WAIT_PROGRESS", 1)); - execute(UNSET_TRACE2, id.toString(), "WAIT_PROGRESS"); - assertRows(execute(QUERY_TRACE, id.toString(), "WAIT_PROGRESS")); - assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString(), "WAIT_PROGRESS")); - execute(SET_TRACE, 1, id.toString(), "WAIT_PROGRESS"); - assertRows(execute(QUERY_TRACE, id.toString(), "WAIT_PROGRESS"), row(id.toString(), "WAIT_PROGRESS", 1)); - assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString(), "WAIT_PROGRESS"), row(nodeId, id.toString(), "WAIT_PROGRESS", 1)); - filter.appliesTo(id); + assertRows(execute(QUERY_TRACE, id.toString())); + assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString())); + execute(SET_TRACE, 1, "{WaitProgress}", id.toString()); + assertRows(execute(QUERY_TRACE, id.toString()), row(id.toString(), 1, "{WaitProgress}")); + assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString()), row(nodeId, id.toString(), 1, "{WaitProgress}")); accord.node().coordinate(id, txn).beginAsResult(); filter.preAccept.awaitThrowUncheckedOnInterrupt(); filter.apply.awaitThrowUncheckedOnInterrupt(); - spinUntilSuccess(() -> Assertions.assertThat(execute(QUERY_TRACES, id.toString(), "WAIT_PROGRESS").size()).isGreaterThan(0)); - spinUntilSuccess(() -> Assertions.assertThat(execute(QUERY_TRACES_REMOTE, nodeId, id.toString(), "WAIT_PROGRESS").size()).isGreaterThan(0)); - execute(ERASE_TRACES1, id.toString(), "FETCH", Long.MAX_VALUE); - execute(ERASE_TRACES2, id.toString(), "FETCH"); - execute(ERASE_TRACES1, id.toString(), "WAIT_PROGRESS", Long.MAX_VALUE); - Assertions.assertThat(execute(QUERY_TRACES, id.toString(), "WAIT_PROGRESS").size()).isEqualTo(0); - Assertions.assertThat(execute(QUERY_TRACES_REMOTE, nodeId, id.toString(), "WAIT_PROGRESS").size()).isEqualTo(0); + spinUntilSuccess(() -> Assertions.assertThat(execute(QUERY_TRACES, id.toString(), "WaitProgress").size()).isGreaterThan(0)); + spinUntilSuccess(() -> Assertions.assertThat(execute(QUERY_TRACES_REMOTE, nodeId, id.toString(), "WaitProgress").size()).isGreaterThan(0)); + execute(ERASE_TRACES1, id.toString(), Long.MAX_VALUE); + execute(ERASE_TRACES1, id.toString(), Long.MAX_VALUE); + Assertions.assertThat(execute(QUERY_TRACES, id.toString(), "WaitProgress").size()).isEqualTo(0); + Assertions.assertThat(execute(QUERY_TRACES_REMOTE, nodeId, id.toString(), "WaitProgress").size()).isEqualTo(0); // just check other variants don't fail - execute(ERASE_TRACES2, id.toString(), "WAIT_PROGRESS"); execute(ERASE_TRACES3, id.toString()); } @@ -280,6 +275,137 @@ public void tracing() { MessagingService.instance().outboundSink.remove(filter); } + + filter = new AccordMsgFilter(); + MessagingService.instance().outboundSink.add(filter); + try + { + TxnId id = accord.node().nextTxnIdWithDefaultFlags(Txn.Kind.Write, Routable.Domain.Key); + Txn txn = createTxn(wrapInTxn(String.format("INSERT INTO %s.%s(k, c, v) VALUES (?, ?, ?)", KEYSPACE, tableName)), 1, 1, 1); + filter.appliesTo(id); + + execute(SET_TRACE_REMOTE, 1, "{WaitProgress}", nodeId, id.toString()); + assertRows(execute(QUERY_TRACE, id.toString()), row(id.toString(), 1, "{WaitProgress}")); + assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString()), row(nodeId, id.toString(), 1, "{WaitProgress}")); + execute(SET_TRACE_REMOTE, 0, "{}", nodeId, id.toString()); + assertRows(execute(QUERY_TRACE, id.toString())); + assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString())); + execute(SET_TRACE_REMOTE, 1, "{WaitProgress}", nodeId, id.toString()); + assertRows(execute(QUERY_TRACE, id.toString()), row(id.toString(), 1, "{WaitProgress}")); + assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString()), row(nodeId, id.toString(), 1, "{WaitProgress}")); + execute(UNSET_TRACE1_REMOTE, nodeId, id.toString()); + assertRows(execute(QUERY_TRACE, id.toString())); + assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString())); + execute(SET_TRACE_REMOTE, 1, "{WaitProgress}", nodeId, id.toString()); + assertRows(execute(QUERY_TRACE, id.toString()), row(id.toString(), 1, "{WaitProgress}")); + assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString()), row(nodeId, id.toString(), 1, "{WaitProgress}")); + accord.node().coordinate(id, txn).beginAsResult(); + filter.preAccept.awaitThrowUncheckedOnInterrupt(); + filter.apply.awaitThrowUncheckedOnInterrupt(); + spinUntilSuccess(() -> Assertions.assertThat(execute(QUERY_TRACES, id.toString(), "WaitProgress").size()).isGreaterThan(0)); + spinUntilSuccess(() -> Assertions.assertThat(execute(QUERY_TRACES_REMOTE, nodeId, id.toString(), "WaitProgress").size()).isGreaterThan(0)); + execute(ERASE_TRACES1_REMOTE, nodeId, id.toString(), Long.MAX_VALUE); + execute(ERASE_TRACES1_REMOTE, nodeId, id.toString(), Long.MAX_VALUE); + Assertions.assertThat(execute(QUERY_TRACES, id.toString(), "WaitProgress").size()).isEqualTo(0); + Assertions.assertThat(execute(QUERY_TRACES_REMOTE, nodeId, id.toString(), "WaitProgress").size()).isEqualTo(0); + // just check other variants don't fail + execute(ERASE_TRACES3_REMOTE, nodeId, id.toString()); + + } + finally + { + MessagingService.instance().outboundSink.remove(filter); + } + } + + @Test + public void patternTracing() + { + // simple test to confirm basic tracing functionality works, doesn't validate specific behaviours only requesting/querying/erasing + String tableName = createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY KEY (k, c)) WITH transactional_mode = 'full'"); + AccordService accord = accord(); + DatabaseDescriptor.getAccord().fetch_txn = "1s"; + + execute(SET_PATTERN_TRACE, "leaky", 0, 5, 1.0f, "tid:1:1|tid:1:2", "-{*X}", "-{WaitProgress}", "{}", "ring", 5, 1, "*", 1); + assertRows(execute(QUERY_PATTERN_TRACE, 1), row(1, "LEAKY", 0, 5, 1.0f, 0, "tid:1:1|tid:1:2", "-{KX,RX}", "-{WaitProgress}", "{}", "RING", 5, 1, "*")); + execute(UNSET_PATTERN_TRACE, 1); + assertRows(execute(QUERY_PATTERN_TRACE, 1)); + + RoutingKey matchKey; + { + Txn txn = createTxn(wrapInTxn(String.format("INSERT INTO %s.%s(k, c, v) VALUES (?, ?, ?)", KEYSPACE, tableName)), 0, 0, 0); + matchKey = (RoutingKey) txn.keys().toParticipants().get(0); + } + + int count = 5; + { + List txnIds = new ArrayList<>(); + execute(SET_PATTERN_TRACE, "leaky", 0, count, 1.0f, matchKey.toString(), "*", "{}", "*", "leaky", 1, 1, "*", 1); + for (int i = 0 ; i < count + 1 ; ++i) + { + TxnId id = accord.node().nextTxnIdWithDefaultFlags(Txn.Kind.Write, Routable.Domain.Key); + Txn txn = createTxn(wrapInTxn(String.format("INSERT INTO %s.%s(k, c, v) VALUES (?, ?, ?)", KEYSPACE, tableName)), 0, i, 0); + getBlocking(accord.node().coordinate(id, txn)); + if (i < count) assertRows(execute(QUERY_TRACE, id.toString()), row(id.toString(), 1, "*")); + else assertRows(execute(QUERY_TRACE, id.toString())); + txnIds.add(id); + } + + execute(UNSET_PATTERN_TRACE, 1); + for (int i = 0 ; i < count ; ++i) + assertRows(execute(QUERY_TRACE, txnIds.get(i).toString())); + } + + { + execute(SET_PATTERN_TRACE, "leaky", 0, count, 1.0f, matchKey.asRange().toString(), "{KE}", "{}", "{PreAccept}", "leaky", 1, 1, "*", 1); + for (int i = 0 ; i < count ; ++i) + { + TxnId id = accord.node().nextTxnIdWithDefaultFlags(Txn.Kind.Write, Routable.Domain.Key); + Txn txn = createTxn(wrapInTxn(String.format("INSERT INTO %s.%s(k, c, v) VALUES (?, ?, ?)", KEYSPACE, tableName)), 0, i, 0); + getBlocking(accord.node().coordinate(id, txn)); + assertRows(execute(QUERY_TRACE, id.toString())); + } + + List txnIds = new ArrayList<>(); + for (int i = 0 ; i < count + 1 ; ++i) + { + TxnId id = accord.node().nextTxnIdWithDefaultFlags(Txn.Kind.EphemeralRead, Routable.Domain.Key); + Txn txn = createTxn(wrapInTxn(String.format("SELECT * FROM %s.%s WHERE k = ? AND c = ?", KEYSPACE, tableName)), 0, i); + getBlocking(accord.node().coordinate(id, txn)); + if (i < count) assertRows(execute(QUERY_TRACE, id.toString()), row(id.toString(), 1, "*")); + else assertRows(execute(QUERY_TRACE, id.toString())); + txnIds.add(id); + } + + execute(UNSET_PATTERN_TRACE, 1); + for (int i = 0 ; i < count ; ++i) + assertRows(execute(QUERY_TRACE, txnIds.get(i).toString())); + } + + { + TxnId id = accord.node().nextTxnIdWithDefaultFlags(Txn.Kind.Write, Routable.Domain.Key); + Txn txn = createTxn(wrapInTxn(String.format("INSERT INTO %s.%s(k, c, v) VALUES (?, ?, ?)", KEYSPACE, tableName)), 1, 1, 1); + execute(SET_PATTERN_TRACE, "leaky", 0, count, 1.0f, "" + txn.keys().get(0).toUnseekable(), "{KW}", "*", "{}", "leaky", 1, 1, "{}", 1); + + AccordMsgFilter filter = new AccordMsgFilter(); + filter.dropVerbs = EnumSet.allOf(Verb.class); + filter.appliesTo(id); + MessagingService.instance().outboundSink.add(filter); + try + { + boolean failed = false; + try { getBlocking(accord.node().coordinate(id, txn)); } + catch (Throwable ignore) { failed = true; } + Assertions.assertThat(failed).isTrue(); + } + finally + { + MessagingService.instance().outboundSink.remove(filter); + } + + spinUntilSuccess(() -> Assertions.assertThat(execute(QUERY_ALL_TRACES, id.toString()).size()).isGreaterThan(0), 60); + execute(UNSET_PATTERN_TRACE, 1); + } } @Test @@ -332,19 +458,30 @@ public void completedTxn() throws ExecutionException, InterruptedException String tableName = createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY KEY (k, c)) WITH transactional_mode = 'full'"); AccordService accord = accord(); int nodeId = accord.nodeId().id; + AccordMsgFilter filter = new AccordMsgFilter(); TxnId id = accord.node().nextTxnIdWithDefaultFlags(Txn.Kind.Write, Routable.Domain.Key); Txn txn = createTxn(wrapInTxn(String.format("INSERT INTO %s.%s(k, c, v) VALUES (?, ?, ?)", KEYSPACE, tableName)), 0, 0, 0); - String keyStr = txn.keys().get(0).toUnseekable().toString(); - getBlocking(accord.node().coordinate(id, txn)); - - spinUntilSuccess(() -> assertRows(execute(QUERY_TXN_BLOCKED_BY, id.toString()), - row(id.toString(), anyInt(), 0, "", "", any(), anyOf(SaveStatus.ReadyToExecute.name(), SaveStatus.Applying.name(), SaveStatus.Applied.name())))); - assertRows(execute(QUERY_TXN, id.toString()), row(id.toString(), "Applied")); - assertRows(execute(QUERY_TXN_REMOTE, nodeId, id.toString()), row(id.toString(), "Applied")); - assertRows(execute(QUERY_JOURNAL, id.toString()), row(id.toString(), "PreAccepted"), row(id.toString(), "Applying"), row(id.toString(), "Applied"), row(id.toString(), null)); - assertRows(execute(QUERY_JOURNAL_REMOTE, nodeId, id.toString()), row(id.toString(), "PreAccepted"), row(id.toString(), "Applying"), row(id.toString(), "Applied"), row(id.toString(), null)); - assertRows(execute(QUERY_COMMANDS_FOR_KEY, keyStr), row(id.toString(), "APPLIED_DURABLE")); - assertRows(execute(QUERY_COMMANDS_FOR_KEY_REMOTE, nodeId, keyStr), row(id.toString(), "APPLIED_DURABLE")); + filter.appliesTo(id); + filter.dropVerbs = Set.of(); + MessagingService.instance().outboundSink.add(filter); + try + { + String keyStr = txn.keys().get(0).toUnseekable().toString(); + getBlocking(accord.node().coordinate(id, txn)); + filter.apply.awaitThrowUncheckedOnInterrupt(); + spinUntilSuccess(() -> assertRows(execute(QUERY_TXN_BLOCKED_BY, id.toString()), + row(id.toString(), anyInt(), 0, "", "", any(), anyOf(SaveStatus.ReadyToExecute.name(), SaveStatus.Applying.name(), SaveStatus.Applied.name())))); + assertRows(execute(QUERY_TXN, id.toString()), row(id.toString(), anyOf("Applied", "Applying"))); + assertRows(execute(QUERY_TXN_REMOTE, nodeId, id.toString()), row(id.toString(), anyOf("Applied", "Applying"))); + assertRows(execute(QUERY_JOURNAL, id.toString()), row(id.toString(), "PreAccepted"), row(id.toString(), "Applying"), row(id.toString(), "Applied"), row(id.toString(), null)); + assertRows(execute(QUERY_JOURNAL_REMOTE, nodeId, id.toString()), row(id.toString(), "PreAccepted"), row(id.toString(), "Applying"), row(id.toString(), "Applied"), row(id.toString(), null)); + assertRows(execute(QUERY_COMMANDS_FOR_KEY, keyStr), row(id.toString(), "APPLIED_DURABLE")); + assertRows(execute(QUERY_COMMANDS_FOR_KEY_REMOTE, nodeId, keyStr), row(id.toString(), "APPLIED_DURABLE")); + } + finally + { + MessagingService.instance().outboundSink.remove(filter); + } } @Test @@ -503,13 +640,13 @@ public void blocked() throws ExecutionException, InterruptedException @Test public void patchJournalVestigialTest() { - testPatchJournal("ERASE_VESTIGIAL", "Vestigial"); + testPatchJournal("LOCALLY_ERASE_VESTIGIAL", "Vestigial"); } @Test public void patchJournalInvalidateTest() { - testPatchJournal("INVALIDATE", "Invalidated"); + testPatchJournal("LOCALLY_INVALIDATE", "Invalidated"); } @Test @@ -520,9 +657,8 @@ public void patchJournalTruncateTest() testPatchJournal("ERASE", "Erased"); Assert.fail("Should have thrown"); } - catch (Throwable t) + catch (InvalidRequestException t) { - Assert.assertTrue(t.getMessage().contains("No enum constant")); } } @@ -630,6 +766,4 @@ public boolean test(Message msg, InetAddressAndPort to) return !dropVerbs.contains(msg.verb()); } } - - } \ No newline at end of file