Skip to content
2 changes: 2 additions & 0 deletions accord-core/src/main/java/accord/api/AsyncExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import accord.utils.async.AsyncChain;
import accord.utils.async.Cancellable;

// TODO (required): consistent RejectedExecutionException handling
public interface AsyncExecutor extends Executor
{
// unlike execute, throws no exceptions, nor will not wrap the runnable
Expand All @@ -38,6 +39,7 @@ default Cancellable execute(RunOrFail run)

// Depending on this implementation this method may queue-jump, i.e. task submission order is not guaranteed.
// Make sure this is semantically safe at all call-sites.
// TODO (required): RejectedExecutionException?
default void executeMaybeImmediately(Runnable run)
{
if (!tryExecuteImmediately(run))
Expand Down
14 changes: 13 additions & 1 deletion accord-core/src/main/java/accord/api/DataStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,22 @@ static FetchResult failure(Throwable t)
*/
FetchResult fetch(Node node, SafeCommandStore safeStore, Ranges ranges, SyncPoint syncPoint, FetchRanges callback, FetchKind kind);

/**
* Logical fsync-like operation: anything within the provided ranges written to the store prior to the
* invocation of this method must be durable once the AsyncResult completes successfully. That is, a restart of the node must
* restore the DataStore to a state on or after the point at which snapshot was invoked.
*
* TODO (desired): clunky to pass integer flags around; is there a neater implementation-agnostic alternative?
*/
default void ensureDurable(CommandStore commandStore, Ranges ranges, RedundantBefore reportOnSuccess, int flags)
{
ensureDurable(commandStore, reportOnSuccess, flags);
}

/**
* Logical fsync-like operation: anything written to the store prior to the invocation of this method
* must be durable once the AsyncResult completes successfully. That is, a restart of the node must
* restore the DataStore to a state on or after the point at which snapshot was invoked.
*/
void ensureDurable(CommandStore commandStore, Ranges ranges, RedundantBefore reportOnSuccess);
void ensureDurable(CommandStore commandStore, RedundantBefore reportOnSuccess, int flags);
}
20 changes: 10 additions & 10 deletions accord-core/src/main/java/accord/api/Journal.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ enum Load
MINIMAL_WITH_DEPS
}

void open(Node node);
void start(Node node);

Command loadCommand(int store, TxnId txnId, RedundantBefore redundantBefore, DurableBefore durableBefore);
Expand All @@ -61,26 +62,25 @@ enum Load
Command.MinimalWithDeps loadMinimalWithDeps(int store, TxnId txnId, RedundantBefore redundantBefore, DurableBefore durableBefore);
void saveCommand(int store, CommandUpdate value, Runnable onFlush);

List<? extends TopologyUpdate> replayTopologies();
List<? extends TopologyUpdate> loadTopologies();
void saveTopology(TopologyUpdate topologyUpdate, Runnable onFlush);

void purge(CommandStores commandStores, EpochSupplier minEpoch);

/**
* Replays all messages from journal to rehydrate CommandStores state. Returns whether it has seen (and ignored)
* any exceptions during replay.
*/
boolean replay(CommandStores commandStores);

RedundantBefore loadRedundantBefore(int store);
NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int store);
NavigableMap<Timestamp, Ranges> loadSafeToRead(int store);
CommandStores.RangesForEpoch loadRangesForEpoch(int store);
void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable onFlush);

Persister<DurableBefore, DurableBefore> durableBeforePersister();

void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable onFlush);
void purge(CommandStores commandStores, EpochSupplier minEpoch);

/**
* Replays all messages from journal to rehydrate CommandStores state. Returns whether it has seen (and ignored)
* any exceptions during replay.
*/
boolean replay(CommandStores commandStores, Object param);

class TopologyUpdate
{
public final Int2ObjectHashMap<CommandStores.RangesForEpoch> commandStores;
Expand Down
17 changes: 17 additions & 0 deletions accord-core/src/main/java/accord/api/LocalListeners.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,23 @@ public TxnListener(TxnId waiter, TxnId waitingOn, SaveStatus awaitingStatus)
this.waitingOn = waitingOn;
this.awaitingStatus = awaitingStatus;
}

@Override
public boolean equals(Object that)
{
return that instanceof TxnListener && equals((TxnListener) that);
}

public boolean equals(TxnListener that)
{
return this.waiter.equals(that.waiter) && this.waitingOn.equals(that.waitingOn) && this.awaitingStatus == that.awaitingStatus;
}

@Override
public int hashCode()
{
throw new UnsupportedOperationException();
}
}

Iterable<TxnListener> txnListeners();
Expand Down
40 changes: 39 additions & 1 deletion accord-core/src/main/java/accord/api/ProtocolModifiers.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ static Spec parse(String description)
case "accept": accept = include; Invariants.require(cfp == DoNotChase, "Invalid to specify ChaseFixedPoint.Chase for accept"); break;
case "commit": commit = include; Invariants.require(cfp == DoNotChase, "Invalid to specify ChaseFixedPoint.Chase for commit"); break;
case "stable": stable = include; Invariants.require(cfp == DoNotChase, "Invalid to specify ChaseFixedPoint.Chase for stable"); break;
case "recover": recover = include; Invariants.require(cfp == DoNotChase, "Invalid to specify ChaseFixedPoint.Chase for stable"); break;
case "recover": recover = include; Invariants.require(cfp == DoNotChase, "Invalid to specify ChaseFixedPoint.Chase for recover"); break;
}
}

Expand Down Expand Up @@ -194,6 +194,32 @@ public static boolean discardPreAcceptDeps(TxnId txnId)
}
}

public static class RangeSpecParam
{
// Implementations may set this prior to creating any Range object to specify whether the start or end bound of its ranges are inclusive.
public static boolean END_INCLUSIVE = true;
}
public static class RangeSpec
{
public static final boolean END_INCLUSIVE = RangeSpecParam.END_INCLUSIVE;
public static boolean isEndInclusive()
{
return END_INCLUSIVE;
}
public static boolean isEndExclusive()
{
return !END_INCLUSIVE;
}
public static boolean isStartInclusive()
{
return !END_INCLUSIVE;
}
public static boolean isStartExclusive()
{
return END_INCLUSIVE;
}
}

public static class Toggles
{
private static FastPaths permittedFastPaths = new FastPaths(FastPath.values());
Expand All @@ -205,6 +231,18 @@ public static class Toggles
public static MediumPath defaultMediumPath() { return defaultMediumPath; }
public static void setDefaultMediumPath(MediumPath newDefaultMediumPath) { defaultMediumPath = newDefaultMediumPath; }

private static boolean recoveryAwaitsSupersedingSyncPoints = true;
public static boolean recoveryAwaitsSupersedingSyncPoints() { return recoveryAwaitsSupersedingSyncPoints; }
public static void setRecoveryAwaitsSupersedingSyncPoints(boolean newRecoveryAwaitsSupersedingSyncPoints) { recoveryAwaitsSupersedingSyncPoints = newRecoveryAwaitsSupersedingSyncPoints; }

private static boolean syncPointsTrackUnstableMediumPathDependencies = false;
public static boolean syncPointsTrackUnstableMediumPathDependencies() { return syncPointsTrackUnstableMediumPathDependencies; }
public static void setSyncPointsTrackUnstableMediumPathDependencies(boolean newSyncPointsTrackUnstableMediumPathDependencies) { syncPointsTrackUnstableMediumPathDependencies = newSyncPointsTrackUnstableMediumPathDependencies; }

private static boolean recoverPartialAcceptPhaseIfNoFastPath = false;
public static boolean recoverPartialAcceptPhaseIfNoFastPath() { return recoverPartialAcceptPhaseIfNoFastPath; }
public static void setRecoverPartialAcceptPhaseIfNoFastPath(boolean newSyncPointsRecoverPartialAcceptPhase) {recoverPartialAcceptPhaseIfNoFastPath = newSyncPointsRecoverPartialAcceptPhase; }

private static boolean filterDuplicateDependenciesFromAcceptReply = true;
public static boolean filterDuplicateDependenciesFromAcceptReply() { return filterDuplicateDependenciesFromAcceptReply; }
public static void setFilterDuplicateDependenciesFromAcceptReply(boolean newFilterDuplicateDependenciesFromAcceptReply) { filterDuplicateDependenciesFromAcceptReply = newFilterDuplicateDependenciesFromAcceptReply; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@

import static accord.api.ProtocolModifiers.Toggles.permitLocalExecution;
import static accord.coordinate.ExecutePath.EPHEMERAL;
import static accord.coordinate.ExecutePath.FAST;
import static accord.coordinate.ReadCoordinator.Action.Approve;
import static accord.coordinate.ReadCoordinator.Action.ApprovePartial;
import static accord.primitives.Status.Phase.Execute;
Expand Down
32 changes: 17 additions & 15 deletions accord-core/src/main/java/accord/coordinate/ExecuteTxn.java
Original file line number Diff line number Diff line change
Expand Up @@ -399,20 +399,23 @@ protected void onDone(Success success, Throwable failure)
}
else
{
stable.informStableOnceQuorum();
if (sendOnlyReadStableMessages())
if (!isPrivilegedVoteCommitting)
{
// send additional stable messages to record the transaction outcome
Commit.Kind kind = commitKind();
if (!candidates.isEmpty())
stable.informStableOnceQuorum();
if (sendOnlyReadStableMessages())
{
for (int i = 0, size = candidates.size() ; i < size ; ++i)
sendStableOnly(candidates.get(i), kind);
}
if (unstableFastReads != null)
{
for (Node.Id to : unstableFastReads)
sendStableOnly(to, kind);
// send additional stable messages to record the transaction outcome
Commit.Kind kind = commitKind();
if (!candidates.isEmpty())
{
for (int i = 0, size = candidates.size() ; i < size ; ++i)
sendStableOnly(candidates.get(i), kind);
}
if (unstableFastReads != null)
{
for (Node.Id to : unstableFastReads)
sendStableOnly(to, kind);
}
}
}
invokeCallback(null, failure);
Expand All @@ -430,9 +433,8 @@ public void onSlowResponse(Id from)
@Override
public void onFailure(Id from, Throwable failure)
{
super.onFailure(from, failure);
if (isPrivilegedVoteCommitting && from.id == node.id().id)
tryFinishOnFailure();
if (isPrivilegedVoteCommitting && from.id == node.id().id) finishWithFailure(failure);
else super.onFailure(from, failure);
}

protected CoordinationAdapter<Result> adapter()
Expand Down
14 changes: 7 additions & 7 deletions accord-core/src/main/java/accord/coordinate/KeyBarriers.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.slf4j.LoggerFactory;

import accord.api.RoutingKey;
import accord.api.VisibleForImplementation;
import accord.local.CommandSummaries;
import accord.local.MapReduceConsumeCommandStores;
import accord.local.Node;
Expand All @@ -48,8 +49,6 @@
import static accord.local.CommandSummaries.SummaryStatus.APPLIED;
import static accord.local.CommandSummaries.SummaryStatus.COMMITTED;
import static accord.local.CommandSummaries.SummaryStatus.INVALIDATED;
import static accord.local.CommandSummaries.ComputeIsDep.IGNORE;
import static accord.local.CommandSummaries.TestStartedAt.STARTED_AFTER;
import static accord.local.durability.DurabilityService.SyncLocal.NoLocal;
import static accord.local.durability.DurabilityService.SyncLocal.Self;
import static accord.local.durability.DurabilityService.SyncRemote.NoRemote;
Expand All @@ -63,6 +62,7 @@
* Facility for finding existing key transactions that can serve as a barrier transaction,
* ensuring all reads/writes after some point in the txn log have been executed.
*/
@VisibleForImplementation
public class KeyBarriers
{
@SuppressWarnings("unused")
Expand All @@ -84,7 +84,7 @@ public Found(TxnId txnId, RoutingKey key, SyncLocal knownLocal, SyncRemote known
}
}

public static AsyncResult<Found> find(Node node, Timestamp min, RoutingKey key, SyncLocal syncLocal, SyncRemote syncRemote)
public static AsyncResult<Found> find(Node node, TxnId min, RoutingKey key, SyncLocal syncLocal, SyncRemote syncRemote)
{
Find find = new Find(min, key, syncLocal, syncRemote);
node.commandStores().mapReduceConsume(min.epoch(), Long.MAX_VALUE, find);
Expand All @@ -98,16 +98,16 @@ public static AsyncResult<Found> find(Node node, Timestamp min, RoutingKey key,
* For Applied we can return success immediately with the executeAt epoch. For PreApplied we can add
* a listener for when it transitions to Applied and then return success.
*/
static class Find extends MapReduceConsumeCommandStores<RoutingKeys, Found> implements CommandSummaries.AllCommandVisitor
static class Find extends MapReduceConsumeCommandStores<RoutingKeys, Found> implements CommandSummaries.SupersedingCommandVisitor
{
final AsyncResults.SettableByCallback<Found> result = new AsyncResults.SettableByCallback<>();
final Timestamp min;
final TxnId min;
final RoutingKey find;
final SyncLocal syncLocal;
final SyncRemote syncRemote;
Found found;

Find(Timestamp min, RoutingKey find, SyncLocal syncLocal, SyncRemote syncRemote)
Find(TxnId min, RoutingKey find, SyncLocal syncLocal, SyncRemote syncRemote)
{
super(RoutingKeys.of(find));
this.min = min;
Expand All @@ -122,7 +122,7 @@ public Found applyInternal(SafeCommandStore safeStore)
// Barriers are trying to establish that committed transactions are applied before the barrier (or in this case just minEpoch)
// so all existing transaction types should ensure that at this point. An earlier txnid may have an executeAt that is after
// this barrier or the transaction we listen on and that is fine
safeStore.visit(scope, TxnId.NONE, Ws, STARTED_AFTER, min, IGNORE, this);
safeStore.visit(scope, min, Ws, this);
return found;
}

Expand Down
17 changes: 13 additions & 4 deletions accord-core/src/main/java/accord/coordinate/Propose.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import accord.local.Node.Id;
import accord.local.SequentialAsyncExecutor;
import accord.messages.Accept;
import accord.messages.Accept.AcceptFlags;
import accord.messages.Accept.AcceptReply;
import accord.messages.Callback;
import accord.primitives.Ballot;
Expand All @@ -47,9 +48,9 @@
import accord.utils.SortedListMap;
import accord.utils.Rethrowable;

import static accord.api.ProtocolModifiers.Toggles.filterDuplicateDependenciesFromAcceptReply;
import static accord.coordinate.tracking.RequestStatus.Failed;
import static accord.coordinate.tracking.RequestStatus.Success;
import static accord.messages.Accept.AcceptFlags.filterDeps;
import static accord.messages.Commit.Invalidate.commitInvalidate;
import static accord.primitives.Routables.Slice.Minimal;
import static accord.primitives.Status.AcceptedInvalidate;
Expand All @@ -66,6 +67,7 @@ abstract class Propose<R> extends AbstractCoordination<FullRoute<?>, R, AcceptRe

final Timestamp executeAt;
final QuorumTracker tracker;
final int acceptFlags;

Propose(Node node, SequentialAsyncExecutor executor, Topologies topologies, Accept.Kind kind, Ballot ballot, TxnId txnId, Txn txn, Route<?> require, FullRoute<?> route, Timestamp executeAt, Deps deps, BiConsumer<? super R, Throwable> callback)
{
Expand All @@ -77,6 +79,7 @@ abstract class Propose<R> extends AbstractCoordination<FullRoute<?>, R, AcceptRe
this.deps = deps;
this.executeAt = executeAt;
this.tracker = new QuorumTracker(topologies);
this.acceptFlags = AcceptFlags.encode(txnId, require != scope);
Invariants.require(txnId.isSyncPoint() || deps.maxTxnId(txnId).compareTo(executeAt) <= 0,
"Attempted to propose %s with an earlier executeAt than a conflicting transaction it witnessed: %s vs executeAt: %s", txnId, deps, executeAt);
Invariants.require(topologies.currentEpoch() == executeAt.epoch());
Expand All @@ -86,7 +89,7 @@ abstract class Propose<R> extends AbstractCoordination<FullRoute<?>, R, AcceptRe
void start()
{
super.start();
contact(to -> new Accept(to, tracker.topologies(), kind, ballot, txnId, scope, executeAt, deps, require != scope));
contact(to -> new Accept(to, tracker.topologies(), kind, ballot, txnId, scope, executeAt, deps, acceptFlags));
}

@Override
Expand Down Expand Up @@ -163,15 +166,22 @@ Deps mergeDeps(Deps newDeps)
Deps mergeNewDeps()
{
SortedListMap<Node.Id, AcceptReply> oks = finishOks();
if (!AcceptFlags.calculateDeps(acceptFlags))
return Deps.NONE;

Deps deps = Deps.merge(oks, oks.domainSize(), SortedListMap::getValue, ok -> ok.deps);
if (Faults.discardPreAcceptDeps(txnId))
return deps;

if (txnId.is(TrackStable))
{
// we must not propose as stable any dep < txnId that we did not propose as part of this phase
if (filterDuplicateDependenciesFromAcceptReply())
if (!filterDeps(acceptFlags))
{
// if the replicas haven't filtered their responses, we need to remove
// anything we sent to ensure we don't mark them as unstable
deps = deps.without(this.deps);
}

deps = deps.markUnstableBefore(txnId);
}
Expand All @@ -181,7 +191,6 @@ Deps mergeNewDeps()

abstract CoordinationAdapter<R> adapter();


@Override
public CoordinationKind kind()
{
Expand Down
Loading