diff --git a/bgs/admin.go b/bgs/admin.go index 1d415d66a..45cc60484 100644 --- a/bgs/admin.go +++ b/bgs/admin.go @@ -356,13 +356,37 @@ func (bgs *BGS) handleAdminUnbanDomain(c echo.Context) error { }) } +type PDSRates struct { + PerSecond int64 `json:"per_second,omitempty"` + PerHour int64 `json:"per_hour,omitempty"` + PerDay int64 `json:"per_day,omitempty"` + CrawlRate int64 `json:"crawl_rate,omitempty"` + RepoLimit int64 `json:"repo_limit,omitempty"` + + RelayAllowed bool `json:"relay_allowed,omitempty"` +} + +func (pr *PDSRates) FromSlurper(s *Slurper) { + if pr.PerSecond == 0 { + pr.PerHour = s.DefaultPerSecondLimit + } + if pr.PerHour == 0 { + pr.PerHour = s.DefaultPerHourLimit + } + if pr.PerDay == 0 { + pr.PerDay = s.DefaultPerDayLimit + } + if pr.CrawlRate == 0 { + pr.CrawlRate = int64(s.DefaultCrawlLimit) + } + if pr.RepoLimit == 0 { + pr.RepoLimit = s.DefaultRepoLimit + } +} + type RateLimitChangeRequest struct { - Host string `json:"host"` - PerSecond int64 `json:"per_second"` - PerHour int64 `json:"per_hour"` - PerDay int64 `json:"per_day"` - CrawlRate int64 `json:"crawl_rate"` - RepoLimit int64 `json:"repo_limit"` + Host string `json:"host"` + PDSRates } func (bgs *BGS) handleAdminChangePDSRateLimits(e echo.Context) error { @@ -383,6 +407,7 @@ func (bgs *BGS) handleAdminChangePDSRateLimits(e echo.Context) error { pds.DailyEventLimit = body.PerDay pds.CrawlRateLimit = float64(body.CrawlRate) pds.RepoLimit = body.RepoLimit + pds.RelayAllowed = body.RelayAllowed if err := bgs.db.Save(&pds).Error; err != nil { return echo.NewHTTPError(http.StatusInternalServerError, fmt.Errorf("failed to save rate limit changes: %w", err)) @@ -595,6 +620,9 @@ func (bgs *BGS) handleAdminAddTrustedDomain(e echo.Context) error { type AdminRequestCrawlRequest struct { Hostname string `json:"hostname"` + + // optional: + PDSRates } func (bgs *BGS) handleAdminRequestCrawl(e echo.Context) error { @@ -610,27 +638,16 @@ func (bgs *BGS) handleAdminRequestCrawl(e echo.Context) error { return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname") } - if !strings.HasPrefix(host, "http://") && !strings.HasPrefix(host, "https://") { - if bgs.ssl { - host = "https://" + host - } else { - host = "http://" + host - } - } + host, sslUrl := bgs.newPdsHostPrefixNormalize(host) + if !bgs.config.SSL.AdminOk(sslUrl) { + return echo.NewHTTPError(http.StatusBadRequest, "ssl is %s but got host=%#v", bgs.config.SSL.String(), host) + } u, err := url.Parse(host) if err != nil { return echo.NewHTTPError(http.StatusBadRequest, "failed to parse hostname") } - if u.Scheme == "http" && bgs.ssl { - return echo.NewHTTPError(http.StatusBadRequest, "this server requires https") - } - - if u.Scheme == "https" && !bgs.ssl { - return echo.NewHTTPError(http.StatusBadRequest, "this server does not support https") - } - if u.Path != "" { return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname without path") } @@ -647,6 +664,8 @@ func (bgs *BGS) handleAdminRequestCrawl(e echo.Context) error { } // Skip checking if the server is online for now + rateOverrides := body.PDSRates + rateOverrides.FromSlurper(bgs.slurper) - return bgs.slurper.SubscribeToPds(ctx, host, true, true) // Override Trusted Domain Check + return bgs.slurper.SubscribeToPds(ctx, host, true, true, sslUrl, &rateOverrides) // Override Trusted Domain Check } diff --git a/bgs/bgs.go b/bgs/bgs.go index 9f14492b5..50bfefc33 100644 --- a/bgs/bgs.go +++ b/bgs/bgs.go @@ -65,9 +65,9 @@ type BGS struct { hr api.HandleResolver - // TODO: work on doing away with this flag in favor of more pluggable - // pieces that abstract the need for explicit ssl checks - ssl bool + ssl SlurperSSLStance + + config BGSConfig crawlOnly bool @@ -117,12 +117,13 @@ type SocketConsumer struct { } type BGSConfig struct { - SSL bool + SSL SlurperSSLStance CompactInterval time.Duration DefaultRepoLimit int64 ConcurrencyPerPDS int64 MaxQueuePerPDS int64 NumCompactionWorkers int + VerboseAPILog bool // NextCrawlers gets forwarded POST /xrpc/com.atproto.sync.requestCrawl NextCrawlers []*url.URL @@ -130,7 +131,7 @@ type BGSConfig struct { func DefaultBGSConfig() *BGSConfig { return &BGSConfig{ - SSL: true, + SSL: SlurperRequireSSL, CompactInterval: 4 * time.Hour, DefaultRepoLimit: 100, ConcurrencyPerPDS: 100, @@ -140,7 +141,11 @@ func DefaultBGSConfig() *BGSConfig { } func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtman *events.EventManager, didr did.Resolver, rf *indexer.RepoFetcher, hr api.HandleResolver, config *BGSConfig) (*BGS, error) { - + logger := slog.Default().With("system", "bgs") + err := fixupDupPDSRows(db, logger) + if err != nil { + return nil, err + } if config == nil { config = DefaultBGSConfig() } @@ -161,6 +166,7 @@ func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtm events: evtman, didr: didr, ssl: config.SSL, + config: *config, consumersLk: sync.RWMutex{}, consumers: make(map[uint64]*SocketConsumer), @@ -169,7 +175,7 @@ func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtm userCache: uc, - log: slog.Default().With("system", "bgs"), + log: logger, } ix.CreateExternalUser = bgs.createExternalUser @@ -202,6 +208,68 @@ func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtm return bgs, nil } +func fixupDupPDSRows(db *gorm.DB, logger *slog.Logger) error { + rows, err := db.Raw("SELECT id, host FROM pds").Rows() + if err != nil { + logger.Warn("could not list PDS rows; assume blank db", "err", err) + return nil + } + hostCounts := make(map[string][]uint) + maxPDSId := uint(0) + maxHostCount := 0 + for rows.Next() { + var pdsId uint + var host string + if err := rows.Scan(&pdsId, &host); err != nil { + return fmt.Errorf("pds sql row err, %w", err) + } + idlist := hostCounts[host] + idlist = append(idlist, pdsId) + count := len(idlist) + if count > maxHostCount { + maxHostCount = count + } + hostCounts[host] = idlist + if pdsId > maxPDSId { + maxPDSId = pdsId + } + } + if maxHostCount <= 1 { + logger.Debug("no pds dup rows found") + return nil + } + for host, idlist := range hostCounts { + if len(idlist) > 1 { + logger.Info("dup PDS", "host", host, "count", len(idlist)) + minPDSId := idlist[0] + for _, otherid := range idlist[1:] { + if otherid < minPDSId { + minPDSId = otherid + } + } + for _, xPDSId := range idlist { + if xPDSId == minPDSId { + continue + } + logger.Info("dup PDS", "host", host, "from", xPDSId, "to", minPDSId) + err = db.Exec("UPDATE users SET pds = ? WHERE pds = ?", minPDSId, xPDSId).Error + if err != nil { + return fmt.Errorf("failed to update user pds %d -> %d: %w", xPDSId, minPDSId, err) + } + err = db.Exec("UPDATE actor_infos SET pds = ? WHERE pds = ?", minPDSId, xPDSId).Error + if err != nil { + return fmt.Errorf("failed to update actor_infos pds %d -> %d: %w", xPDSId, minPDSId, err) + } + err = db.Exec("DELETE FROM pds WHERE id = ?", xPDSId).Error + if err != nil { + return fmt.Errorf("failed to delete pds %d: %w", xPDSId, err) + } + } + } + } + return nil +} + func (bgs *BGS) StartMetrics(listen string) error { http.Handle("/metrics", promhttp.Handler()) return http.ListenAndServe(listen, nil) @@ -317,7 +385,7 @@ func (bgs *BGS) StartWithListener(listen net.Listener) error { AllowHeaders: []string{echo.HeaderOrigin, echo.HeaderContentType, echo.HeaderAccept, echo.HeaderAuthorization}, })) - if !bgs.ssl { + if bgs.config.VerboseAPILog { e.Use(middleware.LoggerWithConfig(middleware.LoggerConfig{ Format: "method=${method}, uri=${uri}, status=${status} latency=${latency_human}\n", })) @@ -941,7 +1009,9 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event return fmt.Errorf("rebase was true in event seq:%d,host:%s", evt.Seq, host.Host) } - if host.ID != u.PDS && u.PDS != 0 { + if host.RelayAllowed { + // don't check that source is canonical PDS, allow intermediate relays + } else if host.ID != u.PDS && u.PDS != 0 { bgs.log.Warn("received event for repo from different pds than expected", "repo", evt.Repo, "expPds", u.PDS, "gotPds", host.Host) // Flush any cached DID documents for this user bgs.didr.FlushCacheFor(env.RepoCommit.Repo) @@ -1273,7 +1343,7 @@ func (s *BGS) createExternalUser(ctx context.Context, did string) (*models.Actor peering.DailyEventLimit = s.slurper.DefaultPerDayLimit peering.RepoLimit = s.slurper.DefaultRepoLimit - if s.ssl && !peering.SSL { + if (s.ssl == SlurperRequireSSL) && !peering.SSL { return nil, fmt.Errorf("did references non-ssl PDS, this is disallowed in prod: %q %q", did, svc.ServiceEndpoint) } diff --git a/bgs/fedmgr.go b/bgs/fedmgr.go index 10212f0a4..61c74bf9e 100644 --- a/bgs/fedmgr.go +++ b/bgs/fedmgr.go @@ -55,7 +55,7 @@ type Slurper struct { shutdownChan chan bool shutdownResult chan []error - ssl bool + ssl SlurperSSLStance } type Limiters struct { @@ -64,8 +64,71 @@ type Limiters struct { PerDay *slidingwindow.Limiter } +type SlurperSSLStance int + +const ( + // ALL upstreams must be wss:// and https:// + SlurperRequireSSL SlurperSSLStance = 1 + + // NO upstreams may be wss:// and https:// + SlurperDisableSSL SlurperSSLStance = 2 + + // Anything is allowed! + SlurperMixedSSL SlurperSSLStance = 3 + + // upstreams set by /admin/* may be anything, other things must be wss:// https:// + SlurperRequireExternalSSL SlurperSSLStance = 4 +) + +// return true if an external requestCrawl meets SSL policy +func (sssl SlurperSSLStance) ExternalOk(ssl bool) bool { + switch sssl { + case SlurperRequireSSL: + return ssl + case SlurperDisableSSL: + return !ssl + case SlurperRequireExternalSSL: + return ssl + case SlurperMixedSSL: + return true + default: + slog.Error("unknown ssl policy", "ssl", sssl) + return true + } +} + +// return true if admin action meets SSL policy +func (sssl SlurperSSLStance) AdminOk(ssl bool) bool { + switch sssl { + case SlurperRequireSSL: + return ssl + case SlurperDisableSSL: + return !ssl + case SlurperRequireExternalSSL, SlurperMixedSSL: + return true + default: + slog.Error("unknown ssl policy", "ssl", sssl) + return true + } +} + +func (sssl SlurperSSLStance) String() string { + switch sssl { + case SlurperRequireSSL: + return "required" + case SlurperRequireExternalSSL: + return "external required" + case SlurperDisableSSL: + return "disabled" + case SlurperMixedSSL: + return "mixed" + default: + return fmt.Sprintf("slurper_ssl_%d", int(sssl)) + } +} + type SlurperOptions struct { - SSL bool + SSL SlurperSSLStance DefaultPerSecondLimit int64 DefaultPerHourLimit int64 DefaultPerDayLimit int64 @@ -77,7 +140,7 @@ type SlurperOptions struct { func DefaultSlurperOptions() *SlurperOptions { return &SlurperOptions{ - SSL: false, + SSL: SlurperDisableSSL, DefaultPerSecondLimit: 50, DefaultPerHourLimit: 2500, DefaultPerDayLimit: 20_000, @@ -363,7 +426,7 @@ func (s *Slurper) canSlurpHost(host string) bool { return !s.newSubsDisabled } -func (s *Slurper) SubscribeToPds(ctx context.Context, host string, reg bool, adminOverride bool) error { +func (s *Slurper) SubscribeToPds(ctx context.Context, host string, reg bool, adminOverride, sslUrl bool, rateOverrides *PDSRates) error { // TODO: for performance, lock on the hostname instead of global s.lk.Lock() defer s.lk.Unlock() @@ -389,7 +452,7 @@ func (s *Slurper) SubscribeToPds(ctx context.Context, host string, reg bool, adm // New PDS! npds := models.PDS{ Host: host, - SSL: s.ssl, + SSL: sslUrl, Registered: reg, RateLimit: float64(s.DefaultPerSecondLimit), HourlyEventLimit: s.DefaultPerHourLimit, @@ -397,6 +460,14 @@ func (s *Slurper) SubscribeToPds(ctx context.Context, host string, reg bool, adm CrawlRateLimit: float64(s.DefaultCrawlLimit), RepoLimit: s.DefaultRepoLimit, } + if rateOverrides != nil { + npds.RateLimit = float64(rateOverrides.PerSecond) + npds.HourlyEventLimit = rateOverrides.PerHour + npds.DailyEventLimit = rateOverrides.PerDay + npds.CrawlRateLimit = float64(rateOverrides.CrawlRate) + npds.RepoLimit = rateOverrides.RepoLimit + npds.RelayAllowed = rateOverrides.RelayAllowed + } if err := s.db.Create(&npds).Error; err != nil { return err } @@ -467,7 +538,7 @@ func (s *Slurper) subscribeWithRedialer(ctx context.Context, host *models.PDS, s } protocol := "ws" - if s.ssl { + if host.SSL { protocol = "wss" } diff --git a/bgs/handlers.go b/bgs/handlers.go index 0e46f0043..23696b389 100644 --- a/bgs/handlers.go +++ b/bgs/handlers.go @@ -128,18 +128,31 @@ func (s *BGS) handleComAtprotoSyncGetBlocks(ctx context.Context, cids []string, return nil, fmt.Errorf("NYI") } +func (bgs *BGS) newPdsHostPrefixNormalize(host string) (hostN string, sslUrl bool) { + sslUrl = false + if strings.HasPrefix(host, "http://") || strings.HasPrefix(host, "ws://") { + } else if strings.HasPrefix(host, "https://") || strings.HasPrefix(host, "wss://") { + sslUrl = true + } else { + if bgs.config.SSL == SlurperDisableSSL { + host = "http://" + host + } else { + host = "https://" + host + sslUrl = true + } + } + return host, sslUrl +} + func (s *BGS) handleComAtprotoSyncRequestCrawl(ctx context.Context, body *comatprototypes.SyncRequestCrawl_Input) error { host := body.Hostname if host == "" { return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname") } - if !strings.HasPrefix(host, "http://") && !strings.HasPrefix(host, "https://") { - if s.ssl { - host = "https://" + host - } else { - host = "http://" + host - } + host, sslUrl := s.newPdsHostPrefixNormalize(host) + if !s.config.SSL.ExternalOk(sslUrl) { + return echo.NewHTTPError(http.StatusBadRequest, "server ssl policy is %s", s.config.SSL.String()) } u, err := url.Parse(host) @@ -147,14 +160,6 @@ func (s *BGS) handleComAtprotoSyncRequestCrawl(ctx context.Context, body *comatp return echo.NewHTTPError(http.StatusBadRequest, "failed to parse hostname") } - if u.Scheme == "http" && s.ssl { - return echo.NewHTTPError(http.StatusBadRequest, "this server requires https") - } - - if u.Scheme == "https" && !s.ssl { - return echo.NewHTTPError(http.StatusBadRequest, "this server does not support https") - } - if u.Path != "" { return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname without path") } @@ -212,7 +217,7 @@ func (s *BGS) handleComAtprotoSyncRequestCrawl(ctx context.Context, body *comatp } } - return s.slurper.SubscribeToPds(ctx, host, true, false) + return s.slurper.SubscribeToPds(ctx, host, true, false, sslUrl, nil) } func (s *BGS) handleComAtprotoSyncNotifyOfUpdate(ctx context.Context, body *comatprototypes.SyncNotifyOfUpdate_Input) error { diff --git a/carstore/README.md b/carstore/README.md new file mode 100644 index 000000000..90880defb --- /dev/null +++ b/carstore/README.md @@ -0,0 +1,41 @@ +# Carstore + +Store a zillion users of PDS-like repo, with more limited operations (mainly: firehose in, firehose out). + +## [ScyllaStore](scylla.go) + +Blocks stored in ScyllaDB. +User and PDS metadata stored in gorm (PostgreSQL or sqlite3). + +## [FileCarStore](bs.go) + +Store 'car slices' from PDS source subscribeRepo firehose streams to filesystem. +Store metadata to gorm postgresql (or sqlite3). +Periodic compaction of car slices into fewer larger car slices. +User and PDS metadata stored in gorm (PostgreSQL or sqlite3). +FileCarStore was the first production carstore and used through at least 2024-11. + +## [SQLiteStore](sqlite_store.go) + +Experimental/demo. +Blocks stored in trivial local sqlite3 schema. +Minimal reference implementation from which fancy scalable/performant implementations may be derived. + +```sql +CREATE TABLE IF NOT EXISTS blocks (uid int, cid blob, rev varchar, root blob, block blob, PRIMARY KEY(uid,cid)) +CREATE INDEX IF NOT EXISTS blocx_by_rev ON blocks (uid, rev DESC) + +INSERT INTO blocks (uid, cid, rev, root, block) VALUES (?, ?, ?, ?, ?) ON CONFLICT (uid,cid) DO UPDATE SET rev=excluded.rev, root=excluded.root, block=excluded.block + +SELECT rev, root FROM blocks WHERE uid = ? ORDER BY rev DESC LIMIT 1 + +SELECT cid,rev,root,block FROM blocks WHERE uid = ? AND rev > ? ORDER BY rev DESC + +DELETE FROM blocks WHERE uid = ? + +SELECT rev, root FROM blocks WHERE uid = ? AND cid = ? LIMIT 1 + +SELECT block FROM blocks WHERE uid = ? AND cid = ? LIMIT 1 + +SELECT length(block) FROM blocks WHERE uid = ? AND cid = ? LIMIT 1 +``` diff --git a/carstore/bs.go b/carstore/bs.go index dbce4f098..aacbc1c80 100644 --- a/carstore/bs.go +++ b/carstore/bs.go @@ -10,7 +10,6 @@ import ( "os" "path/filepath" "sort" - "sync" "sync/atomic" "time" @@ -20,7 +19,6 @@ import ( blockformat "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" - "github.com/ipfs/go-datastore" blockstore "github.com/ipfs/go-ipfs-blockstore" cbor "github.com/ipfs/go-ipld-cbor" ipld "github.com/ipfs/go-ipld-format" @@ -47,8 +45,11 @@ const MaxSliceLength = 2 << 20 const BigShardThreshold = 2 << 20 type CarStore interface { + // TODO: not really part of general interface CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error) + // TODO: not really part of general interface GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error) + GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error) @@ -63,8 +64,7 @@ type FileCarStore struct { meta *CarStoreGormMeta rootDirs []string - lscLk sync.Mutex - lastShardCache map[models.Uid]*CarShard + lastShardCache lastShardCache log *slog.Logger } @@ -88,16 +88,29 @@ func NewCarStore(meta *gorm.DB, roots []string) (CarStore, error) { return nil, err } - return &FileCarStore{ - meta: &CarStoreGormMeta{meta: meta}, - rootDirs: roots, - lastShardCache: make(map[models.Uid]*CarShard), - log: slog.Default().With("system", "carstore"), - }, nil + gormMeta := &CarStoreGormMeta{meta: meta} + out := &FileCarStore{ + meta: gormMeta, + rootDirs: roots, + lastShardCache: lastShardCache{ + source: gormMeta, + }, + log: slog.Default().With("system", "carstore"), + } + out.lastShardCache.Init() + return out, nil } +// userView needs these things to get into the underlying block store +// implemented by CarStoreGormMeta +type userViewSource interface { + HasUidCid(ctx context.Context, user models.Uid, k cid.Cid) (bool, error) + LookupBlockRef(ctx context.Context, k cid.Cid) (path string, offset int64, user models.Uid, err error) +} + +// wrapper into a block store that keeps track of which user we are working on behalf of type userView struct { - cs CarStore + cs userViewSource user models.Uid cache map[cid.Cid]blockformat.Block @@ -115,13 +128,7 @@ func (uv *userView) Has(ctx context.Context, k cid.Cid) (bool, error) { if have { return have, nil } - - fcd, ok := uv.cs.(*FileCarStore) - if !ok { - return false, nil - } - - return fcd.meta.HasUidCid(ctx, uv.user, k) + return uv.cs.HasUidCid(ctx, uv.user, k) } var CacheHits int64 @@ -143,12 +150,7 @@ func (uv *userView) Get(ctx context.Context, k cid.Cid) (blockformat.Block, erro } atomic.AddInt64(&CacheMiss, 1) - fcd, ok := uv.cs.(*FileCarStore) - if !ok { - return nil, ipld.ErrNotFound{Cid: k} - } - - path, offset, user, err := fcd.meta.LookupBlockRef(ctx, k) + path, offset, user, err := uv.cs.LookupBlockRef(ctx, k) if err != nil { return nil, err } @@ -279,61 +281,39 @@ func (uv *userView) GetSize(ctx context.Context, k cid.Cid) (int, error) { return len(blk.RawData()), nil } +// subset of blockstore.Blockstore that we actually use here +type minBlockstore interface { + Get(ctx context.Context, bcid cid.Cid) (blockformat.Block, error) + Has(ctx context.Context, bcid cid.Cid) (bool, error) + GetSize(ctx context.Context, bcid cid.Cid) (int, error) +} + type DeltaSession struct { - fresh blockstore.Blockstore blks map[cid.Cid]blockformat.Block rmcids map[cid.Cid]bool - base blockstore.Blockstore + base minBlockstore user models.Uid baseCid cid.Cid seq int readonly bool - cs CarStore + cs shardWriter lastRev string } func (cs *FileCarStore) checkLastShardCache(user models.Uid) *CarShard { - cs.lscLk.Lock() - defer cs.lscLk.Unlock() - - ls, ok := cs.lastShardCache[user] - if ok { - return ls - } - - return nil + return cs.lastShardCache.check(user) } func (cs *FileCarStore) removeLastShardCache(user models.Uid) { - cs.lscLk.Lock() - defer cs.lscLk.Unlock() - - delete(cs.lastShardCache, user) + cs.lastShardCache.remove(user) } func (cs *FileCarStore) putLastShardCache(ls *CarShard) { - cs.lscLk.Lock() - defer cs.lscLk.Unlock() - - cs.lastShardCache[ls.Usr] = ls + cs.lastShardCache.put(ls) } func (cs *FileCarStore) getLastShard(ctx context.Context, user models.Uid) (*CarShard, error) { - ctx, span := otel.Tracer("carstore").Start(ctx, "getLastShard") - defer span.End() - - maybeLs := cs.checkLastShardCache(user) - if maybeLs != nil { - return maybeLs, nil - } - - lastShard, err := cs.meta.GetLastShard(ctx, user) - if err != nil { - return nil, err - } - - cs.putLastShardCache(lastShard) - return lastShard, nil + return cs.lastShardCache.get(ctx, user) } var ErrRepoBaseMismatch = fmt.Errorf("attempted a delta session on top of the wrong previous head") @@ -354,11 +334,10 @@ func (cs *FileCarStore) NewDeltaSession(ctx context.Context, user models.Uid, si } return &DeltaSession{ - fresh: blockstore.NewBlockstore(datastore.NewMapDatastore()), - blks: make(map[cid.Cid]blockformat.Block), + blks: make(map[cid.Cid]blockformat.Block), base: &userView{ user: user, - cs: cs, + cs: cs.meta, prefetch: true, cache: make(map[cid.Cid]blockformat.Block), }, @@ -374,7 +353,7 @@ func (cs *FileCarStore) ReadOnlySession(user models.Uid) (*DeltaSession, error) return &DeltaSession{ base: &userView{ user: user, - cs: cs, + cs: cs.meta, prefetch: false, cache: make(map[cid.Cid]blockformat.Block), }, @@ -385,7 +364,7 @@ func (cs *FileCarStore) ReadOnlySession(user models.Uid) (*DeltaSession, error) } // TODO: incremental is only ever called true, remove the param -func (cs *FileCarStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, w io.Writer) error { +func (cs *FileCarStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, shardOut io.Writer) error { ctx, span := otel.Tracer("carstore").Start(ctx, "ReadUserCar") defer span.End() @@ -398,7 +377,6 @@ func (cs *FileCarStore) ReadUserCar(ctx context.Context, user models.Uid, sinceR } } - // TODO: Why does ReadUserCar want shards seq DESC but CompactUserShards wants seq ASC ? shards, err := cs.meta.GetUserShardsDesc(ctx, user, earlySeq) if err != nil { return err @@ -418,12 +396,12 @@ func (cs *FileCarStore) ReadUserCar(ctx context.Context, user models.Uid, sinceR if err := car.WriteHeader(&car.CarHeader{ Roots: []cid.Cid{shards[0].Root.CID}, Version: 1, - }, w); err != nil { + }, shardOut); err != nil { return err } for _, sh := range shards { - if err := cs.writeShardBlocks(ctx, &sh, w); err != nil { + if err := cs.writeShardBlocks(ctx, &sh, shardOut); err != nil { return err } } @@ -433,7 +411,7 @@ func (cs *FileCarStore) ReadUserCar(ctx context.Context, user models.Uid, sinceR // inner loop part of ReadUserCar // copy shard blocks from disk to Writer -func (cs *FileCarStore) writeShardBlocks(ctx context.Context, sh *CarShard, w io.Writer) error { +func (cs *FileCarStore) writeShardBlocks(ctx context.Context, sh *CarShard, shardOut io.Writer) error { ctx, span := otel.Tracer("carstore").Start(ctx, "writeShardBlocks") defer span.End() @@ -448,7 +426,7 @@ func (cs *FileCarStore) writeShardBlocks(ctx context.Context, sh *CarShard, w io return err } - _, err = io.Copy(w, fi) + _, err = io.Copy(shardOut, fi) if err != nil { return err } @@ -603,18 +581,7 @@ func (ds *DeltaSession) CloseWithRoot(ctx context.Context, root cid.Cid, rev str return nil, fmt.Errorf("cannot write to readonly deltaSession") } - switch ocs := ds.cs.(type) { - case *FileCarStore: - return ocs.writeNewShard(ctx, root, rev, ds.user, ds.seq, ds.blks, ds.rmcids) - case *NonArchivalCarstore: - slice, err := blocksToCar(ctx, root, rev, ds.blks) - if err != nil { - return nil, err - } - return slice, ocs.updateLastCommit(ctx, ds.user, rev, root) - default: - return nil, fmt.Errorf("unsupported carstore type") - } + return ds.cs.writeNewShard(ctx, root, rev, ds.user, ds.seq, ds.blks, ds.rmcids) } func WriteCarHeader(w io.Writer, root cid.Cid) (int64, error) { @@ -635,6 +602,12 @@ func WriteCarHeader(w io.Writer, root cid.Cid) (int64, error) { return hnw, nil } +// shardWriter.writeNewShard called from inside DeltaSession.CloseWithRoot +type shardWriter interface { + // writeNewShard stores blocks in `blks` arg and creates a new shard to propagate out to our firehose + writeNewShard(ctx context.Context, root cid.Cid, rev string, user models.Uid, seq int, blks map[cid.Cid]blockformat.Block, rmcids map[cid.Cid]bool) ([]byte, error) +} + func blocksToCar(ctx context.Context, root cid.Cid, rev string, blks map[cid.Cid]blockformat.Block) ([]byte, error) { buf := new(bytes.Buffer) _, err := WriteCarHeader(buf, root) diff --git a/carstore/last_shard_cache.go b/carstore/last_shard_cache.go new file mode 100644 index 000000000..8371b8883 --- /dev/null +++ b/carstore/last_shard_cache.go @@ -0,0 +1,70 @@ +package carstore + +import ( + "context" + "github.com/bluesky-social/indigo/models" + "go.opentelemetry.io/otel" + "sync" +) + +type LastShardSource interface { + GetLastShard(context.Context, models.Uid) (*CarShard, error) +} + +type lastShardCache struct { + source LastShardSource + + lscLk sync.Mutex + lastShardCache map[models.Uid]*CarShard +} + +func (lsc *lastShardCache) Init() { + lsc.lastShardCache = make(map[models.Uid]*CarShard) +} + +func (lsc *lastShardCache) check(user models.Uid) *CarShard { + lsc.lscLk.Lock() + defer lsc.lscLk.Unlock() + + ls, ok := lsc.lastShardCache[user] + if ok { + return ls + } + + return nil +} + +func (lsc *lastShardCache) remove(user models.Uid) { + lsc.lscLk.Lock() + defer lsc.lscLk.Unlock() + + delete(lsc.lastShardCache, user) +} + +func (lsc *lastShardCache) put(ls *CarShard) { + if ls == nil { + return + } + lsc.lscLk.Lock() + defer lsc.lscLk.Unlock() + + lsc.lastShardCache[ls.Usr] = ls +} + +func (lsc *lastShardCache) get(ctx context.Context, user models.Uid) (*CarShard, error) { + ctx, span := otel.Tracer("carstore").Start(ctx, "getLastShard") + defer span.End() + + maybeLs := lsc.check(user) + if maybeLs != nil { + return maybeLs, nil + } + + lastShard, err := lsc.source.GetLastShard(ctx, user) + if err != nil { + return nil, err + } + + lsc.put(lastShard) + return lastShard, nil +} diff --git a/carstore/nonarchive.go b/carstore/nonarchive.go index 4784eb537..46d2e59ea 100644 --- a/carstore/nonarchive.go +++ b/carstore/nonarchive.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + ipld "github.com/ipfs/go-ipld-format" "io" "log/slog" "sync" @@ -11,8 +12,6 @@ import ( "github.com/bluesky-social/indigo/models" blockformat "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" - "github.com/ipfs/go-datastore" - blockstore "github.com/ipfs/go-ipfs-blockstore" car "github.com/ipld/go-car" "go.opentelemetry.io/otel" "gorm.io/gorm" @@ -75,10 +74,13 @@ func (cs *NonArchivalCarstore) putLastShardCache(ls *commitRefInfo) { func (cs *NonArchivalCarstore) loadCommitRefInfo(ctx context.Context, user models.Uid) (*commitRefInfo, error) { var out commitRefInfo - if err := cs.db.Find(&out, "uid = ?", user).Error; err != nil { - return nil, err + wat := cs.db.Find(&out, "uid = ?", user) + if wat.Error != nil { + return nil, wat.Error + } + if wat.RowsAffected == 0 { + return nil, nil } - return &out, nil } @@ -95,6 +97,9 @@ func (cs *NonArchivalCarstore) getCommitRefInfo(ctx context.Context, user models if err != nil { return nil, err } + if lastShard == nil { + return nil, nil + } cs.putLastShardCache(lastShard) return lastShard, nil @@ -119,6 +124,8 @@ func (cs *NonArchivalCarstore) updateLastCommit(ctx context.Context, uid models. return nil } +var commitRefZero = commitRefInfo{} + func (cs *NonArchivalCarstore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error) { ctx, span := otel.Tracer("carstore").Start(ctx, "NewSession") defer span.End() @@ -130,13 +137,15 @@ func (cs *NonArchivalCarstore) NewDeltaSession(ctx context.Context, user models. return nil, err } - if since != nil && *since != lastShard.Rev { + if lastShard == nil { + // ok, no previous user state to refer to + lastShard = &commitRefZero + } else if since != nil && *since != lastShard.Rev { cs.log.Warn("revision mismatch", "commitSince", since, "lastRev", lastShard.Rev, "err", ErrRepoBaseMismatch) } return &DeltaSession{ - fresh: blockstore.NewBlockstore(datastore.NewMapDatastore()), - blks: make(map[cid.Cid]blockformat.Block), + blks: make(map[cid.Cid]blockformat.Block), base: &userView{ user: user, cs: cs, @@ -213,7 +222,7 @@ func (cs *NonArchivalCarstore) GetUserRepoHead(ctx context.Context, user models. if err != nil { return cid.Undef, err } - if lastShard.ID == 0 { + if lastShard == nil || lastShard.ID == 0 { return cid.Undef, nil } @@ -225,7 +234,7 @@ func (cs *NonArchivalCarstore) GetUserRepoRev(ctx context.Context, user models.U if err != nil { return "", err } - if lastShard.ID == 0 { + if lastShard == nil || lastShard.ID == 0 { return "", nil } @@ -252,3 +261,19 @@ func (cs *NonArchivalCarstore) GetCompactionTargets(ctx context.Context, shardCo func (cs *NonArchivalCarstore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error) { return nil, fmt.Errorf("compaction not supported in non-archival") } + +func (cs *NonArchivalCarstore) HasUidCid(ctx context.Context, user models.Uid, k cid.Cid) (bool, error) { + return false, nil +} + +func (cs *NonArchivalCarstore) LookupBlockRef(ctx context.Context, k cid.Cid) (path string, offset int64, user models.Uid, err error) { + return "", 0, 0, ipld.ErrNotFound{Cid: k} +} + +func (cs *NonArchivalCarstore) writeNewShard(ctx context.Context, root cid.Cid, rev string, user models.Uid, seq int, blks map[cid.Cid]blockformat.Block, rmcids map[cid.Cid]bool) ([]byte, error) { + slice, err := blocksToCar(ctx, root, rev, blks) + if err != nil { + return nil, err + } + return slice, cs.updateLastCommit(ctx, user, rev, root) +} diff --git a/carstore/repo_test.go b/carstore/repo_test.go index 8366cab95..c0c1c7414 100644 --- a/carstore/repo_test.go +++ b/carstore/repo_test.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io" + "log/slog" "os" "path/filepath" "testing" @@ -24,7 +25,7 @@ import ( "gorm.io/gorm" ) -func testCarStore() (CarStore, func(), error) { +func testCarStore(t testing.TB) (CarStore, func(), error) { tempdir, err := os.MkdirTemp("", "msttest-") if err != nil { return nil, nil, err @@ -60,6 +61,23 @@ func testCarStore() (CarStore, func(), error) { }, nil } +func testSqliteCarStore(t testing.TB) (CarStore, func(), error) { + sqs := &SQLiteStore{} + sqs.log = slogForTest(t) + err := sqs.Open(":memory:") + if err != nil { + return nil, nil, err + } + return sqs, func() {}, nil +} + +type testFactory func(t testing.TB) (CarStore, func(), error) + +var backends = map[string]testFactory{ + "cartore": testCarStore, + "sqlite": testSqliteCarStore, +} + func testFlatfsBs() (blockstore.Blockstore, func(), error) { tempdir, err := os.MkdirTemp("", "msttest-") if err != nil { @@ -78,91 +96,96 @@ func testFlatfsBs() (blockstore.Blockstore, func(), error) { }, nil } -func TestBasicOperation(t *testing.T) { +func TestBasicOperation(ot *testing.T) { ctx := context.TODO() - cs, cleanup, err := testCarStore() - if err != nil { - t.Fatal(err) - } - defer cleanup() + for fname, tf := range backends { + ot.Run(fname, func(t *testing.T) { - ds, err := cs.NewDeltaSession(ctx, 1, nil) - if err != nil { - t.Fatal(err) - } + cs, cleanup, err := tf(t) + if err != nil { + t.Fatal(err) + } + defer cleanup() - ncid, rev, err := setupRepo(ctx, ds, false) - if err != nil { - t.Fatal(err) - } + ds, err := cs.NewDeltaSession(ctx, 1, nil) + if err != nil { + t.Fatal(err) + } - if _, err := ds.CloseWithRoot(ctx, ncid, rev); err != nil { - t.Fatal(err) - } + ncid, rev, err := setupRepo(ctx, ds, false) + if err != nil { + t.Fatal(err) + } - var recs []cid.Cid - head := ncid - for i := 0; i < 10; i++ { - ds, err := cs.NewDeltaSession(ctx, 1, &rev) - if err != nil { - t.Fatal(err) - } + if _, err := ds.CloseWithRoot(ctx, ncid, rev); err != nil { + t.Fatal(err) + } - rr, err := repo.OpenRepo(ctx, ds, head) - if err != nil { - t.Fatal(err) - } + var recs []cid.Cid + head := ncid + for i := 0; i < 10; i++ { + ds, err := cs.NewDeltaSession(ctx, 1, &rev) + if err != nil { + t.Fatal(err) + } - rc, _, err := rr.CreateRecord(ctx, "app.bsky.feed.post", &appbsky.FeedPost{ - Text: fmt.Sprintf("hey look its a tweet %d", time.Now().UnixNano()), - }) - if err != nil { - t.Fatal(err) - } + rr, err := repo.OpenRepo(ctx, ds, head) + if err != nil { + t.Fatal(err) + } - recs = append(recs, rc) + rc, _, err := rr.CreateRecord(ctx, "app.bsky.feed.post", &appbsky.FeedPost{ + Text: fmt.Sprintf("hey look its a tweet %d", time.Now().UnixNano()), + }) + if err != nil { + t.Fatal(err) + } - kmgr := &util.FakeKeyManager{} - nroot, nrev, err := rr.Commit(ctx, kmgr.SignForUser) - if err != nil { - t.Fatal(err) - } + recs = append(recs, rc) - rev = nrev + kmgr := &util.FakeKeyManager{} + nroot, nrev, err := rr.Commit(ctx, kmgr.SignForUser) + if err != nil { + t.Fatal(err) + } - if err := ds.CalcDiff(ctx, nil); err != nil { - t.Fatal(err) - } + rev = nrev - if _, err := ds.CloseWithRoot(ctx, nroot, rev); err != nil { - t.Fatal(err) - } + if err := ds.CalcDiff(ctx, nil); err != nil { + t.Fatal(err) + } - head = nroot - } + if _, err := ds.CloseWithRoot(ctx, nroot, rev); err != nil { + t.Fatal(err) + } - buf := new(bytes.Buffer) - if err := cs.ReadUserCar(ctx, 1, "", true, buf); err != nil { - t.Fatal(err) - } - checkRepo(t, cs, buf, recs) + head = nroot + } - if _, err := cs.CompactUserShards(ctx, 1, false); err != nil { - t.Fatal(err) - } + buf := new(bytes.Buffer) + if err := cs.ReadUserCar(ctx, 1, "", true, buf); err != nil { + t.Fatal(err) + } + checkRepo(t, cs, buf, recs) - buf = new(bytes.Buffer) - if err := cs.ReadUserCar(ctx, 1, "", true, buf); err != nil { - t.Fatal(err) + if _, err := cs.CompactUserShards(ctx, 1, false); err != nil { + t.Fatal(err) + } + + buf = new(bytes.Buffer) + if err := cs.ReadUserCar(ctx, 1, "", true, buf); err != nil { + t.Fatal(err) + } + checkRepo(t, cs, buf, recs) + }) } - checkRepo(t, cs, buf, recs) } func TestRepeatedCompactions(t *testing.T) { ctx := context.TODO() - cs, cleanup, err := testCarStore() + cs, cleanup, err := testCarStore(t) if err != nil { t.Fatal(err) } @@ -323,7 +346,16 @@ func setupRepo(ctx context.Context, bs blockstore.Blockstore, mkprofile bool) (c func BenchmarkRepoWritesCarstore(b *testing.B) { ctx := context.TODO() - cs, cleanup, err := testCarStore() + cs, cleanup, err := testCarStore(b) + innerBenchmarkRepoWritesCarstore(b, ctx, cs, cleanup, err) +} +func BenchmarkRepoWritesSqliteCarstore(b *testing.B) { + ctx := context.TODO() + + cs, cleanup, err := testSqliteCarStore(b) + innerBenchmarkRepoWritesCarstore(b, ctx, cs, cleanup, err) +} +func innerBenchmarkRepoWritesCarstore(b *testing.B, ctx context.Context, cs CarStore, cleanup func(), err error) { if err != nil { b.Fatal(err) } @@ -458,131 +490,152 @@ func BenchmarkRepoWritesSqlite(b *testing.B) { } } -func TestDuplicateBlockAcrossShards(t *testing.T) { +func TestDuplicateBlockAcrossShards(ot *testing.T) { ctx := context.TODO() - cs, cleanup, err := testCarStore() - if err != nil { - t.Fatal(err) - } - defer cleanup() + for fname, tf := range backends { + ot.Run(fname, func(t *testing.T) { - ds1, err := cs.NewDeltaSession(ctx, 1, nil) - if err != nil { - t.Fatal(err) - } + cs, cleanup, err := tf(t) + if err != nil { + t.Fatal(err) + } + defer cleanup() - ds2, err := cs.NewDeltaSession(ctx, 2, nil) - if err != nil { - t.Fatal(err) - } + ds1, err := cs.NewDeltaSession(ctx, 1, nil) + if err != nil { + t.Fatal(err) + } - ds3, err := cs.NewDeltaSession(ctx, 3, nil) - if err != nil { - t.Fatal(err) - } + ds2, err := cs.NewDeltaSession(ctx, 2, nil) + if err != nil { + t.Fatal(err) + } - var cids []cid.Cid - var revs []string - for _, ds := range []*DeltaSession{ds1, ds2, ds3} { - ncid, rev, err := setupRepo(ctx, ds, true) - if err != nil { - t.Fatal(err) - } + ds3, err := cs.NewDeltaSession(ctx, 3, nil) + if err != nil { + t.Fatal(err) + } - if _, err := ds.CloseWithRoot(ctx, ncid, rev); err != nil { - t.Fatal(err) - } - cids = append(cids, ncid) - revs = append(revs, rev) - } + var cids []cid.Cid + var revs []string + for _, ds := range []*DeltaSession{ds1, ds2, ds3} { + ncid, rev, err := setupRepo(ctx, ds, true) + if err != nil { + t.Fatal(err) + } - var recs []cid.Cid - head := cids[1] - rev := revs[1] - for i := 0; i < 10; i++ { - ds, err := cs.NewDeltaSession(ctx, 2, &rev) - if err != nil { - t.Fatal(err) - } + if _, err := ds.CloseWithRoot(ctx, ncid, rev); err != nil { + t.Fatal(err) + } + cids = append(cids, ncid) + revs = append(revs, rev) + } - rr, err := repo.OpenRepo(ctx, ds, head) - if err != nil { - t.Fatal(err) - } + var recs []cid.Cid + head := cids[1] + rev := revs[1] + for i := 0; i < 10; i++ { + ds, err := cs.NewDeltaSession(ctx, 2, &rev) + if err != nil { + t.Fatal(err) + } - rc, _, err := rr.CreateRecord(ctx, "app.bsky.feed.post", &appbsky.FeedPost{ - Text: fmt.Sprintf("hey look its a tweet %d", time.Now().UnixNano()), - }) - if err != nil { - t.Fatal(err) - } + rr, err := repo.OpenRepo(ctx, ds, head) + if err != nil { + t.Fatal(err) + } - recs = append(recs, rc) + rc, _, err := rr.CreateRecord(ctx, "app.bsky.feed.post", &appbsky.FeedPost{ + Text: fmt.Sprintf("hey look its a tweet %d", time.Now().UnixNano()), + }) + if err != nil { + t.Fatal(err) + } - kmgr := &util.FakeKeyManager{} - nroot, nrev, err := rr.Commit(ctx, kmgr.SignForUser) - if err != nil { - t.Fatal(err) - } + recs = append(recs, rc) - rev = nrev + kmgr := &util.FakeKeyManager{} + nroot, nrev, err := rr.Commit(ctx, kmgr.SignForUser) + if err != nil { + t.Fatal(err) + } - if err := ds.CalcDiff(ctx, nil); err != nil { - t.Fatal(err) - } + rev = nrev - if _, err := ds.CloseWithRoot(ctx, nroot, rev); err != nil { - t.Fatal(err) - } + if err := ds.CalcDiff(ctx, nil); err != nil { + t.Fatal(err) + } - head = nroot - } + if _, err := ds.CloseWithRoot(ctx, nroot, rev); err != nil { + t.Fatal(err) + } - // explicitly update the profile object - { - ds, err := cs.NewDeltaSession(ctx, 2, &rev) - if err != nil { - t.Fatal(err) - } + head = nroot + } - rr, err := repo.OpenRepo(ctx, ds, head) - if err != nil { - t.Fatal(err) - } + // explicitly update the profile object + { + ds, err := cs.NewDeltaSession(ctx, 2, &rev) + if err != nil { + t.Fatal(err) + } - desc := "this is so unique" - rc, err := rr.UpdateRecord(ctx, "app.bsky.actor.profile/self", &appbsky.ActorProfile{ - Description: &desc, - }) - if err != nil { - t.Fatal(err) - } + rr, err := repo.OpenRepo(ctx, ds, head) + if err != nil { + t.Fatal(err) + } - recs = append(recs, rc) + desc := "this is so unique" + rc, err := rr.UpdateRecord(ctx, "app.bsky.actor.profile/self", &appbsky.ActorProfile{ + Description: &desc, + }) + if err != nil { + t.Fatal(err) + } - kmgr := &util.FakeKeyManager{} - nroot, nrev, err := rr.Commit(ctx, kmgr.SignForUser) - if err != nil { - t.Fatal(err) - } + recs = append(recs, rc) - rev = nrev + kmgr := &util.FakeKeyManager{} + nroot, nrev, err := rr.Commit(ctx, kmgr.SignForUser) + if err != nil { + t.Fatal(err) + } - if err := ds.CalcDiff(ctx, nil); err != nil { - t.Fatal(err) - } + rev = nrev - if _, err := ds.CloseWithRoot(ctx, nroot, rev); err != nil { - t.Fatal(err) - } + if err := ds.CalcDiff(ctx, nil); err != nil { + t.Fatal(err) + } - head = nroot + if _, err := ds.CloseWithRoot(ctx, nroot, rev); err != nil { + t.Fatal(err) + } + + head = nroot + } + + buf := new(bytes.Buffer) + if err := cs.ReadUserCar(ctx, 2, "", true, buf); err != nil { + t.Fatal(err) + } + checkRepo(t, cs, buf, recs) + }) } +} - buf := new(bytes.Buffer) - if err := cs.ReadUserCar(ctx, 2, "", true, buf); err != nil { - t.Fatal(err) +type testWriter struct { + t testing.TB +} + +func (tw testWriter) Write(p []byte) (n int, err error) { + tw.t.Log(string(p)) + return len(p), nil +} + +func slogForTest(t testing.TB) *slog.Logger { + hopts := slog.HandlerOptions{ + Level: slog.LevelDebug, } - checkRepo(t, cs, buf, recs) + return slog.New(slog.NewTextHandler(&testWriter{t}, &hopts)) } diff --git a/carstore/scylla.go b/carstore/scylla.go new file mode 100644 index 000000000..4a1a5d6b8 --- /dev/null +++ b/carstore/scylla.go @@ -0,0 +1,636 @@ +package carstore + +import ( + "bytes" + "context" + "errors" + "fmt" + "github.com/bluesky-social/indigo/models" + "github.com/gocql/gocql" + blockformat "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-libipfs/blocks" + "github.com/ipld/go-car" + _ "github.com/mattn/go-sqlite3" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "io" + "log/slog" + "math" + "math/rand/v2" + "time" +) + +type ScyllaStore struct { + WriteSession *gocql.Session + ReadSession *gocql.Session + + // scylla servers + scyllaAddrs []string + // scylla namespace where we find our table + keyspace string + + log *slog.Logger + + lastShardCache lastShardCache +} + +func NewScyllaStore(addrs []string, keyspace string) (*ScyllaStore, error) { + out := new(ScyllaStore) + out.scyllaAddrs = addrs + out.keyspace = keyspace + err := out.Open() + if err != nil { + return nil, err + } + return out, nil +} + +func (sqs *ScyllaStore) Open() error { + if sqs.log == nil { + sqs.log = slog.Default() + } + sqs.log.Debug("scylla connect", "addrs", sqs.scyllaAddrs) + var err error + + // + // Write session + // + var writeSession *gocql.Session + for retry := 0; ; retry++ { + writeCluster := gocql.NewCluster(sqs.scyllaAddrs...) + writeCluster.Keyspace = sqs.keyspace + // Default port, the client should automatically upgrade to shard-aware port + writeCluster.Port = 9042 + writeCluster.Consistency = gocql.Quorum + writeCluster.RetryPolicy = &ExponentialBackoffRetryPolicy{NumRetries: 10, Min: 100 * time.Millisecond, Max: 10 * time.Second} + writeCluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy()) + writeSession, err = writeCluster.CreateSession() + if err != nil { + if retry > 200 { + return fmt.Errorf("failed to connect read session too many times: %w", err) + } + sqs.log.Error("failed to connect to ScyllaDB Read Session, retrying in 1s", "retry", retry, "err", err) + time.Sleep(delayForAttempt(retry)) + continue + } + break + } + + // + // Read session + // + var readSession *gocql.Session + for retry := 0; ; retry++ { + readCluster := gocql.NewCluster(sqs.scyllaAddrs...) + readCluster.Keyspace = sqs.keyspace + // Default port, the client should automatically upgrade to shard-aware port + readCluster.Port = 9042 + readCluster.RetryPolicy = &ExponentialBackoffRetryPolicy{NumRetries: 5, Min: 10 * time.Millisecond, Max: 1 * time.Second} + readCluster.Consistency = gocql.One + readCluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy()) + readSession, err = readCluster.CreateSession() + if err != nil { + if retry > 200 { + return fmt.Errorf("failed to connect read session too many times: %w", err) + } + sqs.log.Error("failed to connect to ScyllaDB Read Session, retrying in 1s", "retry", retry, "err", err) + time.Sleep(delayForAttempt(retry)) + continue + } + break + } + + sqs.WriteSession = writeSession + sqs.ReadSession = readSession + + err = sqs.createTables() + if err != nil { + return fmt.Errorf("scylla could not create tables, %w", err) + } + sqs.lastShardCache.source = sqs + sqs.lastShardCache.Init() + return nil +} + +var createTableTexts = []string{ + `CREATE TABLE IF NOT EXISTS blocks (uid bigint, cid blob, rev varchar, root blob, block blob, PRIMARY KEY((uid,cid)))`, + // This is the INDEX I wish we could use, but scylla can't do it so we MATERIALIZED VIEW instead + //`CREATE INDEX IF NOT EXISTS block_by_rev ON blocks (uid, rev)`, + `CREATE MATERIALIZED VIEW IF NOT EXISTS blocks_by_uidrev +AS SELECT uid, rev, cid, root +FROM blocks +WHERE uid IS NOT NULL AND rev IS NOT NULL AND cid IS NOT NULL +PRIMARY KEY ((uid), rev, cid) WITH CLUSTERING ORDER BY (rev DESC)`, +} + +func (sqs *ScyllaStore) createTables() error { + for i, text := range createTableTexts { + err := sqs.WriteSession.Query(text).Exec() + if err != nil { + return fmt.Errorf("scylla create table statement [%d] %v: %w", i, text, err) + } + } + return nil +} + +// writeNewShard needed for DeltaSession.CloseWithRoot +func (sqs *ScyllaStore) writeNewShard(ctx context.Context, root cid.Cid, rev string, user models.Uid, seq int, blks map[cid.Cid]blockformat.Block, rmcids map[cid.Cid]bool) ([]byte, error) { + scWriteNewShard.Inc() + sqs.log.Debug("write shard", "uid", user, "root", root, "rev", rev, "nblocks", len(blks)) + start := time.Now() + ctx, span := otel.Tracer("carstore").Start(ctx, "writeNewShard") + defer span.End() + buf := new(bytes.Buffer) + hnw, err := WriteCarHeader(buf, root) + if err != nil { + return nil, fmt.Errorf("failed to write car header: %w", err) + } + offset := hnw + + dbroot := root.Bytes() + + span.SetAttributes(attribute.Int("blocks", len(blks))) + + for bcid, block := range blks { + // build shard for output firehose + nw, err := LdWrite(buf, bcid.Bytes(), block.RawData()) + if err != nil { + return nil, fmt.Errorf("failed to write block: %w", err) + } + offset += nw + + // TODO: scylla BATCH doesn't apply if the batch crosses partition keys; BUT, we may be able to send many blocks concurrently? + dbcid := bcid.Bytes() + blockbytes := block.RawData() + // we're relying on cql auto-prepare, no 'PreparedStatement' + err = sqs.WriteSession.Query( + `INSERT INTO blocks (uid, cid, rev, root, block) VALUES (?, ?, ?, ?, ?)`, + user, dbcid, rev, dbroot, blockbytes, + ).Idempotent(true).Exec() + if err != nil { + return nil, fmt.Errorf("(uid,cid) block store failed, %w", err) + } + sqs.log.Debug("put block", "uid", user, "cid", bcid, "size", len(blockbytes)) + } + + shard := CarShard{ + Root: models.DbCID{CID: root}, + DataStart: hnw, + Seq: seq, + Usr: user, + Rev: rev, + } + + sqs.lastShardCache.put(&shard) + + dt := time.Since(start).Seconds() + scWriteTimes.Observe(dt) + return buf.Bytes(), nil +} + +// GetLastShard nedeed for NewDeltaSession indirectly through lastShardCache +// What we actually seem to need from this: last {Rev, Root.CID} +func (sqs *ScyllaStore) GetLastShard(ctx context.Context, uid models.Uid) (*CarShard, error) { + scGetLastShard.Inc() + var rev string + var rootb []byte + err := sqs.ReadSession.Query(`SELECT rev, root FROM blocks_by_uidrev WHERE uid = ? ORDER BY rev DESC LIMIT 1`, uid).Scan(&rev, &rootb) + if errors.Is(err, gocql.ErrNotFound) { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("last shard err, %w", err) + } + xcid, cidErr := cid.Cast(rootb) + if cidErr != nil { + return nil, fmt.Errorf("last shard bad cid, %w", cidErr) + } + return &CarShard{ + Root: models.DbCID{CID: xcid}, + Rev: rev, + }, nil +} + +func (sqs *ScyllaStore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error) { + sqs.log.Warn("TODO: don't call compaction") + return nil, nil +} + +func (sqs *ScyllaStore) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error) { + sqs.log.Warn("TODO: don't call compaction targets") + return nil, nil +} + +func (sqs *ScyllaStore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error) { + // TODO: same as FileCarStore; re-unify + lastShard, err := sqs.lastShardCache.get(ctx, user) + if err != nil { + return cid.Undef, err + } + if lastShard == nil { + return cid.Undef, nil + } + if lastShard.ID == 0 { + return cid.Undef, nil + } + + return lastShard.Root.CID, nil +} + +func (sqs *ScyllaStore) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error) { + // TODO: same as FileCarStore; re-unify + lastShard, err := sqs.lastShardCache.get(ctx, user) + if err != nil { + return "", err + } + if lastShard == nil { + return "", nil + } + if lastShard.ID == 0 { + return "", nil + } + + return lastShard.Rev, nil +} + +func (sqs *ScyllaStore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error) { + // TODO: same as FileCarStore, re-unify + ctx, span := otel.Tracer("carstore").Start(ctx, "ImportSlice") + defer span.End() + + carr, err := car.NewCarReader(bytes.NewReader(carslice)) + if err != nil { + return cid.Undef, nil, err + } + + if len(carr.Header.Roots) != 1 { + return cid.Undef, nil, fmt.Errorf("invalid car file, header must have a single root (has %d)", len(carr.Header.Roots)) + } + + ds, err := sqs.NewDeltaSession(ctx, uid, since) + if err != nil { + return cid.Undef, nil, fmt.Errorf("new delta session failed: %w", err) + } + + var cids []cid.Cid + for { + blk, err := carr.Next() + if err != nil { + if err == io.EOF { + break + } + return cid.Undef, nil, err + } + + cids = append(cids, blk.Cid()) + + if err := ds.Put(ctx, blk); err != nil { + return cid.Undef, nil, err + } + } + + return carr.Header.Roots[0], ds, nil +} + +func (sqs *ScyllaStore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error) { + ctx, span := otel.Tracer("carstore").Start(ctx, "NewSession") + defer span.End() + + // TODO: ensure that we don't write updates on top of the wrong head + // this needs to be a compare and swap type operation + lastShard, err := sqs.lastShardCache.get(ctx, user) + if err != nil { + return nil, fmt.Errorf("NewDeltaSession, lsc, %w", err) + } + + if lastShard == nil { + lastShard = &zeroShard + } + + if since != nil && *since != lastShard.Rev { + return nil, fmt.Errorf("revision mismatch: %s != %s: %w", *since, lastShard.Rev, ErrRepoBaseMismatch) + } + + return &DeltaSession{ + blks: make(map[cid.Cid]blockformat.Block), + base: &sqliteUserView{ + uid: user, + sqs: sqs, + }, + user: user, + baseCid: lastShard.Root.CID, + cs: sqs, + seq: lastShard.Seq + 1, + lastRev: lastShard.Rev, + }, nil +} + +func (sqs *ScyllaStore) ReadOnlySession(user models.Uid) (*DeltaSession, error) { + return &DeltaSession{ + base: &sqliteUserView{ + uid: user, + sqs: sqs, + }, + readonly: true, + user: user, + cs: sqs, + }, nil +} + +// ReadUserCar +// incremental is only ever called true +func (sqs *ScyllaStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, shardOut io.Writer) error { + scGetCar.Inc() + ctx, span := otel.Tracer("carstore").Start(ctx, "ReadUserCar") + defer span.End() + start := time.Now() + + cidchan := make(chan cid.Cid, 100) + + go func() { + defer close(cidchan) + cids := sqs.ReadSession.Query(`SELECT cid FROM blocks_by_uidrev WHERE uid = ? AND rev > ? ORDER BY rev DESC`, user, sinceRev).Iter() + defer cids.Close() + for { + var cidb []byte + ok := cids.Scan(&cidb) + if !ok { + break + } + xcid, cidErr := cid.Cast(cidb) + if cidErr != nil { + sqs.log.Warn("ReadUserCar bad cid", "err", cidErr) + continue + } + cidchan <- xcid + } + }() + nblocks := 0 + first := true + for xcid := range cidchan { + var xrev string + var xroot []byte + var xblock []byte + err := sqs.ReadSession.Query("SELECT rev, root, block FROM blocks WHERE uid = ? AND cid = ? LIMIT 1", user, xcid.Bytes()).Scan(&xrev, &xroot, &xblock) + if err != nil { + return fmt.Errorf("rcar bad read, %w", err) + } + if first { + rootCid, cidErr := cid.Cast(xroot) + if cidErr != nil { + return fmt.Errorf("rcar bad rootcid, %w", err) + } + if err := car.WriteHeader(&car.CarHeader{ + Roots: []cid.Cid{rootCid}, + Version: 1, + }, shardOut); err != nil { + return fmt.Errorf("rcar bad header, %w", err) + } + first = false + } + nblocks++ + _, err = LdWrite(shardOut, xcid.Bytes(), xblock) + if err != nil { + return fmt.Errorf("rcar bad write, %w", err) + } + } + span.SetAttributes(attribute.Int("blocks", nblocks)) + sqs.log.Debug("read car", "nblocks", nblocks, "since", sinceRev) + scReadCarTimes.Observe(time.Since(start).Seconds()) + return nil +} + +// Stat is only used in a debugging admin handler +// don't bother implementing it (for now?) +func (sqs *ScyllaStore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error) { + sqs.log.Warn("Stat debugging method not implemented for sqlite store") + return nil, nil +} + +func (sqs *ScyllaStore) WipeUserData(ctx context.Context, user models.Uid) error { + ctx, span := otel.Tracer("carstore").Start(ctx, "WipeUserData") + defer span.End() + + // LOL, can't do this if primary key is (uid,cid) because that's hashed with no scan! + //err := sqs.WriteSession.Query("DELETE FROM blocks WHERE uid = ?", user).Exec() + + cidchan := make(chan cid.Cid, 100) + + go func() { + defer close(cidchan) + cids := sqs.ReadSession.Query(`SELECT cid FROM blocks_by_uidrev WHERE uid = ?`, user).Iter() + defer cids.Close() + for { + var cidb []byte + ok := cids.Scan(&cidb) + if !ok { + break + } + xcid, cidErr := cid.Cast(cidb) + if cidErr != nil { + sqs.log.Warn("ReadUserCar bad cid", "err", cidErr) + continue + } + cidchan <- xcid + } + }() + nblocks := 0 + errcount := 0 + for xcid := range cidchan { + err := sqs.ReadSession.Query("DELETE FROM blocks WHERE uid = ? AND cid = ?", user, xcid.Bytes()).Exec() + if err != nil { + sqs.log.Warn("ReadUserCar bad delete", "err", err) + errcount++ + if errcount > 10 { + return err + } + } + nblocks++ + } + scUsersWiped.Inc() + scBlocksDeleted.Add(float64(nblocks)) + return nil +} + +// HasUidCid needed for NewDeltaSession userView +func (sqs *ScyllaStore) HasUidCid(ctx context.Context, user models.Uid, bcid cid.Cid) (bool, error) { + // TODO: this is pretty cacheable? invalidate (uid,*) on WipeUserData + scHas.Inc() + var rev string + var rootb []byte + err := sqs.ReadSession.Query(`SELECT rev, root FROM blocks WHERE uid = ? AND cid = ? LIMIT 1`, user, bcid.Bytes()).Scan(&rev, &rootb) + if err != nil { + return false, fmt.Errorf("hasUC bad scan, %w", err) + } + return true, nil +} + +func (sqs *ScyllaStore) CarStore() CarStore { + return sqs +} + +func (sqs *ScyllaStore) Close() error { + sqs.WriteSession.Close() + sqs.ReadSession.Close() + return nil +} + +func (sqs *ScyllaStore) getBlock(ctx context.Context, user models.Uid, bcid cid.Cid) (blockformat.Block, error) { + // TODO: this is pretty cacheable? invalidate (uid,*) on WipeUserData + scGetBlock.Inc() + start := time.Now() + var blockb []byte + err := sqs.ReadSession.Query("SELECT block FROM blocks WHERE uid = ? AND cid = ? LIMIT 1", user, bcid.Bytes()).Scan(&blockb) + if err != nil { + return nil, fmt.Errorf("getb err, %w", err) + } + dt := time.Since(start) + scGetTimes.Observe(dt.Seconds()) + return blocks.NewBlock(blockb), nil +} + +func (sqs *ScyllaStore) getBlockSize(ctx context.Context, user models.Uid, bcid cid.Cid) (int64, error) { + // TODO: this is pretty cacheable? invalidate (uid,*) on WipeUserData + scGetBlockSize.Inc() + var out int64 + err := sqs.ReadSession.Query("SELECT length(block) FROM blocks WHERE uid = ? AND cid = ? LIMIT 1", user, bcid.Bytes()).Scan(&out) + if err != nil { + return 0, fmt.Errorf("getbs err, %w", err) + } + return out, nil +} + +var scUsersWiped = promauto.NewCounter(prometheus.CounterOpts{ + Name: "bgs_sc_users_wiped", + Help: "User rows deleted in scylla backend", +}) + +var scBlocksDeleted = promauto.NewCounter(prometheus.CounterOpts{ + Name: "bgs_sc_blocks_deleted", + Help: "User blocks deleted in scylla backend", +}) + +var scGetBlock = promauto.NewCounter(prometheus.CounterOpts{ + Name: "bgs_sc_get_block", + Help: "get block scylla backend", +}) + +var scGetBlockSize = promauto.NewCounter(prometheus.CounterOpts{ + Name: "bgs_sc_get_block_size", + Help: "get block size scylla backend", +}) + +var scGetCar = promauto.NewCounter(prometheus.CounterOpts{ + Name: "bgs_sc_get_car", + Help: "get block scylla backend", +}) + +var scHas = promauto.NewCounter(prometheus.CounterOpts{ + Name: "bgs_sc_has", + Help: "check block presence scylla backend", +}) + +var scGetLastShard = promauto.NewCounter(prometheus.CounterOpts{ + Name: "bgs_sc_get_last_shard", + Help: "get last shard scylla backend", +}) + +var scWriteNewShard = promauto.NewCounter(prometheus.CounterOpts{ + Name: "bgs_sc_write_shard", + Help: "write shard blocks scylla backend", +}) + +var timeBuckets []float64 +var scWriteTimes prometheus.Histogram +var scGetTimes prometheus.Histogram +var scReadCarTimes prometheus.Histogram + +func init() { + timeBuckets = make([]float64, 1, 20) + timeBuckets[0] = 0.000_0100 + i := 0 + for timeBuckets[i] < 1 && len(timeBuckets) < 20 { + timeBuckets = append(timeBuckets, timeBuckets[i]*2) + i++ + } + scWriteTimes = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "bgs_sc_write_times", + Buckets: timeBuckets, + }) + scGetTimes = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "bgs_sc_get_times", + Buckets: timeBuckets, + }) + scReadCarTimes = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "bgs_sc_readcar_times", + Buckets: timeBuckets, + }) +} + +// TODO: copied from tango, re-unify? +// ExponentialBackoffRetryPolicy sleeps between attempts +type ExponentialBackoffRetryPolicy struct { + NumRetries int + Min, Max time.Duration +} + +func (e *ExponentialBackoffRetryPolicy) napTime(attempts int) time.Duration { + return getExponentialTime(e.Min, e.Max, attempts) +} + +func (e *ExponentialBackoffRetryPolicy) Attempt(q gocql.RetryableQuery) bool { + if q.Attempts() > e.NumRetries { + return false + } + time.Sleep(e.napTime(q.Attempts())) + return true +} + +// used to calculate exponentially growing time +func getExponentialTime(min time.Duration, max time.Duration, attempts int) time.Duration { + if min <= 0 { + min = 100 * time.Millisecond + } + if max <= 0 { + max = 10 * time.Second + } + minFloat := float64(min) + napDuration := minFloat * math.Pow(2, float64(attempts-1)) + // add some jitter + napDuration += rand.Float64()*minFloat - (minFloat / 2) + if napDuration > float64(max) { + return time.Duration(max) + } + return time.Duration(napDuration) +} + +// GetRetryType returns the retry type for the given error +func (e *ExponentialBackoffRetryPolicy) GetRetryType(err error) gocql.RetryType { + // Retry timeouts and/or contention errors on the same host + if errors.Is(err, gocql.ErrTimeoutNoResponse) || + errors.Is(err, gocql.ErrNoStreams) || + errors.Is(err, gocql.ErrTooManyTimeouts) { + return gocql.Retry + } + + // Retry next host on unavailable errors + if errors.Is(err, gocql.ErrUnavailable) || + errors.Is(err, gocql.ErrConnectionClosed) || + errors.Is(err, gocql.ErrSessionClosed) { + return gocql.RetryNextHost + } + + // Otherwise don't retry + return gocql.Rethrow +} + +func delayForAttempt(attempt int) time.Duration { + if attempt < 50 { + return time.Millisecond * 5 + } + + return time.Second +} diff --git a/carstore/sqlite_store.go b/carstore/sqlite_store.go new file mode 100644 index 000000000..18a8467db --- /dev/null +++ b/carstore/sqlite_store.go @@ -0,0 +1,576 @@ +package carstore + +import ( + "bytes" + "context" + "database/sql" + "errors" + "fmt" + "go.opentelemetry.io/otel/attribute" + "io" + "log/slog" + "os" + "path/filepath" + + "github.com/bluesky-social/indigo/models" + blockformat "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-libipfs/blocks" + "github.com/ipld/go-car" + _ "github.com/mattn/go-sqlite3" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "go.opentelemetry.io/otel" +) + +// var log = logging.Logger("sqstore") + +type SQLiteStore struct { + dbPath string + db *sql.DB + + log *slog.Logger + + lastShardCache lastShardCache +} + +func ensureDir(path string) error { + fi, err := os.Stat(path) + if err != nil { + if os.IsNotExist(err) { + return os.MkdirAll(path, 0755) + } + return err + } + if fi.IsDir() { + return nil + } + return fmt.Errorf("%s exists but is not a directory", path) +} + +func NewSqliteStore(csdir string) (*SQLiteStore, error) { + if err := ensureDir(csdir); err != nil { + return nil, err + } + dbpath := filepath.Join(csdir, "db.sqlite3") + out := new(SQLiteStore) + err := out.Open(dbpath) + if err != nil { + return nil, err + } + return out, nil +} + +func (sqs *SQLiteStore) Open(path string) error { + if sqs.log == nil { + sqs.log = slog.Default() + } + sqs.log.Debug("open db", "path", path) + db, err := sql.Open("sqlite3", path) + if err != nil { + return fmt.Errorf("%s: sqlite could not open, %w", path, err) + } + sqs.db = db + sqs.dbPath = path + err = sqs.createTables() + if err != nil { + return fmt.Errorf("%s: sqlite could not create tables, %w", path, err) + } + sqs.lastShardCache.source = sqs + sqs.lastShardCache.Init() + return nil +} + +func (sqs *SQLiteStore) createTables() error { + tx, err := sqs.db.Begin() + if err != nil { + return err + } + defer tx.Rollback() + _, err = tx.Exec("CREATE TABLE IF NOT EXISTS blocks (uid int, cid blob, rev varchar, root blob, block blob, PRIMARY KEY(uid,cid));") + if err != nil { + return fmt.Errorf("%s: create table blocks..., %w", sqs.dbPath, err) + } + _, err = tx.Exec("CREATE INDEX IF NOT EXISTS blocx_by_rev ON blocks (uid, rev DESC)") + if err != nil { + return fmt.Errorf("%s: create blocks by rev index, %w", sqs.dbPath, err) + } + return tx.Commit() +} + +// writeNewShard needed for DeltaSession.CloseWithRoot +func (sqs *SQLiteStore) writeNewShard(ctx context.Context, root cid.Cid, rev string, user models.Uid, seq int, blks map[cid.Cid]blockformat.Block, rmcids map[cid.Cid]bool) ([]byte, error) { + sqWriteNewShard.Inc() + sqs.log.Debug("write shard", "uid", user, "root", root, "rev", rev, "nblocks", len(blks)) + ctx, span := otel.Tracer("carstore").Start(ctx, "writeNewShard") + defer span.End() + // this is "write many blocks", "write one block" is above in putBlock(). keep them in sync. + buf := new(bytes.Buffer) + hnw, err := WriteCarHeader(buf, root) + if err != nil { + return nil, fmt.Errorf("failed to write car header: %w", err) + } + offset := hnw + + tx, err := sqs.db.BeginTx(ctx, nil) + if err != nil { + return nil, fmt.Errorf("bad block insert tx, %w", err) + } + defer tx.Rollback() + insertStatement, err := tx.PrepareContext(ctx, "INSERT INTO blocks (uid, cid, rev, root, block) VALUES (?, ?, ?, ?, ?) ON CONFLICT (uid,cid) DO UPDATE SET rev=excluded.rev, root=excluded.root, block=excluded.block") + if err != nil { + return nil, fmt.Errorf("bad block insert sql, %w", err) + } + defer insertStatement.Close() + + dbroot := models.DbCID{CID: root} + + span.SetAttributes(attribute.Int("blocks", len(blks))) + + for bcid, block := range blks { + // build shard for output firehose + nw, err := LdWrite(buf, bcid.Bytes(), block.RawData()) + if err != nil { + return nil, fmt.Errorf("failed to write block: %w", err) + } + offset += nw + + // TODO: better databases have an insert-many option for a prepared statement + dbcid := models.DbCID{CID: bcid} + blockbytes := block.RawData() + _, err = insertStatement.ExecContext(ctx, user, dbcid, rev, dbroot, blockbytes) + if err != nil { + return nil, fmt.Errorf("(uid,cid) block store failed, %w", err) + } + sqs.log.Debug("put block", "uid", user, "cid", bcid, "size", len(blockbytes)) + } + err = tx.Commit() + if err != nil { + return nil, fmt.Errorf("bad block insert commit, %w", err) + } + + shard := CarShard{ + Root: models.DbCID{CID: root}, + DataStart: hnw, + Seq: seq, + Usr: user, + Rev: rev, + } + + sqs.lastShardCache.put(&shard) + + return buf.Bytes(), nil +} + +var ErrNothingThere = errors.New("nothing to read)") + +// GetLastShard nedeed for NewDeltaSession indirectly through lastShardCache +// What we actually seem to need from this: last {Rev, Root.CID} +func (sqs *SQLiteStore) GetLastShard(ctx context.Context, uid models.Uid) (*CarShard, error) { + sqGetLastShard.Inc() + tx, err := sqs.db.BeginTx(ctx, &txReadOnly) + if err != nil { + return nil, fmt.Errorf("bad last shard tx, %w", err) + } + defer tx.Rollback() + qstmt, err := tx.PrepareContext(ctx, "SELECT rev, root FROM blocks WHERE uid = ? ORDER BY rev DESC LIMIT 1") + if err != nil { + return nil, fmt.Errorf("bad last shard sql, %w", err) + } + rows, err := qstmt.QueryContext(ctx, uid) + if err != nil { + return nil, fmt.Errorf("last shard err, %w", err) + } + if rows.Next() { + var rev string + var rootb models.DbCID + err = rows.Scan(&rev, &rootb) + if err != nil { + return nil, fmt.Errorf("last shard bad scan, %w", err) + } + return &CarShard{ + Root: rootb, + Rev: rev, + }, nil + } + return nil, nil +} + +func (sqs *SQLiteStore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error) { + sqs.log.Warn("TODO: don't call compaction") + return nil, nil +} + +func (sqs *SQLiteStore) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error) { + sqs.log.Warn("TODO: don't call compaction targets") + return nil, nil +} + +func (sqs *SQLiteStore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error) { + // TODO: same as FileCarStore; re-unify + lastShard, err := sqs.lastShardCache.get(ctx, user) + if err != nil { + return cid.Undef, err + } + if lastShard == nil { + return cid.Undef, nil + } + if lastShard.ID == 0 { + return cid.Undef, nil + } + + return lastShard.Root.CID, nil +} + +func (sqs *SQLiteStore) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error) { + // TODO: same as FileCarStore; re-unify + lastShard, err := sqs.lastShardCache.get(ctx, user) + if err != nil { + return "", err + } + if lastShard == nil { + return "", nil + } + if lastShard.ID == 0 { + return "", nil + } + + return lastShard.Rev, nil +} + +func (sqs *SQLiteStore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error) { + // TODO: same as FileCarStore, re-unify + ctx, span := otel.Tracer("carstore").Start(ctx, "ImportSlice") + defer span.End() + + carr, err := car.NewCarReader(bytes.NewReader(carslice)) + if err != nil { + return cid.Undef, nil, err + } + + if len(carr.Header.Roots) != 1 { + return cid.Undef, nil, fmt.Errorf("invalid car file, header must have a single root (has %d)", len(carr.Header.Roots)) + } + + ds, err := sqs.NewDeltaSession(ctx, uid, since) + if err != nil { + return cid.Undef, nil, fmt.Errorf("new delta session failed: %w", err) + } + + var cids []cid.Cid + for { + blk, err := carr.Next() + if err != nil { + if err == io.EOF { + break + } + return cid.Undef, nil, err + } + + cids = append(cids, blk.Cid()) + + if err := ds.Put(ctx, blk); err != nil { + return cid.Undef, nil, err + } + } + + return carr.Header.Roots[0], ds, nil +} + +var zeroShard CarShard + +func (sqs *SQLiteStore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error) { + ctx, span := otel.Tracer("carstore").Start(ctx, "NewSession") + defer span.End() + + // TODO: ensure that we don't write updates on top of the wrong head + // this needs to be a compare and swap type operation + lastShard, err := sqs.lastShardCache.get(ctx, user) + if err != nil { + return nil, fmt.Errorf("NewDeltaSession, lsc, %w", err) + } + + if lastShard == nil { + lastShard = &zeroShard + } + + if since != nil && *since != lastShard.Rev { + return nil, fmt.Errorf("revision mismatch: %s != %s: %w", *since, lastShard.Rev, ErrRepoBaseMismatch) + } + + return &DeltaSession{ + blks: make(map[cid.Cid]blockformat.Block), + base: &sqliteUserView{ + uid: user, + sqs: sqs, + }, + user: user, + baseCid: lastShard.Root.CID, + cs: sqs, + seq: lastShard.Seq + 1, + lastRev: lastShard.Rev, + }, nil +} + +func (sqs *SQLiteStore) ReadOnlySession(user models.Uid) (*DeltaSession, error) { + return &DeltaSession{ + base: &sqliteUserView{ + uid: user, + sqs: sqs, + }, + readonly: true, + user: user, + cs: sqs, + }, nil +} + +type cartmp struct { + xcid cid.Cid + rev string + root string + block []byte +} + +// ReadUserCar +// incremental is only ever called true +func (sqs *SQLiteStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, shardOut io.Writer) error { + sqGetCar.Inc() + ctx, span := otel.Tracer("carstore").Start(ctx, "ReadUserCar") + defer span.End() + + tx, err := sqs.db.BeginTx(ctx, &txReadOnly) + if err != nil { + return fmt.Errorf("rcar tx, %w", err) + } + defer tx.Rollback() + qstmt, err := tx.PrepareContext(ctx, "SELECT cid,rev,root,block FROM blocks WHERE uid = ? AND rev > ? ORDER BY rev DESC") + if err != nil { + return fmt.Errorf("rcar sql, %w", err) + } + defer qstmt.Close() + rows, err := qstmt.QueryContext(ctx, user, sinceRev) + if err != nil { + return fmt.Errorf("rcar err, %w", err) + } + nblocks := 0 + first := true + for rows.Next() { + var xcid models.DbCID + var xrev string + var xroot models.DbCID + var xblock []byte + err = rows.Scan(&xcid, &xrev, &xroot, &xblock) + if err != nil { + return fmt.Errorf("rcar bad scan, %w", err) + } + if first { + if err := car.WriteHeader(&car.CarHeader{ + Roots: []cid.Cid{xroot.CID}, + Version: 1, + }, shardOut); err != nil { + return fmt.Errorf("rcar bad header, %w", err) + } + first = false + } + nblocks++ + _, err := LdWrite(shardOut, xcid.CID.Bytes(), xblock) + if err != nil { + return fmt.Errorf("rcar bad write, %w", err) + } + } + sqs.log.Debug("read car", "nblocks", nblocks, "since", sinceRev) + return nil +} + +// Stat is only used in a debugging admin handler +// don't bother implementing it (for now?) +func (sqs *SQLiteStore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error) { + sqs.log.Warn("Stat debugging method not implemented for sqlite store") + return nil, nil +} + +func (sqs *SQLiteStore) WipeUserData(ctx context.Context, user models.Uid) error { + ctx, span := otel.Tracer("carstore").Start(ctx, "WipeUserData") + defer span.End() + tx, err := sqs.db.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("wipe tx, %w", err) + } + defer tx.Rollback() + deleteResult, err := tx.ExecContext(ctx, "DELETE FROM blocks WHERE uid = ?", user) + nrows, ierr := deleteResult.RowsAffected() + if ierr == nil { + sqRowsDeleted.Add(float64(nrows)) + } + if err == nil { + err = ierr + } + if err == nil { + err = tx.Commit() + } + return err +} + +var txReadOnly = sql.TxOptions{ReadOnly: true} + +// HasUidCid needed for NewDeltaSession userView +func (sqs *SQLiteStore) HasUidCid(ctx context.Context, user models.Uid, bcid cid.Cid) (bool, error) { + // TODO: this is pretty cacheable? invalidate (uid,*) on WipeUserData + sqHas.Inc() + tx, err := sqs.db.BeginTx(ctx, &txReadOnly) + if err != nil { + return false, fmt.Errorf("hasUC tx, %w", err) + } + defer tx.Rollback() + qstmt, err := tx.PrepareContext(ctx, "SELECT rev, root FROM blocks WHERE uid = ? AND cid = ? LIMIT 1") + if err != nil { + return false, fmt.Errorf("hasUC sql, %w", err) + } + defer qstmt.Close() + rows, err := qstmt.QueryContext(ctx, user, models.DbCID{CID: bcid}) + if err != nil { + return false, fmt.Errorf("hasUC err, %w", err) + } + if rows.Next() { + var rev string + var rootb models.DbCID + err = rows.Scan(&rev, &rootb) + if err != nil { + return false, fmt.Errorf("hasUC bad scan, %w", err) + } + return true, nil + } + return false, nil +} + +func (sqs *SQLiteStore) CarStore() CarStore { + return sqs +} + +func (sqs *SQLiteStore) Close() error { + return sqs.db.Close() +} + +func (sqs *SQLiteStore) getBlock(ctx context.Context, user models.Uid, bcid cid.Cid) (blockformat.Block, error) { + // TODO: this is pretty cacheable? invalidate (uid,*) on WipeUserData + sqGetBlock.Inc() + tx, err := sqs.db.BeginTx(ctx, &txReadOnly) + if err != nil { + return nil, fmt.Errorf("getb tx, %w", err) + } + defer tx.Rollback() + qstmt, err := tx.PrepareContext(ctx, "SELECT block FROM blocks WHERE uid = ? AND cid = ? LIMIT 1") + if err != nil { + return nil, fmt.Errorf("getb sql, %w", err) + } + defer qstmt.Close() + rows, err := qstmt.QueryContext(ctx, user, models.DbCID{CID: bcid}) + if err != nil { + return nil, fmt.Errorf("getb err, %w", err) + } + if rows.Next() { + //var rev string + //var rootb models.DbCID + var blockb []byte + err = rows.Scan(&blockb) + if err != nil { + return nil, fmt.Errorf("getb bad scan, %w", err) + } + return blocks.NewBlock(blockb), nil + } + return nil, ErrNothingThere +} + +func (sqs *SQLiteStore) getBlockSize(ctx context.Context, user models.Uid, bcid cid.Cid) (int64, error) { + // TODO: this is pretty cacheable? invalidate (uid,*) on WipeUserData + sqGetBlockSize.Inc() + tx, err := sqs.db.BeginTx(ctx, &txReadOnly) + if err != nil { + return 0, fmt.Errorf("getbs tx, %w", err) + } + defer tx.Rollback() + qstmt, err := tx.PrepareContext(ctx, "SELECT length(block) FROM blocks WHERE uid = ? AND cid = ? LIMIT 1") + if err != nil { + return 0, fmt.Errorf("getbs sql, %w", err) + } + defer qstmt.Close() + rows, err := qstmt.QueryContext(ctx, user, models.DbCID{CID: bcid}) + if err != nil { + return 0, fmt.Errorf("getbs err, %w", err) + } + if rows.Next() { + var out int64 + err = rows.Scan(&out) + if err != nil { + return 0, fmt.Errorf("getbs bad scan, %w", err) + } + return out, nil + } + return 0, nil +} + +type sqliteUserViewInner interface { + HasUidCid(ctx context.Context, user models.Uid, bcid cid.Cid) (bool, error) + getBlock(ctx context.Context, user models.Uid, bcid cid.Cid) (blockformat.Block, error) + getBlockSize(ctx context.Context, user models.Uid, bcid cid.Cid) (int64, error) +} + +// TODO: rename, used by both sqlite and scylla +type sqliteUserView struct { + sqs sqliteUserViewInner + uid models.Uid +} + +func (s sqliteUserView) Has(ctx context.Context, c cid.Cid) (bool, error) { + // TODO: cache block metadata? + return s.sqs.HasUidCid(ctx, s.uid, c) +} + +func (s sqliteUserView) Get(ctx context.Context, c cid.Cid) (blockformat.Block, error) { + // TODO: cache blocks? + return s.sqs.getBlock(ctx, s.uid, c) +} + +func (s sqliteUserView) GetSize(ctx context.Context, c cid.Cid) (int, error) { + // TODO: cache block metadata? + bigsize, err := s.sqs.getBlockSize(ctx, s.uid, c) + return int(bigsize), err +} + +// ensure we implement the interface +var _ minBlockstore = (*sqliteUserView)(nil) + +var sqRowsDeleted = promauto.NewCounter(prometheus.CounterOpts{ + Name: "bgs_sq_rows_deleted", + Help: "User rows deleted in sqlite backend", +}) + +var sqGetBlock = promauto.NewCounter(prometheus.CounterOpts{ + Name: "bgs_sq_get_block", + Help: "get block sqlite backend", +}) + +var sqGetBlockSize = promauto.NewCounter(prometheus.CounterOpts{ + Name: "bgs_sq_get_block_size", + Help: "get block size sqlite backend", +}) + +var sqGetCar = promauto.NewCounter(prometheus.CounterOpts{ + Name: "bgs_sq_get_car", + Help: "get block sqlite backend", +}) + +var sqHas = promauto.NewCounter(prometheus.CounterOpts{ + Name: "bgs_sq_has", + Help: "check block presence sqlite backend", +}) + +var sqGetLastShard = promauto.NewCounter(prometheus.CounterOpts{ + Name: "bgs_sq_get_last_shard", + Help: "get last shard sqlite backend", +}) + +var sqWriteNewShard = promauto.NewCounter(prometheus.CounterOpts{ + Name: "bgs_sq_write_shard", + Help: "write shard blocks sqlite backend", +}) diff --git a/cmd/bigsky/main.go b/cmd/bigsky/main.go index 33c798508..4b7e5a725 100644 --- a/cmd/bigsky/main.go +++ b/cmd/bigsky/main.go @@ -99,6 +99,12 @@ func run(args []string) error { Name: "crawl-insecure-ws", Usage: "when connecting to PDS instances, use ws:// instead of wss://", }, + &cli.StringFlag{ + Name: "crawl-ssl-policy", + Usage: "when connecting to PDS instances, 'any' allows either ws:// or wss://, 'require' requires wss://, 'prod' requires wss:// of external requestCrawl hosts, 'none' rejects wss:// connections", + Value: "prod", + EnvVars: []string{"RELAY_CRAWL_NONSSL"}, + }, &cli.BoolFlag{ Name: "spidering", Value: false, @@ -216,6 +222,17 @@ func run(args []string) error { Usage: "forward POST requestCrawl to this url, should be machine root url and not xrpc/requestCrawl, comma separated list", EnvVars: []string{"RELAY_NEXT_CRAWLER"}, }, + &cli.BoolFlag{ + Name: "ex-sqlite-carstore", + Usage: "enable experimental sqlite carstore", + Value: false, + }, + &cli.StringSliceFlag{ + Name: "scylla-carstore", + Usage: "scylla server addresses for storage backend, comma separated", + Value: &cli.StringSlice{}, + EnvVars: []string{"RELAY_SCYLLA_NODES"}, + }, &cli.BoolFlag{ Name: "non-archival", EnvVars: []string{"RELAY_NON_ARCHIVAL"}, @@ -316,56 +333,72 @@ func runBigsky(cctx *cli.Context) error { return err } - slog.Info("setting up main database") dburl := cctx.String("db-url") + slog.Info("setting up main database", "url", dburl) db, err := cliutil.SetupDatabase(dburl, cctx.Int("max-metadb-connections")) if err != nil { return err } - - slog.Info("setting up carstore database") - csdburl := cctx.String("carstore-db-url") - csdb, err := cliutil.SetupDatabase(csdburl, cctx.Int("max-carstore-connections")) - if err != nil { - return err - } - if cctx.Bool("db-tracing") { if err := db.Use(tracing.NewPlugin()); err != nil { return err } - if err := csdb.Use(tracing.NewPlugin()); err != nil { - return err - } - } - - csdirs := []string{csdir} - if paramDirs := cctx.StringSlice("carstore-shard-dirs"); len(paramDirs) > 0 { - csdirs = paramDirs } - for _, csd := range csdirs { - if err := os.MkdirAll(filepath.Dir(csd), os.ModePerm); err != nil { + var cstore carstore.CarStore + scyllaAddrs := cctx.StringSlice("scylla-carstore") + sqliteStore := cctx.Bool("ex-sqlite-carstore") + if len(scyllaAddrs) != 0 { + slog.Info("starting scylla carstore", "addrs", scyllaAddrs) + cstore, err = carstore.NewScyllaStore(scyllaAddrs, "cs") + } else if sqliteStore { + slog.Info("starting sqlite carstore", "dir", csdir) + cstore, err = carstore.NewSqliteStore(csdir) + } else if cctx.Bool("non-archival") { + csdburl := cctx.String("carstore-db-url") + slog.Info("setting up non-archival carstore database", "url", csdburl) + csdb, err := cliutil.SetupDatabase(csdburl, cctx.Int("max-carstore-connections")) + if err != nil { return err } - } - - var cstore carstore.CarStore - - if cctx.Bool("non-archival") { + if cctx.Bool("db-tracing") { + if err := csdb.Use(tracing.NewPlugin()); err != nil { + return err + } + } cs, err := carstore.NewNonArchivalCarstore(csdb) if err != nil { return err } - cstore = cs } else { - cs, err := carstore.NewCarStore(csdb, csdirs) + // make standard FileCarStore + csdburl := cctx.String("carstore-db-url") + slog.Info("setting up carstore database", "url", csdburl) + csdb, err := cliutil.SetupDatabase(csdburl, cctx.Int("max-carstore-connections")) if err != nil { return err } + if cctx.Bool("db-tracing") { + if err := csdb.Use(tracing.NewPlugin()); err != nil { + return err + } + } + csdirs := []string{csdir} + if paramDirs := cctx.StringSlice("carstore-shard-dirs"); len(paramDirs) > 0 { + csdirs = paramDirs + } - cstore = cs + for _, csd := range csdirs { + if err := os.MkdirAll(filepath.Dir(csd), os.ModePerm); err != nil { + return err + } + } + cstore, err = carstore.NewCarStore(csdb, csdirs) + } + + if err != nil { + return err } // DID RESOLUTION @@ -479,12 +512,30 @@ func runBigsky(cctx *cli.Context) error { slog.Info("constructing bgs") bgsConfig := libbgs.DefaultBGSConfig() - bgsConfig.SSL = !cctx.Bool("crawl-insecure-ws") + switch strings.ToLower(cctx.String("crawl-ssl-policy")) { + case "any": + bgsConfig.SSL = libbgs.SlurperMixedSSL + case "none": + bgsConfig.SSL = libbgs.SlurperDisableSSL + case "require": + bgsConfig.SSL = libbgs.SlurperRequireSSL + case "prod": + bgsConfig.SSL = libbgs.SlurperRequireExternalSSL + case "": + if cctx.Bool("crawl-insecure-ws") { + bgsConfig.SSL = libbgs.SlurperDisableSSL + } else { + bgsConfig.SSL = libbgs.SlurperRequireSSL + } + default: + return fmt.Errorf("crawl-ssl-policy/RELAY_CRAWL_NONSSL exepected any|none|require|prod, got %s", cctx.String("crawl-ssl-policy")) + } bgsConfig.CompactInterval = cctx.Duration("compact-interval") bgsConfig.ConcurrencyPerPDS = cctx.Int64("concurrency-per-pds") bgsConfig.MaxQueuePerPDS = cctx.Int64("max-queue-per-pds") bgsConfig.DefaultRepoLimit = cctx.Int64("default-repo-limit") bgsConfig.NumCompactionWorkers = cctx.Int("num-compaction-workers") + bgsConfig.VerboseAPILog = cctx.String("env") == "dev" nextCrawlers := cctx.StringSlice("next-crawler") if len(nextCrawlers) != 0 { nextCrawlerUrls := make([]*url.URL, len(nextCrawlers)) diff --git a/cmd/gosky/debug.go b/cmd/gosky/debug.go index 2037c328c..a51895244 100644 --- a/cmd/gosky/debug.go +++ b/cmd/gosky/debug.go @@ -885,7 +885,7 @@ var debugCompareReposCmd = &cli.Command{ rep1, err = repo.ReadRepoFromCar(ctx, bytes.NewReader(repo1bytes)) if err != nil { - logger.Error("reading repo", "err", err) + logger.Error("reading repo", "err", err, "bytes", len(repo1bytes)) os.Exit(1) return } @@ -904,7 +904,7 @@ var debugCompareReposCmd = &cli.Command{ rep2, err = repo.ReadRepoFromCar(ctx, bytes.NewReader(repo2bytes)) if err != nil { - logger.Error("reading repo", "err", err) + logger.Error("reading repo", "err", err, "bytes", len(repo2bytes)) os.Exit(1) return } diff --git a/go.mod b/go.mod index 6b8e11548..7713f10a3 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/flosch/pongo2/v6 v6.0.0 github.com/go-redis/cache/v9 v9.0.0 github.com/goccy/go-json v0.10.2 + github.com/gocql/gocql v0.0.0-00010101000000-000000000000 github.com/golang-jwt/jwt v3.2.2+incompatible github.com/gorilla/websocket v1.5.1 github.com/hashicorp/go-retryablehttp v0.7.5 @@ -87,9 +88,11 @@ require ( github.com/cockroachdb/redact v1.1.5 // indirect github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/fsnotify/fsnotify v1.5.4 // indirect github.com/getsentry/sentry-go v0.27.0 // indirect github.com/go-redis/redis v6.15.9+incompatible // indirect github.com/golang/snappy v0.0.4 // indirect + github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/ipfs/go-log v1.0.5 // indirect github.com/jackc/puddle/v2 v2.2.1 // indirect @@ -106,6 +109,7 @@ require ( github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11 // indirect golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa // indirect + gopkg.in/inf.v0 v0.9.1 // indirect ) require ( @@ -152,7 +156,7 @@ require ( github.com/lestrrat-go/option v1.0.1 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect - github.com/mattn/go-sqlite3 v1.14.22 // indirect + github.com/mattn/go-sqlite3 v1.14.22 github.com/multiformats/go-base32 v0.1.0 // indirect github.com/multiformats/go-base36 v0.2.0 // indirect github.com/multiformats/go-multibase v0.2.0 // indirect @@ -188,3 +192,5 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect lukechampine.com/blake3 v1.2.1 // indirect ) + +replace github.com/gocql/gocql => github.com/scylladb/gocql v1.14.4 diff --git a/go.sum b/go.sum index 5f651b904..4fd78df6d 100644 --- a/go.sum +++ b/go.sum @@ -73,6 +73,10 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYEDvkta6I8/rnYM5gSdSV2tJ6XbZuEtY= +github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k= +github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= +github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/bradfitz/gomemcache v0.0.0-20230905024940-24af94b03874 h1:N7oVaKyGp8bttX0bfZGmcGkjz7DLQXhAn3DNd3T0ous= github.com/bradfitz/gomemcache v0.0.0-20230905024940-24af94b03874/go.mod h1:r5xuitiExdLAJ09PR7vBVENGvp4ZuTBeWTGtxuX3K+c= github.com/brianvoe/gofakeit/v6 v6.25.0 h1:ZpFjktOpLZUeF8q223o0rUuXtA+m5qW5srjvVi+JkXk= @@ -138,8 +142,9 @@ github.com/flosch/pongo2/v6 v6.0.0/go.mod h1:CuDpFm47R0uGGE7z13/tTlt1Y6zdxvr2RLT github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= -github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI= +github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU= github.com/getsentry/sentry-go v0.27.0 h1:Pv98CIbtB3LkMWmXi4Joa5OOcwbmnX88sF5qbK3r3Ps= github.com/getsentry/sentry-go v0.27.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY= github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA= @@ -211,6 +216,7 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= @@ -255,6 +261,8 @@ github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/ github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1 h1:6UKoz5ujsI55KNpsJH3UwCq3T8kKbZwNZBNPuTTje8U= github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1/go.mod h1:YvJ2f6MplWDhfxiUC3KpyTy76kYUZA4W3pTv/wdKQ9Y= +github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8= +github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4= github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48= github.com/hashicorp/go-hclog v0.9.2 h1:CG6TE5H9/JXsFWJCfoIVpKFIkFe6ysEuHirp4DxCsHI= @@ -592,6 +600,8 @@ github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM= github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA= github.com/samber/slog-echo v1.8.0 h1:DQQRtAliSvQw+ScEdu5gv3jbHu9cCTzvHuTD8GDv7zI= github.com/samber/slog-echo v1.8.0/go.mod h1:0ab2AwcciQXNAXEcjkHwD9okOh9vEHEYn8xP97ocuhM= +github.com/scylladb/gocql v1.14.4 h1:MhevwCfyAraQ6RvZYFO3pF4Lt0YhvQlfg8Eo2HEqVQA= +github.com/scylladb/gocql v1.14.4/go.mod h1:ZLEJ0EVE5JhmtxIW2stgHq/v1P4fWap0qyyXSKyV8K0= github.com/scylladb/termtables v0.0.0-20191203121021-c4c0b6d42ff4/go.mod h1:C1a7PQSMz9NShzorzCiG2fk9+xuCgLkPeCvMHYR2OWg= github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= @@ -808,6 +818,7 @@ golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= +golang.org/x/net v0.0.0-20220526153639-5463443f8c37/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY= @@ -1092,6 +1103,8 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= +gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= @@ -1126,3 +1139,5 @@ lukechampine.com/blake3 v1.2.1/go.mod h1:0OFRp7fBtAylGVCO40o87sbupkyIGgbpv1+M1k1 rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= +sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo= +sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8= diff --git a/indexer/crawler.go b/indexer/crawler.go index 526da9bb6..7e2656dd9 100644 --- a/indexer/crawler.go +++ b/indexer/crawler.go @@ -14,19 +14,22 @@ import ( ) type CrawlDispatcher struct { + // from Crawl() ingest chan *models.ActorInfo - repoSync chan *crawlWork - + // from AddToCatchupQueue() catchup chan *crawlWork + // from main loop to fetchWorker() + repoSync chan *crawlWork + complete chan models.Uid maplk sync.Mutex todo map[models.Uid]*crawlWork inProgress map[models.Uid]*crawlWork - doRepoCrawl func(context.Context, *crawlWork) error + repoFetcher CrawlRepoFetcher concurrency int @@ -35,7 +38,12 @@ type CrawlDispatcher struct { done chan struct{} } -func NewCrawlDispatcher(repoFn func(context.Context, *crawlWork) error, concurrency int, log *slog.Logger) (*CrawlDispatcher, error) { +// this is what we need of RepoFetcher +type CrawlRepoFetcher interface { + FetchAndIndexRepo(ctx context.Context, job *crawlWork) error +} + +func NewCrawlDispatcher(repoFetcher CrawlRepoFetcher, concurrency int, log *slog.Logger) (*CrawlDispatcher, error) { if concurrency < 1 { return nil, fmt.Errorf("must specify a non-zero positive integer for crawl dispatcher concurrency") } @@ -45,7 +53,7 @@ func NewCrawlDispatcher(repoFn func(context.Context, *crawlWork) error, concurre repoSync: make(chan *crawlWork), complete: make(chan models.Uid), catchup: make(chan *crawlWork), - doRepoCrawl: repoFn, + repoFetcher: repoFetcher, concurrency: concurrency, todo: make(map[models.Uid]*crawlWork), inProgress: make(map[models.Uid]*crawlWork), @@ -221,7 +229,7 @@ func (c *CrawlDispatcher) fetchWorker() { for { select { case job := <-c.repoSync: - if err := c.doRepoCrawl(context.TODO(), job); err != nil { + if err := c.repoFetcher.FetchAndIndexRepo(context.TODO(), job); err != nil { c.log.Error("failed to perform repo crawl", "did", job.act.Did, "err", err) } diff --git a/indexer/indexer.go b/indexer/indexer.go index e6a324e9e..6920c7fb6 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -69,7 +69,7 @@ func NewIndexer(db *gorm.DB, notifman notifs.NotificationManager, evtman *events } if crawl { - c, err := NewCrawlDispatcher(fetcher.FetchAndIndexRepo, fetcher.MaxConcurrency, ix.log) + c, err := NewCrawlDispatcher(fetcher, fetcher.MaxConcurrency, ix.log) if err != nil { return nil, err } diff --git a/indexer/repofetch.go b/indexer/repofetch.go index 8ce68bb5f..1e93612d8 100644 --- a/indexer/repofetch.go +++ b/indexer/repofetch.go @@ -141,8 +141,10 @@ func (rf *RepoFetcher) FetchAndIndexRepo(ctx context.Context, job *crawlWork) er } } + revp := &rev if rev == "" { span.SetAttributes(attribute.Bool("full", true)) + revp = nil } c := models.ClientForPds(&pds) @@ -153,7 +155,7 @@ func (rf *RepoFetcher) FetchAndIndexRepo(ctx context.Context, job *crawlWork) er return err } - if err := rf.repoman.ImportNewRepo(ctx, ai.Uid, ai.Did, bytes.NewReader(repo), &rev); err != nil { + if err := rf.repoman.ImportNewRepo(ctx, ai.Uid, ai.Did, bytes.NewReader(repo), revp); err != nil { span.RecordError(err) if ipld.IsNotFound(err) || errors.Is(err, io.EOF) || errors.Is(err, fs.ErrNotExist) { diff --git a/models/dbcid.go b/models/dbcid.go index 366a0e829..d64ae0bc6 100644 --- a/models/dbcid.go +++ b/models/dbcid.go @@ -4,6 +4,7 @@ import ( "database/sql/driver" "encoding/json" "fmt" + "github.com/gocql/gocql" "github.com/ipfs/go-cid" ) @@ -62,3 +63,15 @@ func (dbc *DbCID) UnmarshalJSON(b []byte) error { func (dbc *DbCID) GormDataType() string { return "bytes" } + +func (dbc *DbCID) MarshalCQL(info gocql.TypeInfo) ([]byte, error) { + return dbc.CID.Bytes(), nil +} +func (dbc *DbCID) UnmarshalCQL(info gocql.TypeInfo, data []byte) error { + xcid, err := cid.Cast(data) + if err != nil { + return err + } + dbc.CID = xcid + return nil +} diff --git a/models/models.go b/models/models.go index 9781e75bd..43b6690f9 100644 --- a/models/models.go +++ b/models/models.go @@ -104,7 +104,7 @@ type FollowRecord struct { type PDS struct { gorm.Model - Host string + Host string `gorm:"unique"` Did string SSL bool Cursor int64 @@ -119,6 +119,9 @@ type PDS struct { HourlyEventLimit int64 DailyEventLimit int64 + + // We accept events from this firehose source even if they are not from the origin PDS + RelayAllowed bool } func ClientForPds(pds *PDS) *xrpc.Client { diff --git a/repo/repo.go b/repo/repo.go index acdafcce6..f4e683f4c 100644 --- a/repo/repo.go +++ b/repo/repo.go @@ -80,7 +80,7 @@ func IngestRepo(ctx context.Context, bs blockstore.Blockstore, r io.Reader) (cid br, err := car.NewBlockReader(r) if err != nil { - return cid.Undef, err + return cid.Undef, fmt.Errorf("IngestRepo:NewBlockReader: %w", err) } for { @@ -89,11 +89,11 @@ func IngestRepo(ctx context.Context, bs blockstore.Blockstore, r io.Reader) (cid if err == io.EOF { break } - return cid.Undef, err + return cid.Undef, fmt.Errorf("IngestRepo:Next: %w", err) } if err := bs.Put(ctx, blk); err != nil { - return cid.Undef, err + return cid.Undef, fmt.Errorf("IngestRepo:Put: %w", err) } } @@ -104,7 +104,7 @@ func ReadRepoFromCar(ctx context.Context, r io.Reader) (*Repo, error) { bs := blockstore.NewBlockstore(datastore.NewMapDatastore()) root, err := IngestRepo(ctx, bs, r) if err != nil { - return nil, err + return nil, fmt.Errorf("ReadRepoFromCar:IngestRepo: %w", err) } return OpenRepo(ctx, bs, root) diff --git a/repomgr/repomgr.go b/repomgr/repomgr.go index df67ae283..c7cacef01 100644 --- a/repomgr/repomgr.go +++ b/repomgr/repomgr.go @@ -912,6 +912,9 @@ func (rm *RepoManager) ImportNewRepo(ctx context.Context, user models.Uid, repoD return err } + if rev != nil && *rev == "" { + rev = nil + } if rev == nil { // if 'rev' is nil, this implies a fresh sync. // in this case, ignore any existing blocks we have and treat this like a clean import. diff --git a/testing/integ_test.go b/testing/integ_test.go index fd5533932..156003749 100644 --- a/testing/integ_test.go +++ b/testing/integ_test.go @@ -3,8 +3,6 @@ package testing import ( "bytes" "context" - "encoding/json" - "fmt" "math/rand" "strings" "testing" @@ -52,13 +50,13 @@ func testRelayBasic(t *testing.T, archive bool) { defer evts.Cancel() bob := p1.MustNewUser(t, "bob.tpds") - fmt.Println("event 1") + t.Log("event 1") e1 := evts.Next() assert.NotNil(e1.RepoCommit) assert.Equal(e1.RepoCommit.Repo, bob.DID()) alice := p1.MustNewUser(t, "alice.tpds") - fmt.Println("event 2") + t.Log("event 2") e2 := evts.Next() assert.NotNil(e2.RepoCommit) assert.Equal(e2.RepoCommit.Repo, alice.DID()) @@ -69,14 +67,14 @@ func testRelayBasic(t *testing.T, archive bool) { _ = bp1 _ = ap1 - fmt.Println("bob:", bob.DID()) - fmt.Println("event 3") + t.Log("bob:", bob.DID()) + t.Log("event 3") e3 := evts.Next() assert.Equal(e3.RepoCommit.Repo, bob.DID()) //assert.Equal(e3.RepoCommit.Ops[0].Kind, "createRecord") - fmt.Println("alice:", alice.DID()) - fmt.Println("event 4") + t.Log("alice:", alice.DID()) + t.Log("event 4") e4 := evts.Next() assert.Equal(e4.RepoCommit.Repo, alice.DID()) //assert.Equal(e4.RepoCommit.Ops[0].Kind, "createRecord") @@ -85,7 +83,7 @@ func testRelayBasic(t *testing.T, archive bool) { pbevts := b1.Events(t, 2) defer pbevts.Cancel() - fmt.Println("event 5") + t.Log("event 5") pbe1 := pbevts.Next() assert.Equal(*e3, *pbe1) } @@ -294,11 +292,11 @@ func TestHandleChange(t *testing.T) { time.Sleep(time.Millisecond * 100) initevt := evts.Next() - fmt.Println(initevt.RepoCommit) + t.Log(initevt.RepoCommit) hcevt := evts.Next() - fmt.Println(hcevt.RepoHandle) + t.Log(hcevt.RepoHandle) idevt := evts.Next() - fmt.Println(idevt.RepoIdentity) + t.Log(idevt.RepoIdentity) } func TestAccountEvent(t *testing.T) { @@ -336,46 +334,46 @@ func TestAccountEvent(t *testing.T) { time.Sleep(time.Millisecond * 100) initevt := evts.Next() - fmt.Println(initevt.RepoCommit) + t.Log(initevt.RepoCommit) // Takedown acevt := evts.Next() - fmt.Println(acevt.RepoAccount) + t.Log(acevt.RepoAccount) assert.Equal(acevt.RepoAccount.Did, u.DID()) assert.Equal(acevt.RepoAccount.Active, false) assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusTakendown) // Reactivate acevt = evts.Next() - fmt.Println(acevt.RepoAccount) + t.Log(acevt.RepoAccount) assert.Equal(acevt.RepoAccount.Did, u.DID()) assert.Equal(acevt.RepoAccount.Active, true) assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusActive) // Deactivate acevt = evts.Next() - fmt.Println(acevt.RepoAccount) + t.Log(acevt.RepoAccount) assert.Equal(acevt.RepoAccount.Did, u.DID()) assert.Equal(acevt.RepoAccount.Active, false) assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusDeactivated) // Reactivate acevt = evts.Next() - fmt.Println(acevt.RepoAccount) + t.Log(acevt.RepoAccount) assert.Equal(acevt.RepoAccount.Did, u.DID()) assert.Equal(acevt.RepoAccount.Active, true) assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusActive) // Suspend acevt = evts.Next() - fmt.Println(acevt.RepoAccount) + t.Log(acevt.RepoAccount) assert.Equal(acevt.RepoAccount.Did, u.DID()) assert.Equal(acevt.RepoAccount.Active, false) assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusSuspended) // Reactivate acevt = evts.Next() - fmt.Println(acevt.RepoAccount) + t.Log(acevt.RepoAccount) assert.Equal(acevt.RepoAccount.Did, u.DID()) assert.Equal(acevt.RepoAccount.Active, true) assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusActive) @@ -387,7 +385,7 @@ func TestAccountEvent(t *testing.T) { time.Sleep(time.Millisecond * 20) acevt = evts.Next() - fmt.Println(acevt.RepoAccount) + t.Log(acevt.RepoAccount) assert.Equal(acevt.RepoAccount.Did, u.DID()) assert.Equal(acevt.RepoAccount.Active, false) assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusTakendown) @@ -399,7 +397,7 @@ func TestAccountEvent(t *testing.T) { time.Sleep(time.Millisecond * 20) acevt = evts.Next() - fmt.Println(acevt.RepoAccount) + t.Log(acevt.RepoAccount) assert.Equal(acevt.RepoAccount.Did, u.DID()) assert.Equal(acevt.RepoAccount.Active, true) assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusActive) @@ -469,11 +467,6 @@ func testRelayTakedown(t *testing.T, archive bool) { assert.Equal(alice.did, last.RepoCommit.Repo) } -func jsonPrint(v any) { - b, _ := json.Marshal(v) - fmt.Println(string(b)) -} - func commitFromSlice(t *testing.T, slice []byte, rcid cid.Cid) *repo.SignedCommit { carr, err := car.NewCarReader(bytes.NewReader(slice)) if err != nil { @@ -560,11 +553,11 @@ func TestRelayHandleEmptyEvent(t *testing.T) { defer evts.Cancel() bob := p1.MustNewUser(t, "bob.tpds") - fmt.Println("event 1") + t.Log("event 1") e1 := evts.Next() assert.NotNil(e1.RepoCommit) assert.Equal(e1.RepoCommit.Repo, bob.DID()) - fmt.Println(e1.RepoCommit.Ops[0]) + t.Log(e1.RepoCommit.Ops[0]) ctx := context.TODO() rm := p1.server.Repoman() @@ -573,7 +566,7 @@ func TestRelayHandleEmptyEvent(t *testing.T) { } e2 := evts.Next() - //fmt.Println(e2.RepoCommit.Ops[0]) + //t.Log(e2.RepoCommit.Ops[0]) assert.Equal(len(e2.RepoCommit.Ops), 0) assert.Equal(e2.RepoCommit.Repo, bob.DID()) } diff --git a/testing/utils.go b/testing/utils.go index cd9571463..a9aee787c 100644 --- a/testing/utils.go +++ b/testing/utils.go @@ -210,12 +210,14 @@ func (tp *TestPDS) BumpLimits(t *testing.T, b *TestRelay) { } limReqBody := bgs.RateLimitChangeRequest{ - Host: u.Host, - PerSecond: 5_000, - PerHour: 100_000, - PerDay: 1_000_000, - RepoLimit: 500_000, - CrawlRate: 50_000, + Host: u.Host, + PDSRates: bgs.PDSRates{ + PerSecond: 5_000, + PerHour: 100_000, + PerDay: 1_000_000, + RepoLimit: 500_000, + CrawlRate: 50_000, + }, } // JSON encode the request body @@ -593,7 +595,7 @@ func SetupRelay(ctx context.Context, didr plc.PLCClient, archive bool) (*TestRel tr := &api.TestHandleResolver{} bgsConfig := bgs.DefaultBGSConfig() - bgsConfig.SSL = false + bgsConfig.SSL = bgs.SlurperDisableSSL b, err := bgs.NewBGS(maindb, ix, repoman, evtman, didr, rf, tr, bgsConfig) if err != nil { return nil, err diff --git a/ts/bgs-dash/src/components/Dash/Dash.tsx b/ts/bgs-dash/src/components/Dash/Dash.tsx index 90aff0bed..8f9318032 100644 --- a/ts/bgs-dash/src/components/Dash/Dash.tsx +++ b/ts/bgs-dash/src/components/Dash/Dash.tsx @@ -283,10 +283,11 @@ const Dash: FC<{}> = () => { } const requestCrawlHost = (host: string) => { - fetch(`${RELAY_HOST}/xrpc/com.atproto.sync.requestCrawl`, { + fetch(`${RELAY_HOST}/admin/pds/requestCrawl`, { method: "POST", headers: { "Content-Type": "application/json", + Authorization: `Bearer ${adminToken}`, }, body: JSON.stringify({ hostname: host, @@ -391,6 +392,7 @@ const Dash: FC<{}> = () => { per_day: pds.PerDayEventRate.Max, crawl_rate: pds.CrawlRate.Max, repo_limit: pds.RepoLimit, + relay_allowed: pds.RelayAllowed, }), } ).then((res) => { @@ -862,6 +864,14 @@ const Dash: FC<{}> = () => { Repo Limit +