Skip to content

Commit

Permalink
Harden durability of publish files
Browse files Browse the repository at this point in the history
  • Loading branch information
marta-lokhova committed Nov 22, 2024
1 parent 6fffb62 commit 5762fe4
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 11 deletions.
38 changes: 31 additions & 7 deletions src/history/HistoryManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@
#include "crypto/Hex.h"
#include "crypto/SHA.h"
#include "herder/HerderImpl.h"
#include <cereal/archives/binary.hpp>
#include <cereal/archives/json.hpp>
#include <cereal/cereal.hpp>
#include <cereal/types/vector.hpp>

#include "history/HistoryArchive.h"
#include "history/HistoryArchiveManager.h"
#include "history/HistoryManagerImpl.h"
Expand All @@ -30,6 +35,7 @@
#include "overlay/StellarXDR.h"
#include "process/ProcessManager.h"
#include "transactions/TransactionSQL.h"
#include "util/BufferedAsioCerealOutputArchive.h"
#include "util/GlobalChecks.h"
#include "util/Logging.h"
#include "util/Math.h"
Expand Down Expand Up @@ -97,7 +103,13 @@ writeCheckpointFile(Application& app, HistoryArchiveState const& has,
app.getHistoryManager().isLastLedgerInCheckpoint(has.currentLedger));
auto filename = publishQueueFileName(has.currentLedger);
auto tmpOut = app.getHistoryManager().getTmpDir() / filename;
has.save(tmpOut.string());
{
OutputFileStream out(app.getClock().getIOContext(),
/* fsyncOnClose */ true);
out.open(tmpOut.string());
cereal::BufferedAsioOutputArchive ar(out);
has.serialize(ar);
}

// Immediately produce a final checkpoint JSON (suitable for confirmed
// ledgers)
Expand Down Expand Up @@ -467,6 +479,22 @@ HistoryManagerImpl::takeSnapshotAndPublish(HistoryArchiveState const& has)
"delay-publishing-to-archive", delayTimeout, publishWork);
}

HistoryArchiveState
loadCheckpointHAS(std::string const& filename)
{
HistoryArchiveState has;
std::ifstream in(filename, std::ios::binary);
if (!in)
{
throw std::runtime_error(
fmt::format(FMT_STRING("Error opening file {}"), filename));
}
in.exceptions(std::ios::badbit);
cereal::BinaryInputArchive ar(in);
has.serialize(ar);
return has;
}

size_t
HistoryManagerImpl::publishQueuedHistory()
{
Expand All @@ -485,17 +513,14 @@ HistoryManagerImpl::publishQueuedHistory()
#endif

ZoneScoped;
HistoryArchiveState has;
auto seq = getMinLedgerQueuedToPublish();

if (seq == std::numeric_limits<uint32_t>::max())
{
return 0;
}

auto file = publishQueuePath(mApp.getConfig()) / publishQueueFileName(seq);
has.load(file.string());
takeSnapshotAndPublish(has);
takeSnapshotAndPublish(loadCheckpointHAS(file.string()));
return 1;
}

Expand Down Expand Up @@ -541,8 +566,7 @@ HistoryManagerImpl::getPublishQueueStates()
HistoryArchiveState has;
auto fullPath =
publishQueuePath(mApp.getConfig()) / f;
has.load(fullPath.string());
states.push_back(has);
states.push_back(loadCheckpointHAS(fullPath));
});
return states;
}
Expand Down
22 changes: 22 additions & 0 deletions src/main/CommandLine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1385,6 +1385,26 @@ getSettingsUpgradeTransactions(CommandLineArgs const& args)
});
}

int
runPrintPublishQueue(CommandLineArgs const& args)
{
CommandLine::ConfigOption configOption;

return runWithHelp(args, {configurationParser(configOption)}, [&] {
auto cfg = configOption.getConfig();
VirtualClock clock(VirtualClock::REAL_TIME);
cfg.setNoListen();
Application::pointer app = Application::create(clock, cfg, false);
cereal::JSONOutputArchive archive(std::cout);
archive.makeArray();
for (auto const& has : app->getHistoryManager().getPublishQueueStates())
{
has.serialize(archive);
}
return 0;
});
}

int
runCheckQuorumIntersection(CommandLineArgs const& args)
{
Expand Down Expand Up @@ -2058,6 +2078,8 @@ handleCommandLine(int argc, char* const* argv)
"check that a given network specified as a JSON file enjoys a quorum "
"intersection",
runCheckQuorumIntersection},
{"print-publish-queue", "print all checkpoints scheduled for publish",
runPrintPublishQueue},
#ifdef BUILD_TESTS
{"load-xdr", "load an XDR bucket file, for testing", runLoadXDR},
{"rebuild-ledger-from-buckets",
Expand Down
2 changes: 1 addition & 1 deletion src/main/dumpxdr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ void
dumpXdrStream(std::string const& filename, bool compact)
{
std::regex rx(
R"(.*\b(debug-tx-set|(?:(ledger|bucket|transactions|results|meta-debug|scp)-.+))\.xdr$)");
R"(.*\b(debug-tx-set|(?:(ledger|bucket|transactions|results|meta-debug|scp)-.+))\.xdr(?:\.dirty)?$)");
std::smatch sm;
if (std::regex_match(filename, sm, rx))
{
Expand Down
1 change: 1 addition & 0 deletions src/util/BufferedAsioCerealOutputArchive.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "util/XDRStream.h"
#include <cereal/archives/binary.hpp>
#include <cereal/cereal.hpp>
#include <cereal/types/string.hpp>

namespace cereal
{
Expand Down
8 changes: 5 additions & 3 deletions src/util/Fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ using random_access_t = asio::posix::stream_descriptor;
using native_handle_t = int;
#endif

////
// Utility functions for operating on the filesystem.
////
/*
Utility functions for operating on the filesystem.
Note on durable writes to disk:
*/

// raises an exception if a lock file cannot be created
void lockFile(std::string const& path);
Expand Down

0 comments on commit 5762fe4

Please sign in to comment.