Skip to content

Commit

Permalink
logic forks
Browse files Browse the repository at this point in the history
  • Loading branch information
haileyok committed Jan 2, 2025
1 parent 839eb04 commit 6262314
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 55 deletions.
1 change: 1 addition & 0 deletions automod/consumer/ozone.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func (oc *OzoneConsumer) Run(ctx context.Context) error {
"", // subjectType string
nil, // types []string
)

if err != nil {
oc.Logger.Warn("ozone query events failed; sleeping then will retrying", "err", err, "period", period.String())
time.Sleep(period)
Expand Down
36 changes: 36 additions & 0 deletions automod/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type EngineConfig struct {
QuotaModTakedownDay int
// number of misc actions automod can do per day, for all subjects combined (circuit breaker)
QuotaModActionDay int
// whether hepa is running in `authority` or `labeler` mode
Mode string
}

// Entrypoint for external code pushing #identity events in to the engine.
Expand Down Expand Up @@ -390,3 +392,37 @@ func (e *Engine) CanonicalLogLineNotification(c *NotificationContext) {
"reject", c.effects.RejectEvent,
)
}

func (e *Engine) RunRefreshSession(ctx context.Context) {
go func() {
ticker := time.NewTicker(1 * time.Hour)
defer ticker.Stop()

for {
select {
case <-ticker.C:
if e.OzoneClient != nil && e.OzoneClient.Auth.AccessJwt != "" {
// copy the ozone client. we dont have a mutex to lock, and we don't want to rugpull the
// access jwt while its trying to make requests. this isnt perfect...
oc := &*e.OzoneClient

// set the access jwt to the existing refresh jwt
oc.Auth.AccessJwt = oc.Auth.RefreshJwt

res, err := comatproto.ServerRefreshSession(ctx, oc)
if err != nil {
e.Logger.Error("failed refreshing ozone session", "err", err)
continue
}

// update the existing clients auth
e.OzoneClient.Auth.AccessJwt = res.AccessJwt
e.OzoneClient.Auth.RefreshJwt = res.RefreshJwt
}

case <-ctx.Done():
return
}
}
}()
}
2 changes: 1 addition & 1 deletion automod/engine/fetch_account_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (e *Engine) GetAccountMeta(ctx context.Context, ident *identity.Identity) (
}

// first attempt to fetch private account metadata from Ozone
if e.OzoneClient != nil && e.OzoneClient.AdminToken != nil && *e.OzoneClient.AdminToken != "" {
if e.Config.Mode == "authority" && e.OzoneClient != nil {
rd, err := toolsozone.ModerationGetRepo(ctx, e.OzoneClient, ident.DID.String())
if err != nil {
logger.Warn("failed to fetch private account metadata from Ozone", "err", err)
Expand Down
30 changes: 26 additions & 4 deletions cmd/hepa/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ func run(args []string) error {
}

app.Flags = []cli.Flag{
&cli.StringFlag{
Name: "mode",
Usage: "mode to run in: 'authority' (default) or `labeler`",
Value: "authority",
EnvVars: []string{"HEPA_MODE"},
},
&cli.StringFlag{
Name: "atp-relay-host",
Usage: "hostname and port of Relay to subscribe to",
Expand Down Expand Up @@ -76,9 +82,19 @@ func run(args []string) error {
EnvVars: []string{"HEPA_OZONE_AUTH_ADMIN_TOKEN", "HEPA_MOD_AUTH_ADMIN_TOKEN"},
},
&cli.StringFlag{
Name: "ozone-password",
Name: "ozone-mod-password",
Usage: "authentication password for mod service account. used when not supplying an admin authentication token.",
EnvVars: []string{"HEPA_OZONE_PASSWORD"},
EnvVars: []string{"HEPA_OZONE_MOD_PASS"},
},
&cli.StringFlag{
Name: "ozone-mod-service",
Usage: "service the mod account is hosted on",
EnvVars: []string{"HEPA_OZONE_MOD_SERVICE"},
},
&cli.StringFlag{
Name: "ozone-service-did",
Usage: "did of the ozone service. only required when running in \"labeler\" mode.",
EnvVars: []string{"HEPA_OZONE_SERVICE_DID"},
},
&cli.StringFlag{
Name: "atp-pds-host",
Expand Down Expand Up @@ -266,12 +282,15 @@ var runCmd = &cli.Command{
dir,
Config{
Logger: logger,
Mode: cctx.String("mode"),
RelayHost: cctx.String("atp-relay-host"), // DEPRECATED
BskyHost: cctx.String("atp-bsky-host"),
OzoneHost: cctx.String("atp-ozone-host"),
OzoneDID: cctx.String("ozone-did"),
OzoneAdminToken: cctx.String("ozone-admin-token"),
OzonePassword: cctx.String("ozone-password"),
OzoneModPassword: cctx.String("ozone-mod-password"),
OzoneModService: cctx.String("ozone-mod-service"),
OzoneServiceDid: cctx.String("ozone-service-did"),
PDSHost: cctx.String("atp-pds-host"),
PDSAdminToken: cctx.String("pds-admin-token"),
SetsFileJSON: cctx.String("sets-json-path"),
Expand Down Expand Up @@ -367,12 +386,15 @@ func configEphemeralServer(cctx *cli.Context) (*Server, error) {
dir,
Config{
Logger: logger,
Mode: cctx.String("mode"),
RelayHost: cctx.String("atp-relay-host"),
BskyHost: cctx.String("atp-bsky-host"),
OzoneHost: cctx.String("atp-ozone-host"),
OzoneDID: cctx.String("ozone-did"),
OzoneAdminToken: cctx.String("ozone-admin-token"),
OzonePassword: cctx.String("ozone-password"),
OzoneModPassword: cctx.String("ozone-mod-password"),
OzoneModService: cctx.String("ozone-mod-service"),
OzoneServiceDid: cctx.String("ozone-service-did"),
PDSHost: cctx.String("atp-pds-host"),
PDSAdminToken: cctx.String("pds-admin-token"),
SetsFileJSON: cctx.String("sets-json-path"),
Expand Down
81 changes: 31 additions & 50 deletions cmd/hepa/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,15 @@ type Server struct {

type Config struct {
Logger *slog.Logger
Mode string
RelayHost string // DEPRECATED
BskyHost string
OzoneHost string
OzoneDID string
OzoneAdminToken string
OzonePassword string
OzoneModPassword string
OzoneModService string
OzoneServiceDid string
PDSHost string
PDSAdminToken string
SetsFileJSON string
Expand Down Expand Up @@ -71,25 +74,39 @@ func NewServer(dir identity.Directory, config Config) (*Server, error) {
}))
}

if config.Mode == "labeler" && (config.OzoneServiceDid == "" || config.OzoneModPassword == "" || config.OzoneModService == "") {
return nil, fmt.Errorf("must provide ozone service DID and password for labeler mode")
} else if config.Mode == "authority" && config.OzoneAdminToken == "" {
return nil, fmt.Errorf("must provide ozone admin token for authority mode")
}

relayws := config.RelayHost
if !strings.HasPrefix(relayws, "ws") {
return nil, fmt.Errorf("specified relay host must include 'ws://' or 'wss://'")
}

var ozoneClient *xrpc.Client
if config.OzoneDID != "" {
var host string
if config.Mode == "authority" {
host = config.OzoneHost
} else if config.Mode == "labeler" {
host = config.OzoneModService
}

ozoneClient = &xrpc.Client{
Client: util.RobustHTTPClient(),
Host: config.OzoneHost,
Client: util.RobustHTTPClient(),
Host: host,
Headers: make(map[string]string),
}

if config.OzoneAdminToken != "" {
if config.Mode == "authority" {
ozoneClient.AdminToken = &config.OzoneAdminToken
ozoneClient.Auth = &xrpc.AuthInfo{}
} else if config.OzonePassword != "" {
} else if config.Mode == "labeler" {
res, err := atproto.ServerCreateSession(context.TODO(), ozoneClient, &atproto.ServerCreateSession_Input{
Identifier: config.OzoneDID,
Password: config.OzonePassword,
Password: config.OzoneModPassword,
})

if err != nil {
Expand All @@ -107,9 +124,13 @@ func NewServer(dir identity.Directory, config Config) (*Server, error) {
}

if config.RatelimitBypass != "" {
ozoneClient.Headers = make(map[string]string)
ozoneClient.Headers["x-ratelimit-bypass"] = config.RatelimitBypass
}

if config.Mode == "labeler" {
ozoneClient.Headers["atproto-proxy"] = fmt.Sprintf("%s#atproto_labeler", config.OzoneServiceDid)
}

od, err := syntax.ParseDID(config.OzoneDID)
if err != nil {
return nil, fmt.Errorf("ozone account DID supplied was not valid: %v", err)
Expand Down Expand Up @@ -253,6 +274,7 @@ func NewServer(dir identity.Directory, config Config) (*Server, error) {
QuotaModReportDay: config.QuotaModReportDay,
QuotaModTakedownDay: config.QuotaModTakedownDay,
QuotaModActionDay: config.QuotaModActionDay,
Mode: config.Mode,
},
}

Expand All @@ -264,8 +286,8 @@ func NewServer(dir identity.Directory, config Config) (*Server, error) {
RedisClient: rdb,
}

if config.OzonePassword != "" && config.OzoneAdminToken == "" {
s.runRefreshSession(context.TODO())
if config.Mode == "labeler" {
s.Engine.RunRefreshSession(context.TODO())
}

return s, nil
Expand All @@ -275,44 +297,3 @@ func (s *Server) RunMetrics(listen string) error {
http.Handle("/metrics", promhttp.Handler())
return http.ListenAndServe(listen, nil)
}

func (s *Server) runRefreshSession(ctx context.Context) {
go func() {
ticker := time.NewTicker(1 * time.Hour)
defer ticker.Stop()

for {
select {
case <-ticker.C:
if s.Engine.OzoneClient != nil && s.Engine.OzoneClient.Auth.AccessJwt != "" {
// create a new ozone client since we dont have a mutex to lock
oc := &xrpc.Client{
Client: util.RobustHTTPClient(),
Host: s.Engine.OzoneClient.Host,
Auth: &xrpc.AuthInfo{
Did: s.Engine.OzoneClient.Auth.Did,
Handle: s.Engine.OzoneClient.Auth.Handle,
AccessJwt: s.Engine.OzoneClient.Auth.RefreshJwt, // Use the refresh jwt
RefreshJwt: s.Engine.OzoneClient.Auth.RefreshJwt,
},
}

res, err := atproto.ServerRefreshSession(ctx, oc)
if err != nil {
s.logger.Error("failed refreshing ozone session", "err", err)
continue
}

// update the auth and client
oc.Auth.AccessJwt = res.AccessJwt
oc.Auth.RefreshJwt = res.RefreshJwt

s.Engine.OzoneClient = oc
}

case <-ctx.Done():
return
}
}
}()
}

0 comments on commit 6262314

Please sign in to comment.