From 02a8a512dc423b732fdbff5aa3a3873d31132e7b Mon Sep 17 00:00:00 2001 From: SChernykh Date: Thu, 14 Jul 2022 09:04:14 +0200 Subject: [PATCH] SideChain: precalculate tx pubkeys for faster sync --- src/pool_block.cpp | 4 + src/pool_block.h | 2 + src/side_chain.cpp | 176 +++++++++++++++++++++++++++++++++ src/side_chain.h | 23 ++++- tests/src/pool_block_tests.cpp | 52 +++++----- 5 files changed, 230 insertions(+), 27 deletions(-) diff --git a/src/pool_block.cpp b/src/pool_block.cpp index 9db3091..ecedfdf 100644 --- a/src/pool_block.cpp +++ b/src/pool_block.cpp @@ -53,6 +53,7 @@ PoolBlock::PoolBlock() , m_invalid(false) , m_broadcasted(false) , m_wantBroadcast(false) + , m_precalculated(false) , m_localTimestamp(seconds_since_epoch()) { uv_mutex_init_checked(&m_lock); @@ -114,6 +115,7 @@ PoolBlock& PoolBlock::operator=(const PoolBlock& b) m_invalid = b.m_invalid; m_broadcasted = b.m_broadcasted; m_wantBroadcast = b.m_wantBroadcast; + m_precalculated = b.m_precalculated; m_localTimestamp = seconds_since_epoch(); @@ -242,6 +244,8 @@ void PoolBlock::reset_offchain_data() m_broadcasted = false; m_wantBroadcast = false; + m_precalculated = false; + m_localTimestamp = seconds_since_epoch(); } diff --git a/src/pool_block.h b/src/pool_block.h index 8a0c3b2..ba50ce9 100644 --- a/src/pool_block.h +++ b/src/pool_block.h @@ -132,6 +132,8 @@ struct PoolBlock mutable bool m_broadcasted; mutable bool m_wantBroadcast; + bool m_precalculated; + uint64_t m_localTimestamp; void serialize_mainchain_data(uint32_t nonce, uint32_t extra_nonce, const hash& sidechain_hash); diff --git a/src/side_chain.cpp b/src/side_chain.cpp index 7093e1b..31b0b4c 100644 --- a/src/side_chain.cpp +++ b/src/side_chain.cpp @@ -63,6 +63,7 @@ SideChain::SideChain(p2pool* pool, NetworkType type, const char* pool_name) , m_chainWindowSize(2160) , m_unclePenalty(20) , m_curDifficulty(m_minDifficulty) + , m_precalcFinished(false) { LOGINFO(1, log::LightCyan() << "network type = " << m_networkType); @@ -154,14 +155,39 @@ SideChain::SideChain(p2pool* pool, NetworkType type, const char* pool_name) memset(buf + 8, '*', HASH_SIZE * 2 - 16); m_consensusIdDisplayStr.assign(buf); LOGINFO(1, "consensus ID = " << log::LightCyan() << m_consensusIdDisplayStr.c_str()); + + uv_cond_init_checked(&m_precalcJobsCond); + uv_mutex_init_checked(&m_precalcJobsMutex); + m_precalcJobs.reserve(16); + + uint32_t numThreads = std::thread::hardware_concurrency(); + + // Leave 1 CPU core free from worker threads + if (numThreads > 1) { + --numThreads; + } + + // Use no more than 8 threads + numThreads = std::min(numThreads, 8); + + LOGINFO(4, "running " << numThreads << " pre-calculation workers"); + + for (uint32_t i = 0; i < numThreads; ++i) { + m_precalcWorkers.emplace_back(&SideChain::precalc_worker, this); + } + + m_uniquePrecalcInputs = new unordered_set(); } SideChain::~SideChain() { + finish_precalc(); + uv_rwlock_destroy(&m_sidechainLock); uv_mutex_destroy(&m_seenWalletsLock); uv_mutex_destroy(&m_seenBlocksLock); uv_rwlock_destroy(&m_curDifficultyLock); + for (const auto& it : m_blocksById) { delete it.second; } @@ -364,6 +390,50 @@ bool SideChain::get_shares(const PoolBlock* tip, std::vector& shares return true; } +bool SideChain::get_wallets(const PoolBlock* tip, std::vector& wallets) const +{ + // Collect wallets from each block in the PPLNS window, starting from the "tip" + wallets.clear(); + wallets.reserve(m_chainWindowSize * 2); + + uint64_t block_depth = 0; + const PoolBlock* cur = tip; + + do { + wallets.push_back(&cur->m_minerWallet); + + for (const hash& uncle_id : cur->m_uncles) { + auto it = m_blocksById.find(uncle_id); + if (it == m_blocksById.end()) { + return false; + } + + // Skip uncles which are already out of PPLNS window + if (tip->m_sidechainHeight - it->second->m_sidechainHeight < m_chainWindowSize) { + wallets.push_back(&it->second->m_minerWallet); + } + } + + ++block_depth; + if ((block_depth >= m_chainWindowSize) || (cur->m_sidechainHeight == 0)) { + break; + } + + auto it = m_blocksById.find(cur->m_parent); + if (it == m_blocksById.end()) { + return false; + } + + cur = it->second; + } while (true); + + // Remove duplicates + std::sort(wallets.begin(), wallets.end(), [](const Wallet* a, const Wallet* b) { return *a < *b; }); + wallets.erase(std::unique(wallets.begin(), wallets.end(), [](const Wallet* a, const Wallet* b) { return *a == *b; }), wallets.end()); + + return true; +} + bool SideChain::block_seen(const PoolBlock& block) { // Check if it's some old block @@ -538,6 +608,9 @@ void SideChain::add_block(const PoolBlock& block) m_blocksByHeight[new_block->m_sidechainHeight].push_back(new_block); + // Pre-calculate eph_public_keys during initial sync + launch_precalc(new_block); + update_depths(new_block); if (new_block->m_verified) { @@ -1738,6 +1811,9 @@ void SideChain::prune_old_blocks() if (p2pServer()) { p2pServer()->clear_cached_blocks(); } + + // Pre-calc workers are not needed anymore + finish_precalc(); } } @@ -1855,4 +1931,104 @@ bool SideChain::check_config() return true; } +void SideChain::launch_precalc(const PoolBlock* block) +{ + if (m_precalcFinished) { + return; + } + + auto it = m_blocksByHeight.find(block->m_sidechainHeight + m_chainWindowSize - 1); + if ((it != m_blocksByHeight.end()) && !it->second.empty()) { + for (PoolBlock* b : it->second) { + std::vector wallets; + if (!b->m_precalculated && get_wallets(b, wallets)) { + b->m_precalculated = true; + PrecalcJob* job = new PrecalcJob{ b, std::move(wallets) }; + { + MutexLock lock2(m_precalcJobsMutex); + m_precalcJobs.push_back(job); + } + uv_cond_signal(&m_precalcJobsCond); + } + } + } +} + +void SideChain::precalc_worker() +{ + do { + PrecalcJob* job; + { + MutexLock lock(m_precalcJobsMutex); + + if (m_precalcFinished) { + return; + } + + while (m_precalcJobs.empty()) { + uv_cond_wait(&m_precalcJobsCond, &m_precalcJobsMutex); + + // cppcheck-suppress knownConditionTrueFalse + if (m_precalcFinished) { + return; + } + } + + job = m_precalcJobs.back(); + m_precalcJobs.pop_back(); + + // Filter out duplicate inputs for get_eph_public_key() + uint8_t t[HASH_SIZE * 2 + sizeof(size_t)]; + memcpy(t, job->b->m_txkeySec.h, HASH_SIZE); + + for (size_t i = 0, n = job->wallets.size(); i < n; ++i) { + memcpy(t + HASH_SIZE, job->wallets[i]->view_public_key().h, HASH_SIZE); + memcpy(t + HASH_SIZE * 2, &i, sizeof(i)); + if (!m_uniquePrecalcInputs->insert(robin_hood::hash_bytes(t, array_size(t))).second) { + job->wallets[i] = nullptr; + } + } + } + + for (size_t i = 0, n = job->wallets.size(); i < n; ++i) { + if (job->wallets[i]) { + hash eph_public_key; + uint8_t view_tag; + job->wallets[i]->get_eph_public_key(job->b->m_txkeySec, i, eph_public_key, view_tag); + } + } + delete job; + } while (true); +} + +void SideChain::finish_precalc() +{ + if (m_precalcFinished.exchange(true)) { + return; + } + + { + MutexLock lock(m_precalcJobsMutex); + for (PrecalcJob* job : m_precalcJobs) { + delete job; + } + m_precalcJobs.clear(); + m_precalcJobs.shrink_to_fit(); + uv_cond_broadcast(&m_precalcJobsCond); + } + + for (std::thread& t : m_precalcWorkers) { + t.join(); + } + m_precalcWorkers.clear(); + m_precalcWorkers.shrink_to_fit(); + + delete m_uniquePrecalcInputs; + + uv_mutex_destroy(&m_precalcJobsMutex); + uv_cond_destroy(&m_precalcJobsCond); + + LOGINFO(4, "pre-calculation workers stopped"); +} + } // namespace p2pool diff --git a/src/side_chain.h b/src/side_chain.h index 57fea94..c8f99f1 100644 --- a/src/side_chain.h +++ b/src/side_chain.h @@ -19,6 +19,7 @@ #include "uv_util.h" #include +#include namespace p2pool { @@ -37,7 +38,7 @@ struct MinerShare const Wallet* m_wallet; }; -class SideChain +class SideChain : public nocopy_nomove { public: SideChain(p2pool* pool, NetworkType type, const char* pool_name = nullptr); @@ -86,6 +87,7 @@ private: private: bool get_shares(const PoolBlock* tip, std::vector& shares) const; bool get_difficulty(const PoolBlock* tip, std::vector& difficultyData, difficulty_type& curDifficulty) const; + bool get_wallets(const PoolBlock* tip, std::vector& wallets) const; void verify_loop(PoolBlock* block); void verify(PoolBlock* block); void update_chain_tip(const PoolBlock* block); @@ -128,6 +130,25 @@ private: ChainMain m_watchBlock; hash m_watchBlockSidechainId; + + struct PrecalcJob + { + const PoolBlock* b; + std::vector wallets; + }; + + uv_cond_t m_precalcJobsCond; + uv_mutex_t m_precalcJobsMutex; + + std::vector m_precalcJobs; + std::vector m_precalcWorkers; + unordered_set* m_uniquePrecalcInputs; + + std::atomic m_precalcFinished; + + void launch_precalc(const PoolBlock* block); + void precalc_worker(); + void finish_precalc(); }; } // namespace p2pool diff --git a/tests/src/pool_block_tests.cpp b/tests/src/pool_block_tests.cpp index f51f4c8..196761d 100644 --- a/tests/src/pool_block_tests.cpp +++ b/tests/src/pool_block_tests.cpp @@ -103,39 +103,39 @@ TEST(pool_block, deserialize) TEST(pool_block, verify) { init_crypto_cache(); + { + PoolBlock b; + SideChain sidechain(nullptr, NetworkType::Mainnet); - PoolBlock b; - SideChain sidechain(nullptr, NetworkType::Mainnet); + std::ifstream f("sidechain_dump.dat", std::ios::binary | std::ios::ate); + ASSERT_EQ(f.good() && f.is_open(), true); - std::ifstream f("sidechain_dump.dat", std::ios::binary | std::ios::ate); - ASSERT_EQ(f.good() && f.is_open(), true); + std::vector buf(f.tellg()); + f.seekg(0); + f.read(reinterpret_cast(buf.data()), buf.size()); + ASSERT_EQ(f.good(), true); - std::vector buf(f.tellg()); - f.seekg(0); - f.read(reinterpret_cast(buf.data()), buf.size()); - ASSERT_EQ(f.good(), true); + for (const uint8_t *p = buf.data(), *e = buf.data() + buf.size(); p < e;) { + ASSERT_TRUE(p + sizeof(uint32_t) <= e); + const uint32_t n = *reinterpret_cast(p); + p += sizeof(uint32_t); - for (const uint8_t *p = buf.data(), *e = buf.data() + buf.size(); p < e;) { - ASSERT_TRUE(p + sizeof(uint32_t) <= e); - const uint32_t n = *reinterpret_cast(p); - p += sizeof(uint32_t); + ASSERT_TRUE(p + n <= e); + ASSERT_EQ(b.deserialize(p, n, sidechain), 0); + p += n; - ASSERT_TRUE(p + n <= e); - ASSERT_EQ(b.deserialize(p, n, sidechain), 0); - p += n; + sidechain.add_block(b); + ASSERT_TRUE(sidechain.find_block(b.m_sidechainId) != nullptr); + } - sidechain.add_block(b); - ASSERT_TRUE(sidechain.find_block(b.m_sidechainId) != nullptr); + const PoolBlock* tip = sidechain.chainTip(); + ASSERT_TRUE(tip != nullptr); + ASSERT_TRUE(tip->m_verified); + ASSERT_FALSE(tip->m_invalid); + + ASSERT_EQ(tip->m_txinGenHeight, 2483901); + ASSERT_EQ(tip->m_sidechainHeight, 522805); } - - const PoolBlock* tip = sidechain.chainTip(); - ASSERT_TRUE(tip != nullptr); - ASSERT_TRUE(tip->m_verified); - ASSERT_FALSE(tip->m_invalid); - - ASSERT_EQ(tip->m_txinGenHeight, 2483901); - ASSERT_EQ(tip->m_sidechainHeight, 522805); - destroy_crypto_cache(); }