Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow admin to add PDS that are ssl or not #904

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 8 additions & 16 deletions bgs/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,8 @@ type RateLimitChangeRequest struct {
PerDay int64 `json:"per_day"`
CrawlRate int64 `json:"crawl_rate"`
RepoLimit int64 `json:"repo_limit"`

RelayAllowed bool `json:"relay_allowed"`
}

func (bgs *BGS) handleAdminChangePDSRateLimits(e echo.Context) error {
Expand All @@ -383,6 +385,7 @@ func (bgs *BGS) handleAdminChangePDSRateLimits(e echo.Context) error {
pds.DailyEventLimit = body.PerDay
pds.CrawlRateLimit = float64(body.CrawlRate)
pds.RepoLimit = body.RepoLimit
pds.RelayAllowed = body.RelayAllowed

if err := bgs.db.Save(&pds).Error; err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, fmt.Errorf("failed to save rate limit changes: %w", err))
Expand Down Expand Up @@ -610,27 +613,16 @@ func (bgs *BGS) handleAdminRequestCrawl(e echo.Context) error {
return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname")
}

if !strings.HasPrefix(host, "http://") && !strings.HasPrefix(host, "https://") {
if bgs.ssl {
host = "https://" + host
} else {
host = "http://" + host
}
}
host, sslUrl := bgs.newPdsHostPrefixNormalize(host)

if !bgs.config.SSL.AdminOk(sslUrl) {
return echo.NewHTTPError(http.StatusBadRequest, "ssl is %s but got host=%#v", bgs.config.SSL.String(), host)
}
u, err := url.Parse(host)
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest, "failed to parse hostname")
}

if u.Scheme == "http" && bgs.ssl {
return echo.NewHTTPError(http.StatusBadRequest, "this server requires https")
}

if u.Scheme == "https" && !bgs.ssl {
return echo.NewHTTPError(http.StatusBadRequest, "this server does not support https")
}

if u.Path != "" {
return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname without path")
}
Expand All @@ -648,5 +640,5 @@ func (bgs *BGS) handleAdminRequestCrawl(e echo.Context) error {

// Skip checking if the server is online for now

return bgs.slurper.SubscribeToPds(ctx, host, true, true) // Override Trusted Domain Check
return bgs.slurper.SubscribeToPds(ctx, host, true, true, sslUrl) // Override Trusted Domain Check
}
20 changes: 12 additions & 8 deletions bgs/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ type BGS struct {

hr api.HandleResolver

// TODO: work on doing away with this flag in favor of more pluggable
// pieces that abstract the need for explicit ssl checks
ssl bool
ssl SlurperSSLStance

config BGSConfig

crawlOnly bool

Expand Down Expand Up @@ -117,20 +117,21 @@ type SocketConsumer struct {
}

type BGSConfig struct {
SSL bool
SSL SlurperSSLStance
CompactInterval time.Duration
DefaultRepoLimit int64
ConcurrencyPerPDS int64
MaxQueuePerPDS int64
NumCompactionWorkers int
VerboseAPILog bool

// NextCrawlers gets forwarded POST /xrpc/com.atproto.sync.requestCrawl
NextCrawlers []*url.URL
}

func DefaultBGSConfig() *BGSConfig {
return &BGSConfig{
SSL: true,
SSL: SlurperRequireSSL,
CompactInterval: 4 * time.Hour,
DefaultRepoLimit: 100,
ConcurrencyPerPDS: 100,
Expand Down Expand Up @@ -161,6 +162,7 @@ func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtm
events: evtman,
didr: didr,
ssl: config.SSL,
config: *config,

consumersLk: sync.RWMutex{},
consumers: make(map[uint64]*SocketConsumer),
Expand Down Expand Up @@ -317,7 +319,7 @@ func (bgs *BGS) StartWithListener(listen net.Listener) error {
AllowHeaders: []string{echo.HeaderOrigin, echo.HeaderContentType, echo.HeaderAccept, echo.HeaderAuthorization},
}))

if !bgs.ssl {
if bgs.config.VerboseAPILog {
e.Use(middleware.LoggerWithConfig(middleware.LoggerConfig{
Format: "method=${method}, uri=${uri}, status=${status} latency=${latency_human}\n",
}))
Expand Down Expand Up @@ -941,7 +943,9 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event
return fmt.Errorf("rebase was true in event seq:%d,host:%s", evt.Seq, host.Host)
}

if host.ID != u.PDS && u.PDS != 0 {
if host.RelayAllowed {
// don't check that source is canonical PDS, allow intermediate relays
} else if host.ID != u.PDS && u.PDS != 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the implications here of skipping the createExternalUser bit? Do we end up not creating a row for users that are being echoed by a Relay?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah
image

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a little weird, as it turns out that narelay does keep a little bit of per-user state so that it can emit a warning if there's a discontinuity in a user's feed ( See #915 )

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'd still probably want to keep track of the users being proxied by a Relay so we can ban them and/or change their upstream status and/or count them right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mehhhh, ok, this needs some more consideration, there's some useful stuff inside .createExternalUser() and I can't just throw out the path of code that was under "user from weird place, wat"

bgs.log.Warn("received event for repo from different pds than expected", "repo", evt.Repo, "expPds", u.PDS, "gotPds", host.Host)
// Flush any cached DID documents for this user
bgs.didr.FlushCacheFor(env.RepoCommit.Repo)
Expand Down Expand Up @@ -1273,7 +1277,7 @@ func (s *BGS) createExternalUser(ctx context.Context, did string) (*models.Actor
peering.DailyEventLimit = s.slurper.DefaultPerDayLimit
peering.RepoLimit = s.slurper.DefaultRepoLimit

if s.ssl && !peering.SSL {
if (s.ssl == SlurperRequireSSL) && !peering.SSL {
return nil, fmt.Errorf("did references non-ssl PDS, this is disallowed in prod: %q %q", did, svc.ServiceEndpoint)
}

Expand Down
75 changes: 69 additions & 6 deletions bgs/fedmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type Slurper struct {
shutdownChan chan bool
shutdownResult chan []error

ssl bool
ssl SlurperSSLStance
}

type Limiters struct {
Expand All @@ -64,8 +64,71 @@ type Limiters struct {
PerDay *slidingwindow.Limiter
}

type SlurperSSLStance int

const (
// ALL upstreams must be wss:// and https://
SlurperRequireSSL SlurperSSLStance = 1

// NO upstreams may be wss:// and https://
SlurperDisableSSL SlurperSSLStance = 2

// Anything is allowed!
SlurperMixedSSL SlurperSSLStance = 3

// upstreams set by /admin/* may be anything, other things must be wss:// https://
SlurperRequireExternalSSL SlurperSSLStance = 4
)

// return true if an external requestCrawl meets SSL policy
func (sssl SlurperSSLStance) ExternalOk(ssl bool) bool {
switch sssl {
case SlurperRequireSSL:
return ssl
case SlurperDisableSSL:
return !ssl
case SlurperRequireExternalSSL:
return ssl
case SlurperMixedSSL:
return true
default:
slog.Error("unknown ssl policy", "ssl", sssl)
return true
}
}

// return true if admin action meets SSL policy
func (sssl SlurperSSLStance) AdminOk(ssl bool) bool {
switch sssl {
case SlurperRequireSSL:
return ssl
case SlurperDisableSSL:
return !ssl
case SlurperRequireExternalSSL, SlurperMixedSSL:
return true
default:
slog.Error("unknown ssl policy", "ssl", sssl)
return true
}
}

func (sssl SlurperSSLStance) String() string {
switch sssl {
case SlurperRequireSSL:
return "required"
case SlurperRequireExternalSSL:
return "external required"
case SlurperDisableSSL:
return "disabled"
case SlurperMixedSSL:
return "mixed"
default:
return fmt.Sprintf("slurper_ssl_%d", int(sssl))
}
}

type SlurperOptions struct {
SSL bool
SSL SlurperSSLStance
DefaultPerSecondLimit int64
DefaultPerHourLimit int64
DefaultPerDayLimit int64
Expand All @@ -77,7 +140,7 @@ type SlurperOptions struct {

func DefaultSlurperOptions() *SlurperOptions {
return &SlurperOptions{
SSL: false,
SSL: SlurperDisableSSL,
DefaultPerSecondLimit: 50,
DefaultPerHourLimit: 2500,
DefaultPerDayLimit: 20_000,
Expand Down Expand Up @@ -363,7 +426,7 @@ func (s *Slurper) canSlurpHost(host string) bool {
return !s.newSubsDisabled
}

func (s *Slurper) SubscribeToPds(ctx context.Context, host string, reg bool, adminOverride bool) error {
func (s *Slurper) SubscribeToPds(ctx context.Context, host string, reg bool, adminOverride, sslUrl bool) error {
// TODO: for performance, lock on the hostname instead of global
s.lk.Lock()
defer s.lk.Unlock()
Expand All @@ -389,7 +452,7 @@ func (s *Slurper) SubscribeToPds(ctx context.Context, host string, reg bool, adm
// New PDS!
npds := models.PDS{
Host: host,
SSL: s.ssl,
SSL: sslUrl,
Registered: reg,
RateLimit: float64(s.DefaultPerSecondLimit),
HourlyEventLimit: s.DefaultPerHourLimit,
Expand Down Expand Up @@ -467,7 +530,7 @@ func (s *Slurper) subscribeWithRedialer(ctx context.Context, host *models.PDS, s
}

protocol := "ws"
if s.ssl {
if host.SSL {
protocol = "wss"
}

Expand Down
35 changes: 20 additions & 15 deletions bgs/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,33 +128,38 @@ func (s *BGS) handleComAtprotoSyncGetBlocks(ctx context.Context, cids []string,
return nil, fmt.Errorf("NYI")
}

func (bgs *BGS) newPdsHostPrefixNormalize(host string) (hostN string, sslUrl bool) {
sslUrl = false
if strings.HasPrefix(host, "http://") || strings.HasPrefix(host, "ws://") {
} else if strings.HasPrefix(host, "https://") || strings.HasPrefix(host, "wss://") {
sslUrl = true
} else {
if bgs.config.SSL == SlurperDisableSSL {
host = "http://" + host
} else {
host = "https://" + host
sslUrl = true
}
}
return host, sslUrl
}

func (s *BGS) handleComAtprotoSyncRequestCrawl(ctx context.Context, body *comatprototypes.SyncRequestCrawl_Input) error {
host := body.Hostname
if host == "" {
return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname")
}

if !strings.HasPrefix(host, "http://") && !strings.HasPrefix(host, "https://") {
if s.ssl {
host = "https://" + host
} else {
host = "http://" + host
}
host, sslUrl := s.newPdsHostPrefixNormalize(host)
if !s.config.SSL.ExternalOk(sslUrl) {
return echo.NewHTTPError(http.StatusBadRequest, "server ssl policy is %s", s.config.SSL.String())
}

u, err := url.Parse(host)
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest, "failed to parse hostname")
}

if u.Scheme == "http" && s.ssl {
return echo.NewHTTPError(http.StatusBadRequest, "this server requires https")
}

if u.Scheme == "https" && !s.ssl {
return echo.NewHTTPError(http.StatusBadRequest, "this server does not support https")
}

if u.Path != "" {
return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname without path")
}
Expand Down Expand Up @@ -212,7 +217,7 @@ func (s *BGS) handleComAtprotoSyncRequestCrawl(ctx context.Context, body *comatp
}
}

return s.slurper.SubscribeToPds(ctx, host, true, false)
return s.slurper.SubscribeToPds(ctx, host, true, false, sslUrl)
}

func (s *BGS) handleComAtprotoSyncNotifyOfUpdate(ctx context.Context, body *comatprototypes.SyncNotifyOfUpdate_Input) error {
Expand Down
26 changes: 25 additions & 1 deletion cmd/bigsky/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ func run(args []string) error {
Name: "crawl-insecure-ws",
Usage: "when connecting to PDS instances, use ws:// instead of wss://",
},
&cli.StringFlag{
Name: "crawl-ssl-policy",
Usage: "when connecting to PDS instances, 'any' allows either ws:// or wss://, 'require' requires wss://, 'prod' requires wss:// of external requestCrawl hosts, 'none' rejects wss:// connections",
Value: "prod",
EnvVars: []string{"RELAY_CRAWL_NONSSL"},
},
&cli.BoolFlag{
Name: "spidering",
Value: false,
Expand Down Expand Up @@ -479,12 +485,30 @@ func runBigsky(cctx *cli.Context) error {

slog.Info("constructing bgs")
bgsConfig := libbgs.DefaultBGSConfig()
bgsConfig.SSL = !cctx.Bool("crawl-insecure-ws")
switch strings.ToLower(cctx.String("crawl-ssl-policy")) {
case "any":
bgsConfig.SSL = libbgs.SlurperMixedSSL
case "none":
bgsConfig.SSL = libbgs.SlurperDisableSSL
case "require":
bgsConfig.SSL = libbgs.SlurperRequireSSL
case "prod":
bgsConfig.SSL = libbgs.SlurperRequireExternalSSL
case "":
if cctx.Bool("crawl-insecure-ws") {
bgsConfig.SSL = libbgs.SlurperDisableSSL
} else {
bgsConfig.SSL = libbgs.SlurperRequireSSL
}
default:
return fmt.Errorf("crawl-ssl-policy/RELAY_CRAWL_NONSSL exepected any|none|require|prod, got %s", cctx.String("crawl-ssl-policy"))
}
bgsConfig.CompactInterval = cctx.Duration("compact-interval")
bgsConfig.ConcurrencyPerPDS = cctx.Int64("concurrency-per-pds")
bgsConfig.MaxQueuePerPDS = cctx.Int64("max-queue-per-pds")
bgsConfig.DefaultRepoLimit = cctx.Int64("default-repo-limit")
bgsConfig.NumCompactionWorkers = cctx.Int("num-compaction-workers")
bgsConfig.VerboseAPILog = cctx.String("env") == "dev"
nextCrawlers := cctx.StringSlice("next-crawler")
if len(nextCrawlers) != 0 {
nextCrawlerUrls := make([]*url.URL, len(nextCrawlers))
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ require (
github.com/cockroachdb/redact v1.1.5 // indirect
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/fsnotify/fsnotify v1.5.4 // indirect
github.com/getsentry/sentry-go v0.27.0 // indirect
github.com/go-redis/redis v6.15.9+incompatible // indirect
github.com/golang/snappy v0.0.4 // indirect
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,9 @@ github.com/flosch/pongo2/v6 v6.0.0/go.mod h1:CuDpFm47R0uGGE7z13/tTlt1Y6zdxvr2RLT
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI=
github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU=
github.com/getsentry/sentry-go v0.27.0 h1:Pv98CIbtB3LkMWmXi4Joa5OOcwbmnX88sF5qbK3r3Ps=
github.com/getsentry/sentry-go v0.27.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY=
github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA=
Expand Down
3 changes: 3 additions & 0 deletions models/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ type PDS struct {

HourlyEventLimit int64
DailyEventLimit int64

// We accept events from this firehose source even if they are not from the origin PDS
RelayAllowed bool
}

func ClientForPds(pds *PDS) *xrpc.Client {
Expand Down
2 changes: 1 addition & 1 deletion testing/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ func SetupRelay(ctx context.Context, didr plc.PLCClient, archive bool) (*TestRel
tr := &api.TestHandleResolver{}

bgsConfig := bgs.DefaultBGSConfig()
bgsConfig.SSL = false
bgsConfig.SSL = bgs.SlurperDisableSSL
b, err := bgs.NewBGS(maindb, ix, repoman, evtman, didr, rf, tr, bgsConfig)
if err != nil {
return nil, err
Expand Down
Loading
Loading