Skip to content
This repository has been archived by the owner on Dec 2, 2020. It is now read-only.

Commit

Permalink
Merge pull request #116 from hellcatz/patch-4
Browse files Browse the repository at this point in the history
Solve Issue #86, #113
  • Loading branch information
Procrastinator authored Apr 20, 2017
2 parents 0c4cd51 + b0cc4cd commit 6937e7b
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 72 deletions.
195 changes: 123 additions & 72 deletions libs/paymentProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ var async = require('async');
var Stratum = require('stratum-pool');
var util = require('stratum-pool/lib/util.js');


module.exports = function(logger){

var poolConfigs = JSON.parse(process.env.pools);
Expand Down Expand Up @@ -53,12 +52,15 @@ function SetupForPool(logger, poolOptions, setupFinished){

var minConfShield = 3;
var minConfPayout = 3;

var maxBlocksPerPayment = processingConfig.maxBlocksPerPayment || 3;

var requireShielding = poolOptions.coin.requireShielding === true;
var fee = parseFloat(poolOptions.coin.txfee) || parseFloat(0.0004);

logger.special(logSystem, logComponent, logComponent + ' requireShielding: ' + requireShielding);
logger.special(logSystem, logComponent, logComponent + ' payments txfee reserve: ' + fee);
logger.debug(logSystem, logComponent, logComponent + ' requireShielding: ' + requireShielding);
logger.debug(logSystem, logComponent, logComponent + ' payments txfee reserve: ' + fee);
logger.debug(logSystem, logComponent, logComponent + ' maxBlocksPerPayment: ' + maxBlocksPerPayment);

var daemon = new Stratum.daemon.interface([processingConfig.daemon], function(severity, message){
logger[severity](logSystem, logComponent, message);
Expand Down Expand Up @@ -291,8 +293,8 @@ function SetupForPool(logger, poolOptions, setupFinished){
}
);
}


// TODO, this needs to be moved out of payments processor
function cacheNetworkStats () {
var params = null;
daemon.cmd('getmininginfo', params,
Expand Down Expand Up @@ -381,19 +383,19 @@ function SetupForPool(logger, poolOptions, setupFinished){
}
if (op.status == "failed") {
if (op.error) {
logger.error(logSystem, logComponent, "Payment operation failed " + op.id + " " + op.error.code +", " + op.error.message);
logger.error(logSystem, logComponent, "Shielding operation failed " + op.id + " " + op.error.code +", " + op.error.message);
} else {
logger.error(logSystem, logComponent, "Payment operation failed " + op.id);
logger.error(logSystem, logComponent, "Shielding operation failed " + op.id);
}
} else {
logger.special(logSystem, logComponent, 'Payment operation success ' + op.id + ' txid: ' + op.result.txid);
logger.special(logSystem, logComponent, 'Shielding operation success ' + op.id + ' txid: ' + op.result.txid);
}
}
}, true, true);
} else if (op.status == "executing") {
if (opidCount == 0) {
opidCount++;
logger.special(logSystem, logComponent, 'Payment operation in progress ' + op.id );
logger.special(logSystem, logComponent, 'Shielding operation in progress ' + op.id );
}
}
});
Expand All @@ -418,9 +420,18 @@ function SetupForPool(logger, poolOptions, setupFinished){
};

function balanceRound(number) {
return parseFloat((Math.round(number * 100000000) / 100000000).toFixed(8));
return parseFloat((Math.round(number * 100000000) / 100000000).toFixed(8));
}

function checkForDuplicateBlockHeight(rounds, height) {
var count = 0;
for (var i = 0; i < rounds.length; i++) {
if (rounds[i].height == height)
count++;
}
return count > 1;
}

/* Deal with numbers in smallest possible units (satoshis) as much as possible. This greatly helps with accuracy
when rounding and whatnot. When we are storing numbers for only humans to see, store in whole coin units. */

Expand All @@ -442,38 +453,110 @@ function SetupForPool(logger, poolOptions, setupFinished){

async.waterfall([

/* Call redis to get an array of rounds - which are coinbase transactions and block heights from submitted
blocks. */
/* Call redis to get an array of rounds and balances - which are coinbase transactions and block heights from submitted blocks. */
function(callback){
startRedisTimer();
redisClient.multi([
['hgetall', coin + ':balances'],
['smembers', coin + ':blocksPending']
]).exec(function(error, results){
endRedisTimer();

if (error){
logger.error(logSystem, logComponent, 'Could not get blocks from redis ' + JSON.stringify(error));
callback(true);
return;
}

// build worker balances
var workers = {};
for (var w in results[0]){
workers[w] = {balance: coinsToSatoshies(parseFloat(results[0][w]))};
}

// build initial rounds data from blocksPending
var rounds = results[1].map(function(r){
var details = r.split(':');
return {
blockHash: details[0],
txHash: details[1],
height: details[2],
minedby: details[3],
duplicate: false,
serialized: r
};
});

callback(null, workers, rounds);
// find duplicate blocks by height
// this can happen when two or more solutions are submitted at the same block height
var duplicateFound = false;
for (var i = 0; i < rounds.length; i++) {
if (checkForDuplicateBlockHeight(rounds, rounds[i].height) === true) {
rounds[i].duplicate = true;
duplicateFound = true;
}
}
// handle duplicates if needed
if (duplicateFound) {
var dups = rounds.filter(function(round){ return round.duplicate; });
logger.warning(logSystem, logComponent, 'Duplicate pending blocks found: ' + JSON.stringify(dups));
// attempt to find the invalid duplicates
var rpcDupCheck = dups.map(function(r){
return ['getblock', [r.blockHash]];
});
startRPCTimer();
daemon.batchCmd(rpcDupCheck, function(error, blocks){
endRPCTimer();
if (error || !blocks) {
logger.error(logSystem, logComponent, 'Error with duplicate block check rpc call getblock ' + JSON.stringify(error));
return;
}
// look for the invalid duplicate block
var validBlocks = {}; // hashtable for unique look up
var invalidBlocks = []; // array for redis work
blocks.forEach(function(block, i) {
if (block && block.result) {
// invalid duplicate submit blocks have negative confirmations
if (block.result.confirmations < 0) {
logger.warning(logSystem, logComponent, 'Remove invalid duplicate block ' + block.result.height + ' > ' + block.result.hash);
// move from blocksPending to blocksDuplicate...
invalidBlocks.push(['smove', coin + ':blocksPending', coin + ':blocksDuplicate', dups[i].serialized]);
} else {
// block must be valid, make sure it is unique
if (validBlocks.hasOwnProperty(dups[i].blockHash)) {
// not unique duplicate block
logger.warning(logSystem, logComponent, 'Remove non-unique duplicate block ' + block.result.height + ' > ' + block.result.hash);
// move from blocksPending to blocksDuplicate...
invalidBlocks.push(['smove', coin + ':blocksPending', coin + ':blocksDuplicate', dups[i].serialized]);
} else {
// keep unique valid block
validBlocks[dups[i].blockHash] = dups[i].serialized;
logger.debug(logSystem, logComponent, 'Keep valid duplicate block ' + block.result.height + ' > ' + block.result.hash);
}
}
}
});
// filter out all duplicates to prevent double payments
rounds = rounds.filter(function(round){ return !round.duplicate; });
// if we detected the invalid duplicates, move them
if (invalidBlocks.length > 0) {
// move invalid duplicate blocks in redis
startRedisTimer();
redisClient.multi(invalidBlocks).exec(function(error, kicked){
endRedisTimer();
if (error) {
logger.error(logSystem, logComponent, 'Error could not move invalid duplicate blocks in redis ' + JSON.stringify(error));
}
// continue payments normally
callback(null, workers, rounds);
});
} else {
// notify pool owner that we are unable to find the invalid duplicate blocks, manual intervention required...
logger.error(logSystem, logComponent, 'Unable to detect invalid duplicate blocks, duplicate block payments on hold.');
// continue payments normally
callback(null, workers, rounds);
}
});
} else {
// no duplicates, continue payments normally
callback(null, workers, rounds);
}
});
},

Expand Down Expand Up @@ -528,58 +611,12 @@ function SetupForPool(logger, poolOptions, setupFinished){
callback(true);
return;
}

// check for invalid blocks by block hash
blockDetails.forEach(function(block, i) {
// this is just the response from getblockcount
if (i === blockDetails.length - 1){
return;
}
// help track duplicate or invalid blocks by block hash
if (block && block.result && block.result.hash) {
// find the round for this block hash
for (var k=0; k < rounds.length; k++) {
if (rounds[k].blockHash == block.result.hash) {
var round = rounds[k];
var dupFound = false;
// duplicate, invalid, kicked, orphaned blocks will have negative confirmations
if (block.result.confirmations < 0) {
// check if this is an invalid duplicate
// we need to kick invalid duplicates now, as this will cause a double payout...
for (var d=0; d < rounds.length; d++) {
if (rounds[d].height == block.result.height && rounds[d].blockHash != block.result.hash) {
logger.warning(logSystem, logComponent, 'Kicking invalid duplicate block ' +round.height + ' > ' + round.blockHash);
dupFound = true;
// kick this round now, its completely invalid!
var kickNow = [];
kickNow.push(['smove', coin + ':blocksPending', coin + ':blocksDuplicate', round.serialized]);
startRedisTimer();
redisClient.multi(kickNow).exec(function(error, kicked){
endRedisTimer();
if (error){
logger.error(logSystem, logComponent, 'Error could not kick invalid duplicate block ' + JSON.stringify(kicked));
}
});
// filter the duplicate out now, just in case we are actually paying this time around...
rounds = rounds.filter(function(item){ return item.txHash != round.txHash; });
}
}
// unknown reason why this block failed, possible orphan or kicked soon
// not sure if we should take any action or just wait it out...
if (!dupFound) {
logger.warning(logSystem, logComponent, 'Daemon reports negative confirmations '+block.result.confirmations+' for block: ' +round.height + ' > ' + round.blockHash);
}
}
}
}
}
});

// now check block transaction ids

// get pending block transaction details from coin daemon
var batchRPCcommand = rounds.map(function(r){
return ['gettransaction', [r.txHash]];
});
// guarantee a response for batchRPCcommand
// get account address (not implemented in zcash at this time..)
batchRPCcommand.push(['getaccount', [poolOptions.address]]);

startRPCTimer();
Expand Down Expand Up @@ -653,25 +690,40 @@ function SetupForPool(logger, poolOptions, setupFinished){
return true;
};

//Filter out all rounds that are immature (not confirmed or orphaned yet)
// limit blocks paid per payment round
var payingBlocks = 0;

//filter out all rounds that are immature (not confirmed or orphaned yet)
rounds = rounds.filter(function(r){

// only pay max blocks at a time
if (payingBlocks >= maxBlocksPerPayment)
return false;

switch (r.category) {
case 'orphan':
case 'kicked':
r.canDeleteShares = canDeleteShares(r);
return true;
case 'generate':
payingBlocks++;
return true;

default:
return false;
}
});

// TODO: make tx fees dynamic
var feeSatoshi = fee * magnitude;

// calculate what the pool owes its miners
var totalOwed = parseInt(0);
for (var i = 0; i < rounds.length; i++) {
totalOwed = totalOwed + Math.round(rounds[i].reward * magnitude) - feeSatoshi; // TODO: make tx fees dynamic
// only pay generated blocks, not orphaned or kicked
if (rounds[i].category == 'generate') {
totalOwed = totalOwed + Math.round(rounds[i].reward * magnitude) - feeSatoshi;
}
}

var notAddr = null;
Expand All @@ -682,10 +734,10 @@ function SetupForPool(logger, poolOptions, setupFinished){
// check if we have enough tAddress funds to brgin payment processing
listUnspent(null, notAddr, minConfPayout, false, function (error, tBalance){
if (error) {
logger.error(logSystem, logComponent, 'Error checking pool balance before processing payments. (Unable to begin payment process)');
logger.error(logSystem, logComponent, 'Error checking pool balance before processing payments.');
return callback(true);
} else if (tBalance < totalOwed) {
logger.error(logSystem, logComponent, 'Insufficient funds to process payments ('+(tBalance / magnitude).toFixed(8) + ' < ' + (totalOwed / magnitude).toFixed(8)+'). Possibly waiting for shielding process.');
logger.error(logSystem, logComponent, 'Insufficient funds to process payments for ' + payingBlocks + ' blocks ('+(tBalance / magnitude).toFixed(8) + ' < ' + (totalOwed / magnitude).toFixed(8)+'). Possibly waiting for shielding process.');
return callback(true);
} else {
// zcash daemon does not support account feature
Expand Down Expand Up @@ -756,7 +808,6 @@ function SetupForPool(logger, poolOptions, setupFinished){
});
},


/* Calculate if any payments are ready to be sent and trigger them sending
Get balance different for each address and pass it along as object of latest balances such as
{worker1: balance1, worker2, balance2}
Expand Down
Loading

0 comments on commit 6937e7b

Please sign in to comment.