Fixed handling of multiple blocks from ZMQ

This commit is contained in:
sech1
2025-09-06 14:25:57 +02:00
parent 971f804a6e
commit b7c4613d40
6 changed files with 67 additions and 59 deletions

View File

@@ -134,14 +134,14 @@ P2PServer::P2PServer(p2pool* pool)
#endif
uv_mutex_init_checked(&m_MoneroBlockBroadcastsLock);
uv_mutex_init_checked(&m_BroadcastMoneroBlockLock);
uv_mutex_init_checked(&m_MoneroBlocksToBroadcastLock);
err = uv_async_init(&m_loop, &m_BroadcastMoneroBlockAsync, on_monero_block_broadcast);
err = uv_async_init(&m_loop, &m_MoneroBlocksToBroadcastAsync, on_monero_block_broadcast);
if (err) {
LOGERR(1, "uv_async_init failed, error " << uv_err_name(err));
PANIC_STOP();
}
m_BroadcastMoneroBlockAsync.data = this;
m_MoneroBlocksToBroadcastAsync.data = this;
err = uv_timer_init(&m_loop, &m_timer);
if (err) {
@@ -185,7 +185,7 @@ P2PServer::~P2PServer()
#endif
uv_mutex_destroy(&m_MoneroBlockBroadcastsLock);
uv_mutex_destroy(&m_BroadcastMoneroBlockLock);
uv_mutex_destroy(&m_MoneroBlocksToBroadcastLock);
delete m_block;
delete m_cache;
@@ -1447,8 +1447,8 @@ void P2PServer::on_shutdown()
#endif
{
MutexLock lock(m_BroadcastMoneroBlockLock);
uv_close(reinterpret_cast<uv_handle_t*>(&m_BroadcastMoneroBlockAsync), nullptr);
MutexLock lock(m_MoneroBlocksToBroadcastLock);
uv_close(reinterpret_cast<uv_handle_t*>(&m_MoneroBlocksToBroadcastAsync), nullptr);
}
}
@@ -1635,13 +1635,13 @@ void P2PServer::clean_monero_block_broadcasts()
}
}
void P2PServer::broadcast_monero_block_async(std::vector<uint8_t>&& blob)
void P2PServer::broadcast_monero_block_async(std::vector<std::vector<uint8_t>>&& blobs)
{
MutexLock lock(m_BroadcastMoneroBlockLock);
MutexLock lock(m_MoneroBlocksToBroadcastLock);
m_MoneroBlockToBroadcast = std::move(blob);
m_MoneroBlocksToBroadcast = std::move(blobs);
const int err = uv_async_send(&m_BroadcastMoneroBlockAsync);
const int err = uv_async_send(&m_MoneroBlocksToBroadcastAsync);
if (err) {
LOGERR(1, "uv_async_send failed, error " << uv_err_name(err));
}
@@ -1649,17 +1649,17 @@ void P2PServer::broadcast_monero_block_async(std::vector<uint8_t>&& blob)
void P2PServer::broadcast_monero_block_handler()
{
std::vector<uint8_t> blob;
std::vector<std::vector<uint8_t>> blobs;
{
MutexLock lock(m_BroadcastMoneroBlockLock);
blob = std::move(m_MoneroBlockToBroadcast);
MutexLock lock(m_MoneroBlocksToBroadcastLock);
blobs = std::move(m_MoneroBlocksToBroadcast);
}
if (blob.empty()) {
return;
for (const std::vector<uint8_t>& blob : blobs) {
if (!blob.empty()) {
broadcast_monero_block(blob.data(), static_cast<uint32_t>(blob.size()), nullptr, false);
}
}
broadcast_monero_block(blob.data(), static_cast<uint32_t>(blob.size()), nullptr, false);
}
void P2PServer::broadcast_monero_block(const uint8_t* data, uint32_t data_size, const P2PClient* source, bool duplicate_check_done)

View File

@@ -216,7 +216,7 @@ public:
void broadcast_aux_job_donation(const uint8_t* data, uint32_t data_size, uint64_t timestamp, const P2PClient* source, bool duplicate_check_done);
void broadcast_monero_block_async(std::vector<uint8_t>&& blob);
void broadcast_monero_block_async(std::vector<std::vector<uint8_t>>&& blobs);
void broadcast_monero_block_handler();
void broadcast_monero_block(const uint8_t* data, uint32_t data_size, const P2PClient* source, bool duplicate_check_done);
@@ -347,10 +347,10 @@ private:
void broadcast_aux_job_donation_handler();
#endif
uv_mutex_t m_BroadcastMoneroBlockLock;
std::vector<uint8_t> m_MoneroBlockToBroadcast;
uv_mutex_t m_MoneroBlocksToBroadcastLock;
std::vector<std::vector<uint8_t>> m_MoneroBlocksToBroadcast;
uv_async_t m_BroadcastMoneroBlockAsync;
uv_async_t m_MoneroBlocksToBroadcastAsync;
static void on_monero_block_broadcast(uv_async_t* handle) { reinterpret_cast<P2PServer*>(handle->data)->broadcast_monero_block_handler(); }

View File

@@ -643,9 +643,9 @@ void p2pool::handle_chain_main(ChainMain& data, const char* extra)
m_zmqLastActive = seconds_since_epoch();
}
void p2pool::handle_monero_block_broadcast(std::vector<uint8_t>&& blob)
void p2pool::handle_monero_block_broadcast(std::vector<std::vector<uint8_t>>&& blobs)
{
m_p2pServer->broadcast_monero_block_async(std::move(blob));
m_p2pServer->broadcast_monero_block_async(std::move(blobs));
}
#ifdef WITH_MERGE_MINING_DONATION

View File

@@ -86,7 +86,7 @@ public:
virtual void handle_tx(TxMempoolData& tx) override;
virtual void handle_miner_data(MinerData& data) override;
virtual void handle_chain_main(ChainMain& data, const char* extra) override;
virtual void handle_monero_block_broadcast(std::vector<uint8_t>&& blob) override;
virtual void handle_monero_block_broadcast(std::vector<std::vector<uint8_t>>&& blobs) override;
#ifdef WITH_MERGE_MINING_DONATION
void set_aux_job_donation(const std::vector<IMergeMiningClient::ChainParameters>& chain_params);

View File

@@ -92,7 +92,7 @@ struct MinerCallbackHandler
virtual void handle_tx(TxMempoolData& tx) = 0;
virtual void handle_miner_data(MinerData& data) = 0;
virtual void handle_chain_main(ChainMain& data, const char* extra) = 0;
virtual void handle_monero_block_broadcast(std::vector<uint8_t>&& blob) = 0;
virtual void handle_monero_block_broadcast(std::vector<std::vector<uint8_t>>&& blobs) = 0;
};
template<typename T>

View File

@@ -253,9 +253,11 @@ bool ZMQReader::connect(const std::string& address, bool keep_monitor)
return true;
}
static void broadcast_monero_block(rapidjson::Value* value, MinerCallbackHandler* handler)
static std::vector<uint8_t> construct_monero_block_blob(rapidjson::Value* value)
{
#define X(type, name) type name; if (!parseValue(*value, #name, name)) return;
std::vector<uint8_t> empty_blob;
#define X(type, name) type name; if (!parseValue(*value, #name, name)) return empty_blob;
X(uint8_t, major_version);
X(uint8_t, minor_version);
@@ -268,40 +270,40 @@ static void broadcast_monero_block(rapidjson::Value* value, MinerCallbackHandler
auto miner_tx = value->FindMember("miner_tx");
if ((miner_tx == value->MemberEnd()) || !miner_tx->value.IsObject()) {
LOGWARN(3, "broadcast_monero_block: miner_tx not found or is not an object");
return;
LOGWARN(3, "construct_monero_block_blob: miner_tx not found or is not an object");
return empty_blob;
}
uint8_t version;
if (!parseValue(miner_tx->value, "version", version)) {
LOGWARN(3, "broadcast_monero_block: version not found");
return;
LOGWARN(3, "construct_monero_block_blob: version not found");
return empty_blob;
}
uint64_t unlock_height;
if (!parseValue(miner_tx->value, "unlock_time", unlock_height)) {
LOGWARN(3, "broadcast_monero_block: unlock_time not found");
return;
LOGWARN(3, "construct_monero_block_blob: unlock_time not found");
return empty_blob;
}
std::string extra;
if (!parseValue(miner_tx->value, "extra", extra)) {
LOGWARN(3, "broadcast_monero_block: extra not found");
return;
LOGWARN(3, "construct_monero_block_blob: extra not found");
return empty_blob;
}
auto outputs = miner_tx->value.FindMember("outputs");
if ((outputs == miner_tx->value.MemberEnd()) || !outputs->value.IsArray()) {
LOGWARN(3, "broadcast_monero_block: outputs not found or is not an array");
return;
LOGWARN(3, "construct_monero_block_blob: outputs not found or is not an array");
return empty_blob;
}
auto tx_hashes = value->FindMember("tx_hashes");
if ((tx_hashes == value->MemberEnd()) || !tx_hashes->value.IsArray()) {
LOGWARN(3, "broadcast_monero_block: tx_hashes not found or is not an array");
return;
LOGWARN(3, "construct_monero_block_blob: tx_hashes not found or is not an array");
return empty_blob;
}
std::vector<uint8_t> blob;
@@ -314,8 +316,8 @@ static void broadcast_monero_block(rapidjson::Value* value, MinerCallbackHandler
hash h;
if (!from_hex(prev_id.c_str(), prev_id.length(), h)) {
LOGWARN(3, "broadcast_monero_block: invalid prev_id " << prev_id);
return;
LOGWARN(3, "construct_monero_block_blob: invalid prev_id " << prev_id);
return empty_blob;
}
blob.insert(blob.end(), h.h, h.h + HASH_SIZE);
@@ -341,42 +343,42 @@ static void broadcast_monero_block(rapidjson::Value* value, MinerCallbackHandler
for (rapidjson::Value* i = arr.begin(); i != arr.end(); ++i) {
auto amount = i->FindMember("amount");
if ((amount == i->MemberEnd()) || !amount->value.IsUint64()) {
LOGWARN(3, "broadcast_monero_block: amount not found or is not UInt64");
return;
LOGWARN(3, "construct_monero_block_blob: amount not found or is not UInt64");
return empty_blob;
}
auto to_tagged_key = i->FindMember("to_tagged_key");
if ((to_tagged_key == i->MemberEnd()) || !to_tagged_key->value.IsObject()) {
LOGWARN(3, "broadcast_monero_block: to_tagged_key not found or is not an object");
return;
LOGWARN(3, "construct_monero_block_blob: to_tagged_key not found or is not an object");
return empty_blob;
}
auto key = to_tagged_key->value.FindMember("key");
if ((key == to_tagged_key->value.MemberEnd()) || !key->value.IsString()) {
LOGWARN(3, "broadcast_monero_block: key not found or is not a string");
return;
LOGWARN(3, "construct_monero_block_blob: key not found or is not a string");
return empty_blob;
}
auto view_tag = to_tagged_key->value.FindMember("view_tag");
if ((view_tag == to_tagged_key->value.MemberEnd()) || !view_tag->value.IsString()) {
LOGWARN(3, "broadcast_monero_block: view_tag not found or is not a string");
return;
LOGWARN(3, "construct_monero_block_blob: view_tag not found or is not a string");
return empty_blob;
}
writeVarint(amount->value.GetUint64(), blob);
blob.push_back(TXOUT_TO_TAGGED_KEY);
if (!from_hex(key->value.GetString(), key->value.GetStringLength(), h)) {
LOGWARN(3, "broadcast_monero_block: invalid key " << key->value.GetString());
return;
LOGWARN(3, "construct_monero_block_blob: invalid key " << key->value.GetString());
return empty_blob;
}
blob.insert(blob.end(), h.h, h.h + HASH_SIZE);
std::vector<uint8_t> t;
if (!from_hex(view_tag->value.GetString(), view_tag->value.GetStringLength(), t) || (t.size() != 1)) {
LOGWARN(3, "broadcast_monero_block: invalid view_tag " << view_tag->value.GetString());
return;
LOGWARN(3, "construct_monero_block_blob: invalid view_tag " << view_tag->value.GetString());
return empty_blob;
}
blob.push_back(t[0]);
@@ -384,8 +386,8 @@ static void broadcast_monero_block(rapidjson::Value* value, MinerCallbackHandler
std::vector<uint8_t> t;
if (!from_hex(extra.c_str(), extra.length(), t) || t.empty()) {
LOGWARN(3, "broadcast_monero_block: invalid extra " << extra);
return;
LOGWARN(3, "construct_monero_block_blob: invalid extra " << extra);
return empty_blob;
}
writeVarint(t.size(), blob);
@@ -401,8 +403,8 @@ static void broadcast_monero_block(rapidjson::Value* value, MinerCallbackHandler
for (rapidjson::Value* i = arr2.begin(); i != arr2.end(); ++i) {
if (!i->IsString() || !from_hex(i->GetString(), i->GetStringLength(), h)) {
LOGWARN(3, "broadcast_monero_block: invalid tx_hash " << i->GetString());
return;
LOGWARN(3, "construct_monero_block_blob: invalid tx_hash " << i->GetString());
return empty_blob;
}
blob.insert(blob.end(), h.h, h.h + HASH_SIZE);
}
@@ -410,7 +412,7 @@ static void broadcast_monero_block(rapidjson::Value* value, MinerCallbackHandler
const uint8_t* p = reinterpret_cast<const uint8_t*>(&data);
blob.insert(blob.begin(), p, p + sizeof(data));
handler->handle_monero_block_broadcast(std::move(blob));
return blob;
}
void ZMQReader::parse(char* data, size_t size)
@@ -509,8 +511,12 @@ void ZMQReader::parse(char* data, size_t size)
}
auto arr = doc.GetArray();
std::vector<std::vector<uint8_t>> blobs;
blobs.reserve(arr.Size());
for (Value* i = arr.begin(); i != arr.end(); ++i) {
broadcast_monero_block(i, m_handler);
blobs.emplace_back(construct_monero_block_blob(i));
if (!PARSE(*i, m_chainmainData, timestamp)) {
LOGWARN(1, "json-full-chain_main timestamp failed to parse, skipping it");
@@ -573,6 +579,8 @@ void ZMQReader::parse(char* data, size_t size)
m_handler->handle_chain_main(m_chainmainData, extra_it->value.GetString());
}
m_handler->handle_monero_block_broadcast(std::move(blobs));
}
}