Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
319 changes: 259 additions & 60 deletions src/index/base.cpp

Large diffs are not rendered by default.

43 changes: 42 additions & 1 deletion src/index/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,25 @@
#include <util/threadinterrupt.h>
#include <validationinterface.h>

#include <any>
#include <string>

class CBlock;
class CBlockIndex;
class Chainstate;
class ChainstateManager;
class ThreadPool;
namespace interfaces {
class Chain;
} // namespace interfaces

/** Maximum number of threads a single thread pool instance can have */
static constexpr int16_t MAX_INDEX_WORKERS_COUNT = 100;
/** Number of concurrent jobs during the initial sync process */
static constexpr int16_t INDEX_WORKERS_COUNT = 0;
/** Number of tasks processed by each worker */
static constexpr int16_t INDEX_WORK_PER_CHUNK = 1000;

struct IndexSummary {
std::string name;
bool synced{false};
Expand Down Expand Up @@ -80,6 +89,9 @@ class BaseIndex : public CValidationInterface
std::thread m_thread_sync;
CThreadInterrupt m_interrupt;

ThreadPool* m_thread_pool{nullptr};
int m_blocks_per_worker{INDEX_WORK_PER_CHUNK};

/// Write the current index state (eg. chain block locator and subclass-specific items) to disk.
///
/// Recommendations for error handling:
Expand All @@ -93,7 +105,8 @@ class BaseIndex : public CValidationInterface
/// Loop over disconnected blocks and call CustomRemove.
bool Rewind(const CBlockIndex* current_tip, const CBlockIndex* new_tip);

bool ProcessBlock(const CBlockIndex* pindex, const CBlock* block_data = nullptr);
std::any ProcessBlock(const CBlockIndex* pindex, const CBlock* block_data = nullptr);
std::vector<std::any> ProcessBlocks(bool process_in_order, const CBlockIndex* start, const CBlockIndex* end);

virtual bool AllowPrune() const = 0;

Expand Down Expand Up @@ -131,6 +144,26 @@ class BaseIndex : public CValidationInterface
/// Update the internal best block index as well as the prune lock.
void SetBestBlockIndex(const CBlockIndex* block);

/// If 'AllowParallelSync()' returns true, 'ProcessBlock()' will run concurrently in batches.
/// The 'std::any' result will be passed to 'PostProcessBlocks()' so the index can process
/// async result batches in a synchronous fashion (if required).
[[nodiscard]] virtual std::any CustomProcessBlock(const interfaces::BlockInfo& block_info) {
// If parallel sync is enabled, the child class must implement this method.
if (AllowParallelSync()) return std::any();

// Default, synchronous write
if (!CustomAppend(block_info)) {
throw std::runtime_error(strprintf("%s: Failed to write block %s to index database",
__func__, block_info.hash.ToString()));
}
return true;
}

/// 'PostProcessBlocks()' is called in a synchronous manner after a batch of async 'ProcessBlock()'
/// calls have completed.
/// Here the index usually links and dump information that cannot be processed in an asynchronous fashion.
[[nodiscard]] virtual bool CustomPostProcessBlocks(const std::any& obj) { return true; };

public:
BaseIndex(std::unique_ptr<interfaces::Chain> chain, std::string name, int start_height = 0);
/// Destructor interrupts sync thread if running and blocks until it exits.
Expand All @@ -139,6 +172,8 @@ class BaseIndex : public CValidationInterface
/// Get the name of the index for display in logs.
const std::string& GetName() const LIFETIMEBOUND { return m_name; }

void SetThreadPool(ThreadPool& thread_pool) { m_thread_pool = &thread_pool; }

/// Blocks the current thread until the index is caught up to the current
/// state of the block chain. This only blocks if the index has gotten in
/// sync once and only needs to process blocks in the ValidationInterface
Expand All @@ -165,6 +200,12 @@ class BaseIndex : public CValidationInterface
/// Stops the instance from staying in sync with blockchain updates.
void Stop();

/// Number of blocks each worker thread will process at a time
void SetBlocksPerWorker(int count) { m_blocks_per_worker = count; }

/// True if the child class allows concurrent sync.
virtual bool AllowParallelSync() { return false; }

/// Get a summary of the index and its state.
IndexSummary GetSummary() const;
};
Expand Down
3 changes: 0 additions & 3 deletions src/index/bip352.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,6 @@ bool BIP352Index::CustomAppend(const interfaces::BlockInfo& block)
// is needed on non-mainnet chains because m_start_height is 0 by default.
if (block.height == 0) return true;

// Exclude pre-taproot
if (block.height < m_start_height) return true;

tweak_index_entry index_entry;
GetSilentPaymentKeys(Assert(block.data)->vtx, *Assert(block.undo_data), index_entry);

Expand Down
9 changes: 9 additions & 0 deletions src/index/bip352.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ class BIP352Index final : public BaseIndex

bool CustomAppend(const interfaces::BlockInfo& block) override;

std::any CustomProcessBlock(const interfaces::BlockInfo& block) override {
// Exclude pre-taproot
if (block.height < m_start_height) return true;

return CustomAppend(block);
}

BaseIndex::DB& GetDB() const override;
public:

Expand All @@ -70,6 +77,8 @@ class BIP352Index final : public BaseIndex
// Destructor is declared because this class contains a unique_ptr to an incomplete type.
virtual ~BIP352Index() override;

bool AllowParallelSync() override { return true; }

bool FindSilentPayment(const uint256& block_hash, tweak_index_entry& index_entry) const;
};

Expand Down
17 changes: 17 additions & 0 deletions src/index/blockfilterindex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,23 @@ bool BlockFilterIndex::Write(const BlockFilter& filter, uint32_t block_height, c
return true;
}

std::any BlockFilterIndex::CustomProcessBlock(const interfaces::BlockInfo& block_info)
{
return std::make_pair(BlockFilter(BlockFilterType::BASIC, *block_info.data, *block_info.undo_data), block_info.height);
}

bool BlockFilterIndex::CustomPostProcessBlocks(const std::any& obj)
{
const auto& [filter, height] = std::any_cast<std::pair<BlockFilter, int>>(obj);
const uint256& header = filter.ComputeHeader(m_last_header);
if (!Write(filter, height, header)) {
LogError("Error writing filters, shutting down block filters index\n");
return false;
}
m_last_header = header;
return true;
}

[[nodiscard]] static bool CopyHeightIndexToHashIndex(CDBIterator& db_it, CDBBatch& batch,
const std::string& index_name, int height)
{
Expand Down
5 changes: 5 additions & 0 deletions src/index/blockfilterindex.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,18 @@ class BlockFilterIndex final : public BaseIndex

BaseIndex::DB& GetDB() const LIFETIMEBOUND override { return *m_db; }

std::any CustomProcessBlock(const interfaces::BlockInfo& block) override;
bool CustomPostProcessBlocks(const std::any& obj) override;

public:
/** Constructs the index, which becomes available to be queried. */
explicit BlockFilterIndex(std::unique_ptr<interfaces::Chain> chain, BlockFilterType filter_type,
size_t n_cache_size, bool f_memory = false, bool f_wipe = false);

BlockFilterType GetFilterType() const { return m_filter_type; }

bool AllowParallelSync() override { return true; }

/** Get a single filter by block. */
bool LookupFilter(const CBlockIndex* block_index, BlockFilter& filter_out) const;

Expand Down
6 changes: 6 additions & 0 deletions src/index/txindex.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,19 @@ class TxIndex final : public BaseIndex

BaseIndex::DB& GetDB() const override;

std::any CustomProcessBlock(const interfaces::BlockInfo& block) override {
return CustomAppend(block);
}

public:
/// Constructs the index, which becomes available to be queried.
explicit TxIndex(std::unique_ptr<interfaces::Chain> chain, size_t n_cache_size, bool f_memory = false, bool f_wipe = false);

// Destructor is declared because this class contains a unique_ptr to an incomplete type.
virtual ~TxIndex() override;

bool AllowParallelSync() override { return true; }

/// Look up a transaction by hash.
///
/// @param[in] tx_hash The hash of the transaction to be returned.
Expand Down
19 changes: 18 additions & 1 deletion src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
#include <util/syserror.h>
#include <util/thread.h>
#include <util/threadnames.h>
#include <util/threadpool.h>
#include <util/time.h>
#include <util/translation.h>
#include <validation.h>
Expand Down Expand Up @@ -351,6 +352,7 @@ void Shutdown(NodeContext& node)

// Stop and delete all indexes only after flushing background callbacks.
for (auto* index : node.indexes) index->Stop();
if (node.m_index_threads) node.m_index_threads.reset();
if (g_txindex) g_txindex.reset();
if (g_coin_stats_index) g_coin_stats_index.reset();
if (g_bip352_index) g_bip352_index.reset();
Expand Down Expand Up @@ -525,6 +527,7 @@ void SetupServerArgs(ArgsManager& argsman, bool can_listen_ipc)
strprintf("Maintain an index of compact filters by block (default: %s, values: %s).", DEFAULT_BLOCKFILTERINDEX, ListBlockFilterTypes()) +
" If <type> is not supplied or if <type> = 1, indexes for all known types are enabled.",
ArgsManager::ALLOW_ANY, OptionsCategory::OPTIONS);
argsman.AddArg("-indexworkers=<n>", strprintf("Number of worker threads spawned for the indexes initial sync process (default: %d).", INDEX_WORKERS_COUNT), ArgsManager::ALLOW_ANY, OptionsCategory::OPTIONS);

argsman.AddArg("-addnode=<ip>", strprintf("Add a node to connect to and attempt to keep the connection open (see the addnode RPC help for more info). This option can be specified multiple times to add multiple nodes; connections are limited to %u at a time and are counted separately from the -maxconnections limit.", MAX_ADDNODE_CONNECTIONS), ArgsManager::ALLOW_ANY | ArgsManager::NETWORK_ONLY, OptionsCategory::CONNECTION);
argsman.AddArg("-asmap=<file>", strprintf("Specify asn mapping used for bucketing of the peers (default: %s). Relative paths will be prefixed by the net-specific datadir location.", DEFAULT_ASMAP_FILENAME), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION);
Expand Down Expand Up @@ -2149,6 +2152,8 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)

bool StartIndexBackgroundSync(NodeContext& node)
{
if (node.indexes.empty()) return true;

// Find the oldest block among all indexes.
// This block is used to verify that we have the required blocks' data stored on disk,
// starting from that point up to the current tip.
Expand Down Expand Up @@ -2187,7 +2192,19 @@ bool StartIndexBackgroundSync(NodeContext& node)
}
}

if (node.args->IsArgSet("-indexworkers")) {
int index_workers = node.args->GetIntArg("-indexworkers", INDEX_WORKERS_COUNT);
if (index_workers < 0 || index_workers > MAX_INDEX_WORKERS_COUNT) return InitError(Untranslated(strprintf("Invalid -indexworkers arg. Must be a number in-between 1 and %d", MAX_INDEX_WORKERS_COUNT)));

node.m_index_threads = std::make_unique<ThreadPool>();
node.m_index_threads->Start(index_workers);
}

// Start threads
for (auto index : node.indexes) if (!index->StartBackgroundSync()) return false;
for (auto index : node.indexes) {
// Provide thread pool to indexes
if (node.m_index_threads && index->AllowParallelSync()) index->SetThreadPool(*node.m_index_threads);
if (!index->StartBackgroundSync()) return false;
}
return true;
}
3 changes: 3 additions & 0 deletions src/node/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#ifndef BITCOIN_NODE_CONTEXT_H
#define BITCOIN_NODE_CONTEXT_H

#include <util/threadpool.h>

#include <atomic>
#include <cstdlib>
#include <functional>
Expand Down Expand Up @@ -90,6 +92,7 @@ struct NodeContext {
//! Manages all the node warnings
std::unique_ptr<node::Warnings> warnings;
std::thread background_init_thread;
std::unique_ptr<ThreadPool> m_index_threads;

//! Declare default constructor and destructor that are not inline, so code
//! instantiating the NodeContext struct doesn't need to #include class
Expand Down
1 change: 1 addition & 0 deletions src/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ add_executable(test_bitcoin
sync_tests.cpp
system_tests.cpp
testnet4_miner_tests.cpp
threadpool_tests.cpp
timeoffsets_tests.cpp
torcontrol_tests.cpp
transaction_tests.cpp
Expand Down
45 changes: 45 additions & 0 deletions src/test/blockfilter_index_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <pow.h>
#include <test/util/blockfilter.h>
#include <test/util/setup_common.h>
#include <util/threadpool.h>
#include <validation.h>

#include <boost/test/unit_test.hpp>
Expand Down Expand Up @@ -268,6 +269,50 @@ BOOST_FIXTURE_TEST_CASE(blockfilter_index_initial_sync, BuildChainTestingSetup)
filter_index.Stop();
}

BOOST_FIXTURE_TEST_CASE(blockfilter_index_parallel_initial_sync, BuildChainTestingSetup)
{
int tip_height = 100; // pre-mined blocks
const uint16_t MINE_BLOCKS = 650;
for (int round = 0; round < 2; round++) { // two rounds to test sync from genesis and from a higher block
// Generate blocks
mineBlocks(MINE_BLOCKS);
const CBlockIndex* tip = WITH_LOCK(::cs_main, return m_node.chainman->ActiveChain().Tip());
BOOST_REQUIRE(tip->nHeight == MINE_BLOCKS + tip_height);
tip_height = tip->nHeight;

// Init index
BlockFilterIndex filter_index(interfaces::MakeChain(m_node), BlockFilterType::BASIC, 1 << 20, /*f_memory=*/false);
BOOST_REQUIRE(filter_index.Init());

ThreadPool thread_pool;
thread_pool.Start(2);
filter_index.SetThreadPool(thread_pool);
filter_index.SetBlocksPerWorker(200);

// Start sync
BOOST_CHECK(!filter_index.BlockUntilSyncedToCurrentChain());
filter_index.Sync();
const auto& summary{filter_index.GetSummary()};
BOOST_CHECK(summary.synced);
BOOST_CHECK_EQUAL(summary.best_block_height, tip_height);

// Check that filter index has all blocks that were in the chain before it started.
{
uint256 last_header;
LOCK(cs_main);
const CBlockIndex* block_index;
for (block_index = m_node.chainman->ActiveChain().Genesis();
block_index != nullptr;
block_index = m_node.chainman->ActiveChain().Next(block_index)) {
CheckFilterLookups(filter_index, block_index, last_header, m_node.chainman->m_blockman);
}
}

filter_index.Interrupt();
filter_index.Stop();
}
}

BOOST_FIXTURE_TEST_CASE(blockfilter_index_init_destroy, BasicTestingSetup)
{
BlockFilterIndex* filter_index;
Expand Down
Loading
Loading