From 435e6c75993727d3d2f1004d3e5a2bd5eb6b47a7 Mon Sep 17 00:00:00 2001 From: SChernykh <15806605+SChernykh@users.noreply.github.com> Date: Fri, 6 Jun 2025 18:56:02 +0200 Subject: [PATCH] Tari: added block push via gRPC API --- src/merge_mining_client_tari.cpp | 159 ++++++++++++++++++++++++++++--- src/merge_mining_client_tari.h | 19 ++++ 2 files changed, 165 insertions(+), 13 deletions(-) diff --git a/src/merge_mining_client_tari.cpp b/src/merge_mining_client_tari.cpp index 01feb5f..342827c 100644 --- a/src/merge_mining_client_tari.cpp +++ b/src/merge_mining_client_tari.cpp @@ -25,6 +25,7 @@ #include "pool_block.h" #include "merkle.h" #include "side_chain.h" +#include LOG_CATEGORY(MergeMiningClientTari) @@ -43,6 +44,7 @@ MergeMiningClientTari::MergeMiningClientTari(p2pool* pool, std::string host, con , m_server(new TariServer(pool->params().m_socks5Proxy)) , m_hostStr(host) , m_workerStop(0) + , m_pushBlockStop(0) { if (host.find(TARI_PREFIX) != 0) { LOGERR(1, "Invalid host " << host << " - \"" << TARI_PREFIX << "\" prefix not found"); @@ -85,26 +87,54 @@ MergeMiningClientTari::MergeMiningClientTari(p2pool* pool, std::string host, con log::Stream s(buf); s << "127.0.0.1:" << m_server->external_listen_port(); - grpc::ChannelArguments cArgs; + m_channelArgs.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, 1000); - cArgs.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, 1000); - - cArgs.SetInt(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, 1000); - cArgs.SetInt(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS, 10000); + m_channelArgs.SetInt(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, 1000); + m_channelArgs.SetInt(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS, 10000); - cArgs.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, -1); - cArgs.SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, -1); + m_channelArgs.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, -1); + m_channelArgs.SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, -1); - m_TariNode = new BaseNode::Stub(grpc::CreateCustomChannel(buf, grpc::InsecureChannelCredentials(), cArgs)); + m_TariNode = new BaseNode::Stub(grpc::CreateCustomChannel(buf, grpc::InsecureChannelCredentials(), m_channelArgs)); uv_mutex_init_checked(&m_workerLock); uv_cond_init_checked(&m_workerCond); - const int err = uv_thread_create(&m_worker, run_wrapper, this); + int err = uv_thread_create(&m_worker, run_wrapper, this); if (err) { LOGERR(1, "failed to start worker thread, error " << uv_err_name(err)); throw std::exception(); } + + uv_mutex_init_checked(&m_pushBlockLock); + uv_cond_init_checked(&m_pushBlockCond); + + std::ifstream f("tari_nodes.txt"); + + if (f.is_open()) { + while (f.good()) { + std::string s; + std::getline(f, s); + + if (!s.empty()) { + PushBlockThreadData* data = new PushBlockThreadData{ {}, this, std::move(s) }; + + err = uv_thread_create(&data->m_worker, push_block_thread, data); + if (err) { + LOGERR(1, "failed to start block pusher thread for node " << data->m_node << ", error " << uv_err_name(err)); + delete data; + } + else { + m_pushBlockThreads.push_back(data); + + // Limit to 16 nodes. More will be just an overkill and a waste of bandwidth. + if (m_pushBlockThreads.size() >= 16) { + break; + } + } + } + } + } } MergeMiningClientTari::~MergeMiningClientTari() @@ -118,6 +148,20 @@ MergeMiningClientTari::~MergeMiningClientTari() } uv_thread_join(&m_worker); + if (!m_pushBlockThreads.empty()) { + m_pushBlockStop.exchange(1); + { + MutexLock lock(m_pushBlockLock); + uv_cond_broadcast(&m_pushBlockCond); + } + + for (PushBlockThreadData* data : m_pushBlockThreads) { + uv_thread_join(&data->m_worker); + delete data; + } + m_pushBlockThreads.clear(); + } + m_server->shutdown_tcp(); delete m_server; @@ -128,6 +172,9 @@ MergeMiningClientTari::~MergeMiningClientTari() uv_mutex_destroy(&m_workerLock); uv_cond_destroy(&m_workerCond); + uv_mutex_destroy(&m_pushBlockLock); + uv_cond_destroy(&m_pushBlockCond); + LOGINFO(1, "stopped"); } @@ -447,6 +494,18 @@ void MergeMiningClientTari::submit_solution(const std::vector& coinbase pow->set_pow_data(data); } + { + const std::string& h = block.header().hash(); + LOGINFO(4, "Block " << log::hex_buf(h.data(), h.size()) << ": PoW data is set, everything is ready. Submitting it!"); + } + + if (!m_pushBlockThreads.empty()) { + MutexLock lock(m_pushBlockLock); + + m_blockToPush = block; + uv_cond_broadcast(&m_pushBlockCond); + } + struct Work { uv_work_t req; @@ -589,7 +648,7 @@ void MergeMiningClientTari::run() (a.failed_checkpoints() == b.failed_checkpoints()); }; - for (;;) { + while (!m_workerStop) { const auto start_time = high_resolution_clock::now(); // Force frequent enough updates (at least every 30 seconds) @@ -692,7 +751,7 @@ void MergeMiningClientTari::run() m_chainParamsTimestamp = time(nullptr); - m_tariBlock = response.block(); + m_tariBlock = std::move(*response.mutable_block()); LOGINFO(4, "Tari aux block template: height = " << job_params.height << ", diff = " << job_params.diff @@ -716,8 +775,8 @@ void MergeMiningClientTari::run() const int64_t timeout = std::max(500'000'000 - dt, 1'000'000); - if ((m_workerStop.load() != 0) || (uv_cond_timedwait(&m_workerCond, &m_workerLock, timeout) != UV_ETIMEDOUT)) { - return; + if (!m_workerStop) { + uv_cond_timedwait(&m_workerCond, &m_workerLock, timeout); } } } @@ -887,4 +946,78 @@ bool MergeMiningClientTari::TariClient::on_read(const char* data, uint32_t size) }); } +void MergeMiningClientTari::push_block_thread(void* arg) +{ + set_thread_name("Push Tari block"); + + const PushBlockThreadData* data = reinterpret_cast(arg); + + LOGINFO(1, "push block thread ready for " << data->m_node); + + try { + data->m_client->push_blocks_to(data->m_node); + } + catch (const std::exception& e) { + LOGERR(0, "Exception in push_blocks_to(" << data->m_node << "): " << e.what()); + } + + LOGINFO(1, "push block thread stopped for " << data->m_node); +} + +void MergeMiningClientTari::push_blocks_to(const std::string& node_address) +{ + using namespace tari::rpc; + + std::unique_ptr node(new BaseNode::Stub(grpc::CreateCustomChannel(node_address, grpc::InsecureChannelCredentials(), m_channelArgs))); + + grpc::ClientContext get_tip_info_ctx{}; + Empty get_tip_info_request{}; + TipInfoResponse tip_info{}; + + const grpc::Status get_tip_info_status = node->GetTipInfo(&get_tip_info_ctx, get_tip_info_request, &tip_info); + + if (!get_tip_info_status.ok()) { + LOGWARN(4, "GetTipInfo failed for " << node_address << ": " << get_tip_info_status.error_message()); + if (!get_tip_info_status.error_details().empty()) { + LOGWARN(4, "GetTipInfo failed for " << node_address << ": " << get_tip_info_status.error_details()); + } + } + else { + LOGINFO(4, node_address << " is at block height " << tip_info.metadata().best_block_height()); + } + + while (!m_pushBlockStop) { + Block block; + { + MutexLock lock(m_pushBlockLock); + uv_cond_wait(&m_pushBlockCond, &m_pushBlockLock); + + if (m_pushBlockStop) { + return; + } + + block = m_blockToPush; + } + + const std::string& h = block.header().hash(); + LOGINFO(4, "Pushing block " << log::hex_buf(h.data(), h.size()) << " to " << node_address); + + grpc::ClientContext ctx; + SubmitBlockResponse response; + + const grpc::Status status = node->SubmitBlock(&ctx, block, &response); + + if (!status.ok()) { + LOGWARN(4, "SubmitBlock failed for " << node_address << ": " << status.error_message()); + if (!status.error_details().empty()) { + LOGWARN(4, "SubmitBlock failed for " << node_address << ": " << status.error_details()); + } + } + else { + const std::string& h = response.block_hash(); + LOGINFO(0, log::LightGreen() << "Pushed Tari block " << log::hex_buf(h.data(), h.size()) << " to " << node_address); + } + } +} + } // namespace p2pool diff --git a/src/merge_mining_client_tari.h b/src/merge_mining_client_tari.h index 1cc4759..3f2cd58 100644 --- a/src/merge_mining_client_tari.h +++ b/src/merge_mining_client_tari.h @@ -99,6 +99,7 @@ private: const std::string m_hostStr; + grpc::ChannelArguments m_channelArgs; tari::rpc::BaseNode::Stub* m_TariNode; struct TariClient : public TCPServer::Client @@ -130,6 +131,24 @@ private: static void run_wrapper(void* arg); void run(); + + uv_mutex_t m_pushBlockLock; + uv_cond_t m_pushBlockCond; + std::atomic m_pushBlockStop; + tari::rpc::Block m_blockToPush; + + struct PushBlockThreadData + { + uv_thread_t m_worker; + + MergeMiningClientTari* m_client; + const std::string m_node; + }; + + std::vector m_pushBlockThreads; + + static void push_block_thread(void* arg); + void push_blocks_to(const std::string& node_address); }; } // namespace p2pool