diff --git a/docs/quick-reference.md b/docs/quick-reference.md index b52cf8ad4f..bcb0029302 100644 --- a/docs/quick-reference.md +++ b/docs/quick-reference.md @@ -61,8 +61,7 @@ You have some control over which peers you're connected to: ### Maintenance -Core keeps old meta around for Horizon and other systems. As cursors get updated, automatic -maintenance normally deletes more than enough for the node to use a constant amount of disk space. +Core keeps historical data needed for publish (such as SCP history) Sometimes you need to clean up more than this (for example, if you have a large maintenance debt). In this case running the command `maintenance?count=100000000` (integer is a large number, bigger than your max backlog) will perform the full maintenance. diff --git a/docs/software/commands.md b/docs/software/commands.md index db17ba6a83..41ee27e5fd 100644 --- a/docs/software/commands.md +++ b/docs/software/commands.md @@ -307,23 +307,6 @@ Most commands return their results in JSON format. * `delayed`: participating in the latest consensus rounds, but slower than others. * `agree`: running just fine. -* **setcursor** - `setcursor?id=ID&cursor=N`
- Sets or creates a cursor identified by `ID` with value `N`. ID is an - uppercase AlphaNum, N is an uint32 that represents the last ledger sequence - number that the instance ID processed. Cursors are used by dependent services - to tell stellar-core which data can be safely deleted by the instance. The - data is historical data stored in the SQL tables such as txhistory or - ledgerheaders. When all consumers processed the data for ledger sequence N - the data can be safely removed by the instance. The actual deletion is - performed by invoking the `maintenance` endpoint or on startup. See also - `dropcursor`. - -* **getcursor** - `getcursor?[id=ID]`
- Gets the cursor identified by `ID`. If ID is not defined then all cursors - will be returned. - * **scp** `scp?[limit=n][&fullkeys=false]`
Returns a JSON object with the internal state of the SCP engine for the last diff --git a/docs/stellar-core_example.cfg b/docs/stellar-core_example.cfg index c8325b7476..52ce7271bf 100644 --- a/docs/stellar-core_example.cfg +++ b/docs/stellar-core_example.cfg @@ -304,10 +304,6 @@ KNOWN_PEERS=[ "core-testnet2.stellar.org", "core-testnet3.stellar.org"] -# KNOWN_CURSORS (list of strings) default is empty -# Set of cursors added at each startup with value '1'. -KNOWN_CURSORS=["HORIZON"] - ####################### ## SCP settings diff --git a/src/bucket/test/BucketManagerTests.cpp b/src/bucket/test/BucketManagerTests.cpp index 62fb33af2a..a861e063bb 100644 --- a/src/bucket/test/BucketManagerTests.cpp +++ b/src/bucket/test/BucketManagerTests.cpp @@ -22,7 +22,7 @@ #include "lib/catch.hpp" #include "main/Application.h" #include "main/Config.h" -#include "main/ExternalQueue.h" +#include "main/Maintainer.h" #include "test/TestUtils.h" #include "test/test.h" #include "util/GlobalChecks.h" @@ -641,8 +641,7 @@ TEST_CASE_VERSIONS( clock.crank(false); // Trim history after publishing whenever possible. - ExternalQueue ps(*app); - ps.deleteOldEntries(50000); + app->getMaintainer().performMaintenance(50000); } }); } diff --git a/src/database/Database.cpp b/src/database/Database.cpp index 4ab34363cc..83eadba2cd 100644 --- a/src/database/Database.cpp +++ b/src/database/Database.cpp @@ -24,7 +24,6 @@ #include "history/HistoryManager.h" #include "ledger/LedgerHeaderUtils.h" #include "ledger/LedgerTxn.h" -#include "main/ExternalQueue.h" #include "main/PersistentState.h" #include "overlay/BanManager.h" #include "overlay/OverlayManager.h" @@ -63,7 +62,7 @@ bool Database::gDriversRegistered = false; // smallest schema version supported static unsigned long const MIN_SCHEMA_VERSION = 21; -static unsigned long const SCHEMA_VERSION = 23; +static unsigned long const SCHEMA_VERSION = 24; // These should always match our compiled version precisely, since we are // using a bundled version to get access to carray(). But in case someone @@ -219,6 +218,9 @@ Database::applySchemaUpgrade(unsigned long vers) mApp.getHistoryManager().dropSQLBasedPublish(); Upgrades::dropSupportUpgradeHistory(*this); break; + case 24: + getSession() << "DROP TABLE IF EXISTS pubsub;"; + break; default: throw std::runtime_error("Unknown DB schema version"); } @@ -473,7 +475,6 @@ Database::initialize() Upgrades::dropAll(*this); OverlayManager::dropAll(*this); PersistentState::dropAll(*this); - ExternalQueue::dropAll(*this); LedgerHeaderUtils::dropAll(*this); // No need to re-create txhistory, will be dropped during // upgradeToCurrentSchema anyway diff --git a/src/history/test/HistoryTests.cpp b/src/history/test/HistoryTests.cpp index b880f8ea69..059aa19a97 100644 --- a/src/history/test/HistoryTests.cpp +++ b/src/history/test/HistoryTests.cpp @@ -16,7 +16,7 @@ #include "historywork/GzipFileWork.h" #include "historywork/PutHistoryArchiveStateWork.h" #include "ledger/LedgerManager.h" -#include "main/ExternalQueue.h" +#include "main/Maintainer.h" #include "main/PersistentState.h" #include "process/ProcessManager.h" #include "test/TestAccount.h" @@ -1375,8 +1375,7 @@ TEST_CASE("persist publish queue", "[history][publish][acceptance]") REQUIRE(hm0.getMinLedgerQueuedToPublish() == 7); // Trim history after publishing. - ExternalQueue ps(*app0); - ps.deleteOldEntries(50000); + app0->getMaintainer().performMaintenance(50000); } cfg.MAX_CONCURRENT_SUBPROCESSES = 32; @@ -1395,8 +1394,7 @@ TEST_CASE("persist publish queue", "[history][publish][acceptance]") clock.crank(true); // Trim history after publishing whenever possible. - ExternalQueue ps(*app1); - ps.deleteOldEntries(50000); + app1->getMaintainer().performMaintenance(50000); } // We should have either an empty publish queue or a // ledger sometime after the 5th checkpoint diff --git a/src/main/ApplicationImpl.cpp b/src/main/ApplicationImpl.cpp index 5be20c7342..420b344d45 100644 --- a/src/main/ApplicationImpl.cpp +++ b/src/main/ApplicationImpl.cpp @@ -38,7 +38,6 @@ #include "main/AppConnector.h" #include "main/ApplicationUtils.h" #include "main/CommandHandler.h" -#include "main/ExternalQueue.h" #include "main/Maintainer.h" #include "main/StellarCoreVersion.h" #include "medida/counter.h" @@ -872,9 +871,6 @@ ApplicationImpl::startServices() { // restores Herder's state before starting overlay mHerder->start(); - // set known cursors before starting maintenance job - ExternalQueue ps(*this); - ps.setInitialCursors(mConfig.KNOWN_CURSORS); mMaintainer->start(); if (mConfig.MODE_AUTO_STARTS_OVERLAY) { diff --git a/src/main/CommandHandler.cpp b/src/main/CommandHandler.cpp index e2e22dbe44..9c266f7684 100644 --- a/src/main/CommandHandler.cpp +++ b/src/main/CommandHandler.cpp @@ -36,8 +36,6 @@ #include "xdr/Stellar-transaction.h" #include "xdrpp/marshal.h" -#include "ExternalQueue.h" - #ifdef BUILD_TESTS #include "simulation/LoadGenerator.h" #include "test/TestAccount.h" @@ -90,9 +88,6 @@ CommandHandler::CommandHandler(Application& app) : mApp(app) mServer->add404(std::bind(&CommandHandler::fileNotFound, this, _1, _2)); if (mApp.getConfig().modeStoresAnyHistory()) { - addRoute("dropcursor", &CommandHandler::dropcursor); - addRoute("getcursor", &CommandHandler::getcursor); - addRoute("setcursor", &CommandHandler::setcursor); addRoute("maintenance", &CommandHandler::maintenance); } @@ -1011,77 +1006,6 @@ CommandHandler::tx(std::string const& params, std::string& retStr) retStr = Json::FastWriter().write(root); } -void -CommandHandler::dropcursor(std::string const& params, std::string& retStr) -{ - ZoneScoped; - std::map map; - http::server::server::parseParams(params, map); - std::string const& id = map["id"]; - - if (!ExternalQueue::validateResourceID(id)) - { - retStr = "Invalid resource id"; - } - else - { - ExternalQueue ps(mApp); - ps.deleteCursor(id); - retStr = "Done"; - } -} - -void -CommandHandler::setcursor(std::string const& params, std::string& retStr) -{ - ZoneScoped; - std::map map; - http::server::server::parseParams(params, map); - std::string const& id = map["id"]; - - uint32 cursor = parseRequiredParam(map, "cursor"); - - if (!ExternalQueue::validateResourceID(id)) - { - retStr = "Invalid resource id"; - } - else - { - ExternalQueue ps(mApp); - ps.setCursorForResource(id, cursor); - retStr = "Done"; - } -} - -void -CommandHandler::getcursor(std::string const& params, std::string& retStr) -{ - ZoneScoped; - Json::Value root; - std::map map; - http::server::server::parseParams(params, map); - std::string const& id = map["id"]; - - // the decision was made not to check validity here - // because there are subsequent checks for that in - // ExternalQueue and if an exception is thrown for - // validity there, the ret format is technically more - // correct for the mime type - ExternalQueue ps(mApp); - std::map curMap; - int counter = 0; - ps.getCursorForResource(id, curMap); - root["cursors"][0]; - for (auto cursor : curMap) - { - root["cursors"][counter]["id"] = cursor.first; - root["cursors"][counter]["cursor"] = cursor.second; - counter++; - } - - retStr = root.toStyledString(); -} - void CommandHandler::maintenance(std::string const& params, std::string& retStr) { diff --git a/src/main/CommandHandler.h b/src/main/CommandHandler.h index d259730286..8474f07a89 100644 --- a/src/main/CommandHandler.h +++ b/src/main/CommandHandler.h @@ -49,7 +49,6 @@ class CommandHandler void bans(std::string const& params, std::string& retStr); void connect(std::string const& params, std::string& retStr); - void dropcursor(std::string const& params, std::string& retStr); void dropPeer(std::string const& params, std::string& retStr); void info(std::string const& params, std::string& retStr); void ll(std::string const& params, std::string& retStr); @@ -61,8 +60,6 @@ class CommandHandler void peers(std::string const& params, std::string& retStr); void selfCheck(std::string const&, std::string& retStr); void quorum(std::string const& params, std::string& retStr); - void setcursor(std::string const& params, std::string& retStr); - void getcursor(std::string const& params, std::string& retStr); void scpInfo(std::string const& params, std::string& retStr); void tx(std::string const& params, std::string& retStr); void unban(std::string const& params, std::string& retStr); diff --git a/src/main/Config.cpp b/src/main/Config.cpp index f8980fd2d6..4dbe1489f3 100644 --- a/src/main/Config.cpp +++ b/src/main/Config.cpp @@ -8,7 +8,6 @@ #include "herder/Herder.h" #include "history/HistoryArchive.h" #include "ledger/LedgerManager.h" -#include "main/ExternalQueue.h" #include "main/StellarCoreVersion.h" #include "scp/LocalNode.h" #include "scp/QuorumSetUtils.h" @@ -1127,18 +1126,6 @@ Config::processConfig(std::shared_ptr t) [&]() { BUCKETLIST_DB_PERSIST_INDEX = readBool(item); }}, {"METADATA_DEBUG_LEDGERS", [&]() { METADATA_DEBUG_LEDGERS = readInt(item); }}, - {"KNOWN_CURSORS", - [&]() { - KNOWN_CURSORS = readArray(item); - for (auto const& c : KNOWN_CURSORS) - { - if (!ExternalQueue::validateResourceID(c)) - { - throw std::invalid_argument(fmt::format( - FMT_STRING("invalid cursor: \"{}\""), c)); - } - } - }}, {"RUN_STANDALONE", [&]() { RUN_STANDALONE = readBool(item); }}, {"CATCHUP_COMPLETE", [&]() { CATCHUP_COMPLETE = readBool(item); }}, diff --git a/src/main/Config.h b/src/main/Config.h index 7ca4c082a4..cc8ca0175c 100644 --- a/src/main/Config.h +++ b/src/main/Config.h @@ -554,9 +554,6 @@ class Config : public std::enable_shared_from_this // at a premium. uint32_t METADATA_DEBUG_LEDGERS; - // Set of cursors added at each startup with value '1'. - std::vector KNOWN_CURSORS; - // maximum protocol version supported by the application, can be overridden // in tests uint32_t LEDGER_PROTOCOL_VERSION; diff --git a/src/main/ExternalQueue.cpp b/src/main/ExternalQueue.cpp deleted file mode 100644 index 61578971cb..0000000000 --- a/src/main/ExternalQueue.cpp +++ /dev/null @@ -1,245 +0,0 @@ -// Copyright 2015 Stellar Development Foundation and contributors. Licensed -// under the Apache License, Version 2.0. See the COPYING file at the root -// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 - -#include "ExternalQueue.h" - -#include "Application.h" -#include "database/Database.h" -#include "ledger/LedgerManager.h" -#include "util/GlobalChecks.h" -#include "util/Logging.h" -#include -#include -#include - -namespace stellar -{ - -using namespace std; - -string ExternalQueue::kSQLCreateStatement = - "CREATE TABLE IF NOT EXISTS pubsub (" - "resid CHARACTER(32) PRIMARY KEY," - "lastread INTEGER" - "); "; - -ExternalQueue::ExternalQueue(Application& app) : mApp(app) -{ -} - -void -ExternalQueue::dropAll(Database& db) -{ - db.getSession() << "DROP TABLE IF EXISTS pubsub;"; - - soci::statement st = db.getSession().prepare << kSQLCreateStatement; - st.execute(true); -} - -bool -ExternalQueue::validateResourceID(std::string const& resid) -{ - static std::regex re("^[A-Z][A-Z0-9]{0,31}$"); - return std::regex_match(resid, re); -} - -void -ExternalQueue::setInitialCursors(std::vector const& initialResids) -{ - for (auto const& resid : initialResids) - { - addCursorForResource(resid, 1); - } -} - -void -ExternalQueue::addCursorForResource(std::string const& resid, uint32 cursor) -{ - ZoneScoped; - if (getCursor(resid).empty()) - { - setCursorForResource(resid, cursor); - } -} - -void -ExternalQueue::setCursorForResource(std::string const& resid, uint32 cursor) -{ - ZoneScoped; - checkID(resid); - - std::string old(getCursor(resid)); - if (old.empty()) - { - ZoneNamedN(insertPubsubZone, "insert pubsub", true); - auto prep = mApp.getDatabase().getPreparedStatement( - "INSERT INTO pubsub (resid, lastread) VALUES (:n, :v);"); - auto& st = prep.statement(); - st.exchange(soci::use(resid)); - st.exchange(soci::use(cursor)); - st.define_and_bind(); - st.execute(true); - if (st.get_affected_rows() != 1) - { - throw std::runtime_error("Could not insert data in SQL"); - } - } - else - { - auto prep = mApp.getDatabase().getPreparedStatement( - "UPDATE pubsub SET lastread = :v WHERE resid = :n;"); - - auto& st = prep.statement(); - st.exchange(soci::use(cursor)); - st.exchange(soci::use(resid)); - st.define_and_bind(); - { - ZoneNamedN(updatePubsubZone, "update pubsub", true); - st.execute(true); - } - } -} - -void -ExternalQueue::getCursorForResource(std::string const& resid, - std::map& curMap) -{ - ZoneScoped; - // no resid set, get all cursors - if (resid.empty()) - { - std::string n; - uint32_t v; - - auto& db = mApp.getDatabase(); - auto prep = - db.getPreparedStatement("SELECT resid, lastread FROM pubsub;"); - auto& st = prep.statement(); - st.exchange(soci::into(n)); - st.exchange(soci::into(v)); - st.define_and_bind(); - { - ZoneNamedN(selectPubsubZone, "select pubsub", true); - st.execute(true); - } - - while (st.got_data()) - { - curMap[n] = v; - st.fetch(); - } - } - else - { - // if resid is set attempt to look up the cursor - // and add it to the map if anything is found - std::string cursor = getCursor(resid); - if (!cursor.empty()) - { - curMap[resid] = strtoul(cursor.c_str(), NULL, 0); - } - } -} - -void -ExternalQueue::deleteCursor(std::string const& resid) -{ - ZoneScoped; - checkID(resid); - - { - ZoneNamedN(deletePubsubZone, "delete pubsub", true); - auto prep = mApp.getDatabase().getPreparedStatement( - "DELETE FROM pubsub WHERE resid = :n;"); - auto& st = prep.statement(); - st.exchange(soci::use(resid)); - st.define_and_bind(); - st.execute(true); - } -} - -void -ExternalQueue::deleteOldEntries(uint32 count) -{ - ZoneScoped; - auto& db = mApp.getDatabase(); - int m; - soci::indicator minIndicator; - soci::statement st = - (db.getSession().prepare << "SELECT MIN(lastread) FROM pubsub", - soci::into(m, minIndicator)); - { - ZoneNamedN(selectPubsubZone, "select pubsub", true); - st.execute(true); - } - - // rmin is the minimum of all last-reads, which means that remote - // subscribers are ok with us deleting any history N <= rmin. - // If we do not have subscribers, take this as maxint, and just - // use the LCL/checkpoint number (see below) to control trimming. - uint32_t rmin = std::numeric_limits::max(); - if (st.got_data() && minIndicator == soci::indicator::i_ok) - { - rmin = static_cast(m); - } - - // Next calculate the minimum of the LCL and/or any queued checkpoint. - uint32_t lcl = mApp.getLedgerManager().getLastClosedLedgerNum(); - uint32_t ql = mApp.getHistoryManager().getMinLedgerQueuedToPublish(); - uint32_t qmin = ql == 0 ? lcl : std::min(ql, lcl); - - // Next calculate, given qmin, the first ledger it'd be _safe to - // delete_ while still keeping everything required to publish. - // So if qmin is (for example) 0x7f = 127, then we want to keep 64 - // ledgers before that, and therefore can erase 0x3f = 63 and less. - uint32_t freq = mApp.getHistoryManager().getCheckpointFrequency(); - uint32_t lmin = qmin >= freq ? qmin - freq : 0; - - // Cumulative minimum is the lesser of the requirements of history - // publication and the requirements of our pubsub subscribers. - uint32_t cmin = std::min(lmin, rmin); - - CLOG_INFO(History, - "Trimming history <= ledger {} (rmin={}, qmin={}, lmin={})", cmin, - rmin, qmin, lmin); - - mApp.getLedgerManager().deleteOldEntries(mApp.getDatabase(), cmin, count); -} - -void -ExternalQueue::checkID(std::string const& resid) -{ - if (!validateResourceID(resid)) - { - throw std::invalid_argument("invalid resource ID"); - } -} - -std::string -ExternalQueue::getCursor(std::string const& resid) -{ - ZoneScoped; - checkID(resid); - std::string res; - - auto& db = mApp.getDatabase(); - auto prep = db.getPreparedStatement( - "SELECT lastread FROM pubsub WHERE resid = :n;"); - auto& st = prep.statement(); - st.exchange(soci::into(res)); - st.exchange(soci::use(resid)); - st.define_and_bind(); - { - ZoneNamedN(selectPubsubZone, "select pubsub", true); - st.execute(true); - } - - if (!st.got_data()) - { - res.clear(); - } - - return res; -} -} diff --git a/src/main/ExternalQueue.h b/src/main/ExternalQueue.h deleted file mode 100644 index ba11598ba0..0000000000 --- a/src/main/ExternalQueue.h +++ /dev/null @@ -1,47 +0,0 @@ -#pragma once - -// Copyright 2015 Stellar Development Foundation and contributors. Licensed -// under the Apache License, Version 2.0. See the COPYING file at the root -// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 - -#include "main/Application.h" -#include "xdr/Stellar-types.h" -#include - -namespace stellar -{ - -class ExternalQueue -{ - public: - ExternalQueue(Application& app); - - static void dropAll(Database& db); - - // checks if a given resource ID is well formed - static bool validateResourceID(std::string const& resid); - - // sets initial cursors for given resource (if not already present) - void setInitialCursors(std::vector const& initialResids); - // sets the cursor of a given resource if not already present - void addCursorForResource(std::string const& resid, uint32 cursor); - // sets the cursor of a given resource - void setCursorForResource(std::string const& resid, uint32 cursor); - // gets the cursor of a given resource, gets all cursors of resid is empty - void getCursorForResource(std::string const& resid, - std::map& curMap); - // deletes the subscription for the resource - void deleteCursor(std::string const& resid); - - // safely delete data, maximum count entries from each table - void deleteOldEntries(uint32 count); - - private: - void checkID(std::string const& resid); - std::string getCursor(std::string const& resid); - - static std::string kSQLCreateStatement; - - Application& mApp; -}; -} diff --git a/src/main/Maintainer.cpp b/src/main/Maintainer.cpp index 7d28854c08..0b11d7658b 100644 --- a/src/main/Maintainer.cpp +++ b/src/main/Maintainer.cpp @@ -3,8 +3,9 @@ // of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 #include "main/Maintainer.h" +#include "ledger/LedgerManager.h" +#include "main/Application.h" #include "main/Config.h" -#include "main/ExternalQueue.h" #include "util/GlobalChecks.h" #include "util/LogSlowExecution.h" #include "util/Logging.h" @@ -72,7 +73,21 @@ Maintainer::performMaintenance(uint32_t count) "performance issue: check database or perform a large manual " "maintenance followed by database maintenance. Maintenance took", std::chrono::seconds{2}); - ExternalQueue ps{mApp}; - ps.deleteOldEntries(count); + + // Calculate the minimum of the LCL and/or any queued checkpoint. + uint32_t lcl = mApp.getLedgerManager().getLastClosedLedgerNum(); + uint32_t ql = mApp.getHistoryManager().getMinLedgerQueuedToPublish(); + uint32_t qmin = ql == 0 ? lcl : std::min(ql, lcl); + + // Next calculate, given qmin, the first ledger it'd be _safe to + // delete_ while still keeping everything required to publish. + // So if qmin is (for example) 0x7f = 127, then we want to keep 64 + // ledgers before that, and therefore can erase 0x3f = 63 and less. + uint32_t freq = mApp.getHistoryManager().getCheckpointFrequency(); + uint32_t lmin = qmin >= freq ? qmin - freq : 0; + + CLOG_INFO(History, "Trimming history <= ledger {}", lmin); + + mApp.getLedgerManager().deleteOldEntries(mApp.getDatabase(), lmin, count); } } diff --git a/src/main/test/ExternalQueueTests.cpp b/src/main/test/ExternalQueueTests.cpp deleted file mode 100644 index e5af50427f..0000000000 --- a/src/main/test/ExternalQueueTests.cpp +++ /dev/null @@ -1,46 +0,0 @@ -#ifdef USE_POSTGRES -// Copyright 2014 Stellar Development Foundation and contributors. Licensed -// under the Apache License, Version 2.0. See the COPYING file at the root -// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 - -#include "lib/catch.hpp" -#include "main/Application.h" -#include "main/CommandHandler.h" -#include "main/Config.h" -#include "main/ExternalQueue.h" -#include "simulation/Simulation.h" -#include "test/TestUtils.h" -#include "test/test.h" - -using namespace stellar; - -TEST_CASE("cursors", "[externalqueue]") -{ - VirtualClock clock; - Config const& cfg = getTestConfig(0, Config::TESTDB_POSTGRESQL); - Application::pointer app = createTestApplication(clock, cfg); - - ExternalQueue ps(*app); - std::map curMap; - app->getCommandHandler().manualCmd("setcursor?id=FOO&cursor=123"); - app->getCommandHandler().manualCmd("setcursor?id=BAR&cursor=456"); - - SECTION("get non-existent cursor") - { - ps.getCursorForResource("NONEXISTENT", curMap); - REQUIRE(curMap.size() == 0); - } - - SECTION("get single cursor") - { - ps.getCursorForResource("FOO", curMap); - REQUIRE(curMap.size() == 1); - } - - SECTION("get all cursors") - { - ps.getCursorForResource("", curMap); - REQUIRE(curMap.size() == 2); - } -} -#endif