Skip to content

Commit

Permalink
added ability to report progress of RescanAccounts and to interrupt it
Browse files Browse the repository at this point in the history
  • Loading branch information
anatolse committed Nov 23, 2023
1 parent afa3da9 commit b8ea45e
Show file tree
Hide file tree
Showing 7 changed files with 253 additions and 177 deletions.
15 changes: 14 additions & 1 deletion core/block_crypt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3182,9 +3182,18 @@ namespace beam
m_Total = nTotal;
m_Last_ms = GetTime_ms();
LOG_INFO() << sz;
if (m_pExternal)
m_pExternal->Reset(sz, nTotal);
}

void LongAction::OnProgress(uint64_t pos)
void LongAction::SetTotal(uint64_t nTotal)
{
m_Total = nTotal;
if (m_pExternal)
m_pExternal->SetTotal(nTotal);
}

bool LongAction::OnProgress(uint64_t pos)
{
uint32_t dt_ms = GetTime_ms() - m_Last_ms;

Expand All @@ -3206,6 +3215,10 @@ namespace beam

LOG_INFO() << "\t" << nDone << "%...";
}
if (m_pExternal)
return m_pExternal->OnProgress(pos);

return true;
}

/////////////
Expand Down
24 changes: 17 additions & 7 deletions core/block_crypt.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,28 @@ namespace beam
uint32_t GetTime_ms(); // platform-independent GetTickCount
uint32_t GetTimeNnz_ms(); // guaranteed non-zero

struct LongAction
struct ILongAction
{
uint32_t m_Last_ms;
uint64_t m_Total;
virtual void Reset(const char*, uint64_t nTotal) = 0;
virtual void SetTotal(uint64_t nTotal) = 0;
virtual bool OnProgress(uint64_t pos) = 0;
};
struct LongAction : ILongAction
{
uint32_t m_Last_ms = 0;
uint64_t m_Total = 0;
ILongAction *m_pExternal = nullptr;

LongAction(const char* sz, uint64_t nTotal) {
LongAction(const char* sz, uint64_t nTotal, ILongAction *pExternal = nullptr)
: m_pExternal(pExternal)
{
Reset(sz, nTotal);
}
LongAction() {}
LongAction() = default;

void Reset(const char*, uint64_t nTotal);
void OnProgress(uint64_t pos);
void Reset(const char*, uint64_t nTotal) final;
void SetTotal(uint64_t nTotal) final;
bool OnProgress(uint64_t pos) final;
};

void HeightAdd(Height& trg, Height val); // saturates if overflow
Expand Down
11 changes: 10 additions & 1 deletion node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1232,8 +1232,17 @@ void Node::RefreshAccounts()

for (size_t i = accs.size() - nAdd; i < accs.size(); i++)
m_Processor.get_DB().InsertAccount(accs[i]);
try
{
m_Processor.RescanAccounts(nAdd, m_Cfg.m_Observer ? m_Cfg.m_Observer->GetLongActionHandler() : nullptr);
}
catch (const std::runtime_error&)
{
for (size_t i = accs.size() - nAdd; i < accs.size(); i++)
m_Processor.get_DB().DeleteAccountWithEvents(accs[i].m_iAccount);

m_Processor.RescanAccounts(nAdd);
return;
}
}

if (!accs.empty())
Expand Down
13 changes: 7 additions & 6 deletions node/node.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,15 @@ struct Node
virtual void OnStateChanged() {}
virtual void OnRolledBack(const Block::SystemState::ID& id) {};
virtual void InitializeUtxosProgress(uint64_t done, uint64_t total) {};
virtual ILongAction* GetLongActionHandler() { return nullptr; }

enum Error
{
enum Error
{
Unknown,
TimeDiffToLarge
};
};

virtual void OnSyncError(Error error = Unknown) {}
virtual void OnSyncError(Error error = Unknown) {}
};

struct Config
Expand Down Expand Up @@ -200,7 +201,7 @@ struct Node
} m_SyncStatus;

uint32_t get_AcessiblePeerCount() const; // all the peers with known addresses. Including temporarily banned
const PeerManager::AddrSet& get_AcessiblePeerAddrs() const;
const PeerManager::AddrSet& get_AcessiblePeerAddrs() const;

bool m_UpdatedFromPeers = false;
bool m_PostStartSynced = false;
Expand All @@ -216,7 +217,7 @@ struct Node

uint8_t OnTransaction(Transaction::Ptr&&, std::unique_ptr<Merkle::Hash>&&, const PeerID*, bool bFluff, std::ostream* pExtraInfo);

// for step-by-step tests
// for step-by-step tests
void GenerateFakeBlocks(uint32_t n);

TxPool::Fluff m_TxPool;
Expand Down
59 changes: 50 additions & 9 deletions node/node_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ namespace beam

LOG_INFO() << "starting a node on " << node.m_Cfg.m_Listen.port() << " port...";

class MyObserver final : public Node::IObserver
class MyObserver final : public Node::IObserver, public ILongAction
{
public:
MyObserver(Node& node, NodeClient& model)
Expand Down Expand Up @@ -331,14 +331,7 @@ namespace beam
m_model.m_observer->onStartedNode();
}

// make sure no overflow during conversion from SyncStatus to int,int.
constexpr auto threshold = static_cast<unsigned int>(std::numeric_limits<int>::max());
while (s.m_Total > threshold)
{
s.m_Total >>= 1;
s.m_Done >>= 1;
}

AdjustProgress(s.m_Done, s.m_Total);
m_model.m_observer->onSyncProgressUpdated(static_cast<int>(s.m_Done), static_cast<int>(s.m_Total));
}

Expand All @@ -352,10 +345,58 @@ namespace beam
m_model.m_observer->onInitProgressUpdated(done, total);
}

ILongAction* GetLongActionHandler() override
{
return this;
}

void Reset(const char* sz, uint64_t nTotal) override
{
SetTotal(nTotal);
m_Last_ms = GetTime_ms();
}

void SetTotal(uint64_t nTotal) override
{
m_Total = nTotal;
}

bool OnProgress(uint64_t pos) override
{
if (m_model.m_shouldTerminateModel)
{
return false;
}
uint32_t dt_ms = GetTime_ms() - m_Last_ms;
const uint32_t nWindow_ms = 1000; // 1 sec
uint32_t n = dt_ms / nWindow_ms;
if (n)
{
m_Last_ms += n * nWindow_ms;
uint64_t total = m_Total;
AdjustProgress(pos, total);
m_model.m_observer->onSyncProgressUpdated(static_cast<int>(pos), static_cast<int>(total));
}
return true;
}
private:
void AdjustProgress(uint64_t& done, uint64_t& total)
{
// make sure no overflow during conversion from SyncStatus to int,int.
constexpr auto threshold = static_cast<unsigned int>(std::numeric_limits<int>::max());
while (total > threshold)
{
total >>= 1;
done >>= 1;
}
}

private:
Node& m_node;
NodeClient& m_model;
Height m_Done0 = MaxHeight;
uint64_t m_Total = 0;
uint32_t m_Last_ms = 0;
bool m_reportedStarted = false;
} obs(node, *this);

Expand Down
22 changes: 12 additions & 10 deletions node/processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3485,7 +3485,7 @@ bool NodeProcessor::BlockInterpretCtx::BvmProcessor::IsOwnedVar(const bvm2::Cont
!memcmp(cid.m_pData, key.p, cid.nBytes);
}

void NodeProcessor::RescanAccounts(uint32_t nRecent)
void NodeProcessor::RescanAccounts(uint32_t nRecent, ILongAction* pExternalHandler)
{
if (!nRecent)
return;
Expand Down Expand Up @@ -3555,7 +3555,7 @@ void NodeProcessor::RescanAccounts(uint32_t nRecent)
};

{
LongAction la("Rescanning owned Txos...", 0);
LongAction la("Rescanning owned Txos...", 0, pExternalHandler);

TxoRecover wlk(rec);
wlk.m_pLa = &la;
Expand All @@ -3574,7 +3574,7 @@ void NodeProcessor::RescanAccounts(uint32_t nRecent)
TxoID nOuts = m_Extra.m_ShieldedOutputs;
m_Extra.m_ShieldedOutputs = 0;

LongAction la("Rescanning shielded Txos...", 0);
LongAction la("Rescanning shielded Txos...", 0, pExternalHandler);

struct MyKrnWalker
:public KrnWalkerRecognize
Expand Down Expand Up @@ -3607,7 +3607,7 @@ void NodeProcessor::RescanAccounts(uint32_t nRecent)
EnumKernels(wlkKrn, HeightRange(h0, m_Cursor.m_Sid.m_Height));

assert(m_Extra.m_ShieldedOutputs == nOuts);
nOuts; // supporess unused var warning in release
nOuts; // suppress unused var warning in release
}
}

Expand Down Expand Up @@ -6912,7 +6912,7 @@ bool NodeProcessor::EnumTxos(ITxoWalker& wlkTxo, const HeightRange& hr)
assert(hr.m_Max <= m_Cursor.m_ID.m_Height);

if (wlkTxo.m_pLa)
wlkTxo.m_pLa->m_Total = hr.m_Max - hr.m_Min + 1;
wlkTxo.m_pLa->SetTotal(hr.m_Max - hr.m_Min + 1);

TxoID id1 = get_TxosBefore(hr.m_Min);
Height h = hr.m_Min - 1; // don't care about overflow
Expand All @@ -6934,8 +6934,9 @@ bool NodeProcessor::EnumTxos(ITxoWalker& wlkTxo, const HeightRange& hr)
assert(wlk.m_ID < id1);
}

if (wlkTxo.m_pLa)
wlkTxo.m_pLa->OnProgress(h - hr.m_Min);
if (wlkTxo.m_pLa &&
!wlkTxo.m_pLa->OnProgress(h - hr.m_Min))
throw std::runtime_error("EnumTxos interrupted");
}

if (!wlkTxo.OnTxo(wlk, h))
Expand All @@ -6952,7 +6953,7 @@ bool NodeProcessor::EnumKernels(IKrnWalker& wlkKrn, const HeightRange& hr)
assert(hr.m_Max <= m_Cursor.m_ID.m_Height);

if (wlkKrn.m_pLa)
wlkKrn.m_pLa->m_Total = hr.m_Max - hr.m_Min + 1;
wlkKrn.m_pLa->SetTotal(hr.m_Max - hr.m_Min + 1);

TxVectors::Eternal txve;

Expand All @@ -6967,8 +6968,9 @@ bool NodeProcessor::EnumKernels(IKrnWalker& wlkKrn, const HeightRange& hr)
if (!wlkKrn.ProcessHeight(rowID, txve.m_vKernels))
return false;

if (wlkKrn.m_pLa)
wlkKrn.m_pLa->OnProgress(wlkKrn.m_Height - hr.m_Min + 1);
if (wlkKrn.m_pLa &&
!wlkKrn.m_pLa->OnProgress(wlkKrn.m_Height - hr.m_Min + 1))
throw std::runtime_error("EnumKernels interrupted");
}

return true;
Expand Down
Loading

0 comments on commit b8ea45e

Please sign in to comment.