diff --git a/bgs/admin.go b/bgs/admin.go index 1d415d66a..8f6d2ec35 100644 --- a/bgs/admin.go +++ b/bgs/admin.go @@ -363,6 +363,8 @@ type RateLimitChangeRequest struct { PerDay int64 `json:"per_day"` CrawlRate int64 `json:"crawl_rate"` RepoLimit int64 `json:"repo_limit"` + + RelayAllowed bool `json:"relay_allowed"` } func (bgs *BGS) handleAdminChangePDSRateLimits(e echo.Context) error { @@ -383,6 +385,7 @@ func (bgs *BGS) handleAdminChangePDSRateLimits(e echo.Context) error { pds.DailyEventLimit = body.PerDay pds.CrawlRateLimit = float64(body.CrawlRate) pds.RepoLimit = body.RepoLimit + pds.RelayAllowed = body.RelayAllowed if err := bgs.db.Save(&pds).Error; err != nil { return echo.NewHTTPError(http.StatusInternalServerError, fmt.Errorf("failed to save rate limit changes: %w", err)) @@ -610,27 +613,16 @@ func (bgs *BGS) handleAdminRequestCrawl(e echo.Context) error { return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname") } - if !strings.HasPrefix(host, "http://") && !strings.HasPrefix(host, "https://") { - if bgs.ssl { - host = "https://" + host - } else { - host = "http://" + host - } - } + host, sslUrl := bgs.newPdsHostPrefixNormalize(host) + if !bgs.config.SSL.AdminOk(sslUrl) { + return echo.NewHTTPError(http.StatusBadRequest, "ssl is %s but got host=%#v", bgs.config.SSL.String(), host) + } u, err := url.Parse(host) if err != nil { return echo.NewHTTPError(http.StatusBadRequest, "failed to parse hostname") } - if u.Scheme == "http" && bgs.ssl { - return echo.NewHTTPError(http.StatusBadRequest, "this server requires https") - } - - if u.Scheme == "https" && !bgs.ssl { - return echo.NewHTTPError(http.StatusBadRequest, "this server does not support https") - } - if u.Path != "" { return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname without path") } @@ -648,5 +640,5 @@ func (bgs *BGS) handleAdminRequestCrawl(e echo.Context) error { // Skip checking if the server is online for now - return bgs.slurper.SubscribeToPds(ctx, host, true, true) // Override Trusted Domain Check + return bgs.slurper.SubscribeToPds(ctx, host, true, true, sslUrl) // Override Trusted Domain Check } diff --git a/bgs/bgs.go b/bgs/bgs.go index 9f14492b5..81ba50fe9 100644 --- a/bgs/bgs.go +++ b/bgs/bgs.go @@ -65,9 +65,9 @@ type BGS struct { hr api.HandleResolver - // TODO: work on doing away with this flag in favor of more pluggable - // pieces that abstract the need for explicit ssl checks - ssl bool + ssl SlurperSSLStance + + config BGSConfig crawlOnly bool @@ -117,12 +117,13 @@ type SocketConsumer struct { } type BGSConfig struct { - SSL bool + SSL SlurperSSLStance CompactInterval time.Duration DefaultRepoLimit int64 ConcurrencyPerPDS int64 MaxQueuePerPDS int64 NumCompactionWorkers int + VerboseAPILog bool // NextCrawlers gets forwarded POST /xrpc/com.atproto.sync.requestCrawl NextCrawlers []*url.URL @@ -130,7 +131,7 @@ type BGSConfig struct { func DefaultBGSConfig() *BGSConfig { return &BGSConfig{ - SSL: true, + SSL: SlurperRequireSSL, CompactInterval: 4 * time.Hour, DefaultRepoLimit: 100, ConcurrencyPerPDS: 100, @@ -161,6 +162,7 @@ func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtm events: evtman, didr: didr, ssl: config.SSL, + config: *config, consumersLk: sync.RWMutex{}, consumers: make(map[uint64]*SocketConsumer), @@ -317,7 +319,7 @@ func (bgs *BGS) StartWithListener(listen net.Listener) error { AllowHeaders: []string{echo.HeaderOrigin, echo.HeaderContentType, echo.HeaderAccept, echo.HeaderAuthorization}, })) - if !bgs.ssl { + if bgs.config.VerboseAPILog { e.Use(middleware.LoggerWithConfig(middleware.LoggerConfig{ Format: "method=${method}, uri=${uri}, status=${status} latency=${latency_human}\n", })) @@ -941,7 +943,9 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event return fmt.Errorf("rebase was true in event seq:%d,host:%s", evt.Seq, host.Host) } - if host.ID != u.PDS && u.PDS != 0 { + if host.RelayAllowed { + // don't check that source is canonical PDS, allow intermediate relays + } else if host.ID != u.PDS && u.PDS != 0 { bgs.log.Warn("received event for repo from different pds than expected", "repo", evt.Repo, "expPds", u.PDS, "gotPds", host.Host) // Flush any cached DID documents for this user bgs.didr.FlushCacheFor(env.RepoCommit.Repo) @@ -1273,7 +1277,7 @@ func (s *BGS) createExternalUser(ctx context.Context, did string) (*models.Actor peering.DailyEventLimit = s.slurper.DefaultPerDayLimit peering.RepoLimit = s.slurper.DefaultRepoLimit - if s.ssl && !peering.SSL { + if (s.ssl == SlurperRequireSSL) && !peering.SSL { return nil, fmt.Errorf("did references non-ssl PDS, this is disallowed in prod: %q %q", did, svc.ServiceEndpoint) } diff --git a/bgs/fedmgr.go b/bgs/fedmgr.go index 10212f0a4..b0fc77d15 100644 --- a/bgs/fedmgr.go +++ b/bgs/fedmgr.go @@ -55,7 +55,7 @@ type Slurper struct { shutdownChan chan bool shutdownResult chan []error - ssl bool + ssl SlurperSSLStance } type Limiters struct { @@ -64,8 +64,71 @@ type Limiters struct { PerDay *slidingwindow.Limiter } +type SlurperSSLStance int + +const ( + // ALL upstreams must be wss:// and https:// + SlurperRequireSSL SlurperSSLStance = 1 + + // NO upstreams may be wss:// and https:// + SlurperDisableSSL SlurperSSLStance = 2 + + // Anything is allowed! + SlurperMixedSSL SlurperSSLStance = 3 + + // upstreams set by /admin/* may be anything, other things must be wss:// https:// + SlurperRequireExternalSSL SlurperSSLStance = 4 +) + +// return true if an external requestCrawl meets SSL policy +func (sssl SlurperSSLStance) ExternalOk(ssl bool) bool { + switch sssl { + case SlurperRequireSSL: + return ssl + case SlurperDisableSSL: + return !ssl + case SlurperRequireExternalSSL: + return ssl + case SlurperMixedSSL: + return true + default: + slog.Error("unknown ssl policy", "ssl", sssl) + return true + } +} + +// return true if admin action meets SSL policy +func (sssl SlurperSSLStance) AdminOk(ssl bool) bool { + switch sssl { + case SlurperRequireSSL: + return ssl + case SlurperDisableSSL: + return !ssl + case SlurperRequireExternalSSL, SlurperMixedSSL: + return true + default: + slog.Error("unknown ssl policy", "ssl", sssl) + return true + } +} + +func (sssl SlurperSSLStance) String() string { + switch sssl { + case SlurperRequireSSL: + return "required" + case SlurperRequireExternalSSL: + return "external required" + case SlurperDisableSSL: + return "disabled" + case SlurperMixedSSL: + return "mixed" + default: + return fmt.Sprintf("slurper_ssl_%d", int(sssl)) + } +} + type SlurperOptions struct { - SSL bool + SSL SlurperSSLStance DefaultPerSecondLimit int64 DefaultPerHourLimit int64 DefaultPerDayLimit int64 @@ -77,7 +140,7 @@ type SlurperOptions struct { func DefaultSlurperOptions() *SlurperOptions { return &SlurperOptions{ - SSL: false, + SSL: SlurperDisableSSL, DefaultPerSecondLimit: 50, DefaultPerHourLimit: 2500, DefaultPerDayLimit: 20_000, @@ -363,7 +426,7 @@ func (s *Slurper) canSlurpHost(host string) bool { return !s.newSubsDisabled } -func (s *Slurper) SubscribeToPds(ctx context.Context, host string, reg bool, adminOverride bool) error { +func (s *Slurper) SubscribeToPds(ctx context.Context, host string, reg bool, adminOverride, sslUrl bool) error { // TODO: for performance, lock on the hostname instead of global s.lk.Lock() defer s.lk.Unlock() @@ -389,7 +452,7 @@ func (s *Slurper) SubscribeToPds(ctx context.Context, host string, reg bool, adm // New PDS! npds := models.PDS{ Host: host, - SSL: s.ssl, + SSL: sslUrl, Registered: reg, RateLimit: float64(s.DefaultPerSecondLimit), HourlyEventLimit: s.DefaultPerHourLimit, @@ -467,7 +530,7 @@ func (s *Slurper) subscribeWithRedialer(ctx context.Context, host *models.PDS, s } protocol := "ws" - if s.ssl { + if host.SSL { protocol = "wss" } diff --git a/bgs/handlers.go b/bgs/handlers.go index 0e46f0043..b1723eb09 100644 --- a/bgs/handlers.go +++ b/bgs/handlers.go @@ -128,18 +128,31 @@ func (s *BGS) handleComAtprotoSyncGetBlocks(ctx context.Context, cids []string, return nil, fmt.Errorf("NYI") } +func (bgs *BGS) newPdsHostPrefixNormalize(host string) (hostN string, sslUrl bool) { + sslUrl = false + if strings.HasPrefix(host, "http://") || strings.HasPrefix(host, "ws://") { + } else if strings.HasPrefix(host, "https://") || strings.HasPrefix(host, "wss://") { + sslUrl = true + } else { + if bgs.config.SSL == SlurperDisableSSL { + host = "http://" + host + } else { + host = "https://" + host + sslUrl = true + } + } + return host, sslUrl +} + func (s *BGS) handleComAtprotoSyncRequestCrawl(ctx context.Context, body *comatprototypes.SyncRequestCrawl_Input) error { host := body.Hostname if host == "" { return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname") } - if !strings.HasPrefix(host, "http://") && !strings.HasPrefix(host, "https://") { - if s.ssl { - host = "https://" + host - } else { - host = "http://" + host - } + host, sslUrl := s.newPdsHostPrefixNormalize(host) + if !s.config.SSL.ExternalOk(sslUrl) { + return echo.NewHTTPError(http.StatusBadRequest, "server ssl policy is %s", s.config.SSL.String()) } u, err := url.Parse(host) @@ -147,14 +160,6 @@ func (s *BGS) handleComAtprotoSyncRequestCrawl(ctx context.Context, body *comatp return echo.NewHTTPError(http.StatusBadRequest, "failed to parse hostname") } - if u.Scheme == "http" && s.ssl { - return echo.NewHTTPError(http.StatusBadRequest, "this server requires https") - } - - if u.Scheme == "https" && !s.ssl { - return echo.NewHTTPError(http.StatusBadRequest, "this server does not support https") - } - if u.Path != "" { return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname without path") } @@ -212,7 +217,7 @@ func (s *BGS) handleComAtprotoSyncRequestCrawl(ctx context.Context, body *comatp } } - return s.slurper.SubscribeToPds(ctx, host, true, false) + return s.slurper.SubscribeToPds(ctx, host, true, false, sslUrl) } func (s *BGS) handleComAtprotoSyncNotifyOfUpdate(ctx context.Context, body *comatprototypes.SyncNotifyOfUpdate_Input) error { diff --git a/cmd/bigsky/main.go b/cmd/bigsky/main.go index 33c798508..62982ec1c 100644 --- a/cmd/bigsky/main.go +++ b/cmd/bigsky/main.go @@ -99,6 +99,12 @@ func run(args []string) error { Name: "crawl-insecure-ws", Usage: "when connecting to PDS instances, use ws:// instead of wss://", }, + &cli.StringFlag{ + Name: "crawl-ssl-policy", + Usage: "when connecting to PDS instances, 'any' allows either ws:// or wss://, 'require' requires wss://, 'prod' requires wss:// of external requestCrawl hosts, 'none' rejects wss:// connections", + Value: "prod", + EnvVars: []string{"RELAY_CRAWL_NONSSL"}, + }, &cli.BoolFlag{ Name: "spidering", Value: false, @@ -479,12 +485,30 @@ func runBigsky(cctx *cli.Context) error { slog.Info("constructing bgs") bgsConfig := libbgs.DefaultBGSConfig() - bgsConfig.SSL = !cctx.Bool("crawl-insecure-ws") + switch strings.ToLower(cctx.String("crawl-ssl-policy")) { + case "any": + bgsConfig.SSL = libbgs.SlurperMixedSSL + case "none": + bgsConfig.SSL = libbgs.SlurperDisableSSL + case "require": + bgsConfig.SSL = libbgs.SlurperRequireSSL + case "prod": + bgsConfig.SSL = libbgs.SlurperRequireExternalSSL + case "": + if cctx.Bool("crawl-insecure-ws") { + bgsConfig.SSL = libbgs.SlurperDisableSSL + } else { + bgsConfig.SSL = libbgs.SlurperRequireSSL + } + default: + return fmt.Errorf("crawl-ssl-policy/RELAY_CRAWL_NONSSL exepected any|none|require|prod, got %s", cctx.String("crawl-ssl-policy")) + } bgsConfig.CompactInterval = cctx.Duration("compact-interval") bgsConfig.ConcurrencyPerPDS = cctx.Int64("concurrency-per-pds") bgsConfig.MaxQueuePerPDS = cctx.Int64("max-queue-per-pds") bgsConfig.DefaultRepoLimit = cctx.Int64("default-repo-limit") bgsConfig.NumCompactionWorkers = cctx.Int("num-compaction-workers") + bgsConfig.VerboseAPILog = cctx.String("env") == "dev" nextCrawlers := cctx.StringSlice("next-crawler") if len(nextCrawlers) != 0 { nextCrawlerUrls := make([]*url.URL, len(nextCrawlers)) diff --git a/go.mod b/go.mod index 6b8e11548..ff5d582e8 100644 --- a/go.mod +++ b/go.mod @@ -87,6 +87,7 @@ require ( github.com/cockroachdb/redact v1.1.5 // indirect github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/fsnotify/fsnotify v1.5.4 // indirect github.com/getsentry/sentry-go v0.27.0 // indirect github.com/go-redis/redis v6.15.9+incompatible // indirect github.com/golang/snappy v0.0.4 // indirect diff --git a/go.sum b/go.sum index 5f651b904..dbfa36855 100644 --- a/go.sum +++ b/go.sum @@ -138,8 +138,9 @@ github.com/flosch/pongo2/v6 v6.0.0/go.mod h1:CuDpFm47R0uGGE7z13/tTlt1Y6zdxvr2RLT github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= -github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI= +github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU= github.com/getsentry/sentry-go v0.27.0 h1:Pv98CIbtB3LkMWmXi4Joa5OOcwbmnX88sF5qbK3r3Ps= github.com/getsentry/sentry-go v0.27.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY= github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA= diff --git a/models/models.go b/models/models.go index 9781e75bd..10007c54b 100644 --- a/models/models.go +++ b/models/models.go @@ -119,6 +119,9 @@ type PDS struct { HourlyEventLimit int64 DailyEventLimit int64 + + // We accept events from this firehose source even if they are not from the origin PDS + RelayAllowed bool } func ClientForPds(pds *PDS) *xrpc.Client { diff --git a/testing/utils.go b/testing/utils.go index d2847d026..363d392a6 100644 --- a/testing/utils.go +++ b/testing/utils.go @@ -593,7 +593,7 @@ func SetupRelay(ctx context.Context, didr plc.PLCClient, archive bool) (*TestRel tr := &api.TestHandleResolver{} bgsConfig := bgs.DefaultBGSConfig() - bgsConfig.SSL = false + bgsConfig.SSL = bgs.SlurperDisableSSL b, err := bgs.NewBGS(maindb, ix, repoman, evtman, didr, rf, tr, bgsConfig) if err != nil { return nil, err diff --git a/ts/bgs-dash/src/components/Dash/Dash.tsx b/ts/bgs-dash/src/components/Dash/Dash.tsx index 90aff0bed..8f9318032 100644 --- a/ts/bgs-dash/src/components/Dash/Dash.tsx +++ b/ts/bgs-dash/src/components/Dash/Dash.tsx @@ -283,10 +283,11 @@ const Dash: FC<{}> = () => { } const requestCrawlHost = (host: string) => { - fetch(`${RELAY_HOST}/xrpc/com.atproto.sync.requestCrawl`, { + fetch(`${RELAY_HOST}/admin/pds/requestCrawl`, { method: "POST", headers: { "Content-Type": "application/json", + Authorization: `Bearer ${adminToken}`, }, body: JSON.stringify({ hostname: host, @@ -391,6 +392,7 @@ const Dash: FC<{}> = () => { per_day: pds.PerDayEventRate.Max, crawl_rate: pds.CrawlRate.Max, repo_limit: pds.RepoLimit, + relay_allowed: pds.RelayAllowed, }), } ).then((res) => { @@ -862,6 +864,14 @@ const Dash: FC<{}> = () => { Repo Limit + + + Allow Relay + + = () => { /> + + { + pds.RelayAllowed = (document.getElementById(`relay-allow-${pds.ID}`) as HTMLInputElement).checked; + updateRateLimits(pds); + }} + checked={pds.RelayAllowed} + > + {new Date(Date.parse(pds.CreatedAt)).toLocaleString()} diff --git a/ts/bgs-dash/src/models/pds.ts b/ts/bgs-dash/src/models/pds.ts index faf1055d7..934693974 100644 --- a/ts/bgs-dash/src/models/pds.ts +++ b/ts/bgs-dash/src/models/pds.ts @@ -22,6 +22,7 @@ interface PDS { PerDayEventRate: RateLimit; RepoCount: number; RepoLimit: number; + RelayAllowed: boolean; } type PDSKey = keyof PDS;