diff --git a/src/p2pool.cpp b/src/p2pool.cpp index d89bc44..91b0f6a 100644 --- a/src/p2pool.cpp +++ b/src/p2pool.cpp @@ -158,6 +158,13 @@ p2pool::p2pool(int argc, char* argv[]) } m_reconnectToHostAsync.data = this; + err = uv_async_init(uv_default_loop_checked(), &m_getMissingHeightsAsync, on_get_missing_heights); + if (err) { + LOGERR(1, "uv_async_init failed, error " << uv_err_name(err)); + throw std::exception(); + } + m_getMissingHeightsAsync.data = this; + uv_rwlock_init_checked(&m_mainchainLock); uv_rwlock_init_checked(&m_minerDataLock); #ifndef P2POOL_UNIT_TESTS @@ -179,6 +186,8 @@ p2pool::p2pool(int argc, char* argv[]) uv_mutex_init_checked(&m_submitBlockDataLock); uv_mutex_init_checked(&m_submitAuxBlockDataLock); + uv_mutex_init_checked(&m_missingHeightsLock); + m_api = p->m_apiPath.empty() ? nullptr : new p2pool_api(p->m_apiPath, p->m_localStats); if (p->m_localStats && !m_api) { @@ -264,6 +273,8 @@ p2pool::~p2pool() uv_mutex_destroy(&m_submitBlockDataLock); uv_mutex_destroy(&m_submitAuxBlockDataLock); + uv_mutex_destroy(&m_missingHeightsLock); + delete m_api; delete m_sideChain; delete m_hasher; @@ -497,31 +508,64 @@ void p2pool::handle_miner_data(MinerData& data) missing_heights.erase(std::unique(missing_heights.begin(), missing_heights.end()), missing_heights.end()); } - for (uint64_t h : missing_heights) { - LOGWARN(3, "Mainchain data for height " << h << " is missing, requesting it from monerod again"); + MutexLock lock(m_missingHeightsLock); + m_missingHeights = std::move(missing_heights); - char buf[log::Stream::BUF_SIZE + 1] = {}; - log::Stream s(buf); - s << "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_block_header_by_height\",\"params\":{\"height\":" << h << "}}\0"; - - JSONRPCRequest::call(host.m_address, host.m_rpcPort, buf, host.m_rpcLogin, m_params->m_socks5Proxy, host.m_rpcSSL, host.m_rpcSSL_Fingerprint, - [this, h](const char* data, size_t size, double) - { - ChainMain block; - if (!parse_block_header(data, size, block)) { - LOGERR(1, "couldn't download block header for height " << h); - } - }, - [h](const char* data, size_t size, double) - { - if (size > 0) { - LOGERR(1, "couldn't download block header for height " << h << ", error " << log::const_buf(data, size)); - } - }); + if (!m_missingHeights.empty()) { + const int err = uv_async_send(&m_getMissingHeightsAsync); + if (err) { + LOGERR(1, "uv_async_send failed, error " << uv_err_name(err)); + } } } } +void p2pool::get_missing_heights() +{ + uint64_t h; + { + MutexLock lock(m_missingHeightsLock); + + if (m_missingHeights.empty()) { + return; + } + + h = m_missingHeights.back(); + m_missingHeights.pop_back(); + } + + LOGWARN(3, "Mainchain data for height " << h << " is missing, requesting it from monerod again"); + + char buf[log::Stream::BUF_SIZE + 1] = {}; + log::Stream s(buf); + s << "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_block_header_by_height\",\"params\":{\"height\":" << h << "}}\0"; + + const Params::Host& host = current_host(); + + JSONRPCRequest::call(host.m_address, host.m_rpcPort, buf, host.m_rpcLogin, m_params->m_socks5Proxy, host.m_rpcSSL, host.m_rpcSSL_Fingerprint, + [this, h](const char* data, size_t size, double) + { + ChainMain block; + if (!parse_block_header(data, size, block)) { + LOGERR(1, "couldn't download block header for height " << h); + + MutexLock lock(m_missingHeightsLock); + m_missingHeights.push_back(h); + } + get_missing_heights(); + }, + [this, h](const char* data, size_t size, double) + { + if (size > 0) { + LOGERR(1, "couldn't download block header for height " << h << ", error " << log::const_buf(data, size)); + + MutexLock lock(m_missingHeightsLock); + m_missingHeights.push_back(h); + } + get_missing_heights(); + }); +} + const char* BLOCK_FOUND = "\n\ -----------------------------------------------------------------------------------------------\n\ | ###### # ####### ##### # # ####### ####### # # # # ###### |\n\ @@ -992,6 +1036,7 @@ void p2pool::on_stop(uv_async_t* async) uv_close(reinterpret_cast(&pool->m_blockTemplateAsync), nullptr); uv_close(reinterpret_cast(&pool->m_stopAsync), nullptr); uv_close(reinterpret_cast(&pool->m_reconnectToHostAsync), nullptr); + uv_close(reinterpret_cast(&pool->m_getMissingHeightsAsync), nullptr); init_signals(pool, false); diff --git a/src/p2pool.h b/src/p2pool.h index 048d8b4..5a5a763 100644 --- a/src/p2pool.h +++ b/src/p2pool.h @@ -151,6 +151,7 @@ private: static void on_update_block_template(uv_async_t* async) { reinterpret_cast(async->data)->update_block_template(); } static void on_stop(uv_async_t*); static void on_reconnect_to_host(uv_async_t* async) { reinterpret_cast(async->data)->reconnect_to_host(); } + static void on_get_missing_heights(uv_async_t* async) { reinterpret_cast(async->data)->get_missing_heights(); } void submit_block() const; void submit_aux_block() const; @@ -261,6 +262,13 @@ private: uint64_t m_startTime; uv_async_t m_reconnectToHostAsync; + uv_async_t m_getMissingHeightsAsync; + + mutable uv_mutex_t m_missingHeightsLock; + mutable std::vector m_missingHeights; + + void get_missing_heights(); + #ifndef P2POOL_UNIT_TESTS mutable uv_rwlock_t m_ZMQReaderLock; ZMQReader* m_ZMQReader = nullptr;