Fix: get missing blocks in a thread-safe way

This commit is contained in:
sech1
2025-09-05 15:30:52 +02:00
parent 830c78a032
commit 971f804a6e
2 changed files with 73 additions and 20 deletions

View File

@@ -158,6 +158,13 @@ p2pool::p2pool(int argc, char* argv[])
}
m_reconnectToHostAsync.data = this;
err = uv_async_init(uv_default_loop_checked(), &m_getMissingHeightsAsync, on_get_missing_heights);
if (err) {
LOGERR(1, "uv_async_init failed, error " << uv_err_name(err));
throw std::exception();
}
m_getMissingHeightsAsync.data = this;
uv_rwlock_init_checked(&m_mainchainLock);
uv_rwlock_init_checked(&m_minerDataLock);
#ifndef P2POOL_UNIT_TESTS
@@ -179,6 +186,8 @@ p2pool::p2pool(int argc, char* argv[])
uv_mutex_init_checked(&m_submitBlockDataLock);
uv_mutex_init_checked(&m_submitAuxBlockDataLock);
uv_mutex_init_checked(&m_missingHeightsLock);
m_api = p->m_apiPath.empty() ? nullptr : new p2pool_api(p->m_apiPath, p->m_localStats);
if (p->m_localStats && !m_api) {
@@ -264,6 +273,8 @@ p2pool::~p2pool()
uv_mutex_destroy(&m_submitBlockDataLock);
uv_mutex_destroy(&m_submitAuxBlockDataLock);
uv_mutex_destroy(&m_missingHeightsLock);
delete m_api;
delete m_sideChain;
delete m_hasher;
@@ -497,31 +508,64 @@ void p2pool::handle_miner_data(MinerData& data)
missing_heights.erase(std::unique(missing_heights.begin(), missing_heights.end()), missing_heights.end());
}
for (uint64_t h : missing_heights) {
LOGWARN(3, "Mainchain data for height " << h << " is missing, requesting it from monerod again");
MutexLock lock(m_missingHeightsLock);
m_missingHeights = std::move(missing_heights);
char buf[log::Stream::BUF_SIZE + 1] = {};
log::Stream s(buf);
s << "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_block_header_by_height\",\"params\":{\"height\":" << h << "}}\0";
JSONRPCRequest::call(host.m_address, host.m_rpcPort, buf, host.m_rpcLogin, m_params->m_socks5Proxy, host.m_rpcSSL, host.m_rpcSSL_Fingerprint,
[this, h](const char* data, size_t size, double)
{
ChainMain block;
if (!parse_block_header(data, size, block)) {
LOGERR(1, "couldn't download block header for height " << h);
}
},
[h](const char* data, size_t size, double)
{
if (size > 0) {
LOGERR(1, "couldn't download block header for height " << h << ", error " << log::const_buf(data, size));
}
});
if (!m_missingHeights.empty()) {
const int err = uv_async_send(&m_getMissingHeightsAsync);
if (err) {
LOGERR(1, "uv_async_send failed, error " << uv_err_name(err));
}
}
}
}
void p2pool::get_missing_heights()
{
uint64_t h;
{
MutexLock lock(m_missingHeightsLock);
if (m_missingHeights.empty()) {
return;
}
h = m_missingHeights.back();
m_missingHeights.pop_back();
}
LOGWARN(3, "Mainchain data for height " << h << " is missing, requesting it from monerod again");
char buf[log::Stream::BUF_SIZE + 1] = {};
log::Stream s(buf);
s << "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_block_header_by_height\",\"params\":{\"height\":" << h << "}}\0";
const Params::Host& host = current_host();
JSONRPCRequest::call(host.m_address, host.m_rpcPort, buf, host.m_rpcLogin, m_params->m_socks5Proxy, host.m_rpcSSL, host.m_rpcSSL_Fingerprint,
[this, h](const char* data, size_t size, double)
{
ChainMain block;
if (!parse_block_header(data, size, block)) {
LOGERR(1, "couldn't download block header for height " << h);
MutexLock lock(m_missingHeightsLock);
m_missingHeights.push_back(h);
}
get_missing_heights();
},
[this, h](const char* data, size_t size, double)
{
if (size > 0) {
LOGERR(1, "couldn't download block header for height " << h << ", error " << log::const_buf(data, size));
MutexLock lock(m_missingHeightsLock);
m_missingHeights.push_back(h);
}
get_missing_heights();
});
}
const char* BLOCK_FOUND = "\n\
-----------------------------------------------------------------------------------------------\n\
| ###### # ####### ##### # # ####### ####### # # # # ###### |\n\
@@ -992,6 +1036,7 @@ void p2pool::on_stop(uv_async_t* async)
uv_close(reinterpret_cast<uv_handle_t*>(&pool->m_blockTemplateAsync), nullptr);
uv_close(reinterpret_cast<uv_handle_t*>(&pool->m_stopAsync), nullptr);
uv_close(reinterpret_cast<uv_handle_t*>(&pool->m_reconnectToHostAsync), nullptr);
uv_close(reinterpret_cast<uv_handle_t*>(&pool->m_getMissingHeightsAsync), nullptr);
init_signals(pool, false);

View File

@@ -151,6 +151,7 @@ private:
static void on_update_block_template(uv_async_t* async) { reinterpret_cast<p2pool*>(async->data)->update_block_template(); }
static void on_stop(uv_async_t*);
static void on_reconnect_to_host(uv_async_t* async) { reinterpret_cast<p2pool*>(async->data)->reconnect_to_host(); }
static void on_get_missing_heights(uv_async_t* async) { reinterpret_cast<p2pool*>(async->data)->get_missing_heights(); }
void submit_block() const;
void submit_aux_block() const;
@@ -261,6 +262,13 @@ private:
uint64_t m_startTime;
uv_async_t m_reconnectToHostAsync;
uv_async_t m_getMissingHeightsAsync;
mutable uv_mutex_t m_missingHeightsLock;
mutable std::vector<uint64_t> m_missingHeights;
void get_missing_heights();
#ifndef P2POOL_UNIT_TESTS
mutable uv_rwlock_t m_ZMQReaderLock;
ZMQReader* m_ZMQReader = nullptr;