Tari: added block push via gRPC API

This commit is contained in:
SChernykh
2025-06-06 18:56:02 +02:00
parent ce840733f1
commit 435e6c7599
2 changed files with 165 additions and 13 deletions

View File

@@ -25,6 +25,7 @@
#include "pool_block.h"
#include "merkle.h"
#include "side_chain.h"
#include <fstream>
LOG_CATEGORY(MergeMiningClientTari)
@@ -43,6 +44,7 @@ MergeMiningClientTari::MergeMiningClientTari(p2pool* pool, std::string host, con
, m_server(new TariServer(pool->params().m_socks5Proxy))
, m_hostStr(host)
, m_workerStop(0)
, m_pushBlockStop(0)
{
if (host.find(TARI_PREFIX) != 0) {
LOGERR(1, "Invalid host " << host << " - \"" << TARI_PREFIX << "\" prefix not found");
@@ -85,26 +87,54 @@ MergeMiningClientTari::MergeMiningClientTari(p2pool* pool, std::string host, con
log::Stream s(buf);
s << "127.0.0.1:" << m_server->external_listen_port();
grpc::ChannelArguments cArgs;
m_channelArgs.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, 1000);
cArgs.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, 1000);
cArgs.SetInt(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, 1000);
cArgs.SetInt(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS, 10000);
m_channelArgs.SetInt(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, 1000);
m_channelArgs.SetInt(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS, 10000);
cArgs.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, -1);
cArgs.SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, -1);
m_channelArgs.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, -1);
m_channelArgs.SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, -1);
m_TariNode = new BaseNode::Stub(grpc::CreateCustomChannel(buf, grpc::InsecureChannelCredentials(), cArgs));
m_TariNode = new BaseNode::Stub(grpc::CreateCustomChannel(buf, grpc::InsecureChannelCredentials(), m_channelArgs));
uv_mutex_init_checked(&m_workerLock);
uv_cond_init_checked(&m_workerCond);
const int err = uv_thread_create(&m_worker, run_wrapper, this);
int err = uv_thread_create(&m_worker, run_wrapper, this);
if (err) {
LOGERR(1, "failed to start worker thread, error " << uv_err_name(err));
throw std::exception();
}
uv_mutex_init_checked(&m_pushBlockLock);
uv_cond_init_checked(&m_pushBlockCond);
std::ifstream f("tari_nodes.txt");
if (f.is_open()) {
while (f.good()) {
std::string s;
std::getline(f, s);
if (!s.empty()) {
PushBlockThreadData* data = new PushBlockThreadData{ {}, this, std::move(s) };
err = uv_thread_create(&data->m_worker, push_block_thread, data);
if (err) {
LOGERR(1, "failed to start block pusher thread for node " << data->m_node << ", error " << uv_err_name(err));
delete data;
}
else {
m_pushBlockThreads.push_back(data);
// Limit to 16 nodes. More will be just an overkill and a waste of bandwidth.
if (m_pushBlockThreads.size() >= 16) {
break;
}
}
}
}
}
}
MergeMiningClientTari::~MergeMiningClientTari()
@@ -118,6 +148,20 @@ MergeMiningClientTari::~MergeMiningClientTari()
}
uv_thread_join(&m_worker);
if (!m_pushBlockThreads.empty()) {
m_pushBlockStop.exchange(1);
{
MutexLock lock(m_pushBlockLock);
uv_cond_broadcast(&m_pushBlockCond);
}
for (PushBlockThreadData* data : m_pushBlockThreads) {
uv_thread_join(&data->m_worker);
delete data;
}
m_pushBlockThreads.clear();
}
m_server->shutdown_tcp();
delete m_server;
@@ -128,6 +172,9 @@ MergeMiningClientTari::~MergeMiningClientTari()
uv_mutex_destroy(&m_workerLock);
uv_cond_destroy(&m_workerCond);
uv_mutex_destroy(&m_pushBlockLock);
uv_cond_destroy(&m_pushBlockCond);
LOGINFO(1, "stopped");
}
@@ -447,6 +494,18 @@ void MergeMiningClientTari::submit_solution(const std::vector<uint8_t>& coinbase
pow->set_pow_data(data);
}
{
const std::string& h = block.header().hash();
LOGINFO(4, "Block " << log::hex_buf(h.data(), h.size()) << ": PoW data is set, everything is ready. Submitting it!");
}
if (!m_pushBlockThreads.empty()) {
MutexLock lock(m_pushBlockLock);
m_blockToPush = block;
uv_cond_broadcast(&m_pushBlockCond);
}
struct Work
{
uv_work_t req;
@@ -589,7 +648,7 @@ void MergeMiningClientTari::run()
(a.failed_checkpoints() == b.failed_checkpoints());
};
for (;;) {
while (!m_workerStop) {
const auto start_time = high_resolution_clock::now();
// Force frequent enough updates (at least every 30 seconds)
@@ -692,7 +751,7 @@ void MergeMiningClientTari::run()
m_chainParamsTimestamp = time(nullptr);
m_tariBlock = response.block();
m_tariBlock = std::move(*response.mutable_block());
LOGINFO(4, "Tari aux block template: height = " << job_params.height
<< ", diff = " << job_params.diff
@@ -716,8 +775,8 @@ void MergeMiningClientTari::run()
const int64_t timeout = std::max<int64_t>(500'000'000 - dt, 1'000'000);
if ((m_workerStop.load() != 0) || (uv_cond_timedwait(&m_workerCond, &m_workerLock, timeout) != UV_ETIMEDOUT)) {
return;
if (!m_workerStop) {
uv_cond_timedwait(&m_workerCond, &m_workerLock, timeout);
}
}
}
@@ -887,4 +946,78 @@ bool MergeMiningClientTari::TariClient::on_read(const char* data, uint32_t size)
});
}
void MergeMiningClientTari::push_block_thread(void* arg)
{
set_thread_name("Push Tari block");
const PushBlockThreadData* data = reinterpret_cast<const PushBlockThreadData*>(arg);
LOGINFO(1, "push block thread ready for " << data->m_node);
try {
data->m_client->push_blocks_to(data->m_node);
}
catch (const std::exception& e) {
LOGERR(0, "Exception in push_blocks_to(" << data->m_node << "): " << e.what());
}
LOGINFO(1, "push block thread stopped for " << data->m_node);
}
void MergeMiningClientTari::push_blocks_to(const std::string& node_address)
{
using namespace tari::rpc;
std::unique_ptr<BaseNode::Stub> node(new BaseNode::Stub(grpc::CreateCustomChannel(node_address, grpc::InsecureChannelCredentials(), m_channelArgs)));
grpc::ClientContext get_tip_info_ctx{};
Empty get_tip_info_request{};
TipInfoResponse tip_info{};
const grpc::Status get_tip_info_status = node->GetTipInfo(&get_tip_info_ctx, get_tip_info_request, &tip_info);
if (!get_tip_info_status.ok()) {
LOGWARN(4, "GetTipInfo failed for " << node_address << ": " << get_tip_info_status.error_message());
if (!get_tip_info_status.error_details().empty()) {
LOGWARN(4, "GetTipInfo failed for " << node_address << ": " << get_tip_info_status.error_details());
}
}
else {
LOGINFO(4, node_address << " is at block height " << tip_info.metadata().best_block_height());
}
while (!m_pushBlockStop) {
Block block;
{
MutexLock lock(m_pushBlockLock);
uv_cond_wait(&m_pushBlockCond, &m_pushBlockLock);
if (m_pushBlockStop) {
return;
}
block = m_blockToPush;
}
const std::string& h = block.header().hash();
LOGINFO(4, "Pushing block " << log::hex_buf(h.data(), h.size()) << " to " << node_address);
grpc::ClientContext ctx;
SubmitBlockResponse response;
const grpc::Status status = node->SubmitBlock(&ctx, block, &response);
if (!status.ok()) {
LOGWARN(4, "SubmitBlock failed for " << node_address << ": " << status.error_message());
if (!status.error_details().empty()) {
LOGWARN(4, "SubmitBlock failed for " << node_address << ": " << status.error_details());
}
}
else {
const std::string& h = response.block_hash();
LOGINFO(0, log::LightGreen() << "Pushed Tari block " << log::hex_buf(h.data(), h.size()) << " to " << node_address);
}
}
}
} // namespace p2pool

View File

@@ -99,6 +99,7 @@ private:
const std::string m_hostStr;
grpc::ChannelArguments m_channelArgs;
tari::rpc::BaseNode::Stub* m_TariNode;
struct TariClient : public TCPServer::Client
@@ -130,6 +131,24 @@ private:
static void run_wrapper(void* arg);
void run();
uv_mutex_t m_pushBlockLock;
uv_cond_t m_pushBlockCond;
std::atomic<uint32_t> m_pushBlockStop;
tari::rpc::Block m_blockToPush;
struct PushBlockThreadData
{
uv_thread_t m_worker;
MergeMiningClientTari* m_client;
const std::string m_node;
};
std::vector<PushBlockThreadData*> m_pushBlockThreads;
static void push_block_thread(void* arg);
void push_blocks_to(const std::string& node_address);
};
} // namespace p2pool