Refactored background job shutdown logic

This commit is contained in:
sech1
2025-09-14 14:41:31 +02:00
parent 8c1c5fd8e1
commit 44f8d941a7
6 changed files with 46 additions and 24 deletions

View File

@@ -388,7 +388,6 @@ static void do_stop_mining(p2pool* m_pool, const char* /*args*/)
static void do_exit(p2pool *m_pool, const char * /* args */)
{
bkg_jobs_tracker->wait();
m_pool->stop();
}

View File

@@ -97,6 +97,8 @@ CurlContext::CurlContext(const std::string& address, int port, const std::string
, m_connectedTime(0)
, m_proxy(proxy)
{
BACKGROUND_JOB_START(CurlContext);
m_pollHandles.reserve(2);
{
@@ -266,6 +268,8 @@ CurlContext::~CurlContext()
delete m_closeCallback;
curl_slist_free_all(m_headers);
BACKGROUND_JOB_STOP(CurlContext);
}
int CurlContext::on_socket(CURL* /*easy*/, curl_socket_t s, int action)

View File

@@ -188,6 +188,13 @@ p2pool::p2pool(int argc, char* argv[])
uv_mutex_init_checked(&m_missingHeightsLock);
err = uv_timer_init(uv_default_loop_checked(), &m_timer);
if (err) {
LOGERR(1, "uv_timer_init failed, error " << uv_err_name(err));
throw std::exception();
}
m_timer.data = this;
m_api = p->m_apiPath.empty() ? nullptr : new p2pool_api(p->m_apiPath, p->m_localStats);
if (p->m_localStats && !m_api) {
@@ -1023,6 +1030,22 @@ void p2pool::on_stop(uv_async_t* async)
{
p2pool* pool = reinterpret_cast<p2pool*>(async->data);
const std::vector<std::pair<const char*, int32_t>> bkg_jobs = bkg_jobs_tracker->get_jobs();
if (!bkg_jobs.empty()) {
for (const auto& job : bkg_jobs) {
LOGINFO(1, "waiting for " << job.second << " \"" << job.first << "\" jobs to finish");
}
// There are still some background jobs running. Wait for 1 second and call on_stop again.
uv_timer_start(&pool->m_timer, [](uv_timer_t* timer) {
p2pool* pool = reinterpret_cast<p2pool*>(timer->data);
pool->on_stop(&pool->m_stopAsync);
}, 1000, 0);
return;
}
#ifndef P2POOL_UNIT_TESTS
delete pool->m_consoleCommands;
#endif
@@ -1038,6 +1061,9 @@ void p2pool::on_stop(uv_async_t* async)
uv_close(reinterpret_cast<uv_handle_t*>(&pool->m_reconnectToHostAsync), nullptr);
uv_close(reinterpret_cast<uv_handle_t*>(&pool->m_getMissingHeightsAsync), nullptr);
uv_timer_stop(&pool->m_timer);
uv_close(reinterpret_cast<uv_handle_t*>(&pool->m_timer), nullptr);
init_signals(pool, false);
DeleteLoopUserData(uv_default_loop_checked());
@@ -2339,8 +2365,6 @@ int p2pool::run()
m_stopped = true;
bkg_jobs_tracker->wait();
#if defined(WITH_RANDOMX) && !defined(P2POOL_UNIT_TESTS)
{
MutexLock lock(m_minerLock);

View File

@@ -291,6 +291,8 @@ private:
bool m_getMinerDataPending = false;
std::atomic<uint64_t> m_lastMinerDataReceived;
uv_timer_t m_timer;
};
} // namespace p2pool

View File

@@ -441,26 +441,19 @@ struct BackgroundJobTracker::Impl
}
}
void wait()
std::vector<std::pair<const char*, int32_t>> get_jobs()
{
uint64_t last_msg_time = 0;
do {
{
MutexLock lock(m_lock);
// cppcheck-suppress knownConditionTrueFalse
if (m_jobs.empty()) {
return;
}
const uint64_t t = seconds_since_epoch();
if (t != last_msg_time) {
last_msg_time = t;
for (const auto& job : m_jobs) {
LOGINFO(1, "waiting for " << job.second << " \"" << job.first << "\" jobs to finish");
}
}
std::vector<std::pair<const char*, int32_t>> result;
{
MutexLock lock(m_lock);
result.reserve(m_jobs.size());
for (const auto& job : m_jobs) {
result.emplace_back(job.first, job.second);
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
} while (1);
}
return result;
}
void print_status()
@@ -507,9 +500,9 @@ void BackgroundJobTracker::stop_internal(const char* name)
m_impl->stop(name);
}
void BackgroundJobTracker::wait()
std::vector<std::pair<const char*, int32_t>> BackgroundJobTracker::get_jobs()
{
m_impl->wait();
return m_impl->get_jobs();
}
void BackgroundJobTracker::print_status()

View File

@@ -243,7 +243,7 @@ public:
template<size_t N> FORCEINLINE void start(const char (&name)[N]) { start_internal(name); }
template<size_t N> FORCEINLINE void stop (const char (&name)[N]) { stop_internal (name); }
void wait();
std::vector<std::pair<const char*, int32_t>> get_jobs();
void print_status();
private: