From b7c4613d400a812916de92663f8031af5bca3028 Mon Sep 17 00:00:00 2001 From: sech1 Date: Sat, 6 Sep 2025 14:25:57 +0200 Subject: [PATCH] Fixed handling of multiple blocks from ZMQ --- src/p2p_server.cpp | 34 ++++++++++----------- src/p2p_server.h | 8 ++--- src/p2pool.cpp | 4 +-- src/p2pool.h | 2 +- src/util.h | 2 +- src/zmq_reader.cpp | 76 +++++++++++++++++++++++++--------------------- 6 files changed, 67 insertions(+), 59 deletions(-) diff --git a/src/p2p_server.cpp b/src/p2p_server.cpp index 3aaf9ac..a992dd6 100644 --- a/src/p2p_server.cpp +++ b/src/p2p_server.cpp @@ -134,14 +134,14 @@ P2PServer::P2PServer(p2pool* pool) #endif uv_mutex_init_checked(&m_MoneroBlockBroadcastsLock); - uv_mutex_init_checked(&m_BroadcastMoneroBlockLock); + uv_mutex_init_checked(&m_MoneroBlocksToBroadcastLock); - err = uv_async_init(&m_loop, &m_BroadcastMoneroBlockAsync, on_monero_block_broadcast); + err = uv_async_init(&m_loop, &m_MoneroBlocksToBroadcastAsync, on_monero_block_broadcast); if (err) { LOGERR(1, "uv_async_init failed, error " << uv_err_name(err)); PANIC_STOP(); } - m_BroadcastMoneroBlockAsync.data = this; + m_MoneroBlocksToBroadcastAsync.data = this; err = uv_timer_init(&m_loop, &m_timer); if (err) { @@ -185,7 +185,7 @@ P2PServer::~P2PServer() #endif uv_mutex_destroy(&m_MoneroBlockBroadcastsLock); - uv_mutex_destroy(&m_BroadcastMoneroBlockLock); + uv_mutex_destroy(&m_MoneroBlocksToBroadcastLock); delete m_block; delete m_cache; @@ -1447,8 +1447,8 @@ void P2PServer::on_shutdown() #endif { - MutexLock lock(m_BroadcastMoneroBlockLock); - uv_close(reinterpret_cast(&m_BroadcastMoneroBlockAsync), nullptr); + MutexLock lock(m_MoneroBlocksToBroadcastLock); + uv_close(reinterpret_cast(&m_MoneroBlocksToBroadcastAsync), nullptr); } } @@ -1635,13 +1635,13 @@ void P2PServer::clean_monero_block_broadcasts() } } -void P2PServer::broadcast_monero_block_async(std::vector&& blob) +void P2PServer::broadcast_monero_block_async(std::vector>&& blobs) { - MutexLock lock(m_BroadcastMoneroBlockLock); + MutexLock lock(m_MoneroBlocksToBroadcastLock); - m_MoneroBlockToBroadcast = std::move(blob); + m_MoneroBlocksToBroadcast = std::move(blobs); - const int err = uv_async_send(&m_BroadcastMoneroBlockAsync); + const int err = uv_async_send(&m_MoneroBlocksToBroadcastAsync); if (err) { LOGERR(1, "uv_async_send failed, error " << uv_err_name(err)); } @@ -1649,17 +1649,17 @@ void P2PServer::broadcast_monero_block_async(std::vector&& blob) void P2PServer::broadcast_monero_block_handler() { - std::vector blob; + std::vector> blobs; { - MutexLock lock(m_BroadcastMoneroBlockLock); - blob = std::move(m_MoneroBlockToBroadcast); + MutexLock lock(m_MoneroBlocksToBroadcastLock); + blobs = std::move(m_MoneroBlocksToBroadcast); } - if (blob.empty()) { - return; + for (const std::vector& blob : blobs) { + if (!blob.empty()) { + broadcast_monero_block(blob.data(), static_cast(blob.size()), nullptr, false); + } } - - broadcast_monero_block(blob.data(), static_cast(blob.size()), nullptr, false); } void P2PServer::broadcast_monero_block(const uint8_t* data, uint32_t data_size, const P2PClient* source, bool duplicate_check_done) diff --git a/src/p2p_server.h b/src/p2p_server.h index b8aeaea..500723c 100644 --- a/src/p2p_server.h +++ b/src/p2p_server.h @@ -216,7 +216,7 @@ public: void broadcast_aux_job_donation(const uint8_t* data, uint32_t data_size, uint64_t timestamp, const P2PClient* source, bool duplicate_check_done); - void broadcast_monero_block_async(std::vector&& blob); + void broadcast_monero_block_async(std::vector>&& blobs); void broadcast_monero_block_handler(); void broadcast_monero_block(const uint8_t* data, uint32_t data_size, const P2PClient* source, bool duplicate_check_done); @@ -347,10 +347,10 @@ private: void broadcast_aux_job_donation_handler(); #endif - uv_mutex_t m_BroadcastMoneroBlockLock; - std::vector m_MoneroBlockToBroadcast; + uv_mutex_t m_MoneroBlocksToBroadcastLock; + std::vector> m_MoneroBlocksToBroadcast; - uv_async_t m_BroadcastMoneroBlockAsync; + uv_async_t m_MoneroBlocksToBroadcastAsync; static void on_monero_block_broadcast(uv_async_t* handle) { reinterpret_cast(handle->data)->broadcast_monero_block_handler(); } diff --git a/src/p2pool.cpp b/src/p2pool.cpp index 91b0f6a..5e64639 100644 --- a/src/p2pool.cpp +++ b/src/p2pool.cpp @@ -643,9 +643,9 @@ void p2pool::handle_chain_main(ChainMain& data, const char* extra) m_zmqLastActive = seconds_since_epoch(); } -void p2pool::handle_monero_block_broadcast(std::vector&& blob) +void p2pool::handle_monero_block_broadcast(std::vector>&& blobs) { - m_p2pServer->broadcast_monero_block_async(std::move(blob)); + m_p2pServer->broadcast_monero_block_async(std::move(blobs)); } #ifdef WITH_MERGE_MINING_DONATION diff --git a/src/p2pool.h b/src/p2pool.h index 5a5a763..c36252b 100644 --- a/src/p2pool.h +++ b/src/p2pool.h @@ -86,7 +86,7 @@ public: virtual void handle_tx(TxMempoolData& tx) override; virtual void handle_miner_data(MinerData& data) override; virtual void handle_chain_main(ChainMain& data, const char* extra) override; - virtual void handle_monero_block_broadcast(std::vector&& blob) override; + virtual void handle_monero_block_broadcast(std::vector>&& blobs) override; #ifdef WITH_MERGE_MINING_DONATION void set_aux_job_donation(const std::vector& chain_params); diff --git a/src/util.h b/src/util.h index 8643850..060c795 100644 --- a/src/util.h +++ b/src/util.h @@ -92,7 +92,7 @@ struct MinerCallbackHandler virtual void handle_tx(TxMempoolData& tx) = 0; virtual void handle_miner_data(MinerData& data) = 0; virtual void handle_chain_main(ChainMain& data, const char* extra) = 0; - virtual void handle_monero_block_broadcast(std::vector&& blob) = 0; + virtual void handle_monero_block_broadcast(std::vector>&& blobs) = 0; }; template diff --git a/src/zmq_reader.cpp b/src/zmq_reader.cpp index 31bc6ef..6ad8721 100644 --- a/src/zmq_reader.cpp +++ b/src/zmq_reader.cpp @@ -253,9 +253,11 @@ bool ZMQReader::connect(const std::string& address, bool keep_monitor) return true; } -static void broadcast_monero_block(rapidjson::Value* value, MinerCallbackHandler* handler) +static std::vector construct_monero_block_blob(rapidjson::Value* value) { -#define X(type, name) type name; if (!parseValue(*value, #name, name)) return; + std::vector empty_blob; + +#define X(type, name) type name; if (!parseValue(*value, #name, name)) return empty_blob; X(uint8_t, major_version); X(uint8_t, minor_version); @@ -268,40 +270,40 @@ static void broadcast_monero_block(rapidjson::Value* value, MinerCallbackHandler auto miner_tx = value->FindMember("miner_tx"); if ((miner_tx == value->MemberEnd()) || !miner_tx->value.IsObject()) { - LOGWARN(3, "broadcast_monero_block: miner_tx not found or is not an object"); - return; + LOGWARN(3, "construct_monero_block_blob: miner_tx not found or is not an object"); + return empty_blob; } uint8_t version; if (!parseValue(miner_tx->value, "version", version)) { - LOGWARN(3, "broadcast_monero_block: version not found"); - return; + LOGWARN(3, "construct_monero_block_blob: version not found"); + return empty_blob; } uint64_t unlock_height; if (!parseValue(miner_tx->value, "unlock_time", unlock_height)) { - LOGWARN(3, "broadcast_monero_block: unlock_time not found"); - return; + LOGWARN(3, "construct_monero_block_blob: unlock_time not found"); + return empty_blob; } std::string extra; if (!parseValue(miner_tx->value, "extra", extra)) { - LOGWARN(3, "broadcast_monero_block: extra not found"); - return; + LOGWARN(3, "construct_monero_block_blob: extra not found"); + return empty_blob; } auto outputs = miner_tx->value.FindMember("outputs"); if ((outputs == miner_tx->value.MemberEnd()) || !outputs->value.IsArray()) { - LOGWARN(3, "broadcast_monero_block: outputs not found or is not an array"); - return; + LOGWARN(3, "construct_monero_block_blob: outputs not found or is not an array"); + return empty_blob; } auto tx_hashes = value->FindMember("tx_hashes"); if ((tx_hashes == value->MemberEnd()) || !tx_hashes->value.IsArray()) { - LOGWARN(3, "broadcast_monero_block: tx_hashes not found or is not an array"); - return; + LOGWARN(3, "construct_monero_block_blob: tx_hashes not found or is not an array"); + return empty_blob; } std::vector blob; @@ -314,8 +316,8 @@ static void broadcast_monero_block(rapidjson::Value* value, MinerCallbackHandler hash h; if (!from_hex(prev_id.c_str(), prev_id.length(), h)) { - LOGWARN(3, "broadcast_monero_block: invalid prev_id " << prev_id); - return; + LOGWARN(3, "construct_monero_block_blob: invalid prev_id " << prev_id); + return empty_blob; } blob.insert(blob.end(), h.h, h.h + HASH_SIZE); @@ -341,42 +343,42 @@ static void broadcast_monero_block(rapidjson::Value* value, MinerCallbackHandler for (rapidjson::Value* i = arr.begin(); i != arr.end(); ++i) { auto amount = i->FindMember("amount"); if ((amount == i->MemberEnd()) || !amount->value.IsUint64()) { - LOGWARN(3, "broadcast_monero_block: amount not found or is not UInt64"); - return; + LOGWARN(3, "construct_monero_block_blob: amount not found or is not UInt64"); + return empty_blob; } auto to_tagged_key = i->FindMember("to_tagged_key"); if ((to_tagged_key == i->MemberEnd()) || !to_tagged_key->value.IsObject()) { - LOGWARN(3, "broadcast_monero_block: to_tagged_key not found or is not an object"); - return; + LOGWARN(3, "construct_monero_block_blob: to_tagged_key not found or is not an object"); + return empty_blob; } auto key = to_tagged_key->value.FindMember("key"); if ((key == to_tagged_key->value.MemberEnd()) || !key->value.IsString()) { - LOGWARN(3, "broadcast_monero_block: key not found or is not a string"); - return; + LOGWARN(3, "construct_monero_block_blob: key not found or is not a string"); + return empty_blob; } auto view_tag = to_tagged_key->value.FindMember("view_tag"); if ((view_tag == to_tagged_key->value.MemberEnd()) || !view_tag->value.IsString()) { - LOGWARN(3, "broadcast_monero_block: view_tag not found or is not a string"); - return; + LOGWARN(3, "construct_monero_block_blob: view_tag not found or is not a string"); + return empty_blob; } writeVarint(amount->value.GetUint64(), blob); blob.push_back(TXOUT_TO_TAGGED_KEY); if (!from_hex(key->value.GetString(), key->value.GetStringLength(), h)) { - LOGWARN(3, "broadcast_monero_block: invalid key " << key->value.GetString()); - return; + LOGWARN(3, "construct_monero_block_blob: invalid key " << key->value.GetString()); + return empty_blob; } blob.insert(blob.end(), h.h, h.h + HASH_SIZE); std::vector t; if (!from_hex(view_tag->value.GetString(), view_tag->value.GetStringLength(), t) || (t.size() != 1)) { - LOGWARN(3, "broadcast_monero_block: invalid view_tag " << view_tag->value.GetString()); - return; + LOGWARN(3, "construct_monero_block_blob: invalid view_tag " << view_tag->value.GetString()); + return empty_blob; } blob.push_back(t[0]); @@ -384,8 +386,8 @@ static void broadcast_monero_block(rapidjson::Value* value, MinerCallbackHandler std::vector t; if (!from_hex(extra.c_str(), extra.length(), t) || t.empty()) { - LOGWARN(3, "broadcast_monero_block: invalid extra " << extra); - return; + LOGWARN(3, "construct_monero_block_blob: invalid extra " << extra); + return empty_blob; } writeVarint(t.size(), blob); @@ -401,8 +403,8 @@ static void broadcast_monero_block(rapidjson::Value* value, MinerCallbackHandler for (rapidjson::Value* i = arr2.begin(); i != arr2.end(); ++i) { if (!i->IsString() || !from_hex(i->GetString(), i->GetStringLength(), h)) { - LOGWARN(3, "broadcast_monero_block: invalid tx_hash " << i->GetString()); - return; + LOGWARN(3, "construct_monero_block_blob: invalid tx_hash " << i->GetString()); + return empty_blob; } blob.insert(blob.end(), h.h, h.h + HASH_SIZE); } @@ -410,7 +412,7 @@ static void broadcast_monero_block(rapidjson::Value* value, MinerCallbackHandler const uint8_t* p = reinterpret_cast(&data); blob.insert(blob.begin(), p, p + sizeof(data)); - handler->handle_monero_block_broadcast(std::move(blob)); + return blob; } void ZMQReader::parse(char* data, size_t size) @@ -509,8 +511,12 @@ void ZMQReader::parse(char* data, size_t size) } auto arr = doc.GetArray(); + + std::vector> blobs; + blobs.reserve(arr.Size()); + for (Value* i = arr.begin(); i != arr.end(); ++i) { - broadcast_monero_block(i, m_handler); + blobs.emplace_back(construct_monero_block_blob(i)); if (!PARSE(*i, m_chainmainData, timestamp)) { LOGWARN(1, "json-full-chain_main timestamp failed to parse, skipping it"); @@ -573,6 +579,8 @@ void ZMQReader::parse(char* data, size_t size) m_handler->handle_chain_main(m_chainmainData, extra_it->value.GetString()); } + + m_handler->handle_monero_block_broadcast(std::move(blobs)); } }