Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bolson/vm3 ex250120 #912

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
63 changes: 41 additions & 22 deletions bgs/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,13 +356,37 @@ func (bgs *BGS) handleAdminUnbanDomain(c echo.Context) error {
})
}

type PDSRates struct {
PerSecond int64 `json:"per_second,omitempty"`
PerHour int64 `json:"per_hour,omitempty"`
PerDay int64 `json:"per_day,omitempty"`
CrawlRate int64 `json:"crawl_rate,omitempty"`
RepoLimit int64 `json:"repo_limit,omitempty"`

RelayAllowed bool `json:"relay_allowed,omitempty"`
}

func (pr *PDSRates) FromSlurper(s *Slurper) {
if pr.PerSecond == 0 {
pr.PerHour = s.DefaultPerSecondLimit
}
if pr.PerHour == 0 {
pr.PerHour = s.DefaultPerHourLimit
}
if pr.PerDay == 0 {
pr.PerDay = s.DefaultPerDayLimit
}
if pr.CrawlRate == 0 {
pr.CrawlRate = int64(s.DefaultCrawlLimit)
}
if pr.RepoLimit == 0 {
pr.RepoLimit = s.DefaultRepoLimit
}
}

type RateLimitChangeRequest struct {
Host string `json:"host"`
PerSecond int64 `json:"per_second"`
PerHour int64 `json:"per_hour"`
PerDay int64 `json:"per_day"`
CrawlRate int64 `json:"crawl_rate"`
RepoLimit int64 `json:"repo_limit"`
Host string `json:"host"`
PDSRates
}

func (bgs *BGS) handleAdminChangePDSRateLimits(e echo.Context) error {
Expand All @@ -383,6 +407,7 @@ func (bgs *BGS) handleAdminChangePDSRateLimits(e echo.Context) error {
pds.DailyEventLimit = body.PerDay
pds.CrawlRateLimit = float64(body.CrawlRate)
pds.RepoLimit = body.RepoLimit
pds.RelayAllowed = body.RelayAllowed

if err := bgs.db.Save(&pds).Error; err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, fmt.Errorf("failed to save rate limit changes: %w", err))
Expand Down Expand Up @@ -595,6 +620,9 @@ func (bgs *BGS) handleAdminAddTrustedDomain(e echo.Context) error {

type AdminRequestCrawlRequest struct {
Hostname string `json:"hostname"`

// optional:
PDSRates
}

func (bgs *BGS) handleAdminRequestCrawl(e echo.Context) error {
Expand All @@ -610,27 +638,16 @@ func (bgs *BGS) handleAdminRequestCrawl(e echo.Context) error {
return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname")
}

if !strings.HasPrefix(host, "http://") && !strings.HasPrefix(host, "https://") {
if bgs.ssl {
host = "https://" + host
} else {
host = "http://" + host
}
}
host, sslUrl := bgs.newPdsHostPrefixNormalize(host)

if !bgs.config.SSL.AdminOk(sslUrl) {
return echo.NewHTTPError(http.StatusBadRequest, "ssl is %s but got host=%#v", bgs.config.SSL.String(), host)
}
u, err := url.Parse(host)
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest, "failed to parse hostname")
}

if u.Scheme == "http" && bgs.ssl {
return echo.NewHTTPError(http.StatusBadRequest, "this server requires https")
}

if u.Scheme == "https" && !bgs.ssl {
return echo.NewHTTPError(http.StatusBadRequest, "this server does not support https")
}

if u.Path != "" {
return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname without path")
}
Expand All @@ -647,6 +664,8 @@ func (bgs *BGS) handleAdminRequestCrawl(e echo.Context) error {
}

// Skip checking if the server is online for now
rateOverrides := body.PDSRates
rateOverrides.FromSlurper(bgs.slurper)

return bgs.slurper.SubscribeToPds(ctx, host, true, true) // Override Trusted Domain Check
return bgs.slurper.SubscribeToPds(ctx, host, true, true, sslUrl, &rateOverrides) // Override Trusted Domain Check
}
90 changes: 80 additions & 10 deletions bgs/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ type BGS struct {

hr api.HandleResolver

// TODO: work on doing away with this flag in favor of more pluggable
// pieces that abstract the need for explicit ssl checks
ssl bool
ssl SlurperSSLStance

config BGSConfig

crawlOnly bool

Expand Down Expand Up @@ -117,20 +117,21 @@ type SocketConsumer struct {
}

type BGSConfig struct {
SSL bool
SSL SlurperSSLStance
CompactInterval time.Duration
DefaultRepoLimit int64
ConcurrencyPerPDS int64
MaxQueuePerPDS int64
NumCompactionWorkers int
VerboseAPILog bool

// NextCrawlers gets forwarded POST /xrpc/com.atproto.sync.requestCrawl
NextCrawlers []*url.URL
}

func DefaultBGSConfig() *BGSConfig {
return &BGSConfig{
SSL: true,
SSL: SlurperRequireSSL,
CompactInterval: 4 * time.Hour,
DefaultRepoLimit: 100,
ConcurrencyPerPDS: 100,
Expand All @@ -140,7 +141,11 @@ func DefaultBGSConfig() *BGSConfig {
}

func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtman *events.EventManager, didr did.Resolver, rf *indexer.RepoFetcher, hr api.HandleResolver, config *BGSConfig) (*BGS, error) {

logger := slog.Default().With("system", "bgs")
err := fixupDupPDSRows(db, logger)
if err != nil {
return nil, err
}
if config == nil {
config = DefaultBGSConfig()
}
Expand All @@ -161,6 +166,7 @@ func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtm
events: evtman,
didr: didr,
ssl: config.SSL,
config: *config,

consumersLk: sync.RWMutex{},
consumers: make(map[uint64]*SocketConsumer),
Expand All @@ -169,7 +175,7 @@ func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtm

userCache: uc,

log: slog.Default().With("system", "bgs"),
log: logger,
}

ix.CreateExternalUser = bgs.createExternalUser
Expand Down Expand Up @@ -202,6 +208,68 @@ func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtm
return bgs, nil
}

func fixupDupPDSRows(db *gorm.DB, logger *slog.Logger) error {
rows, err := db.Raw("SELECT id, host FROM pds").Rows()
if err != nil {
logger.Warn("could not list PDS rows; assume blank db", "err", err)
return nil
}
hostCounts := make(map[string][]uint)
maxPDSId := uint(0)
maxHostCount := 0
for rows.Next() {
var pdsId uint
var host string
if err := rows.Scan(&pdsId, &host); err != nil {
return fmt.Errorf("pds sql row err, %w", err)
}
idlist := hostCounts[host]
idlist = append(idlist, pdsId)
count := len(idlist)
if count > maxHostCount {
maxHostCount = count
}
hostCounts[host] = idlist
if pdsId > maxPDSId {
maxPDSId = pdsId
}
}
if maxHostCount <= 1 {
logger.Debug("no pds dup rows found")
return nil
}
for host, idlist := range hostCounts {
if len(idlist) > 1 {
logger.Info("dup PDS", "host", host, "count", len(idlist))
minPDSId := idlist[0]
for _, otherid := range idlist[1:] {
if otherid < minPDSId {
minPDSId = otherid
}
}
for _, xPDSId := range idlist {
if xPDSId == minPDSId {
continue
}
logger.Info("dup PDS", "host", host, "from", xPDSId, "to", minPDSId)
err = db.Exec("UPDATE users SET pds = ? WHERE pds = ?", minPDSId, xPDSId).Error
if err != nil {
return fmt.Errorf("failed to update user pds %d -> %d: %w", xPDSId, minPDSId, err)
}
err = db.Exec("UPDATE actor_infos SET pds = ? WHERE pds = ?", minPDSId, xPDSId).Error
if err != nil {
return fmt.Errorf("failed to update actor_infos pds %d -> %d: %w", xPDSId, minPDSId, err)
}
err = db.Exec("DELETE FROM pds WHERE id = ?", xPDSId).Error
if err != nil {
return fmt.Errorf("failed to delete pds %d: %w", xPDSId, err)
}
}
}
}
return nil
}

func (bgs *BGS) StartMetrics(listen string) error {
http.Handle("/metrics", promhttp.Handler())
return http.ListenAndServe(listen, nil)
Expand Down Expand Up @@ -317,7 +385,7 @@ func (bgs *BGS) StartWithListener(listen net.Listener) error {
AllowHeaders: []string{echo.HeaderOrigin, echo.HeaderContentType, echo.HeaderAccept, echo.HeaderAuthorization},
}))

if !bgs.ssl {
if bgs.config.VerboseAPILog {
e.Use(middleware.LoggerWithConfig(middleware.LoggerConfig{
Format: "method=${method}, uri=${uri}, status=${status} latency=${latency_human}\n",
}))
Expand Down Expand Up @@ -941,7 +1009,9 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event
return fmt.Errorf("rebase was true in event seq:%d,host:%s", evt.Seq, host.Host)
}

if host.ID != u.PDS && u.PDS != 0 {
if host.RelayAllowed {
// don't check that source is canonical PDS, allow intermediate relays
} else if host.ID != u.PDS && u.PDS != 0 {
bgs.log.Warn("received event for repo from different pds than expected", "repo", evt.Repo, "expPds", u.PDS, "gotPds", host.Host)
// Flush any cached DID documents for this user
bgs.didr.FlushCacheFor(env.RepoCommit.Repo)
Expand Down Expand Up @@ -1273,7 +1343,7 @@ func (s *BGS) createExternalUser(ctx context.Context, did string) (*models.Actor
peering.DailyEventLimit = s.slurper.DefaultPerDayLimit
peering.RepoLimit = s.slurper.DefaultRepoLimit

if s.ssl && !peering.SSL {
if (s.ssl == SlurperRequireSSL) && !peering.SSL {
return nil, fmt.Errorf("did references non-ssl PDS, this is disallowed in prod: %q %q", did, svc.ServiceEndpoint)
}

Expand Down
Loading
Loading