Skip to content

Commit

Permalink
[1.0]client/db: enable archive pruning (#3010)
Browse files Browse the repository at this point in the history
* enable pruning of archived orders
  • Loading branch information
buck54321 authored Oct 17, 2024
1 parent a8d7040 commit 8680a20
Show file tree
Hide file tree
Showing 4 changed files with 285 additions and 15 deletions.
26 changes: 15 additions & 11 deletions client/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,18 @@ import (
)

const (
defaultRPCCertFile = "rpc.cert"
defaultRPCKeyFile = "rpc.key"
defaultMainnetHost = "127.0.0.1"
defaultTestnetHost = "127.0.0.2"
defaultSimnetHost = "127.0.0.3"
walletPairOneHost = "127.0.0.6"
walletPairTwoHost = "127.0.0.7"
defaultRPCPort = "5757"
defaultWebPort = "5758"
defaultLogLevel = "debug"
configFilename = "dexc.conf"
defaultRPCCertFile = "rpc.cert"
defaultRPCKeyFile = "rpc.key"
defaultMainnetHost = "127.0.0.1"
defaultTestnetHost = "127.0.0.2"
defaultSimnetHost = "127.0.0.3"
walletPairOneHost = "127.0.0.6"
walletPairTwoHost = "127.0.0.7"
defaultRPCPort = "5757"
defaultWebPort = "5758"
defaultLogLevel = "debug"
configFilename = "dexc.conf"
defaultArchiveSizeLimit = 1000
)

var (
Expand Down Expand Up @@ -108,6 +109,8 @@ type CoreConfig struct {
UnlockCoinsOnLogin bool `long:"release-wallet-coins" description:"On login or wallet creation, instruct the wallet to release any coins that it may have locked."`

ExtensionModeFile string `long:"extension-mode-file" description:"path to a file that specifies options for running core as an extension."`

PruneArchive uint64 `long:"prunearchive" description:"prune that order archive to the specified number of most recent orders. zero means no pruning."`
}

// WebConfig encapsulates the configuration needed for the web server.
Expand Down Expand Up @@ -216,6 +219,7 @@ func (cfg *Config) Core(log dex.Logger) *core.Config {
NoAutoDBBackup: cfg.NoAutoDBBackup,
ExtensionModeFile: cfg.ExtensionModeFile,
TheOneHost: cfg.TheOneHost,
PruneArchive: cfg.PruneArchive,
}
}

Expand Down
6 changes: 5 additions & 1 deletion client/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -1460,8 +1460,11 @@ type Config struct {
// for running core in extension mode, which gives the caller options for
// e.g. limiting the ability to configure wallets.
ExtensionModeFile string

// TheOneHost will run core with only the specified server.
TheOneHost string
// PruneArchive will prune the order archive to the specified number of
// orders.
PruneArchive uint64
}

// locale is data associated with the currently selected language.
Expand Down Expand Up @@ -1544,6 +1547,7 @@ func New(cfg *Config) (*Core, error) {
}
dbOpts := bolt.Opts{
BackupOnShutdown: !cfg.NoAutoDBBackup,
PruneArchive: cfg.PruneArchive,
}
boltDB, err := bolt.NewDB(cfg.DBPath, cfg.Logger.SubLogger("DB"), dbOpts)
if err != nil {
Expand Down
131 changes: 131 additions & 0 deletions client/db/bolt/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ var (
// Opts is a set of options for the DB.
type Opts struct {
BackupOnShutdown bool // default is true
PruneArchive uint64
}

var defaultOpts = Opts{
Expand Down Expand Up @@ -217,6 +218,11 @@ func NewDB(dbPath string, logger dex.Logger, opts ...Opts) (dexdb.DB, error) {
return nil, err
}

if bdb.opts.PruneArchive > 0 {
bdb.log.Info("Pruning the order archive")
bdb.pruneArchivedOrders(bdb.opts.PruneArchive)
}

bdb.log.Infof("Started database (version = %d, file = %s)", DBVersion, dbPath)

return bdb, nil
Expand Down Expand Up @@ -407,6 +413,131 @@ func (db *BoltDB) SetPrimaryCredentials(creds *dexdb.PrimaryCredentials) error {
})
}

func (db *BoltDB) pruneArchivedOrders(prunedSize uint64) error {

return db.Update(func(tx *bbolt.Tx) error {
archivedOB := tx.Bucket(archivedOrdersBucket)
if archivedOB == nil {
return fmt.Errorf("failed to open %s bucket", string(archivedOrdersBucket))
}

nOrds := uint64(archivedOB.Stats().BucketN - 1 /* BucketN includes top bucket */)
if nOrds <= prunedSize {
return nil
}

// We won't delete any orders with active matches.
activeMatches := tx.Bucket(activeMatchesBucket)
if activeMatches == nil {
return fmt.Errorf("failed to open %s bucket", string(activeMatchesBucket))
}
oidsWithActiveMatches := make(map[order.OrderID]struct{}, 0)
if err := activeMatches.ForEach(func(k, _ []byte) error {
mBkt := activeMatches.Bucket(k)
if mBkt == nil {
return fmt.Errorf("error getting match bucket %x", k)
}
var oid order.OrderID
copy(oid[:], mBkt.Get(orderIDKey))
oidsWithActiveMatches[oid] = struct{}{}
return nil
}); err != nil {
return fmt.Errorf("error building active match order ID index: %w", err)
}

toClear := int(nOrds - prunedSize)

type orderStamp struct {
oid []byte
stamp uint64
}
deletes := make([]*orderStamp, 0, toClear)
sortDeletes := func() {
sort.Slice(deletes, func(i, j int) bool {
return deletes[i].stamp < deletes[j].stamp
})
}
var sortedAtCapacity bool
if err := archivedOB.ForEach(func(oidB, v []byte) error {
var oid order.OrderID
copy(oid[:], oidB)
if _, found := oidsWithActiveMatches[oid]; found {
return nil
}
oBkt := archivedOB.Bucket(oidB)
if oBkt == nil {
return fmt.Errorf("no order bucket iterated order %x", oidB)
}
stampB := oBkt.Get(updateTimeKey)
if stampB == nil {
// Highly improbable.
stampB = make([]byte, 8)
}
stamp := intCoder.Uint64(stampB)
if len(deletes) < toClear {
deletes = append(deletes, &orderStamp{
stamp: stamp,
oid: oidB,
})
return nil
}
if !sortedAtCapacity {
// Make sure the last element is the newest one once we hit
// capacity.
sortDeletes()
sortedAtCapacity = true
}
if stamp > deletes[len(deletes)-1].stamp {
return nil
}
deletes[len(deletes)-1] = &orderStamp{
stamp: stamp,
oid: oidB,
}
sortDeletes()
return nil
}); err != nil {
return fmt.Errorf("archive iteration error: %v", err)
}

deletedOrders := make(map[order.OrderID]struct{})
for _, del := range deletes {
var oid order.OrderID
copy(oid[:], del.oid)
deletedOrders[oid] = struct{}{}
if err := archivedOB.DeleteBucket(del.oid); err != nil {
return fmt.Errorf("error deleting archived order %q: %v", del.oid, err)
}
}

matchesToDelete := make([][]byte, 0, prunedSize /* just avoid some allocs if we can */)
archivedMatches := tx.Bucket(archivedMatchesBucket)
if archivedMatches == nil {
return errors.New("no archived match bucket")
}
if err := archivedMatches.ForEach(func(k, _ []byte) error {
matchBkt := archivedMatches.Bucket(k)
if matchBkt == nil {
return fmt.Errorf("no bucket found for %x during iteration", k)
}
var oid order.OrderID
copy(oid[:], matchBkt.Get(orderIDKey))
if _, found := deletedOrders[oid]; found {
matchesToDelete = append(matchesToDelete, k)
}
return nil
}); err != nil {
return fmt.Errorf("error finding matches to prune: %w", err)
}
for i := range matchesToDelete {
if err := archivedMatches.DeleteBucket(matchesToDelete[i]); err != nil {
return fmt.Errorf("error deleting pruned match %x: %w", matchesToDelete[i], err)
}
}
return nil
})
}

// validateCreds checks that the PrimaryCredentials fields are properly
// populated.
func validateCreds(creds *dexdb.PrimaryCredentials) error {
Expand Down
137 changes: 134 additions & 3 deletions client/db/bolt/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"errors"
"flag"
"fmt"
"math/rand"
"os"
Expand All @@ -21,13 +22,14 @@ import (
)

var (
tLogger = dex.StdOutLogger("db_TEST", dex.LevelTrace)
tLogger = dex.StdOutLogger("db_TEST", dex.LevelTrace)
withLongTests bool
)

func newTestDB(t *testing.T) (*BoltDB, func()) {
func newTestDB(t *testing.T, opts ...Opts) (*BoltDB, func()) {
t.Helper()
dbPath := filepath.Join(t.TempDir(), "db.db")
dbi, err := NewDB(dbPath, tLogger)
dbi, err := NewDB(dbPath, tLogger, opts...)
if err != nil {
t.Fatalf("error creating dB: %v", err)
}
Expand All @@ -50,6 +52,9 @@ func newTestDB(t *testing.T) (*BoltDB, func()) {
}

func TestMain(m *testing.M) {
flag.BoolVar(&withLongTests, "withlongtests", false, "include tests that take a long time to run")
flag.Parse()

defer os.Stdout.Sync()
os.Exit(m.Run())
}
Expand Down Expand Up @@ -1162,6 +1167,10 @@ func testCredentialsUpdate(t *testing.T, boltdb *BoltDB, tester func([]byte, str
}

func TestDeleteInactiveMatches(t *testing.T) {
// TODO: This test takes way too long to run. Why?
if !withLongTests {
return
}
boltdb, shutdown := newTestDB(t)
defer shutdown()

Expand Down Expand Up @@ -1340,6 +1349,10 @@ func TestDeleteInactiveMatches(t *testing.T) {
}

func TestDeleteInactiveOrders(t *testing.T) {
// TODO: This test takes way too long to run. Why?
if !withLongTests {
return
}
boltdb, shutdown := newTestDB(t)
defer shutdown()

Expand Down Expand Up @@ -1613,3 +1626,121 @@ func TestPokes(t *testing.T) {
t.Fatal("Result from second LoadPokes wasn't empty")
}
}

func TestPruneArchivedOrders(t *testing.T) {
const host = "blah"
const prunedSize = 5
boltdb, shutdown := newTestDB(t)
defer shutdown()

archivedOrdersN := func() (n int) {
boltdb.View(func(tx *bbolt.Tx) error {
n = tx.Bucket(archivedOrdersBucket).Stats().BucketN - 1 /* BucketN includes top bucket */
return nil
})
return n
}

var ordStampI uint64
addOrder := func(stamp uint64) order.OrderID {
ord, _ := ordertest.RandomLimitOrder()
if stamp == 0 {
stamp = ordStampI
ordStampI++
}
boltdb.UpdateOrder(&db.MetaOrder{
MetaData: &db.OrderMetaData{
Status: order.OrderStatusExecuted,
Host: host,
Proof: db.OrderProof{DEXSig: []byte{0xa}},
},
Order: ord,
})
oid := ord.ID()
boltdb.ordersUpdate(func(ob, archivedOB *bbolt.Bucket) error {
archivedOB.Bucket(oid[:]).Put(updateTimeKey, uint64Bytes(stamp))
return nil
})
return oid
}
for i := 0; i < prunedSize*2; i++ {
addOrder(0)
}

if n := archivedOrdersN(); n != prunedSize*2 {
t.Fatalf("Expected %d archived orders after intitialization, saw %d", prunedSize*2, n)
}

if err := boltdb.pruneArchivedOrders(prunedSize); err != nil {
t.Fatalf("pruneArchivedOrders error: %v", err)
}

if n := archivedOrdersN(); n != prunedSize {
t.Fatalf("Expected %d archived orders after first pruning, saw %d", prunedSize, n)
}

// Make sure we pruned the first 5.
if err := boltdb.View(func(tx *bbolt.Tx) error {
bkt := tx.Bucket(archivedOrdersBucket)
return bkt.ForEach(func(oidB, _ []byte) error {
if stamp := intCoder.Uint64(bkt.Bucket(oidB).Get(updateTimeKey)); stamp < prunedSize {
return fmt.Errorf("order stamp %d should have been pruned", stamp)
}
return nil
})
}); err != nil {
t.Fatal(err)
}

// Add an order with an early stamp and an active match
oid := addOrder(1)
m := &db.MetaMatch{
MetaData: &db.MatchMetaData{
DEX: host,
Base: 1,
},
UserMatch: ordertest.RandomUserMatch(),
}
m.OrderID = oid
m.Status = order.NewlyMatched
if err := boltdb.UpdateMatch(m); err != nil {
t.Fatal(err)
}

if err := boltdb.pruneArchivedOrders(prunedSize); err != nil {
t.Fatalf("Error pruning orders when one has an active match: %v", err)
}

if n := archivedOrdersN(); n != prunedSize {
t.Fatalf("Expected %d archived orders after pruning with active match order in place, saw %d", prunedSize, n)
}

// Our active match order should still be available
if _, err := boltdb.Order(oid); err != nil {
t.Fatalf("Error retrieving unpruned active match order: %v", err)
}

// Retire the active match order
m.Status = order.MatchComplete
if err := boltdb.UpdateMatch(m); err != nil {
t.Fatal(err)
}
// Add an order to push the now retirable older order out
addOrder(0)
if err := boltdb.pruneArchivedOrders(prunedSize); err != nil {
t.Fatalf("Error pruning orders after retiring match: %v", err)
}
if n := archivedOrdersN(); n != prunedSize {
t.Fatalf("Expected %d archived orders after pruning with retired match, saw %d", prunedSize, n)
}
// Match should not be archived any longer.
metaID := m.MatchOrderUniqueID()
if err := boltdb.matchesView(func(mb, archivedMB *bbolt.Bucket) error {
if mb.Bucket(metaID) != nil || archivedMB.Bucket(metaID) != nil {
return errors.New("still found bucket for retired match of pruned order")
}
return nil
}); err != nil {
t.Fatal(err)
}
}

0 comments on commit 8680a20

Please sign in to comment.