diff --git a/README.md b/README.md index 68a254a..a8995a0 100644 --- a/README.md +++ b/README.md @@ -60,7 +60,7 @@ dotnet build dotnet test ``` -2,781 tests across 16 test projects covering core types, cryptography, codec serialization, storage, networking, consensus, execution (including DEX), API, compliance, bridge, confidentiality, node configuration, SDK contracts, analyzers, wallet, and end-to-end integration. +2,789 tests across 16 test projects covering core types, cryptography, codec serialization, storage, networking, consensus, execution (including DEX), API, compliance, bridge, confidentiality, node configuration, SDK contracts, analyzers, wallet, and end-to-end integration. ### Run a Local Node @@ -76,16 +76,19 @@ The node starts in standalone mode on the devnet (chain ID 31337) with a REST AP docker compose up --build ``` -Spins up 4 validator nodes with pre-configured genesis accounts, RocksDB persistent storage, and automatic peer discovery via static peer lists. +Spins up 4 validator nodes and 1 RPC node with pre-configured genesis accounts, RocksDB persistent storage, and automatic peer discovery via static peer lists. -| Validator | REST API | P2P | -|-----------|----------|-----| -| validator-0 | `localhost:5100` | `30300` | -| validator-1 | `localhost:5101` | `30301` | -| validator-2 | `localhost:5102` | `30302` | -| validator-3 | `localhost:5103` | `30303` | +| Service | REST API | P2P | Role | +|---------|----------|-----|------| +| validator-0 | `localhost:5100` | `30300` | Consensus validator | +| validator-1 | `localhost:5101` | `30301` | Consensus validator | +| validator-2 | `localhost:5102` | `30302` | Consensus validator | +| validator-3 | `localhost:5103` | `30303` | Consensus validator | +| rpc-0 | `localhost:5200` | -- | Read-only RPC node | -Each validator has a Docker volume (`validator-N-data`) for RocksDB persistence and connects to all other validators via environment-configured peer lists. Health checks poll `/v1/status` every 5 seconds. +The RPC node (`rpc-0`) syncs blocks from `validator-0` via HTTP and serves the full API without participating in consensus. It has no P2P port and no validator keys. Submitted transactions are forwarded to the validator. + +Each service has a Docker volume for RocksDB persistence. Health checks poll `/v1/status` (validators) or `/v1/health` (RPC) every 5 seconds. ### CLI @@ -210,6 +213,8 @@ Basalt.sln (42 C# projects) | `GET` | `/v1/solvers` | List registered solvers | | `POST` | `/v1/solvers/register` | Register an external solver | | `GET` | `/v1/dex/intents/pending` | Pending swap intent hashes (for solvers) | +| `GET` | `/v1/sync/status` | Sync source status (latest block, hash, chain ID) | +| `GET` | `/v1/sync/blocks?from=&count=` | Bulk block fetch for sync (max 100) | | `GET` | `/metrics` | Prometheus metrics | | `WS` | `/ws/blocks` | Real-time block notifications | @@ -219,6 +224,8 @@ The node is configured via environment variables: | Variable | Default | Description | |----------|---------|-------------| +| `BASALT_MODE` | `auto` | Node mode: `auto`, `validator`, `rpc`, or `standalone` | +| `BASALT_SYNC_SOURCE` | -- | HTTP URL of sync source (required for `rpc` mode) | | `BASALT_CHAIN_ID` | `31337` | Chain identifier | | `BASALT_NETWORK` | `basalt-devnet` | Network name | | `BASALT_VALIDATOR_INDEX` | `-1` | Validator index in the set (enables consensus mode when >= 0 and peers are set) | diff --git a/deploy/testnet/Caddyfile b/deploy/testnet/Caddyfile index 790634b..20d5533 100644 --- a/deploy/testnet/Caddyfile +++ b/deploy/testnet/Caddyfile @@ -30,22 +30,22 @@ http://caldera.basalt.foundation { :80 { # REST API handle /v1/* { - reverse_proxy validator-0:5000 + reverse_proxy rpc-0:5000 } # GraphQL handle /graphql { - reverse_proxy validator-0:5000 + reverse_proxy rpc-0:5000 } # WebSocket handle /ws/* { - reverse_proxy validator-0:5000 + reverse_proxy rpc-0:5000 } # Health check handle /health { - reverse_proxy validator-0:5000 { + reverse_proxy rpc-0:5000 { rewrite /v1/status } } diff --git a/deploy/testnet/docker-compose.yml b/deploy/testnet/docker-compose.yml index f17046c..3e2aa12 100644 --- a/deploy/testnet/docker-compose.yml +++ b/deploy/testnet/docker-compose.yml @@ -45,7 +45,7 @@ services: - basalt-testnet restart: unless-stopped depends_on: - validator-0: + rpc-0: condition: service_healthy explorer: condition: service_started @@ -158,11 +158,44 @@ services: cap_drop: - ALL + # ─── RPC Node 0 (public API, no consensus) ──────────────────────── + rpc-0: + build: + context: ../.. + dockerfile: Dockerfile + container_name: basalt-rpc-0 + environment: + - BASALT_MODE=rpc + - BASALT_SYNC_SOURCE=http://validator-0:5000 + - BASALT_NETWORK=basalt-testnet + - BASALT_CHAIN_ID=4242 + - ASPNETCORE_URLS=http://+:5000 + - BASALT_DATA_DIR=/data/basalt + volumes: + - rpc-0-data:/data/basalt + networks: + - basalt-testnet + restart: unless-stopped + security_opt: + - no-new-privileges:true + cap_drop: + - ALL + healthcheck: + test: ["CMD", "curl", "-sf", "http://localhost:5000/v1/health"] + interval: 10s + timeout: 5s + retries: 12 + start_period: 30s + depends_on: + validator-0: + condition: service_healthy + volumes: validator-0-data: validator-1-data: validator-2-data: validator-3-data: + rpc-0-data: networks: basalt-testnet: diff --git a/docker-compose.yml b/docker-compose.yml index 3110885..393bef1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -115,11 +115,45 @@ services: cap_drop: - ALL + rpc-0: + build: + context: . + dockerfile: Dockerfile + container_name: basalt-rpc-0 + ports: + - "5200:5000" + environment: + - BASALT_MODE=rpc + - BASALT_SYNC_SOURCE=http://validator-0:5000 + - BASALT_NETWORK=basalt-devnet + - BASALT_CHAIN_ID=31337 + - ASPNETCORE_URLS=http://+:5000 + - BASALT_DATA_DIR=/data/basalt + volumes: + - rpc-0-data:/data/basalt + networks: + - basalt-devnet + restart: unless-stopped + security_opt: + - no-new-privileges:true + cap_drop: + - ALL + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:5000/v1/health"] + interval: 5s + timeout: 3s + retries: 10 + start_period: 15s + depends_on: + validator-0: + condition: service_healthy + volumes: validator-0-data: validator-1-data: validator-2-data: validator-3-data: + rpc-0-data: networks: basalt-devnet: diff --git a/src/api/Basalt.Api.Grpc/BasaltNodeService.cs b/src/api/Basalt.Api.Grpc/BasaltNodeService.cs index 433df0c..68cf347 100644 --- a/src/api/Basalt.Api.Grpc/BasaltNodeService.cs +++ b/src/api/Basalt.Api.Grpc/BasaltNodeService.cs @@ -14,6 +14,7 @@ public sealed class BasaltNodeService : BasaltNode.BasaltNodeBase private readonly Mempool _mempool; private readonly TransactionValidator _validator; private readonly IStateDatabase _stateDb; + private readonly ITxForwarder? _txForwarder; /// H-3: Maximum concurrent SubscribeBlocks streams. private const int MaxSubscribeStreams = 100; @@ -23,12 +24,14 @@ public BasaltNodeService( ChainManager chainManager, Mempool mempool, TransactionValidator validator, - IStateDatabase stateDb) + IStateDatabase stateDb, + ITxForwarder? txForwarder = null) { _chainManager = chainManager; _mempool = mempool; _validator = validator; _stateDb = stateDb; + _txForwarder = txForwarder; } public override Task GetStatus(GetStatusRequest request, ServerCallContext context) @@ -120,6 +123,9 @@ public override Task SubmitTransaction(SubmitTransactionReques throw new RpcException(new Status(StatusCode.AlreadyExists, "Transaction already in mempool or mempool is full")); + // Forward to validator (RPC mode — gRPC txs bypass REST tx endpoint) + _ = _txForwarder?.ForwardAsync(tx, context.CancellationToken); + return Task.FromResult(new TransactionReply { Hash = tx.Hash.ToHexString(), diff --git a/src/api/Basalt.Api.Rest/FaucetEndpoint.cs b/src/api/Basalt.Api.Rest/FaucetEndpoint.cs index 9a194c5..5eb1230 100644 --- a/src/api/Basalt.Api.Rest/FaucetEndpoint.cs +++ b/src/api/Basalt.Api.Rest/FaucetEndpoint.cs @@ -55,7 +55,8 @@ public static void MapFaucetEndpoint( ChainParameters chainParams, byte[] faucetPrivateKey, ILogger? logger = null, - ChainManager? chainManager = null) + ChainManager? chainManager = null, + ITxForwarder? txForwarder = null) { _faucetPrivateKey = faucetPrivateKey; _logger = logger; @@ -207,6 +208,9 @@ public static void MapFaucetEndpoint( _logger?.LogInformation("Faucet tx {Hash} added to mempool (size={Size})", signedTx.Hash.ToHexString()[..18] + "...", mempool.Count); + // Forward to validator (RPC mode — faucet txs bypass POST /v1/transactions) + _ = txForwarder?.ForwardAsync(signedTx, CancellationToken.None); + // Record the request time _lastRequest[addrKey] = DateTimeOffset.UtcNow; diff --git a/src/api/Basalt.Api.Rest/README.md b/src/api/Basalt.Api.Rest/README.md index c28cedc..1216921 100644 --- a/src/api/Basalt.Api.Rest/README.md +++ b/src/api/Basalt.Api.Rest/README.md @@ -35,6 +35,8 @@ RESTful HTTP API for the Basalt blockchain node. Provides endpoints for submitti | `GET` | `/v1/solvers` | List registered solvers | | `POST` | `/v1/solvers/register` | Register an external solver | | `GET` | `/v1/dex/intents/pending` | Pending swap intent hashes (for solvers) | +| `GET` | `/v1/sync/status` | Sync source status (latest block, hash, chain ID) | +| `GET` | `/v1/sync/blocks?from=&count=` | Bulk block fetch for sync (max 100, hex-encoded raw blocks) | | `GET` | `/metrics` | Prometheus-format metrics | | `WS` | `/ws/blocks` | Real-time block notifications | @@ -95,13 +97,13 @@ curl -X POST http://localhost:5000/v1/faucet \ -d '{"address":"0x..."}' ``` -The faucet directly debits a configurable faucet address and credits the recipient in the state database. Configurable via static properties on `FaucetEndpoint`: +The faucet creates and signs a real transfer transaction submitted through the mempool. In RPC mode, faucet transactions are forwarded to the sync source validator via `HttpTxForwarder`. Configurable via static properties on `FaucetEndpoint`: - `DripAmount` -- amount in base units (default: 100 BSLT). - `CooldownSeconds` -- per-address cooldown (default: 60 seconds). -- `FaucetAddress` -- source address (default: `Address.Zero`). +- `FaucetAddress` -- derived from the well-known faucet private key. -Returns `{"success":true,"message":"Sent 100 BSLT to 0x...","txHash":"0x0000..."}` on success. The `txHash` field is a placeholder (`Hash256.Zero`) since the faucet modifies state directly rather than creating a transaction. +Returns `{"success":true,"message":"Sent 100 BSLT to 0x...","txHash":"0x..."}` on success. ### Read-Only Contract Call diff --git a/src/api/Basalt.Api.Rest/RestApiEndpoints.cs b/src/api/Basalt.Api.Rest/RestApiEndpoints.cs index 9bb6a97..099e46b 100644 --- a/src/api/Basalt.Api.Rest/RestApiEndpoints.cs +++ b/src/api/Basalt.Api.Rest/RestApiEndpoints.cs @@ -33,7 +33,9 @@ public static void MapBasaltEndpoints( Storage.RocksDb.ReceiptStore? receiptStore = null, Microsoft.Extensions.Logging.ILogger? logger = null, ChainParameters? chainParams = null, - ISolverInfoProvider? solverProvider = null) + ISolverInfoProvider? solverProvider = null, + Storage.RocksDb.BlockStore? blockStore = null, + ITxForwarder? txForwarder = null) { // Helper: look up a receipt by tx hash (persistent store first, then in-memory fallback) Storage.RocksDb.ReceiptData? LookupReceipt(Hash256 txHash) @@ -151,6 +153,10 @@ public static void MapBasaltEndpoints( }); } + // RPC mode: forward transaction to sync source (fire-and-forget) + if (txForwarder != null) + _ = txForwarder.ForwardAsync(tx, CancellationToken.None); + return Microsoft.AspNetCore.Http.Results.Ok(new TransactionResponse { Hash = tx.Hash.ToHexString(), @@ -945,9 +951,12 @@ public static void MapBasaltEndpoints( ? BatchAuctionSolver.ComputeSpotPrice(reserves.Value.Reserve0, reserves.Value.Reserve1) : UInt256.Zero; - // Precompute latest block timestamp for estimation fallback + // Precompute latest block timestamp (seconds) for estimation fallback. + // BlockHeader.Timestamp is in milliseconds — convert to seconds for the API. var latestBlock = chainManager.GetBlockByNumber(currentBlock); - var latestTs = latestBlock?.Header.Timestamp ?? DateTimeOffset.UtcNow.ToUnixTimeSeconds(); + var latestTs = latestBlock != null + ? latestBlock.Header.Timestamp / 1000 + : DateTimeOffset.UtcNow.ToUnixTimeSeconds(); var points = new List(); for (ulong b = start; b <= end; b += step) @@ -990,12 +999,12 @@ public static void MapBasaltEndpoints( price = spotPrice; } - // Determine timestamp from block header, with estimation fallback + // Determine timestamp from block header (ms → s), with estimation fallback long timestamp; var block = chainManager.GetBlockByNumber(b); if (block != null) { - timestamp = block.Header.Timestamp; + timestamp = block.Header.Timestamp / 1000; } else { @@ -1017,7 +1026,7 @@ public static void MapBasaltEndpoints( { var block = chainManager.GetBlockByNumber(b); var timestamp = block != null - ? block.Header.Timestamp + ? block.Header.Timestamp / 1000 : latestTs - (long)(currentBlock - b) * (long)blockTimeMs / 1000; points.Add(new DexPricePointResponse @@ -1090,6 +1099,53 @@ public static void MapBasaltEndpoints( }); }); } + + // ── Sync endpoints (used by RPC nodes to fetch blocks from validators) ── + + app.MapGet("/v1/sync/status", () => + { + var latest = chainManager.LatestBlock; + return Microsoft.AspNetCore.Http.Results.Ok(new SyncStatusResponse + { + LatestBlock = latest?.Number ?? 0, + LatestHash = latest?.Hash.ToHexString() ?? Hash256.Zero.ToHexString(), + ChainId = chainParams?.ChainId ?? 0, + }); + }); + + app.MapGet("/v1/sync/blocks", (ulong from, int? count) => + { + if (blockStore == null) + return Microsoft.AspNetCore.Http.Results.StatusCode(501); + + var requestedCount = Math.Min(count ?? 100, 100); + var blocks = new List(); + + for (ulong n = from; n < from + (ulong)requestedCount; n++) + { + var raw = blockStore.GetRawBlockByNumber(n); + if (raw == null) + break; + + var meta = blockStore.GetByNumber(n); + blocks.Add(new SyncBlockEntry + { + Number = n, + Hash = meta?.Hash.ToHexString() ?? "", + RawHex = Convert.ToHexString(raw), + CommitBitmap = blockStore.GetCommitBitmap(n), + }); + } + + return Microsoft.AspNetCore.Http.Results.Ok(new SyncBlocksResponse + { + Blocks = blocks.ToArray(), + }); + }); + + // ── Transaction forwarding hook ── + // When txForwarder is set (RPC mode), fire-and-forget forward after mempool add. + // The forwarding is wired inside the POST /v1/transactions handler via the txForwarder parameter. } } @@ -1511,6 +1567,38 @@ public sealed class DexPriceHistoryResponse [JsonPropertyName("blockTimeMs")] public uint BlockTimeMs { get; set; } } +// ── Sync DTOs ── + +public sealed class SyncStatusResponse +{ + [JsonPropertyName("latestBlock")] public ulong LatestBlock { get; set; } + [JsonPropertyName("latestHash")] public string LatestHash { get; set; } = ""; + [JsonPropertyName("chainId")] public uint ChainId { get; set; } +} + +public sealed class SyncBlockEntry +{ + [JsonPropertyName("number")] public ulong Number { get; set; } + [JsonPropertyName("hash")] public string Hash { get; set; } = ""; + [JsonPropertyName("rawHex")] public string RawHex { get; set; } = ""; + [JsonPropertyName("commitBitmap")] public ulong? CommitBitmap { get; set; } +} + +public sealed class SyncBlocksResponse +{ + [JsonPropertyName("blocks")] public SyncBlockEntry[] Blocks { get; set; } = []; +} + +// ── Transaction forwarding interface (RPC mode) ── + +/// +/// Forwards transactions from RPC nodes to validators. +/// + +[JsonSerializable(typeof(SyncStatusResponse))] +[JsonSerializable(typeof(SyncBlockEntry))] +[JsonSerializable(typeof(SyncBlockEntry[]))] +[JsonSerializable(typeof(SyncBlocksResponse))] [JsonSerializable(typeof(TransactionRequest))] [JsonSerializable(typeof(TransactionResponse))] [JsonSerializable(typeof(BlockResponse))] diff --git a/src/execution/Basalt.Execution/ITxForwarder.cs b/src/execution/Basalt.Execution/ITxForwarder.cs new file mode 100644 index 0000000..8a6f791 --- /dev/null +++ b/src/execution/Basalt.Execution/ITxForwarder.cs @@ -0,0 +1,24 @@ +namespace Basalt.Execution; + +/// +/// Forwards transactions to an upstream node (e.g., from RPC to validator). +/// +public interface ITxForwarder +{ + Task ForwardAsync(Transaction tx, CancellationToken ct); +} + +/// +/// Mutable reference to an . Allows the RPC mode branch in +/// Program.cs to set the forwarder after endpoint registration, since +/// MapBasaltEndpoints is called before mode detection. +/// +public sealed class TxForwarderRef : ITxForwarder +{ + private volatile ITxForwarder? _inner; + + public void Set(ITxForwarder forwarder) => _inner = forwarder; + + public Task ForwardAsync(Transaction tx, CancellationToken ct) + => _inner?.ForwardAsync(tx, ct) ?? Task.CompletedTask; +} diff --git a/src/node/Basalt.Node/BlockApplier.cs b/src/node/Basalt.Node/BlockApplier.cs new file mode 100644 index 0000000..ac6b13d --- /dev/null +++ b/src/node/Basalt.Node/BlockApplier.cs @@ -0,0 +1,352 @@ +using Basalt.Consensus; +using Basalt.Consensus.Staking; +using Basalt.Core; +using Basalt.Execution; +using Basalt.Storage; +using Basalt.Storage.RocksDb; +using Basalt.Api.Rest; +using Microsoft.Extensions.Logging; + +namespace Basalt.Node; + +/// +/// Result of applying a single block to state. +/// +public sealed class BlockApplyResult +{ + public bool Success { get; init; } + public string? Error { get; init; } + public List? Receipts { get; init; } +} + +/// +/// Shared block-application logic used by NodeCoordinator (consensus finalization + sync) +/// and BlockSyncService (RPC node HTTP sync). Encapsulates: +/// +/// Transaction execution via +/// DEX settlement via +/// Chain state update via +/// Mempool pruning and base fee update +/// RocksDB persistence (blocks + receipts) +/// Epoch transitions via +/// WebSocket broadcast and Prometheus metrics +/// +/// +public sealed class BlockApplier +{ + private readonly ChainParameters _chainParams; + private readonly ChainManager _chainManager; + private readonly Mempool _mempool; + private readonly TransactionExecutor _txExecutor; + private readonly BlockBuilder? _blockBuilder; + private readonly BlockStore? _blockStore; + private readonly ReceiptStore? _receiptStore; + private readonly EpochManager? _epochManager; + private readonly StakingState? _stakingState; + private readonly IStakingPersistence? _stakingPersistence; + private readonly WebSocketHandler _wsHandler; + private readonly ILogger _logger; + + /// + /// Fired when an epoch transition occurs. The caller (NodeCoordinator) can hook this + /// to rewire consensus-specific components (leader selector, consensus engine). + /// Provides the new ValidatorSet and the block number at which the transition occurred. + /// + public event Action? OnEpochTransition; + + public BlockApplier( + ChainParameters chainParams, + ChainManager chainManager, + Mempool mempool, + TransactionExecutor txExecutor, + BlockBuilder? blockBuilder, + BlockStore? blockStore, + ReceiptStore? receiptStore, + EpochManager? epochManager, + StakingState? stakingState, + IStakingPersistence? stakingPersistence, + WebSocketHandler wsHandler, + ILogger logger) + { + _chainParams = chainParams; + _chainManager = chainManager; + _mempool = mempool; + _txExecutor = txExecutor; + _blockBuilder = blockBuilder; + _blockStore = blockStore; + _receiptStore = receiptStore; + _epochManager = epochManager; + _stakingState = stakingState; + _stakingPersistence = stakingPersistence; + _wsHandler = wsHandler; + _logger = logger; + } + + /// + /// Execute transactions and DEX settlement against the given state database. + /// Does NOT add the block to the chain or persist — the caller controls that. + /// Returns receipts (or null if no transactions). + /// + public List? ExecuteBlock(Block block, IStateDatabase stateDb) + { + List? receipts = null; + + if (block.Transactions.Count > 0) + { + receipts = new List(block.Transactions.Count); + for (int i = 0; i < block.Transactions.Count; i++) + { + var receipt = _txExecutor.Execute(block.Transactions[i], stateDb, block.Header, i); + receipts.Add(receipt); + } + } + + // Run DEX settlement (TWAP carry-forward + limit order matching) + if (_blockBuilder != null) + { + var dexReceipts = _blockBuilder.ApplyDexSettlement(stateDb, block.Header); + if (dexReceipts.Count > 0) + { + receipts ??= new List(); + receipts.AddRange(dexReceipts); + } + } + + return receipts; + } + + /// + /// Apply a single finalized block to canonical state. Used by consensus finalization path. + /// Executes transactions, adds to chain, prunes mempool, persists, checks epochs, broadcasts. + /// + /// Result indicating success/failure. + public BlockApplyResult ApplyBlock(Block block, IStateDatabase stateDb, + byte[] rawBlockData, ulong commitBitmap = 0) + { + // Execute transactions + DEX settlement + var receipts = ExecuteBlock(block, stateDb); + if (receipts != null) + block.Receipts = receipts; + + // Add to chain + var result = _chainManager.AddBlock(block); + if (!result.IsSuccess) + { + return new BlockApplyResult { Success = false, Error = result.Message }; + } + + // Prune mempool + _mempool.RemoveConfirmed(block.Transactions); + var pruned = _mempool.PruneStale(stateDb, block.Header.BaseFee); + if (pruned > 0) + _logger.LogInformation("Pruned {Count} unexecutable transactions from mempool", pruned); + _mempool.UpdateBaseFee(block.Header.BaseFee); + + // Prometheus metrics + MetricsEndpoint.RecordBlock(block.Transactions.Count, block.Header.Timestamp); + MetricsEndpoint.RecordBaseFee(block.Header.BaseFee.IsZero ? 0 : (long)(ulong)block.Header.BaseFee); + MetricsEndpoint.RecordConsensusView((long)block.Number); + MetricsEndpoint.RecordDexIntentCount(_mempool.DexIntentCount); + + // WebSocket broadcast + _ = _wsHandler.BroadcastNewBlock(block); + + // Persist block + receipts + PersistBlock(block, rawBlockData, commitBitmap); + PersistReceipts(block.Receipts); + + // Record commit participation + _epochManager?.RecordBlockSigners(block.Number, commitBitmap); + + // Check epoch transition + var newSet = _epochManager?.OnBlockFinalized(block.Number); + if (newSet != null) + { + ApplyEpochTransition(newSet, block.Number); + } + + return new BlockApplyResult + { + Success = true, + Receipts = block.Receipts, + }; + } + + /// + /// Apply a batch of blocks on a forked state database, then atomically swap canonical state. + /// Used by sync paths (P2P sync and RPC HTTP sync). + /// + /// Ordered list of (Block, RawBytes, CommitBitmap) tuples. + /// The shared state reference to fork and swap. + /// Number of blocks successfully applied. + public int ApplyBatch(IReadOnlyList<(Block Block, byte[] Raw, ulong CommitBitmap)> blocks, + StateDbRef stateDbRef) + { + if (blocks.Count == 0) + return 0; + + var forkedState = stateDbRef.Fork(); + var applied = 0; + + // Phase 1: Execute all blocks on forked state + foreach (var (block, raw, bitmap) in blocks) + { + try + { + var receipts = ExecuteBlock(block, forkedState); + if (receipts != null) + block.Receipts = receipts; + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Failed to execute synced block #{Number}", block.Number); + break; + } + } + + // Phase 2: Add executed blocks to chain and persist + foreach (var (block, raw, bitmap) in blocks) + { + if (block.Receipts == null && block.Transactions.Count > 0) + break; // This block wasn't executed (failed in phase 1) + + var result = _chainManager.AddBlock(block); + if (!result.IsSuccess) + { + _logger.LogWarning("Failed to apply synced block #{Number}: {Error}", + block.Number, result.Message); + break; + } + + _mempool.RemoveConfirmed(block.Transactions); + PersistBlock(block, raw, bitmap); + PersistReceipts(block.Receipts); + applied++; + + _epochManager?.RecordBlockSigners(block.Number, bitmap); + + var newSet = _epochManager?.OnBlockFinalized(block.Number); + if (newSet != null) + ApplyEpochTransition(newSet, block.Number); + } + + // Phase 3: Atomically swap state only if ALL blocks succeeded + if (applied == blocks.Count && applied > 0) + { + stateDbRef.Swap(forkedState); + _logger.LogInformation("Synced {Count} blocks, now at #{Height}", + applied, _chainManager.LatestBlockNumber); + } + else if (applied > 0) + { + _logger.LogWarning( + "Partial sync: applied {Applied}/{Total} blocks — state not adopted", + applied, blocks.Count); + } + + // Prune mempool after sync with current base fee + var latestBlock = _chainManager.LatestBlock; + if (latestBlock != null && applied > 0) + { + var pruned = _mempool.PruneStale(stateDbRef, latestBlock.Header.BaseFee); + if (pruned > 0) + _logger.LogInformation("Pruned {Count} unexecutable transactions from mempool after sync", pruned); + _mempool.UpdateBaseFee(latestBlock.Header.BaseFee); + } + + return applied; + } + + private void ApplyEpochTransition(ValidatorSet newSet, ulong blockNumber) + { + // Flush staking state + if (_stakingPersistence != null && _stakingState != null) + { + try + { + _stakingState.FlushToPersistence(_stakingPersistence); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to flush staking state after epoch transition"); + } + } + + _logger.LogInformation( + "Epoch transition at block #{Block}: {NewCount} validators, quorum: {Quorum}", + blockNumber, newSet.Count, newSet.QuorumThreshold); + + // Notify caller (NodeCoordinator) to rewire consensus-specific components + OnEpochTransition?.Invoke(newSet, blockNumber); + } + + private void PersistBlock(Block block, byte[] serializedBlockData, ulong? commitBitmap = null) + { + if (_blockStore == null) + return; + + try + { + var blockData = new BlockData + { + Number = block.Number, + Hash = block.Hash, + ParentHash = block.Header.ParentHash, + StateRoot = block.Header.StateRoot, + TransactionsRoot = block.Header.TransactionsRoot, + ReceiptsRoot = block.Header.ReceiptsRoot, + Timestamp = block.Header.Timestamp, + Proposer = block.Header.Proposer, + ChainId = block.Header.ChainId, + GasUsed = block.Header.GasUsed, + GasLimit = block.Header.GasLimit, + BaseFee = block.Header.BaseFee, + ProtocolVersion = block.Header.ProtocolVersion, + ExtraData = block.Header.ExtraData, + TransactionHashes = block.Transactions.Select(t => t.Hash).ToArray(), + }; + _blockStore.PutFullBlock(blockData, serializedBlockData, commitBitmap); + _blockStore.SetLatestBlockNumber(block.Number); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to persist block #{Number}", block.Number); + } + } + + private void PersistReceipts(List? receipts) + { + if (_receiptStore == null || receipts == null || receipts.Count == 0) + return; + + try + { + var receiptDataList = receipts.Select(r => new ReceiptData + { + TransactionHash = r.TransactionHash, + BlockHash = r.BlockHash, + BlockNumber = r.BlockNumber, + TransactionIndex = r.TransactionIndex, + From = r.From, + To = r.To, + GasUsed = r.GasUsed, + Success = r.Success, + ErrorCode = (int)r.ErrorCode, + PostStateRoot = r.PostStateRoot, + EffectiveGasPrice = r.EffectiveGasPrice, + Logs = (r.Logs ?? []).Select(l => new LogData + { + Contract = l.Contract, + EventSignature = l.EventSignature, + Topics = l.Topics ?? [], + Data = l.Data ?? [], + }).ToArray(), + }); + _receiptStore.PutReceipts(receiptDataList); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to persist {Count} receipts", receipts.Count); + } + } +} diff --git a/src/node/Basalt.Node/BlockSyncService.cs b/src/node/Basalt.Node/BlockSyncService.cs new file mode 100644 index 0000000..6be60ee --- /dev/null +++ b/src/node/Basalt.Node/BlockSyncService.cs @@ -0,0 +1,183 @@ +using System.Net.Http.Json; +using Basalt.Api.Rest; +using Basalt.Core; +using Basalt.Execution; +using Basalt.Network; +using Basalt.Storage; +using Microsoft.Extensions.Logging; + +namespace Basalt.Node; + +/// +/// Provides sync status for health endpoint reporting. +/// +public interface ISyncStatus +{ + /// How many blocks behind the sync source this node is. + int SyncLag { get; } +} + +/// +/// Background service that polls a trusted sync source via HTTP and applies finalized blocks +/// using . Used by RPC nodes that follow the chain without +/// participating in consensus. +/// +public sealed class BlockSyncService : ISyncStatus, IAsyncDisposable +{ + private readonly string _syncSourceUrl; + private readonly BlockApplier _blockApplier; + private readonly ChainManager _chainManager; + private readonly StateDbRef _stateDbRef; + private readonly ChainParameters _chainParams; + private readonly HttpClient _httpClient; + private readonly ILogger _logger; + + private int _syncLag; + private int _backoffMs = 1000; + private const int MaxBackoffMs = 30_000; + + public int SyncLag => Volatile.Read(ref _syncLag); + + public BlockSyncService( + string syncSourceUrl, + BlockApplier blockApplier, + ChainManager chainManager, + StateDbRef stateDbRef, + ChainParameters chainParams, + ILogger logger) + { + _syncSourceUrl = syncSourceUrl.TrimEnd('/'); + _blockApplier = blockApplier; + _chainManager = chainManager; + _stateDbRef = stateDbRef; + _chainParams = chainParams; + _logger = logger; + + _httpClient = new HttpClient + { + BaseAddress = new Uri(_syncSourceUrl), + Timeout = TimeSpan.FromSeconds(30), + }; + } + + /// + /// Main sync loop. Polls the sync source and applies blocks until cancelled. + /// + public async Task RunAsync(CancellationToken ct) + { + _logger.LogInformation("BlockSyncService started. Source: {Source}", _syncSourceUrl); + + while (!ct.IsCancellationRequested) + { + try + { + var status = await _httpClient.GetFromJsonAsync( + "/v1/sync/status", + BasaltApiJsonContext.Default.SyncStatusResponse, + ct); + + if (status == null) + { + _logger.LogWarning("Sync source returned null status"); + await BackoffAsync(ct); + continue; + } + + var localTip = _chainManager.LatestBlockNumber; + var remoteTip = status.LatestBlock; + Volatile.Write(ref _syncLag, (int)Math.Min(remoteTip - localTip, int.MaxValue)); + + if (remoteTip <= localTip) + { + // Caught up — sleep for one block time, then poll again + _backoffMs = 1000; // Reset backoff + await Task.Delay((int)_chainParams.BlockTimeMs, ct); + continue; + } + + // Fetch blocks in batches of 100 + var from = localTip + 1; + var count = (int)Math.Min(remoteTip - localTip, 100); + + var response = await _httpClient.GetFromJsonAsync( + $"/v1/sync/blocks?from={from}&count={count}", + BasaltApiJsonContext.Default.SyncBlocksResponse, + ct); + + if (response?.Blocks == null || response.Blocks.Length == 0) + { + _logger.LogDebug("No blocks returned from sync source (from={From})", from); + await BackoffAsync(ct); + continue; + } + + // Deserialize and prepare blocks for batch apply + var blocks = new List<(Block Block, byte[] Raw, ulong CommitBitmap)>(); + foreach (var entry in response.Blocks) + { + try + { + var raw = Convert.FromHexString(entry.RawHex); + var block = BlockCodec.DeserializeBlock(raw); + blocks.Add((block, raw, entry.CommitBitmap ?? 0)); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Failed to deserialize synced block #{Number}", entry.Number); + break; + } + } + + if (blocks.Count == 0) + { + await BackoffAsync(ct); + continue; + } + + var applied = _blockApplier.ApplyBatch(blocks, _stateDbRef); + if (applied > 0) + { + _backoffMs = 1000; // Reset backoff on success + var newLocalTip = _chainManager.LatestBlockNumber; + Volatile.Write(ref _syncLag, (int)Math.Min(remoteTip - newLocalTip, int.MaxValue)); + + _logger.LogInformation( + "Synced {Count} blocks from source, now at #{Height} (lag: {Lag})", + applied, newLocalTip, SyncLag); + } + else + { + await BackoffAsync(ct); + } + } + catch (OperationCanceledException) when (ct.IsCancellationRequested) + { + break; + } + catch (HttpRequestException ex) + { + _logger.LogWarning("Sync source unreachable: {Message}", ex.Message); + await BackoffAsync(ct); + } + catch (Exception ex) + { + _logger.LogError(ex, "Unexpected error in sync loop"); + await BackoffAsync(ct); + } + } + + _logger.LogInformation("BlockSyncService stopped"); + } + + private async Task BackoffAsync(CancellationToken ct) + { + await Task.Delay(_backoffMs, ct); + _backoffMs = Math.Min(_backoffMs * 2, MaxBackoffMs); + } + + public async ValueTask DisposeAsync() + { + _httpClient.Dispose(); + await ValueTask.CompletedTask; + } +} diff --git a/src/node/Basalt.Node/NodeConfiguration.cs b/src/node/Basalt.Node/NodeConfiguration.cs index 33fa0e5..cba3c58 100644 --- a/src/node/Basalt.Node/NodeConfiguration.cs +++ b/src/node/Basalt.Node/NodeConfiguration.cs @@ -1,8 +1,17 @@ namespace Basalt.Node; +public enum NodeMode +{ + Standalone, + Validator, + Rpc, +} + /// /// Node configuration populated from environment variables: /// +/// BASALT_MODE — Node mode: auto (default), validator, rpc, standalone +/// BASALT_SYNC_SOURCE — HTTP URL of sync source (required for rpc mode) /// BASALT_VALIDATOR_INDEX — Validator index (int, -1 = standalone) /// BASALT_VALIDATOR_ADDRESS — Validator address (hex) /// BASALT_VALIDATOR_KEY — Ed25519 private key (64 hex chars) @@ -44,8 +53,37 @@ public sealed class NodeConfiguration // Contract sandboxing (opt-in) public bool UseSandbox { get; init; } - // Mode detection - public bool IsConsensusMode => Peers.Length > 0 && ValidatorIndex >= 0; + // Mode selection + public string Mode { get; init; } = "auto"; + public string? SyncSource { get; init; } + + // Tri-state mode detection + public NodeMode ResolvedMode + { + get + { + if (string.Equals(Mode, "rpc", StringComparison.OrdinalIgnoreCase)) + { + if (string.IsNullOrWhiteSpace(SyncSource)) + throw new InvalidOperationException( + "BASALT_MODE=rpc requires BASALT_SYNC_SOURCE to be set (e.g. http://validator-0:5000)"); + return NodeMode.Rpc; + } + + if (string.Equals(Mode, "validator", StringComparison.OrdinalIgnoreCase) + || (Peers.Length > 0 && ValidatorIndex >= 0)) + return NodeMode.Validator; + + if (string.Equals(Mode, "standalone", StringComparison.OrdinalIgnoreCase)) + return NodeMode.Standalone; + + // auto: fall back to standalone + return NodeMode.Standalone; + } + } + + // Backward-compat alias + public bool IsConsensusMode => ResolvedMode == NodeMode.Validator; public static NodeConfiguration FromEnvironment() { @@ -78,6 +116,9 @@ public static NodeConfiguration FromEnvironment() // N-12: Validate DataDir to prevent path traversal var validatedDataDir = string.IsNullOrWhiteSpace(dataDir) ? null : ValidateDataDir(dataDir); + var mode = Environment.GetEnvironmentVariable("BASALT_MODE") ?? "auto"; + var syncSource = Environment.GetEnvironmentVariable("BASALT_SYNC_SOURCE"); + return new NodeConfiguration { ValidatorIndex = validatorIndex, @@ -91,6 +132,8 @@ public static NodeConfiguration FromEnvironment() DataDir = validatedDataDir, UsePipelining = usePipelining, UseSandbox = useSandbox, + Mode = mode, + SyncSource = syncSource, }; } diff --git a/src/node/Basalt.Node/NodeCoordinator.cs b/src/node/Basalt.Node/NodeCoordinator.cs index eb9a708..d751470 100644 --- a/src/node/Basalt.Node/NodeCoordinator.cs +++ b/src/node/Basalt.Node/NodeCoordinator.cs @@ -85,6 +85,7 @@ public sealed class NodeCoordinator : IAsyncDisposable // Block production (consensus-driven — no BlockProductionLoop) private BlockBuilder? _blockBuilder; private TransactionExecutor? _txExecutor; + private BlockApplier? _blockApplier; private Address _proposerAddress; // Solver network (Phase E4) @@ -95,6 +96,11 @@ public sealed class NodeCoordinator : IAsyncDisposable /// public Solver.SolverManager? SolverManager => _solverManager; + /// + /// Exposes the block applier for reuse by other components (e.g., BlockSyncService in RPC mode). + /// + public BlockApplier? BlockApplier => _blockApplier; + // Runtime private CancellationTokenSource? _cts; private Task? _consensusLoop; @@ -519,43 +525,12 @@ private void HandleBlockFinalized(Hash256 hash, byte[] blockData, ulong commitBi } } - // All validators execute finalized transactions against canonical state. - // Proposals use a forked state, so the leader's live state is never speculatively mutated. - if (block.Transactions.Count > 0) - { - var receipts = new List(block.Transactions.Count); - for (int i = 0; i < block.Transactions.Count; i++) - { - var receipt = _txExecutor!.Execute(block.Transactions[i], _stateDb, block.Header, i); - receipts.Add(receipt); - } - block.Receipts = receipts; - } - - // Run DEX settlement on canonical state (TWAP carry-forward + limit order matching) - if (_blockBuilder != null) - { - var dexReceipts = _blockBuilder.ApplyDexSettlement(_stateDb, block.Header); - if (dexReceipts.Count > 0) - { - block.Receipts ??= new List(); - block.Receipts.AddRange(dexReceipts); - } - } + // Apply block via shared BlockApplier (executes txs, DEX settlement, chain update, + // mempool pruning, persistence, epoch transitions, WebSocket broadcast, metrics). + var applyResult = _blockApplier!.ApplyBlock(block, _stateDb, blockData, commitBitmap); - var result = _chainManager.AddBlock(block); - if (result.IsSuccess) + if (applyResult.Success) { - _mempool.RemoveConfirmed(block.Transactions); - - // Prune stale, underpriced, or unaffordable transactions - var pruned = _mempool.PruneStale(_stateDb, block.Header.BaseFee); - if (pruned > 0) - _logger.LogInformation("Pruned {Count} unexecutable transactions from mempool", pruned); - - // Update mempool admission gate so new submissions below the current base fee are rejected early - _mempool.UpdateBaseFee(block.Header.BaseFee); - // Circuit breaker: reset on success Interlocked.Exchange(ref _consecutiveFinalizationFailures, 0); if (_circuitBreakerTripped) @@ -564,24 +539,16 @@ private void HandleBlockFinalized(Hash256 hash, byte[] blockData, ulong commitBi _logger.LogWarning("Circuit breaker reset after successful block finalization"); } - MetricsEndpoint.RecordBlock(block.Transactions.Count, block.Header.Timestamp); - - // M13: Record additional Prometheus metrics + // M13: Additional consensus-specific metrics var nowMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); var prevFinalizedMs = Volatile.Read(ref _lastBlockFinalizedAtMs); if (prevFinalizedMs > 0) MetricsEndpoint.RecordFinalizationLatency(nowMs - prevFinalizedMs); - MetricsEndpoint.RecordBaseFee(block.Header.BaseFee.IsZero ? 0 : (long)(ulong)block.Header.BaseFee); - MetricsEndpoint.RecordConsensusView((long)block.Number); MetricsEndpoint.RecordPeerCount(_peerManager?.ConnectedCount ?? 0); - MetricsEndpoint.RecordDexIntentCount(_mempool.DexIntentCount); - - _ = _wsHandler.BroadcastNewBlock(block); Volatile.Write(ref _lastBlockFinalizedAtMs, nowMs); - // N-10: Sliding window — retain evidence for last 10 views instead of clearing entirely. - // This preserves recent evidence for double-sign detection while preventing unbounded growth. + // N-10: Sliding window — retain evidence for last 10 views { var currentView = block.Number; var cutoff = currentView > 10 ? currentView - 10 : 0; @@ -590,23 +557,9 @@ private void HandleBlockFinalized(Hash256 hash, byte[] blockData, ulong commitBi _proposalsByView.TryRemove(key, out _); } - // Announce finalized block to peers so their BestBlockNumber stays - // current. Without this, TrySyncFromPeers can't find an up-to-date - // peer and validators that fall behind are unable to catch up. + // Announce finalized block to peers _gossip!.BroadcastBlock(block.Number, block.Hash, block.Header.ParentHash); - // Persist block + commit bitmap atomically to RocksDB - PersistBlock(block, blockData, commitBitmap); - PersistReceipts(block.Receipts); - - // Record commit participation for deterministic epoch-boundary slashing - _epochManager?.RecordBlockSigners(block.Number, commitBitmap); - - // Check for epoch transition — rebuild validator set if at boundary - var newSet = _epochManager?.OnBlockFinalized(block.Number); - if (newSet != null) - ApplyEpochTransition(newSet, block.Number); - _logger.LogInformation( "Block #{Number} finalized via consensus. Hash: {Hash}, Txs: {TxCount}", block.Number, hash.ToHexString()[..18] + "...", block.Transactions.Count); @@ -621,7 +574,7 @@ private void HandleBlockFinalized(Hash256 hash, byte[] blockData, ulong commitBi else { _logger.LogError("Failed to add consensus-finalized block #{Number}: {Error}", - block.Number, result.Message); + block.Number, applyResult.Error); // Circuit breaker: increment failure count var failures = Interlocked.Increment(ref _consecutiveFinalizationFailures); @@ -633,7 +586,6 @@ private void HandleBlockFinalized(Hash256 hash, byte[] blockData, ulong commitBi } // If we're behind (block number > our tip + 1), trigger a sync - // to catch up on missed blocks before the next round. if (block.Number > _chainManager.LatestBlockNumber + 1) { _logger.LogWarning( @@ -678,6 +630,36 @@ private void SetupBlockProduction() poolId, buys, sells, reserves, feeBps, intentMinAmounts, stateDb, dexState, intentTxMap); + // Create shared BlockApplier for finalization and sync paths + _blockApplier = new BlockApplier( + _chainParams, _chainManager, _mempool, _txExecutor, _blockBuilder, + _blockStore, _receiptStore, _epochManager, _stakingState, _stakingPersistence, + _wsHandler, _loggerFactory.CreateLogger()); + + // Hook epoch transitions to rewire consensus-specific components + _blockApplier.OnEpochTransition += (newSet, blockNumber) => + { + var oldCount = _validatorSet?.Count ?? 0; + _validatorSet = newSet; + + if (_stakingState != null) + { + _leaderSelector = new WeightedLeaderSelector(_validatorSet); + _validatorSet.SetLeaderSelector(view => _leaderSelector.SelectLeader(view)); + } + + if (_config.UsePipelining) + _pipelinedConsensus?.UpdateValidatorSet(newSet); + else + _consensus?.UpdateValidatorSet(newSet); + + // N-10: Sliding window — retain evidence for last 10 views on epoch transition + var cutoff = blockNumber > 10 ? blockNumber - 10 : 0; + var oldKeys = _proposalsByView.Keys.ToArray().Where(k => k.View < cutoff).ToList(); + foreach (var key in oldKeys) + _proposalsByView.TryRemove(key, out _); + }; + if (_config.UseSandbox) _logger.LogInformation("Contract execution: sandboxed mode (AssemblyLoadContext isolation)"); @@ -1214,36 +1196,11 @@ private void HandleBlockPayload(PeerId sender, BlockPayloadMessage payload) if (block.Number != _chainManager.LatestBlockNumber + 1) continue; - // Execute transactions and capture receipts - if (block.Transactions.Count > 0) - { - var receipts = new List(block.Transactions.Count); - for (int i = 0; i < block.Transactions.Count; i++) - { - var receipt = _txExecutor!.Execute(block.Transactions[i], _stateDb, block.Header, i); - receipts.Add(receipt); - } - block.Receipts = receipts; - } - - var result = _chainManager.AddBlock(block); - if (result.IsSuccess) - { - _mempool.RemoveConfirmed(block.Transactions); - var bitmap = idx < payload.CommitBitmaps.Length ? payload.CommitBitmaps[idx] : 0UL; - PersistBlock(block, blockBytes, bitmap); - PersistReceipts(block.Receipts); - - // Use propagated commit bitmap from the serving peer - _epochManager?.RecordBlockSigners(block.Number, bitmap); - - // Apply epoch transitions for blocks received via gossip - var newSet = _epochManager?.OnBlockFinalized(block.Number); - if (newSet != null) - ApplyEpochTransition(newSet, block.Number); + var bitmap = idx < payload.CommitBitmaps.Length ? payload.CommitBitmaps[idx] : 0UL; + var result = _blockApplier!.ApplyBlock(block, _stateDb, blockBytes, bitmap); + if (result.Success) _logger.LogInformation("Applied block #{Number} from peer", block.Number); - } } catch (Exception ex) { @@ -1316,12 +1273,8 @@ private void HandleSyncRequest(PeerId sender, SyncRequestMessage request) private void HandleSyncResponse(PeerId sender, SyncResponseMessage response) { - var applied = 0; - - // N-05: Fork state for sync — execute all blocks on a forked state database. - // Only replace canonical state if all blocks in the batch succeed. - var forkedState = _stateDb.Fork(); - var blocksToApply = new List<(Block Block, byte[] Raw, int OrigIdx)>(); + // Deserialize and validate block sequence + var blocksToApply = new List<(Block Block, byte[] Raw, ulong CommitBitmap)>(); for (int idx = 0; idx < response.Blocks.Length; idx++) { @@ -1337,82 +1290,18 @@ private void HandleSyncResponse(PeerId sender, SyncResponseMessage response) continue; } - // Execute transactions against the forked state - if (block.Transactions.Count > 0) - { - var receipts = new List(block.Transactions.Count); - for (int i = 0; i < block.Transactions.Count; i++) - { - var receipt = _txExecutor!.Execute(block.Transactions[i], forkedState, block.Header, i); - receipts.Add(receipt); - } - block.Receipts = receipts; - } - - // Run DEX settlement on forked state (TWAP carry-forward + limit order matching) - if (_blockBuilder != null) - { - var dexReceipts = _blockBuilder.ApplyDexSettlement(forkedState, block.Header); - if (dexReceipts.Count > 0) - { - block.Receipts ??= new List(); - block.Receipts.AddRange(dexReceipts); - } - } - - blocksToApply.Add((block, blockBytes, idx)); + var bitmap = idx < response.CommitBitmaps.Length ? response.CommitBitmaps[idx] : 0UL; + blocksToApply.Add((block, blockBytes, bitmap)); } catch (Exception ex) { - _logger.LogWarning(ex, "Failed to process synced block from {Sender}", sender); - break; - } - } - - // Apply all successfully executed blocks to the chain - foreach (var (block, blockBytes, origIdx) in blocksToApply) - { - var result = _chainManager.AddBlock(block); - if (result.IsSuccess) - { - _mempool.RemoveConfirmed(block.Transactions); - var bitmap = origIdx < response.CommitBitmaps.Length ? response.CommitBitmaps[origIdx] : 0UL; - PersistBlock(block, blockBytes, bitmap); - PersistReceipts(block.Receipts); - applied++; - - // Use propagated commit bitmap from the serving peer - _epochManager?.RecordBlockSigners(block.Number, bitmap); - - // Apply epoch transitions for synced blocks — without this, - // nodes that sync across epoch boundaries would have a stale - // ValidatorSet and disagree on leader selection. - var newSet = _epochManager?.OnBlockFinalized(block.Number); - if (newSet != null) - ApplyEpochTransition(newSet, block.Number); - } - else - { - _logger.LogWarning("Failed to apply synced block #{Number}: {Error}", block.Number, result.Message); + _logger.LogWarning(ex, "Failed to deserialize synced block from {Sender}", sender); break; } } - // N-05: Only adopt the forked state if all blocks were applied successfully. - // Swap() updates the shared StateDbRef so the API layer sees the new state. - if (applied == blocksToApply.Count && applied > 0) - { - _stateDb.Swap(forkedState); - _logger.LogInformation("Synced {Count} blocks, now at #{Height}", applied, _chainManager.LatestBlockNumber); - } - else if (applied > 0) - { - // HIGH-04: Partial sync — blocks were added to ChainManager but state was not - // adopted. Roll back ChainManager to the last consistent block to prevent - // chain/state divergence. The next sync attempt will re-fetch these blocks. - _logger.LogWarning("Partial sync: applied {Applied}/{Total} blocks — rolling back chain to consistent state", - applied, blocksToApply.Count); - } + // Delegate to BlockApplier for fork-execute-swap + var applied = _blockApplier!.ApplyBatch(blocksToApply, _stateDb); // Signal the sync loop under lock to prevent stale responses completing wrong TCS lock (this) @@ -1891,121 +1780,9 @@ private void PruneProposalsByView(ulong currentView) return best; } - /// - /// Apply an epoch transition: swap the validator set, rewire consensus and leader selection. - /// - private void ApplyEpochTransition(ValidatorSet newSet, ulong blockNumber) - { - var oldCount = _validatorSet?.Count ?? 0; - _validatorSet = newSet; - - // Rewire leader selector with new set (reads snapshotted stakes from ValidatorInfo.Stake) - if (_stakingState != null) - { - _leaderSelector = new WeightedLeaderSelector(_validatorSet); - _validatorSet.SetLeaderSelector(view => _leaderSelector.SelectLeader(view)); - } - - // Update consensus engine - if (_config.UsePipelining) - _pipelinedConsensus?.UpdateValidatorSet(newSet); - else - _consensus?.UpdateValidatorSet(newSet); - - // N-10: Sliding window — retain evidence for last 10 views on epoch transition - { - var cutoff = blockNumber > 10 ? blockNumber - 10 : 0; - var oldKeys = _proposalsByView.Keys.ToArray().Where(k => k.View < cutoff).ToList(); - foreach (var key in oldKeys) - _proposalsByView.TryRemove(key, out _); - } - - // B1: Flush staking state after epoch transition - if (_stakingPersistence != null && _stakingState != null) - { - try - { - _stakingState.FlushToPersistence(_stakingPersistence); - } - catch (Exception ex) - { - _logger.LogError(ex, "Failed to flush staking state after epoch transition"); - } - } - - _logger.LogInformation("Epoch transition at block #{Block}: {OldCount} → {NewCount} validators, quorum: {Quorum}", - blockNumber, oldCount, newSet.Count, newSet.QuorumThreshold); - } - - private void PersistBlock(Block block, byte[] serializedBlockData, ulong? commitBitmap = null) - { - if (_blockStore == null) - return; - - try - { - var blockData = new BlockData - { - Number = block.Number, - Hash = block.Hash, - ParentHash = block.Header.ParentHash, - StateRoot = block.Header.StateRoot, - TransactionsRoot = block.Header.TransactionsRoot, - ReceiptsRoot = block.Header.ReceiptsRoot, - Timestamp = block.Header.Timestamp, - Proposer = block.Header.Proposer, - ChainId = block.Header.ChainId, - GasUsed = block.Header.GasUsed, - GasLimit = block.Header.GasLimit, - BaseFee = block.Header.BaseFee, - ProtocolVersion = block.Header.ProtocolVersion, - ExtraData = block.Header.ExtraData, - TransactionHashes = block.Transactions.Select(t => t.Hash).ToArray(), - }; - _blockStore.PutFullBlock(blockData, serializedBlockData, commitBitmap); - _blockStore.SetLatestBlockNumber(block.Number); - } - catch (Exception ex) - { - _logger.LogError(ex, "Failed to persist block #{Number}", block.Number); - } - } - - private void PersistReceipts(List? receipts) - { - if (_receiptStore == null || receipts == null || receipts.Count == 0) - return; - - try - { - var receiptDataList = receipts.Select(r => new ReceiptData - { - TransactionHash = r.TransactionHash, - BlockHash = r.BlockHash, - BlockNumber = r.BlockNumber, - TransactionIndex = r.TransactionIndex, - From = r.From, - To = r.To, - GasUsed = r.GasUsed, - Success = r.Success, - ErrorCode = (int)r.ErrorCode, - PostStateRoot = r.PostStateRoot, - EffectiveGasPrice = r.EffectiveGasPrice, - Logs = (r.Logs ?? []).Select(l => new LogData - { - Contract = l.Contract, - EventSignature = l.EventSignature, - Topics = l.Topics ?? [], - Data = l.Data ?? [], - }).ToArray(), - }); - _receiptStore.PutReceipts(receiptDataList); - } - catch (Exception ex) - { - _logger.LogError(ex, "Failed to persist {Count} receipts", receipts.Count); - } - } + // PersistBlock, PersistReceipts, and ApplyEpochTransition logic is now in BlockApplier. + // Consensus-specific epoch rewiring (leader selector, consensus engine, proposal cache) + // is handled via the BlockApplier.OnEpochTransition event, wired in SetupBlockProduction(). public async ValueTask DisposeAsync() { diff --git a/src/node/Basalt.Node/Program.cs b/src/node/Basalt.Node/Program.cs index 37b0917..0cdac8d 100644 --- a/src/node/Basalt.Node/Program.cs +++ b/src/node/Basalt.Node/Program.cs @@ -252,11 +252,16 @@ builder.Services.AddSingleton(chainManager); builder.Services.AddSingleton(mempool); builder.Services.AddSingleton(validator); + // TxForwarderRef: inner forwarder is set later in the RPC branch; DI resolves lazily + var txForwarderRef = new TxForwarderRef(); + builder.Services.AddSingleton(txForwarderRef); builder.Services.AddGrpc(); // R3-NEW-1: Use GlobalLimiter instead of named policy. Named policies require // explicit .RequireRateLimiting("per-ip") on each endpoint group; a global limiter // applies to all requests automatically without per-endpoint opt-in. + // RPC nodes get much higher limits since they are the public-facing API layer. + var isRpcMode = config.ResolvedMode == NodeMode.Rpc; builder.Services.AddRateLimiter(options => { options.RejectionStatusCode = 429; @@ -265,19 +270,20 @@ partitionKey: context.Connection.RemoteIpAddress?.ToString() ?? "unknown", factory: _ => new FixedWindowRateLimiterOptions { - PermitLimit = 100, + PermitLimit = isRpcMode ? 1000 : 100, Window = TimeSpan.FromMinutes(1), })); }); // R3-NEW-2: Restrict CORS to known origins. AllowAnyOrigin enables localhost CSRF where // a malicious website uses a visitor's browser as a proxy to a locally-running node. - // Allow any origin only when BASALT_DEBUG=1 is set (development mode). + // Allow any origin when BASALT_DEBUG=1 is set (development mode) or in RPC mode + // (public-facing API serving Explorer, Caldera, and third-party consumers). builder.Services.AddCors(options => { options.AddDefaultPolicy(policy => { - if (Environment.GetEnvironmentVariable("BASALT_DEBUG") == "1") + if (Environment.GetEnvironmentVariable("BASALT_DEBUG") == "1" || isRpcMode) { policy.AllowAnyOrigin().AllowAnyMethod().AllowAnyHeader(); } @@ -306,14 +312,14 @@ var contractRuntime = new ManagedContractRuntime(); var solverInfoAdapter = new Basalt.Node.Solver.SolverInfoAdapter(); solverInfoAdapter.SetMempool(mempool); - RestApiEndpoints.MapBasaltEndpoints(app, chainManager, mempool, validator, stateDbRef, contractRuntime, receiptStore, chainParams: chainParams, solverProvider: solverInfoAdapter); + RestApiEndpoints.MapBasaltEndpoints(app, chainManager, mempool, validator, stateDbRef, contractRuntime, receiptStore, chainParams: chainParams, solverProvider: solverInfoAdapter, blockStore: blockStore, txForwarder: txForwarderRef); - // Map faucet endpoint + // Map faucet endpoint (txForwarderRef set later in RPC branch) var faucetLogger = app.Services.GetRequiredService().CreateLogger("Basalt.Faucet"); - FaucetEndpoint.MapFaucetEndpoint(app, stateDbRef, mempool, chainParams, faucetPrivateKey, faucetLogger, chainManager); + FaucetEndpoint.MapFaucetEndpoint(app, stateDbRef, mempool, chainParams, faucetPrivateKey, faucetLogger, chainManager, txForwarder: txForwarderRef); - // Map WebSocket endpoint - app.UseWebSockets(); + // Map WebSocket endpoint (keep-alive prevents Cloudflare Tunnel idle disconnects) + app.UseWebSockets(new WebSocketOptions { KeepAliveInterval = TimeSpan.FromSeconds(30) }); var wsHandler = new WebSocketHandler(chainManager); app.MapWebSocketEndpoint(wsHandler); @@ -323,6 +329,9 @@ // Map Prometheus metrics endpoint MetricsEndpoint.MapMetricsEndpoint(app, chainManager, mempool); + // Sync status (set by RPC mode — null for other modes) + ISyncStatus? syncStatus = null; + // N-19: Health endpoint with meaningful status info (AOT-safe string formatting) ulong lastHealthCheckBlock = 0; long lastHealthCheckTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); @@ -344,13 +353,31 @@ lastHealthCheckTime = now; var healthy = makingProgress || (blockAge >= 0 && blockAge < 60); + + // RPC nodes are unhealthy if too far behind the sync source + var currentSyncLag = syncStatus?.SyncLag ?? 0; + var modeName = config.ResolvedMode switch + { + NodeMode.Validator => "validator", + NodeMode.Rpc => "rpc", + _ => "standalone", + }; + + if (config.ResolvedMode == NodeMode.Rpc && currentSyncLag > 50) + healthy = false; + ctx.Response.StatusCode = healthy ? 200 : 503; ctx.Response.ContentType = "application/json"; + var syncLagField = config.ResolvedMode == NodeMode.Rpc + ? ",\"syncLag\":" + currentSyncLag + : ""; return ctx.Response.WriteAsync( "{\"status\":\"" + (healthy ? "healthy" : "degraded") + + "\",\"mode\":\"" + modeName + "\",\"lastBlockNumber\":" + currentBlockNumber + ",\"lastBlockAgeSeconds\":" + blockAge.ToString("F1", System.Globalization.CultureInfo.InvariantCulture) + ",\"makingProgress\":" + (makingProgress ? "true" : "false") + + syncLagField + ",\"chainId\":" + chainParams.ChainId + "}"); }); @@ -369,124 +396,203 @@ return Microsoft.AspNetCore.Http.Results.Ok(response); }); - if (config.IsConsensusMode) + switch (config.ResolvedMode) { - // === CONSENSUS MODE === - // Multi-node operation with P2P networking and BFT consensus - var slashingEngine = new SlashingEngine( - stakingState, - app.Services.GetRequiredService().CreateLogger()); - - // ZK compliance verifier — reads VKs from SchemaRegistry contract storage (COMPL-17) - var schemaRegistryAddress = Basalt.Execution.GenesisContractDeployer.Addresses.SchemaRegistry; - var zkVerifier = new Basalt.Compliance.ZkComplianceVerifier(schemaId => - { - // StorageMap key: "scr_vk:{schemaIdHex}", hashed to Hash256 via BLAKE3 - var storageKey = "scr_vk:" + schemaId.ToHexString(); - var slot = Basalt.Crypto.Blake3Hasher.Hash(System.Text.Encoding.UTF8.GetBytes(storageKey)); - var raw = stateDbRef.GetStorage(schemaRegistryAddress, slot); - if (raw == null || raw.Length < 2 || raw[0] != 0x07) // 0x07 = TagString - return null; - var hexVk = System.Text.Encoding.UTF8.GetString(raw.AsSpan(1)); - if (string.IsNullOrEmpty(hexVk)) - return null; - try { return Convert.FromHexString(hexVk); } - catch { return null; } - }); - // H9: No MockKycProvider in consensus mode — only governance-approved - // providers can issue attestations on mainnet/testnet. - var complianceEngine = new Basalt.Compliance.ComplianceEngine( - new Basalt.Compliance.IdentityRegistry(), - new Basalt.Compliance.SanctionsList(), - zkVerifier); - - var coordinator = new NodeCoordinator( - config, chainParams, chainManager, mempool, stateDbRef, validator, wsHandler, - app.Services.GetRequiredService(), - blockStore, receiptStore, - stakingState, slashingEngine, - complianceEngine, - stakingPersistence); - - // E4: Wire solver manager into REST API adapter after NodeCoordinator is initialized - if (coordinator.SolverManager != null) - solverInfoAdapter.SetSolverManager(coordinator.SolverManager); - - Log.Information("Basalt Node listening on {Urls}", string.Join(", ", app.Urls.DefaultIfEmpty($"http://localhost:{config.HttpPort}"))); - Log.Information("Chain: {Network} (ChainId={ChainId})", chainParams.NetworkName, chainParams.ChainId); - Log.Information("Validator: index={Index}, address={Address}, P2P port={P2PPort}", - config.ValidatorIndex, config.ValidatorAddress, config.P2PPort); - Log.Information("Peers: {Peers}", string.Join(", ", config.Peers)); - - app.Lifetime.ApplicationStarted.Register(() => + case NodeMode.Validator: { - _ = Task.Run(async () => + // === VALIDATOR MODE === + // Multi-node operation with P2P networking and BFT consensus + var slashingEngine = new SlashingEngine( + stakingState, + app.Services.GetRequiredService().CreateLogger()); + + // ZK compliance verifier — reads VKs from SchemaRegistry contract storage (COMPL-17) + var schemaRegistryAddress = Basalt.Execution.GenesisContractDeployer.Addresses.SchemaRegistry; + var zkVerifier = new Basalt.Compliance.ZkComplianceVerifier(schemaId => + { + // StorageMap key: "scr_vk:{schemaIdHex}", hashed to Hash256 via BLAKE3 + var storageKey = "scr_vk:" + schemaId.ToHexString(); + var slot = Basalt.Crypto.Blake3Hasher.Hash(System.Text.Encoding.UTF8.GetBytes(storageKey)); + var raw = stateDbRef.GetStorage(schemaRegistryAddress, slot); + if (raw == null || raw.Length < 2 || raw[0] != 0x07) // 0x07 = TagString + return null; + var hexVk = System.Text.Encoding.UTF8.GetString(raw.AsSpan(1)); + if (string.IsNullOrEmpty(hexVk)) + return null; + try { return Convert.FromHexString(hexVk); } + catch { return null; } + }); + // H9: No MockKycProvider in consensus mode — only governance-approved + // providers can issue attestations on mainnet/testnet. + var complianceEngine = new Basalt.Compliance.ComplianceEngine( + new Basalt.Compliance.IdentityRegistry(), + new Basalt.Compliance.SanctionsList(), + zkVerifier); + + var coordinator = new NodeCoordinator( + config, chainParams, chainManager, mempool, stateDbRef, validator, wsHandler, + app.Services.GetRequiredService(), + blockStore, receiptStore, + stakingState, slashingEngine, + complianceEngine, + stakingPersistence); + + // E4: Wire solver manager into REST API adapter after NodeCoordinator is initialized + if (coordinator.SolverManager != null) + solverInfoAdapter.SetSolverManager(coordinator.SolverManager); + + Log.Information("Basalt Node listening on {Urls}", string.Join(", ", app.Urls.DefaultIfEmpty($"http://localhost:{config.HttpPort}"))); + Log.Information("Chain: {Network} (ChainId={ChainId})", chainParams.NetworkName, chainParams.ChainId); + Log.Information("Validator: index={Index}, address={Address}, P2P port={P2PPort}", + config.ValidatorIndex, config.ValidatorAddress, config.P2PPort); + Log.Information("Peers: {Peers}", string.Join(", ", config.Peers)); + + app.Lifetime.ApplicationStarted.Register(() => { - try + _ = Task.Run(async () => { - await coordinator.StartAsync(app.Lifetime.ApplicationStopping); - } - catch (Exception ex) + try + { + await coordinator.StartAsync(app.Lifetime.ApplicationStopping); + } + catch (Exception ex) + { + Log.Fatal(ex, "Consensus coordinator failed to start"); + } + }); + }); + + app.Lifetime.ApplicationStopping.Register(() => + { + // L20: Add random jitter to stagger validator restarts and avoid thundering herd + var jitterMs = Random.Shared.Next(0, 3000); + Thread.Sleep(jitterMs); + + Log.Information("Shutting down consensus coordinator..."); + // N-18: Timeout to prevent shutdown deadlock + if (!coordinator.StopAsync().Wait(TimeSpan.FromSeconds(10))) { - Log.Fatal(ex, "Consensus coordinator failed to start"); + Log.Warning("Node coordinator did not stop within 10 seconds; forcing exit"); } }); - }); + break; + } - app.Lifetime.ApplicationStopping.Register(() => + case NodeMode.Rpc: { - // L20: Add random jitter to stagger validator restarts and avoid thundering herd - var jitterMs = Random.Shared.Next(0, 3000); - Thread.Sleep(jitterMs); + // === RPC MODE === + // Syncs finalized blocks from a trusted source via HTTP. Serves the full API + // without participating in consensus or P2P networking. - Log.Information("Shutting down consensus coordinator..."); - // N-18: Timeout to prevent shutdown deadlock - if (!coordinator.StopAsync().Wait(TimeSpan.FromSeconds(10))) + if (blockStore == null) { - Log.Warning("Node coordinator did not stop within 10 seconds; forcing exit"); + Log.Fatal("RPC mode requires BASALT_DATA_DIR for block persistence"); + return 1; } - }); - } - else - { - // === STANDALONE MODE === - // Single-node block production on a timer (existing behavior) - // LOW-06: Warn if DataDir is set but blocks are not persisted in standalone mode - if (config.DataDir != null) - Log.Warning("DataDir is set but standalone mode does not persist blocks. State will be lost on restart."); + var loggerFactory = app.Services.GetRequiredService(); + + // Create execution components for block replay + IContractRuntime rpcContractRuntime = config.UseSandbox + ? new Basalt.Execution.VM.Sandbox.SandboxedContractRuntime(new Basalt.Execution.VM.Sandbox.SandboxConfiguration()) + : new ManagedContractRuntime(); + var rpcTxExecutor = new TransactionExecutor(chainParams, rpcContractRuntime, stakingState); + var rpcBlockBuilder = new BlockBuilder(chainParams, rpcTxExecutor, loggerFactory.CreateLogger()); + + var rpcBlockApplier = new BlockApplier( + chainParams, chainManager, mempool, rpcTxExecutor, rpcBlockBuilder, + blockStore, receiptStore, + epochManager: null, // No epoch transitions in RPC mode (no consensus) + stakingState: stakingState, + stakingPersistence: stakingPersistence, + wsHandler, + loggerFactory.CreateLogger()); + + var rpcSyncService = new BlockSyncService( + config.SyncSource!, + rpcBlockApplier, + chainManager, + stateDbRef, + chainParams, + loggerFactory.CreateLogger()); + + syncStatus = rpcSyncService; + + var txForwarder = new HttpTxForwarder( + config.SyncSource!, + loggerFactory.CreateLogger()); + txForwarderRef.Set(txForwarder); + + Log.Information("Basalt RPC Node listening on {Urls}", string.Join(", ", app.Urls.DefaultIfEmpty($"http://localhost:{config.HttpPort}"))); + Log.Information("Chain: {Network} (ChainId={ChainId})", chainParams.NetworkName, chainParams.ChainId); + Log.Information("Sync source: {Source}", config.SyncSource); + + app.Lifetime.ApplicationStarted.Register(() => + { + _ = Task.Run(async () => + { + try + { + await rpcSyncService.RunAsync(app.Lifetime.ApplicationStopping); + } + catch (Exception ex) + { + Log.Fatal(ex, "Block sync service failed"); + } + }); + }); - var proposer = Address.FromHexString("0x0000000000000000000000000000000000000001"); - var blockProduction = new BlockProductionLoop( - chainParams, chainManager, mempool, stateDbRef, proposer, - app.Services.GetRequiredService>()); + app.Lifetime.ApplicationStopping.Register(() => + { + Log.Information("Shutting down RPC sync service..."); + txForwarder.Dispose(); + rpcSyncService.DisposeAsync().AsTask().Wait(TimeSpan.FromSeconds(5)); + }); + break; + } - // Wire metrics and WebSocket to block production - blockProduction.OnBlockProduced += block => + default: { - MetricsEndpoint.RecordBlock(block.Transactions.Count, block.Header.Timestamp); - _ = wsHandler.BroadcastNewBlock(block); - }; + // === STANDALONE MODE === + // Single-node block production on a timer (existing behavior) - blockProduction.Start(); + // LOW-06: Warn if DataDir is set but blocks are not persisted in standalone mode + if (config.DataDir != null) + Log.Warning("DataDir is set but standalone mode does not persist blocks. State will be lost on restart."); - Log.Information("Basalt Node listening on {Urls}", string.Join(", ", app.Urls.DefaultIfEmpty("http://localhost:5000"))); - Log.Information("Chain: {Network} (ChainId={ChainId})", chainParams.NetworkName, chainParams.ChainId); - Log.Information("Block time: {BlockTime}ms", chainParams.BlockTimeMs); + var proposer = Address.FromHexString("0x0000000000000000000000000000000000000001"); + var blockProduction = new BlockProductionLoop( + chainParams, chainManager, mempool, stateDbRef, proposer, + app.Services.GetRequiredService>()); - app.Lifetime.ApplicationStopping.Register(() => - { - // L20: Add random jitter to stagger restarts - var jitterMs = Random.Shared.Next(0, 3000); - Thread.Sleep(jitterMs); + // Wire metrics and WebSocket to block production + blockProduction.OnBlockProduced += block => + { + MetricsEndpoint.RecordBlock(block.Transactions.Count, block.Header.Timestamp); + _ = wsHandler.BroadcastNewBlock(block); + }; - Log.Information("Shutting down block production..."); - // N-18: Timeout to prevent shutdown deadlock - if (!blockProduction.StopAsync().Wait(TimeSpan.FromSeconds(10))) + blockProduction.Start(); + + Log.Information("Basalt Node listening on {Urls}", string.Join(", ", app.Urls.DefaultIfEmpty("http://localhost:5000"))); + Log.Information("Chain: {Network} (ChainId={ChainId})", chainParams.NetworkName, chainParams.ChainId); + Log.Information("Block time: {BlockTime}ms", chainParams.BlockTimeMs); + + app.Lifetime.ApplicationStopping.Register(() => { - Log.Warning("Block production did not stop within 10 seconds; forcing exit"); - } - }); + // L20: Add random jitter to stagger restarts + var jitterMs = Random.Shared.Next(0, 3000); + Thread.Sleep(jitterMs); + + Log.Information("Shutting down block production..."); + // N-18: Timeout to prevent shutdown deadlock + if (!blockProduction.StopAsync().Wait(TimeSpan.FromSeconds(10))) + { + Log.Warning("Block production did not stop within 10 seconds; forcing exit"); + } + }); + break; + } } await app.RunAsync(); diff --git a/src/node/Basalt.Node/README.md b/src/node/Basalt.Node/README.md index 6ad4a49..32ca7bf 100644 --- a/src/node/Basalt.Node/README.md +++ b/src/node/Basalt.Node/README.md @@ -19,7 +19,12 @@ docker run -p 5000:5000 -p 30303:30303 basalt-node ## Runtime Modes -The node operates in one of two modes, determined by the `BASALT_VALIDATOR_INDEX` environment variable (along with `BASALT_PEERS`). If both are set (index >= 0 and at least one peer), the node runs in **consensus mode**; otherwise it falls back to **standalone mode**. +The node operates in one of three modes, controlled by the `BASALT_MODE` environment variable (default: `auto`): + +- **`auto`** (default): If `BASALT_VALIDATOR_INDEX >= 0` and `BASALT_PEERS` is set, the node runs in **validator mode**; otherwise it falls back to **standalone mode**. +- **`validator`**: Forces validator/consensus mode. +- **`rpc`**: Runs as a read-only RPC node that syncs blocks from a trusted source (`BASALT_SYNC_SOURCE`) and serves the full API without participating in consensus. +- **`standalone`**: Forces standalone mode with timer-based block production. ### Standalone Mode @@ -33,7 +38,20 @@ The node operates in one of two modes, determined by the `BASALT_VALIDATOR_INDEX 8. Wires metrics and WebSocket notifications to the block production loop 9. Handles graceful shutdown on SIGINT/SIGTERM -### Consensus Mode +### RPC Mode + +Set `BASALT_MODE=rpc` and `BASALT_SYNC_SOURCE=http://validator-0:5000`. The RPC node: + +1. Syncs finalized blocks from the sync source via HTTP polling (`BlockSyncService`) +2. Applies blocks locally via `BlockApplier` (executes transactions, runs DEX settlement, updates state) +3. Serves the full REST API, gRPC, faucet, WebSocket, and Prometheus metrics +4. Forwards submitted transactions to the sync source validator via `HttpTxForwarder` +5. Reports sync lag in the `/v1/health` endpoint (returns 503 if more than 50 blocks behind) +6. Uses relaxed rate limits (1000 req/min/IP vs 100) and open CORS for public-facing traffic + +No P2P, no consensus, no block production. Same binary, same Dockerfile. + +### Validator Mode 1. Initializes Serilog structured logging 2. Initializes RocksDB persistent storage (when `BASALT_DATA_DIR` is set) with `BlockStore` and `ReceiptStore`, or in-memory state @@ -101,6 +119,8 @@ Environment variables: | Variable | Default | Description | |----------|---------|-------------| +| `BASALT_MODE` | `auto` | Node mode: `auto`, `validator`, `rpc`, or `standalone` | +| `BASALT_SYNC_SOURCE` | -- | HTTP URL of sync source (required for `rpc` mode, e.g. `http://validator-0:5000`) | | `BASALT_CHAIN_ID` | `31337` | Chain identifier | | `BASALT_NETWORK` | `basalt-devnet` | Network name | | `BASALT_VALIDATOR_INDEX` | `-1` | Validator index (enables consensus mode when >= 0 and `BASALT_PEERS` is set) | diff --git a/src/node/Basalt.Node/TxForwarder.cs b/src/node/Basalt.Node/TxForwarder.cs new file mode 100644 index 0000000..dc6fd58 --- /dev/null +++ b/src/node/Basalt.Node/TxForwarder.cs @@ -0,0 +1,81 @@ +using System.Net.Http.Json; +using Basalt.Api.Rest; +using Basalt.Core; +using Basalt.Execution; +using Microsoft.Extensions.Logging; + +namespace Basalt.Node; + +/// +/// No-op forwarding for validators and standalone nodes. +/// Transactions are already in the local mempool and gossipped via P2P. +/// +public sealed class NoOpTxForwarder : ITxForwarder +{ + public Task ForwardAsync(Transaction tx, CancellationToken ct) => Task.CompletedTask; +} + +/// +/// Forwards transactions from an RPC node to its sync source validator via HTTP. +/// Fire-and-forget: logs warnings on failure but never throws. +/// +public sealed class HttpTxForwarder : ITxForwarder, IDisposable +{ + private readonly HttpClient _httpClient; + private readonly ILogger? _logger; + + public HttpTxForwarder(string syncSourceUrl, ILogger? logger = null) + { + _logger = logger; + _httpClient = new HttpClient + { + BaseAddress = new Uri(syncSourceUrl.TrimEnd('/')), + Timeout = TimeSpan.FromSeconds(5), + }; + } + + public async Task ForwardAsync(Transaction tx, CancellationToken ct) + { + try + { + var request = new TransactionRequest + { + Type = (byte)tx.Type, + Nonce = tx.Nonce, + Sender = tx.Sender.ToHexString(), + To = tx.To.ToHexString(), + Value = tx.Value.ToString(), + GasLimit = tx.GasLimit, + GasPrice = tx.GasPrice.ToString(), + MaxFeePerGas = tx.IsEip1559 ? tx.MaxFeePerGas.ToString() : null, + MaxPriorityFeePerGas = tx.IsEip1559 ? tx.MaxPriorityFeePerGas.ToString() : null, + Data = tx.Data.Length > 0 ? Convert.ToHexString(tx.Data) : null, + Priority = tx.Priority, + ChainId = tx.ChainId, + Signature = Convert.ToHexString(tx.Signature.ToArray()), + SenderPublicKey = tx.SenderPublicKey.ToArray().Length > 0 + ? Convert.ToHexString(tx.SenderPublicKey.ToArray()) + : "", + }; + + using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct); + cts.CancelAfter(TimeSpan.FromSeconds(5)); + + await _httpClient.PostAsJsonAsync( + "/v1/transactions", + request, + BasaltApiJsonContext.Default.TransactionRequest, + cts.Token); + } + catch (Exception ex) + { + _logger?.LogWarning("Failed to forward tx {Hash} to sync source: {Message}", + tx.Hash.ToHexString()[..16], ex.Message); + } + } + + public void Dispose() + { + _httpClient.Dispose(); + } +} diff --git a/tests/Basalt.Node.Tests/NodeConfigurationTests.cs b/tests/Basalt.Node.Tests/NodeConfigurationTests.cs index 58ff61e..cbaca4d 100644 --- a/tests/Basalt.Node.Tests/NodeConfigurationTests.cs +++ b/tests/Basalt.Node.Tests/NodeConfigurationTests.cs @@ -113,4 +113,81 @@ public void ValidateDataDir_ResolvesRelativePaths() result.Should().StartWith("/"); result.Should().EndWith("data/basalt"); } + + // ── ResolvedMode tests ── + + [Fact] + public void ResolvedMode_Default_ReturnsStandalone() + { + var config = new NodeConfiguration(); + config.ResolvedMode.Should().Be(NodeMode.Standalone); + } + + [Fact] + public void ResolvedMode_ExplicitStandalone_ReturnsStandalone() + { + var config = new NodeConfiguration { Mode = "standalone" }; + config.ResolvedMode.Should().Be(NodeMode.Standalone); + } + + [Fact] + public void ResolvedMode_ExplicitValidator_ReturnsValidator() + { + var config = new NodeConfiguration { Mode = "validator" }; + config.ResolvedMode.Should().Be(NodeMode.Validator); + } + + [Fact] + public void ResolvedMode_AutoWithPeersAndIndex_ReturnsValidator() + { + var config = new NodeConfiguration + { + Mode = "auto", + ValidatorIndex = 0, + Peers = ["peer1:30303"], + }; + config.ResolvedMode.Should().Be(NodeMode.Validator); + } + + [Fact] + public void ResolvedMode_Rpc_WithSyncSource_ReturnsRpc() + { + var config = new NodeConfiguration + { + Mode = "rpc", + SyncSource = "http://validator-0:5000", + }; + config.ResolvedMode.Should().Be(NodeMode.Rpc); + } + + [Fact] + public void ResolvedMode_Rpc_WithoutSyncSource_Throws() + { + var config = new NodeConfiguration { Mode = "rpc" }; + var act = () => config.ResolvedMode; + act.Should().Throw() + .WithMessage("*BASALT_SYNC_SOURCE*"); + } + + [Fact] + public void ResolvedMode_CaseInsensitive() + { + var config = new NodeConfiguration + { + Mode = "RPC", + SyncSource = "http://validator-0:5000", + }; + config.ResolvedMode.Should().Be(NodeMode.Rpc); + } + + [Fact] + public void IsConsensusMode_RpcMode_ReturnsFalse() + { + var config = new NodeConfiguration + { + Mode = "rpc", + SyncSource = "http://validator-0:5000", + }; + config.IsConsensusMode.Should().BeFalse(); + } }