Split pool daemon into mining and observer roles
Some checks failed
CodeQL / Analyze (javascript) (push) Failing after 36s

This commit is contained in:
Codex Bot
2026-03-21 19:32:14 +01:00
parent 0cd6cd6af2
commit cc1c288f52
9 changed files with 235 additions and 121 deletions

View File

@@ -150,6 +150,16 @@
"trustProxyIP": true
},
"miningSource": {
"host": "127.0.0.1",
"port": 18081
},
"chainObserver": {
"host": "127.0.0.1",
"port": 18081
},
"daemon": {
"host": "127.0.0.1",
"port": 18081

View File

@@ -119,6 +119,16 @@
"trustProxyIP": true
},
"miningSource": {
"host": "127.0.0.1",
"port": 37777
},
"chainObserver": {
"host": "127.0.0.1",
"port": 37777
},
"daemon": {
"host": "127.0.0.1",
"port": 37777

View File

@@ -145,6 +145,16 @@
"trustProxyIP": true
},
"miningSource": {
"host": "127.0.0.1",
"port": 11812
},
"chainObserver": {
"host": "127.0.0.1",
"port": 11812
},
"daemon": {
"host": "127.0.0.1",
"port": 11812

View File

@@ -111,6 +111,16 @@
"trustProxyIP": true
},
"miningSource": {
"host": "127.0.0.1",
"port": 17767
},
"chainObserver": {
"host": "127.0.0.1",
"port": 17767
},
"daemon": {
"host": "127.0.0.1",
"port": 17767

86
init.js
View File

@@ -41,16 +41,20 @@
// Load pool modules
if (cluster.isWorker) {
switch (process.env.workerType) {
case 'pool':
require('./lib/pool.js');
break;
case 'daemon':
require('./lib/daemon.js')
break
case 'blockUnlocker':
require('./lib/blockUnlocker.js');
break;
switch (process.env.workerType) {
case 'pool':
require('./lib/pool.js');
break;
case 'daemon':
case 'miningSource':
require('./lib/miningSource.js')
break
case 'chainObserver':
require('./lib/chainObserver.js')
break
case 'blockUnlocker':
require('./lib/blockUnlocker.js');
break;
case 'paymentProcessor':
require('./lib/paymentProcessor.js');
break;
@@ -73,7 +77,7 @@
// Run a single module ?
var singleModule = (function () {
var validModules = ['pool', 'api', 'unlocker', 'payments', 'chartsDataCollector', 'telegramBot'];
var validModules = ['pool', 'api', 'unlocker', 'payments', 'chartsDataCollector', 'telegramBot', 'daemon', 'miningSource', 'chainObserver'];
for (var i = 0; i < process.argv.length; i++) {
if (process.argv[i].indexOf('-module=') === 0) {
@@ -95,13 +99,17 @@
if (singleModule) {
log('info', logSystem, 'Running in single module mode: %s', [singleModule]);
switch (singleModule) {
case 'daemon':
spawnDaemon()
break
case 'pool':
spawnPoolWorkers();
break;
switch (singleModule) {
case 'daemon':
case 'miningSource':
spawnMiningSource();
break;
case 'chainObserver':
spawnChainObserver();
break;
case 'pool':
spawnPoolWorkers();
break;
case 'unlocker':
spawnBlockUnlocker();
break;
@@ -118,12 +126,13 @@
spawnTelegramBot();
break;
}
} else {
spawnPoolWorkers();
spawnDaemon();
spawnBlockUnlocker();
spawnPaymentProcessor();
spawnApi();
} else {
spawnPoolWorkers();
spawnMiningSource();
spawnChainObserver();
spawnBlockUnlocker();
spawnPaymentProcessor();
spawnApi();
spawnChartsDataCollector();
spawnTelegramBot();
}
@@ -229,18 +238,18 @@
}
/**
* Spawn daemon module
**/
function spawnDaemon () {
* Spawn mining source module
**/
function spawnMiningSource () {
if (!config.poolServer || !config.poolServer.enabled || !config.poolServer.ports || config.poolServer.ports.length === 0) return;
var worker = cluster.fork({
workerType: 'daemon'
workerType: 'miningSource'
});
worker.on('exit', function (code, signal) {
log('error', logSystem, 'Daemon died, spawning replacement...');
log('error', logSystem, 'Mining source died, spawning replacement...');
setTimeout(function () {
spawnDaemon();
spawnMiningSource();
}, 10);
})
.on('message', function (msg) {
@@ -260,6 +269,23 @@
});
}
/**
* Spawn chain observer module
**/
function spawnChainObserver () {
if (!config.poolServer || !config.poolServer.enabled || !config.poolServer.ports || config.poolServer.ports.length === 0) return;
var worker = cluster.fork({
workerType: 'chainObserver'
});
worker.on('exit', function () {
log('error', logSystem, 'Chain observer died, spawning replacement...');
setTimeout(function () {
spawnChainObserver();
}, 2000);
});
}
/**
* Spawn block unlocker module
**/

50
lib/chainObserver.js Normal file
View File

@@ -0,0 +1,50 @@
let async = require('async');
let apiInterfaces = require('./apiInterfaces.js')(config.daemon, config.wallet, config.api);
let lastHash;
let observerConfig = config.chainObserver || config.daemon;
let logSystem = 'chainObserver';
let blockData = JSON.stringify({
id: "0",
jsonrpc: "2.0",
method: 'getlastblockheader',
params: {}
});
require('./exceptionWriter.js')(logSystem);
function runInterval () {
async.waterfall([
function (callback) {
apiInterfaces.jsonHttpRequest(observerConfig.host, observerConfig.port, blockData, function (err, res) {
if (err) {
log('error', logSystem, '%s error from chain observer', [config.coin]);
setTimeout(runInterval, 3000);
return;
}
if (res && res.result && res.result.status === "OK" && res.result.hasOwnProperty('block_header')) {
let blockHeader = res.result.block_header;
let hash = blockHeader.hash.toString('hex');
if (!lastHash || lastHash !== hash) {
lastHash = hash;
log('info', logSystem, '%s observed new chain tip %s at height %d', [config.coin, hash, blockHeader.height]);
process.send({
type: 'ChainState',
block_header: blockHeader
});
}
callback(null);
return;
}
log('error', logSystem, 'bad response from chain observer');
setTimeout(runInterval, 3000);
});
}
], function () {
setTimeout(function () {
runInterval();
}, observerConfig.refreshInterval || config.poolServer.blockRefreshInterval);
});
}
runInterval();

View File

@@ -1,90 +0,0 @@
let utils = require('./utils.js');
let async = require('async');
let apiInterfaces = require('./apiInterfaces.js')(config.daemon, config.wallet, config.api);
let lastHash;
let POOL_NONCE_SIZE = 16 + 1; // +1 for old XMR/new TRTL bugs
let logSystem = 'daemon'
let blockData = JSON.stringify({
id: "0",
jsonrpc: "2.0",
method: 'getlastblockheader',
params: {}
})
require('./exceptionWriter.js')(logSystem);
function runInterval () {
async.waterfall([
function (callback) {
apiInterfaces.jsonHttpRequest(config.daemon.host, config.daemon.port, blockData, function (err, res) {
if (err) {
log('error', logSystem, '%s error from daemon', [config.coin]);
setTimeout(runInterval, 3000);
return;
}
if (res && res.result && res.result.status === "OK" && res.result.hasOwnProperty('block_header')) {
let hash = res.result.block_header.hash.toString('hex');
if (!lastHash || lastHash !== hash) {
lastHash = hash
log('info', logSystem, '%s found new hash %s', [config.coin, hash]);
callback(null, true);
return;
} else if (config.daemon.alwaysPoll || false) {
callback(null, true);
return;
} else {
callback(true);
return;
}
} else {
log('error', logSystem, 'bad reponse from daemon');
setTimeout(runInterval, 3000);
return;
}
});
},
function (getbc, callback) {
let templateData = JSON.stringify({
id: "0",
jsonrpc: "2.0",
method: 'getblocktemplate',
params: {
reserve_size: POOL_NONCE_SIZE,
wallet_address: utils.getPoolTemplateAddress()
}
});
apiInterfaces.jsonHttpRequest(config.daemon.host, config.daemon.port, templateData, function (err, res) {
if (err) {
log('error', logSystem, 'Error polling getblocktemplate %j', [err])
callback(null)
return
}
if (res.error) {
log('error', logSystem, 'Error polling getblocktemplate %j', [res.error])
callback(null)
return
}
if (config.coin == 'talleo' && res.result.height >= 10000 && res.result.num_transactions == 0) {
callback(null)
return
}
process.send({
type: 'BlockTemplate',
block: res.result
})
callback(null)
})
}
],
function (error) {
if (error) {}
setTimeout(function () {
runInterval()
}, config.poolServer.blockRefreshInterval)
})
}
runInterval()

View File

@@ -75,7 +75,7 @@ global.log = function (severity, system, text, data) {
if (logConsole) {
if (config.logging.console.colors)
if (system === 'daemon') {
if (system === 'daemon' || system === 'miningSource' || system === 'chainObserver') {
console.log(severityMap[severity](time) + clc.green.bold(' [' + system + '] ' + formattedMessage));
}
else {

88
lib/miningSource.js Normal file
View File

@@ -0,0 +1,88 @@
let utils = require('./utils.js');
let async = require('async');
let apiInterfaces = require('./apiInterfaces.js')(config.daemon, config.wallet, config.api);
let lastHash;
let POOL_NONCE_SIZE = 16 + 1; // +1 for old XMR/new TRTL bugs
let miningConfig = config.miningSource || config.daemon;
let logSystem = 'miningSource';
let blockData = JSON.stringify({
id: "0",
jsonrpc: "2.0",
method: 'getlastblockheader',
params: {}
});
require('./exceptionWriter.js')(logSystem);
function runInterval () {
async.waterfall([
function (callback) {
apiInterfaces.jsonHttpRequest(miningConfig.host, miningConfig.port, blockData, function (err, res) {
if (err) {
log('error', logSystem, '%s error from mining source', [config.coin]);
setTimeout(runInterval, 3000);
return;
}
if (res && res.result && res.result.status === "OK" && res.result.hasOwnProperty('block_header')) {
let hash = res.result.block_header.hash.toString('hex');
if (!lastHash || lastHash !== hash) {
lastHash = hash;
log('info', logSystem, '%s found new hash %s', [config.coin, hash]);
callback(null, true);
return;
} else if (miningConfig.alwaysPoll || config.daemon.alwaysPoll || false) {
callback(null, true);
return;
} else {
callback(true);
return;
}
} else {
log('error', logSystem, 'bad response from mining source');
setTimeout(runInterval, 3000);
return;
}
});
},
function (_getbc, callback) {
let templateData = JSON.stringify({
id: "0",
jsonrpc: "2.0",
method: 'getblocktemplate',
params: {
reserve_size: POOL_NONCE_SIZE,
wallet_address: utils.getPoolTemplateAddress()
}
});
apiInterfaces.jsonHttpRequest(miningConfig.host, miningConfig.port, templateData, function (err, res) {
if (err) {
log('error', logSystem, 'Error polling getblocktemplate %j', [err]);
callback(null);
return;
}
if (res.error) {
log('error', logSystem, 'Error polling getblocktemplate %j', [res.error]);
callback(null);
return;
}
if (config.coin == 'talleo' && res.result.height >= 10000 && res.result.num_transactions == 0) {
callback(null);
return;
}
process.send({
type: 'BlockTemplate',
block: res.result
});
callback(null);
});
}
], function () {
setTimeout(function () {
runInterval();
}, config.poolServer.blockRefreshInterval);
});
}
runInterval();