From 50634e5e79292dcd16ac4fb45593525bd6287b4d Mon Sep 17 00:00:00 2001 From: sech1 Date: Wed, 3 Sep 2025 10:07:39 +0200 Subject: [PATCH] Feature: broadcast all new Monero blocks through p2pool network for faster propagation --- src/common.h | 16 +++ src/json_parsers.h | 1 + src/json_rpc_request.h | 4 +- src/keccak.h | 3 + src/p2p_server.cpp | 294 +++++++++++++++++++++++++++++++++++++++++ src/p2p_server.h | 27 +++- src/p2pool.cpp | 12 ++ src/p2pool.h | 1 + src/pool_block.cpp | 7 +- src/util.h | 1 + src/zmq_reader.cpp | 162 +++++++++++++++++++++++ 11 files changed, 519 insertions(+), 9 deletions(-) diff --git a/src/common.h b/src/common.h index e3684c2..bade8bd 100644 --- a/src/common.h +++ b/src/common.h @@ -166,6 +166,14 @@ struct alignas(uint64_t) hash FORCEINLINE hash() : h{} {} + constexpr hash(std::initializer_list l) : h{} { + auto it = l.begin(); + + for (size_t i = 0; (i < HASH_SIZE) && (it != l.end()); ++i, ++it) { + h[i] = *it; + } + } + FORCEINLINE bool operator<(const hash& other) const { const uint64_t* a = u64(); @@ -496,6 +504,14 @@ struct raw_ip static_assert(sizeof(raw_ip) == 16, "struct raw_ip has invalid size"); +struct MoneroBlockBroadcastHeader +{ + uint32_t header_size; + uint32_t miner_tx_size; +}; + +static_assert(sizeof(MoneroBlockBroadcastHeader) == sizeof(uint32_t) * 2, "struct MoneroBlockBroadcastHeader has invalid size"); + void* malloc_hook(size_t n) noexcept; void* realloc_hook(void* ptr, size_t size) noexcept; void* calloc_hook(size_t count, size_t size) noexcept; diff --git a/src/json_parsers.h b/src/json_parsers.h index 2663df7..84df272 100644 --- a/src/json_parsers.h +++ b/src/json_parsers.h @@ -57,6 +57,7 @@ struct parse_wrapper \ JSON_VALUE_PARSER(String, const char*) JSON_VALUE_PARSER(String, std::string) JSON_VALUE_PARSER(Uint, uint8_t) +JSON_VALUE_PARSER(Uint, uint32_t) JSON_VALUE_PARSER(Uint64, uint64_t) JSON_VALUE_PARSER(Bool, bool) diff --git a/src/json_rpc_request.h b/src/json_rpc_request.h index 9b41f89..6b0e8ca 100644 --- a/src/json_rpc_request.h +++ b/src/json_rpc_request.h @@ -20,7 +20,9 @@ namespace p2pool { namespace JSONRPCRequest { -typedef Callback::Base CallbackBase; +typedef Callback::Base CallbackBase; + +FORCEINLINE static constexpr void dummy_callback(const char* /*msg*/, size_t /*msg_size*/, double /*tcp_ping*/) {} void Call(const std::string& address, int port, const std::string& req, const std::string& auth, const std::string& proxy, bool ssl, const std::string& ssl_fingerprint, CallbackBase* cb, CallbackBase* close_cb, uv_loop_t* loop); diff --git a/src/keccak.h b/src/keccak.h index 0bb45b4..3b18993 100644 --- a/src/keccak.h +++ b/src/keccak.h @@ -26,6 +26,9 @@ enum KeccakParams { extern const uint64_t keccakf_rndc[24]; +// keccak hash of a single 0x00 byte +constexpr hash keccak_0x00{ 0xbc, 0x36, 0x78, 0x9e, 0x7a, 0x1e, 0x28, 0x14, 0x36, 0x46, 0x42, 0x29, 0x82, 0x8f, 0x81, 0x7d, 0x66, 0x12, 0xf7, 0xb4, 0x77, 0xd6, 0x65, 0x91, 0xff, 0x96, 0xa9, 0xe0, 0x64, 0xbc, 0xc9, 0x8a }; + typedef void (*keccakf_func)(std::array&); extern keccakf_func keccakf; diff --git a/src/p2p_server.cpp b/src/p2p_server.cpp index 4a6e602..f4dd2d1 100644 --- a/src/p2p_server.cpp +++ b/src/p2p_server.cpp @@ -20,6 +20,8 @@ #include "p2pool.h" #include "params.h" #include "keccak.h" +#include "merkle.h" +#include "pow_hash.h" #include "side_chain.h" #include "pool_block.h" #include "block_cache.h" @@ -79,6 +81,7 @@ P2PServer::P2PServer(p2pool* pool) m_blockDeserializeBuf.reserve(MAX_BLOCK_SIZE); m_auxJobMessages.reserve(1024); + m_MoneroBlockBroadcasts.reserve(1024); // Diffuse the initial state in case it has low quality m_rng.discard(10000); @@ -130,6 +133,16 @@ P2PServer::P2PServer(p2pool* pool) m_AuxJobBroadcastAsync.data = this; #endif + uv_mutex_init_checked(&m_MoneroBlockBroadcastsLock); + uv_mutex_init_checked(&m_BroadcastMoneroBlockLock); + + err = uv_async_init(&m_loop, &m_BroadcastMoneroBlockAsync, on_monero_block_broadcast); + if (err) { + LOGERR(1, "uv_async_init failed, error " << uv_err_name(err)); + PANIC_STOP(); + } + m_BroadcastMoneroBlockAsync.data = this; + err = uv_timer_init(&m_loop, &m_timer); if (err) { LOGERR(1, "failed to create timer, error " << uv_err_name(err)); @@ -171,6 +184,9 @@ P2PServer::~P2PServer() uv_mutex_destroy(&m_AuxJobBroadcastLock); #endif + uv_mutex_destroy(&m_MoneroBlockBroadcastsLock); + uv_mutex_destroy(&m_BroadcastMoneroBlockLock); + delete m_block; delete m_cache; @@ -1178,6 +1194,7 @@ void P2PServer::on_timer() check_for_updates(); api_update_local_stats(); clean_aux_job_messages(); + clean_monero_block_broadcasts(); } void P2PServer::flush_cache() @@ -1427,6 +1444,11 @@ void P2PServer::on_shutdown() uv_close(reinterpret_cast(&m_AuxJobBroadcastAsync), nullptr); } #endif + + { + MutexLock lock(m_BroadcastMoneroBlockLock); + uv_close(reinterpret_cast(&m_BroadcastMoneroBlockAsync), nullptr); + } } void P2PServer::api_update_local_stats() @@ -1588,6 +1610,113 @@ void P2PServer::clean_aux_job_messages() } } +void P2PServer::clean_monero_block_broadcasts() +{ + if ((m_timerCounter & 255) != 0) { + return; + } + + MutexLock lock(m_MoneroBlockBroadcastsLock); + + if (m_MoneroBlockBroadcasts.empty()) { + return; + } + + const uint64_t cur_time = seconds_since_epoch(); + + for (auto it = m_MoneroBlockBroadcasts.begin(); it != m_MoneroBlockBroadcasts.end();) { + if (cur_time > it->second + MONERO_BROADCAST_TIMEOUT) { + it = m_MoneroBlockBroadcasts.erase(it); + } + else { + ++it; + } + } +} + +void P2PServer::broadcast_monero_block_async(std::vector&& blob) +{ + MutexLock lock(m_BroadcastMoneroBlockLock); + + m_MoneroBlockToBroadcast = std::move(blob); + + const int err = uv_async_send(&m_BroadcastMoneroBlockAsync); + if (err) { + LOGERR(1, "uv_async_send failed, error " << uv_err_name(err)); + } +} + +void P2PServer::broadcast_monero_block_handler() +{ + std::vector blob; + { + MutexLock lock(m_BroadcastMoneroBlockLock); + blob = std::move(m_MoneroBlockToBroadcast); + } + + if (blob.empty()) { + return; + } + + broadcast_monero_block(blob.data(), static_cast(blob.size()), nullptr, false); +} + +void P2PServer::broadcast_monero_block(const uint8_t* data, uint32_t data_size, const P2PClient* source, bool duplicate_check_done) +{ + check_event_loop_thread(__func__); + + if (data_size < sizeof(MoneroBlockBroadcastHeader)) { + LOGWARN(3, "broadcast_monero_block: data_size is too small: " << data_size); + return; + } + + if (!duplicate_check_done && !store_monero_block_broadcast(data + sizeof(MoneroBlockBroadcastHeader), data_size - sizeof(MoneroBlockBroadcastHeader))) { + LOGINFO(6, "broadcast_monero_block: skipping duplicate broadcast"); + return; + } + + for (P2PClient* client = static_cast(m_connectedClientsList->m_next); client != m_connectedClientsList; client = static_cast(client->m_next)) { + if ((source && (client == source)) || !client->is_good() || (client->m_protocolVersion < PROTOCOL_VERSION_1_4)) { + continue; + } + + const bool result = send(client, [client, data, data_size](uint8_t* buf, size_t buf_size) -> size_t { + LOGINFO(6, "sending MONERO_BLOCK_BROADCAST to " << static_cast(client->m_addrString)); + + if (buf_size < 1 + sizeof(uint32_t) + data_size) { + return 0; + } + + uint8_t* p = buf; + + *(p++) = static_cast(MessageId::MONERO_BLOCK_BROADCAST); + + memcpy(p, &data_size, sizeof(uint32_t)); + p += sizeof(uint32_t); + + memcpy(p, data, data_size); + p += data_size; + + return p - buf; + }); + + if (!result) { + LOGWARN(5, "failed to send MONERO_BLOCK_BROADCAST to " << static_cast(client->m_addrString) << ", disconnecting"); + client->close(); + } + } +} + +bool P2PServer::store_monero_block_broadcast(const uint8_t* data, uint32_t data_size) +{ + hash digest; + sha256(data, data_size, digest.h); + + // Every message can be received from multiple peers, so broadcast it only once + MutexLock lock(m_MoneroBlockBroadcastsLock); + return m_MoneroBlockBroadcasts.emplace(*digest.u64(), seconds_since_epoch()).second; +} + P2PServer::P2PClient::~P2PClient() { } @@ -1901,6 +2030,23 @@ bool P2PServer::P2PClient::on_read(const char* data, uint32_t size) } } break; + + case MessageId::MONERO_BLOCK_BROADCAST: + LOGINFO(6, "peer " << log::Gray() << static_cast(m_addrString) << log::NoColor() << " sent MONERO_BLOCK_BROADCAST"); + + if (bytes_left >= 1 + sizeof(uint32_t)) { + const uint32_t msg_size = read_unaligned(reinterpret_cast(buf + 1)); + if (bytes_left >= 1 + sizeof(uint32_t) + msg_size) { + bytes_read = 1 + sizeof(uint32_t) + msg_size; + + if (!on_monero_block_broadcast(buf + 1 + sizeof(uint32_t), msg_size)) { + ban(DEFAULT_BAN_TIME); + server->remove_peer_from_list(this); + return false; + } + } + } + break; } if (bytes_read) { @@ -2822,6 +2968,154 @@ bool P2PServer::P2PClient::on_aux_job_donation(const uint8_t* buf, uint32_t size return true; } +bool P2PServer::P2PClient::on_monero_block_broadcast(const uint8_t* buf, uint32_t size) +{ + P2PServer* server = static_cast(m_owner); + + if (size < sizeof(MoneroBlockBroadcastHeader)) { + LOGWARN(3, "Invalid MONERO_BLOCK_BROADCAST size: " << size); + return false; + } + + const uint8_t* buf0 = buf; + uint32_t size0 = size; + + MoneroBlockBroadcastHeader data; + memcpy(&data, buf, sizeof(data)); + + buf += sizeof(data); + size -= sizeof(data); + + if ((data.header_size < 43) || (data.header_size > 128) || (data.miner_tx_size < 64) || (data.header_size + data.miner_tx_size + 1 > size)) { + LOGWARN(3, "Invalid MONERO_BLOCK_BROADCAST header: " << data.header_size << ", " << data.miner_tx_size << ", " << size); + return false; + } + + uint32_t num_transactions; + const uint8_t* tx_hashes = readVarint(buf + data.header_size + data.miner_tx_size, buf + size, num_transactions); + if (!tx_hashes) { + LOGWARN(3, "Invalid MONERO_BLOCK_BROADCAST: tx_hashes not found"); + return false; + } + + if ((num_transactions >= MAX_BLOCK_SIZE / HASH_SIZE) || (num_transactions * HASH_SIZE != size - static_cast(tx_hashes - buf))) { + LOGWARN(3, "Invalid MONERO_BLOCK_BROADCAST: invalid number of transactions " << num_transactions); + return false; + } + + if (buf[data.header_size] != TX_VERSION) { + LOGWARN(3, "Invalid MONERO_BLOCK_BROADCAST: TX_VERSION byte not found"); + return false; + } + + uint64_t unlock_height; + if (!readVarint(buf + data.header_size + 1, buf + data.header_size + data.miner_tx_size, unlock_height)) { + LOGWARN(3, "Invalid MONERO_BLOCK_BROADCAST: unlock_height not found"); + return false; + } + + p2pool* pool = server->m_pool; + + // Ignore blocks which already unlocked + const uint64_t cur_height = pool->miner_data().height; + + if (unlock_height < cur_height) { + LOGINFO(5, "Outdated MONERO_BLOCK_BROADCAST: unlock_height = " << unlock_height << ", current height = " << cur_height << " - ignored"); + return true; + } + + // Ignore repeated old messages + if (!server->store_monero_block_broadcast(buf, size)) { + LOGINFO(6, "Repeated MONERO_BLOCK_BROADCAST - ignored"); + return true; + } + + const uint64_t height = unlock_height - MINER_REWARD_UNLOCK_TIME; + + difficulty_type diff; + if (!pool->get_difficulty_at_height(height, diff)) { + LOGWARN(3, "Invalid MONERO_BLOCK_BROADCAST: couldn't get difficulty at height " << height); + return false; + } + + // Use 90% of this height's difficulty to account for possible altchain deviations + diff -= diff / 10; + + hash seed; + if (!pool->get_seed(height, seed)) { + LOGWARN(3, "Invalid MONERO_BLOCK_BROADCAST: couldn't get seed for height " << height); + return false; + } + + hash hashes[3] = { {}, keccak_0x00, {} }; + + // "miner_tx_size - 1" because the last byte is 0x00 (base rct data), it goes into the second hash + keccak(buf + data.header_size, static_cast(data.miner_tx_size) - 1, hashes[0].h); + + std::vector transactions(num_transactions + 1); + keccak(reinterpret_cast(hashes), sizeof(hashes), transactions[0].h); + + if (num_transactions > 0) { + memcpy(transactions.data() + 1, tx_hashes, num_transactions * HASH_SIZE); + } + + root_hash root; + merkle_hash(transactions, root); + + std::vector blob; + blob.reserve(data.header_size + HASH_SIZE + 2); + + blob.insert(blob.end(), buf, buf + data.header_size); + blob.insert(blob.end(), root.h, root.h + HASH_SIZE); + writeVarint(num_transactions + 1, blob); + + hash pow_hash; + if (!pool->hasher()->calculate(blob.data(), blob.size(), height, seed, pow_hash, false)) { + LOGWARN(3, "Invalid MONERO_BLOCK_BROADCAST: failed to calculate PoW hash"); + return false; + } + + if (!diff.check_pow(pow_hash)) { + LOGWARN(3, "Invalid MONERO_BLOCK_BROADCAST: diff check failed, PoW hash = " << pow_hash); + return false; + } + + server->broadcast_monero_block(buf0, size0, this, true); + + std::string request; + request.reserve(size * 2 + 128); + + request.append("{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"submit_block\",\"params\":[\""); + + for (size_t i = 0; i < size; ++i) { + request.append(1, "0123456789abcdef"[buf[i] >> 4]); + request.append(1, "0123456789abcdef"[buf[i] & 15]); + } + + request.append("\"]}"); + + const Params::Host& host = pool->current_host(); + + JSONRPCRequest::call( + host.m_address, + host.m_rpcPort, + request, + host.m_rpcLogin, + server->m_socks5Proxy, + host.m_rpcSSL, + host.m_rpcSSL_Fingerprint, + JSONRPCRequest::dummy_callback, + [](const char* data, size_t size, double) + { + if (size > 0) { + LOGERR(0, "on_monero_block_broadcast: submit_block RPC request failed, error " << log::const_buf(data, size)); + } + } + ); + + return true; +} + bool P2PServer::P2PClient::handle_incoming_block_async(const PoolBlock* block, uint64_t max_time_delta) { P2PServer* server = static_cast(m_owner); diff --git a/src/p2p_server.h b/src/p2p_server.h index ce28ce6..be217f7 100644 --- a/src/p2p_server.h +++ b/src/p2p_server.h @@ -40,8 +40,9 @@ static constexpr uint32_t PROTOCOL_VERSION_1_0 = 0x00010000UL; static constexpr uint32_t PROTOCOL_VERSION_1_1 = 0x00010001UL; static constexpr uint32_t PROTOCOL_VERSION_1_2 = 0x00010002UL; static constexpr uint32_t PROTOCOL_VERSION_1_3 = 0x00010003UL; +static constexpr uint32_t PROTOCOL_VERSION_1_4 = 0x00010004UL; -static constexpr uint32_t SUPPORTED_PROTOCOL_VERSION = PROTOCOL_VERSION_1_3; +static constexpr uint32_t SUPPORTED_PROTOCOL_VERSION = PROTOCOL_VERSION_1_4; class P2PServer : public TCPServer { @@ -59,7 +60,9 @@ public: BLOCK_NOTIFY, // Donation messages are signed by author's private keys to prevent their abuse/misuse. AUX_JOB_DONATION, - LAST = AUX_JOB_DONATION, + // Broadcast 3rd-party Monero blocks to make the whole Monero network faster + MONERO_BLOCK_BROADCAST, + LAST = MONERO_BLOCK_BROADCAST, }; explicit P2PServer(p2pool *pool); @@ -118,6 +121,7 @@ public: void on_peer_list_response(const uint8_t* buf); void on_block_notify(const uint8_t* buf); [[nodiscard]] bool on_aux_job_donation(const uint8_t* buf, uint32_t size); + [[nodiscard]] bool on_monero_block_broadcast(const uint8_t* buf, uint32_t size); [[nodiscard]] bool handle_incoming_block_async(const PoolBlock* block, uint64_t max_time_delta = 0); static void handle_incoming_block(p2pool* pool, PoolBlock& block, std::vector& missing_blocks, bool& result); @@ -212,6 +216,12 @@ public: void broadcast_aux_job_donation(const uint8_t* data, uint32_t data_size, uint64_t timestamp, const P2PClient* source, bool duplicate_check_done); + void broadcast_monero_block_async(std::vector&& blob); + void broadcast_monero_block_handler(); + void broadcast_monero_block(const uint8_t* data, uint32_t data_size, const P2PClient* source, bool duplicate_check_done); + + bool store_monero_block_broadcast(const uint8_t* data, uint32_t data_size); + private: [[nodiscard]] const char* get_log_category() const override; @@ -307,12 +317,16 @@ private: enum { AUX_JOB_TIMEOUT = 3600, + MONERO_BROADCAST_TIMEOUT = 3600 * 6, }; unordered_set> m_auxJobMessages; std::vector m_auxJobLastMessage; uint64_t m_auxJobLastMessageTimestamp; + uv_mutex_t m_MoneroBlockBroadcastsLock; + unordered_map m_MoneroBlockBroadcasts; + void send_aux_job_donation(P2PServer::P2PClient* client, const uint8_t* data, uint32_t data_size); void clean_aux_job_messages(); @@ -332,6 +346,15 @@ private: static void on_aux_job_broadcast(uv_async_t* handle) { reinterpret_cast(handle->data)->broadcast_aux_job_donation_handler(); } void broadcast_aux_job_donation_handler(); #endif + + uv_mutex_t m_BroadcastMoneroBlockLock; + std::vector m_MoneroBlockToBroadcast; + + uv_async_t m_BroadcastMoneroBlockAsync; + + static void on_monero_block_broadcast(uv_async_t* handle) { reinterpret_cast(handle->data)->broadcast_monero_block_handler(); } + + void clean_monero_block_broadcasts(); }; } // namespace p2pool diff --git a/src/p2pool.cpp b/src/p2pool.cpp index 63f0ef9..4077411 100644 --- a/src/p2pool.cpp +++ b/src/p2pool.cpp @@ -599,6 +599,10 @@ void p2pool::handle_chain_main(ChainMain& data, const char* extra) m_zmqLastActive = seconds_since_epoch(); } +void p2pool::handle_monero_block_broadcast(std::vector&& blob) +{ + m_p2pServer->broadcast_monero_block_async(std::move(blob)); +} #ifdef WITH_MERGE_MINING_DONATION void p2pool::set_aux_job_donation(const std::vector& chain_params) @@ -1041,6 +1045,9 @@ void p2pool::submit_block() const const uint32_t nonce = submit_data.nonce; const uint32_t extra_nonce = submit_data.extra_nonce; + std::vector blob; + blob.reserve(submit_data.blob.size()); + for (size_t i = 0; i < submit_data.blob.size(); ++i) { uint8_t b; if (nonce_offset && nonce_offset <= i && i < nonce_offset + sizeof(submit_data.nonce)) { @@ -1057,11 +1064,16 @@ void p2pool::submit_block() const else { b = submit_data.blob[i]; } + request.append(1, "0123456789abcdef"[b >> 4]); request.append(1, "0123456789abcdef"[b & 15]); + + blob.push_back(b); } request.append("\"]}"); + m_p2pServer->store_monero_block_broadcast(blob.data(), blob.size()); + const Params::Host& host = current_host(); JSONRPCRequest::call(host.m_address, host.m_rpcPort, request, host.m_rpcLogin, m_params->m_socks5Proxy, host.m_rpcSSL, host.m_rpcSSL_Fingerprint, diff --git a/src/p2pool.h b/src/p2pool.h index e73431e..048d8b4 100644 --- a/src/p2pool.h +++ b/src/p2pool.h @@ -86,6 +86,7 @@ public: virtual void handle_tx(TxMempoolData& tx) override; virtual void handle_miner_data(MinerData& data) override; virtual void handle_chain_main(ChainMain& data, const char* extra) override; + virtual void handle_monero_block_broadcast(std::vector&& blob) override; #ifdef WITH_MERGE_MINING_DONATION void set_aux_job_donation(const std::vector& chain_params); diff --git a/src/pool_block.cpp b/src/pool_block.cpp index c6586d3..61a4a97 100644 --- a/src/pool_block.cpp +++ b/src/pool_block.cpp @@ -343,13 +343,8 @@ bool PoolBlock::get_pow_hash(RandomX_Hasher_Base* hasher, uint64_t height, const alignas(8) uint8_t hashes[HASH_SIZE * 3]; - uint64_t* second_hash = reinterpret_cast(hashes + HASH_SIZE); - // Second hash is keccak of base rct data (it doesn't exist for the coinbase transaction, so it's a hash of a single 0x00 byte) - second_hash[0] = 0x14281e7a9e7836bcull; - second_hash[1] = 0x7d818f8229424636ull; - second_hash[2] = 0x9165d677b4f71266ull; - second_hash[3] = 0x8ac9bc64e0a996ffull; + memcpy(hashes + HASH_SIZE, keccak_0x00.h, HASH_SIZE); // Third hash is null because there is no rct data in the coinbase transaction memset(hashes + HASH_SIZE * 2, 0, HASH_SIZE); diff --git a/src/util.h b/src/util.h index e49638f..8643850 100644 --- a/src/util.h +++ b/src/util.h @@ -92,6 +92,7 @@ struct MinerCallbackHandler virtual void handle_tx(TxMempoolData& tx) = 0; virtual void handle_miner_data(MinerData& data) = 0; virtual void handle_chain_main(ChainMain& data, const char* extra) = 0; + virtual void handle_monero_block_broadcast(std::vector&& blob) = 0; }; template diff --git a/src/zmq_reader.cpp b/src/zmq_reader.cpp index ed9cc15..a0b8125 100644 --- a/src/zmq_reader.cpp +++ b/src/zmq_reader.cpp @@ -253,6 +253,166 @@ bool ZMQReader::connect(const std::string& address, bool keep_monitor) return true; } +static void broadcast_monero_block(rapidjson::Value* value, MinerCallbackHandler* handler) +{ +#define X(type, name) type name; if (!parseValue(*value, #name, name)) return; + + X(uint8_t, major_version); + X(uint8_t, minor_version); + X(uint64_t, timestamp); + X(std::string, prev_id); + X(uint32_t, nonce); + +#undef X + + auto miner_tx = value->FindMember("miner_tx"); + + if ((miner_tx == value->MemberEnd()) || !miner_tx->value.IsObject()) { + LOGWARN(3, "broadcast_monero_block: miner_tx not found or is not an object"); + return; + } + + uint8_t version; + if (!parseValue(miner_tx->value, "version", version)) { + LOGWARN(3, "broadcast_monero_block: version not found"); + return; + } + + uint64_t unlock_height; + if (!parseValue(miner_tx->value, "unlock_time", unlock_height)) { + LOGWARN(3, "broadcast_monero_block: unlock_time not found"); + return; + } + + std::string extra; + if (!parseValue(miner_tx->value, "extra", extra)) { + LOGWARN(3, "broadcast_monero_block: extra not found"); + return; + } + + auto outputs = miner_tx->value.FindMember("outputs"); + + if ((outputs == miner_tx->value.MemberEnd()) || !outputs->value.IsArray()) { + LOGWARN(3, "broadcast_monero_block: outputs not found or is not an array"); + return; + } + + auto tx_hashes = value->FindMember("tx_hashes"); + + if ((tx_hashes == value->MemberEnd()) || !tx_hashes->value.IsArray()) { + LOGWARN(3, "broadcast_monero_block: tx_hashes not found or is not an array"); + return; + } + + std::vector blob; + blob.reserve(16384); + + blob.push_back(major_version); + blob.push_back(minor_version); + + writeVarint(timestamp, blob); + + hash h; + if (!from_hex(prev_id.c_str(), prev_id.length(), h)) { + LOGWARN(3, "broadcast_monero_block: invalid prev_id " << prev_id); + return; + } + + blob.insert(blob.end(), h.h, h.h + HASH_SIZE); + blob.insert(blob.end(), reinterpret_cast(&nonce), reinterpret_cast(&nonce) + NONCE_SIZE); + + MoneroBlockBroadcastHeader data; + + data.header_size = blob.size(); + + blob.insert(blob.end(), version); + + writeVarint(unlock_height, blob); + + blob.push_back(1); + blob.push_back(TXIN_GEN); + + writeVarint(unlock_height - MINER_REWARD_UNLOCK_TIME, blob); + + auto arr = outputs->value.GetArray(); + + writeVarint(arr.Size(), blob); + + for (rapidjson::Value* i = arr.begin(); i != arr.end(); ++i) { + auto amount = i->FindMember("amount"); + if ((amount == i->MemberEnd()) || !amount->value.IsUint64()) { + LOGWARN(3, "broadcast_monero_block: amount not found or is not UInt64"); + return; + } + + auto to_tagged_key = i->FindMember("to_tagged_key"); + if ((to_tagged_key == i->MemberEnd()) || !to_tagged_key->value.IsObject()) { + LOGWARN(3, "broadcast_monero_block: to_tagged_key not found or is not an object"); + return; + } + + auto key = to_tagged_key->value.FindMember("key"); + if ((key == to_tagged_key->value.MemberEnd()) || !key->value.IsString()) { + LOGWARN(3, "broadcast_monero_block: key not found or is not a string"); + return; + } + + auto view_tag = to_tagged_key->value.FindMember("view_tag"); + if ((view_tag == to_tagged_key->value.MemberEnd()) || !view_tag->value.IsString()) { + LOGWARN(3, "broadcast_monero_block: view_tag not found or is not a string"); + return; + } + + writeVarint(amount->value.GetUint64(), blob); + blob.push_back(TXOUT_TO_TAGGED_KEY); + + if (!from_hex(key->value.GetString(), key->value.GetStringLength(), h)) { + LOGWARN(3, "broadcast_monero_block: invalid key " << key->value.GetString()); + return; + } + + blob.insert(blob.end(), h.h, h.h + HASH_SIZE); + + std::vector t; + if (!from_hex(view_tag->value.GetString(), view_tag->value.GetStringLength(), t) || (t.size() != 1)) { + LOGWARN(3, "broadcast_monero_block: invalid view_tag " << view_tag->value.GetString()); + return; + } + + blob.push_back(t[0]); + } + + std::vector t; + if (!from_hex(extra.c_str(), extra.length(), t) || t.empty()) { + LOGWARN(3, "broadcast_monero_block: invalid extra " << extra); + return; + } + + writeVarint(t.size(), blob); + blob.insert(blob.end(), t.begin(), t.end()); + + blob.push_back(0); + + data.miner_tx_size = blob.size() - data.header_size; + + auto arr2 = tx_hashes->value.GetArray(); + + writeVarint(arr2.Size(), blob); + + for (rapidjson::Value* i = arr2.begin(); i != arr2.end(); ++i) { + if (!i->IsString() || !from_hex(i->GetString(), i->GetStringLength(), h)) { + LOGWARN(3, "broadcast_monero_block: invalid tx_hash " << i->GetString()); + return; + } + blob.insert(blob.end(), h.h, h.h + HASH_SIZE); + } + + const uint8_t* p = reinterpret_cast(&data); + blob.insert(blob.begin(), p, p + sizeof(data)); + + handler->handle_monero_block_broadcast(std::move(blob)); +} + void ZMQReader::parse(char* data, size_t size) { char* value = data; @@ -350,6 +510,8 @@ void ZMQReader::parse(char* data, size_t size) auto arr = doc.GetArray(); for (Value* i = arr.begin(); i != arr.end(); ++i) { + broadcast_monero_block(i, m_handler); + if (!PARSE(*i, m_chainmainData, timestamp)) { LOGWARN(1, "json-full-chain_main timestamp failed to parse, skipping it"); continue;