Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/livepeer/starter/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ func NewLivepeerConfig(fs *flag.FlagSet) LivepeerConfig {
cfg.CliAddr = fs.String("cliAddr", *cfg.CliAddr, "Address to bind for CLI commands")
cfg.HttpAddr = fs.String("httpAddr", *cfg.HttpAddr, "Address to bind for HTTP commands")
cfg.ServiceAddr = fs.String("serviceAddr", *cfg.ServiceAddr, "Orchestrator only. Overrides the on-chain serviceURI that broadcasters can use to contact this node; may be an IP or hostname.")
cfg.Nodes = fs.String("nodes", *cfg.Nodes, "Comma-separated list of instance URLs for this orchestrator")
cfg.VerifierURL = fs.String("verifierUrl", *cfg.VerifierURL, "URL of the verifier to use")
cfg.VerifierPath = fs.String("verifierPath", *cfg.VerifierPath, "Path to verifier shared volume")
cfg.LocalVerify = fs.Bool("localVerify", *cfg.LocalVerify, "Set to true to enable local verification i.e. pixel count and signature verification.")
Expand All @@ -21,6 +22,7 @@ func NewLivepeerConfig(fs *flag.FlagSet) LivepeerConfig {
// Broadcaster's Selection Algorithm
cfg.OrchAddr = fs.String("orchAddr", *cfg.OrchAddr, "Comma-separated list of orchestrators to connect to")
cfg.OrchWebhookURL = fs.String("orchWebhookUrl", *cfg.OrchWebhookURL, "Orchestrator discovery callback URL")
cfg.ExtraNodes = fs.Int("extraNodes", *cfg.ExtraNodes, "Number of extra nodes an orchestrator can advertise within the GetOrchestratorInfo response")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe maxNodes or maxExtraNodes (but then I'd rename nodes to extraNodes).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really struggled with naming this.

My first pass at this PR named it maxNodes too but that got a little confusing in practice since this does not include the initial set orchestrators in the pool ... these are literally extra nodes in addition to the initial set of orchestrators in the pool.

We might be able to sidestep this if maxNodes had a default of 1 but that means changing more of the current behavior (which does not depend on this value) and I tried to avoid doing that.

cfg.OrchBlacklist = fs.String("orchBlocklist", "", "Comma-separated list of blocklisted orchestrators")
cfg.OrchMinLivepeerVersion = fs.String("orchMinLivepeerVersion", *cfg.OrchMinLivepeerVersion, "Minimal go-livepeer version orchestrator should have to be selected")
cfg.SelectRandWeight = fs.Float64("selectRandFreq", *cfg.SelectRandWeight, "Weight of the random factor in the orchestrator selection algorithm")
Expand Down
64 changes: 62 additions & 2 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type LivepeerConfig struct {
CliAddr *string
HttpAddr *string
ServiceAddr *string
Nodes *string
OrchAddr *string
VerifierURL *string
EthController *string
Expand Down Expand Up @@ -112,6 +113,7 @@ type LivepeerConfig struct {
IgnoreMaxPriceIfNeeded *bool
MinPerfScore *float64
DiscoveryTimeout *time.Duration
ExtraNodes *int
MaxSessions *string
CurrentManifest *bool
Nvidia *string
Expand Down Expand Up @@ -194,6 +196,7 @@ func DefaultLivepeerConfig() LivepeerConfig {
defaultCliAddr := ""
defaultHttpAddr := ""
defaultServiceAddr := ""
defaultNodes := ""
defaultOrchAddr := ""
defaultVerifierURL := ""
defaultVerifierPath := ""
Expand All @@ -215,6 +218,7 @@ func DefaultLivepeerConfig() LivepeerConfig {
defaultRegion := ""
defaultMinPerfScore := 0.0
defaultDiscoveryTimeout := 500 * time.Millisecond
defaultExtraNodes := 0
defaultCurrentManifest := false
defaultNvidia := ""
defaultNetint := ""
Expand Down Expand Up @@ -310,6 +314,7 @@ func DefaultLivepeerConfig() LivepeerConfig {
CliAddr: &defaultCliAddr,
HttpAddr: &defaultHttpAddr,
ServiceAddr: &defaultServiceAddr,
Nodes: &defaultNodes,
OrchAddr: &defaultOrchAddr,
VerifierURL: &defaultVerifierURL,
VerifierPath: &defaultVerifierPath,
Expand All @@ -331,6 +336,7 @@ func DefaultLivepeerConfig() LivepeerConfig {
Region: &defaultRegion,
MinPerfScore: &defaultMinPerfScore,
DiscoveryTimeout: &defaultDiscoveryTimeout,
ExtraNodes: &defaultExtraNodes,
CurrentManifest: &defaultCurrentManifest,
Nvidia: &defaultNvidia,
Netint: &defaultNetint,
Expand Down Expand Up @@ -561,6 +567,16 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
n.OrchSecret, _ = common.ReadFromFile(*cfg.OrchSecret)
}

// Parse -instances flag and store parsed canonicalized URLs in the node
if cfg.Nodes != nil && *cfg.Nodes != "" {
n.Nodes, err = parseNodes(*cfg.Nodes)
if err != nil || len(n.Nodes) == 0 {
glog.Exit("No valid instance URLs parsed from -nodes: ", err)
} else {
glog.Infof("Configured nodes: %v", strings.Join(n.Nodes, ","))
}
}

var transcoderCaps []core.Capability
if *cfg.Transcoder {
core.WorkDir = *cfg.Datadir
Expand Down Expand Up @@ -1521,6 +1537,8 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
go refreshOrchPerfScoreLoop(ctx, strings.ToUpper(*cfg.Region), *cfg.OrchPerfStatsURL, n.OrchPerfScore)
}

n.ExtraNodes = *cfg.ExtraNodes

// Set up orchestrator discovery
if *cfg.OrchWebhookURL != "" {
whurl, err := validateURL(*cfg.OrchWebhookURL)
Expand Down Expand Up @@ -1608,6 +1626,10 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
glog.Exit("Error getting service URI: ", err)
}

if suri.String() == "" && len(n.Nodes) == 0 {
glog.Exit("Empty service URI and no additional nodes specified; set -serviceAddr or -nodes")
}

if *cfg.Network != "offchain" && !common.ValidateServiceURI(suri) {
glog.Warning("**Warning -serviceAddr is a not a public address or hostname; this is not recommended for onchain networks**")
}
Expand Down Expand Up @@ -1749,17 +1771,23 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
tc <- struct{}{}
}()

doingWork := orch.ServiceURI().String() != ""

// check whether or not the orchestrator is available
if *cfg.TestOrchAvail {
if *cfg.TestOrchAvail && doingWork {
time.Sleep(2 * time.Second)
orchAvail := server.CheckOrchestratorAvailability(orch)
if !orchAvail {
// shut down orchestrator
glog.Infof("Orchestrator not available at %v; shutting down", orch.ServiceURI())
glog.Infof("Orchestrator not available at %v (%v); shutting down", orch.ServiceURI(), *cfg.HttpAddr)
tc <- struct{}{}
}
}

if !doingWork {
glog.Infof("Orchestrator is not performing work")
}

}()

if n.NodeType == core.TranscoderNode || n.NodeType == core.AIWorkerNode {
Expand Down Expand Up @@ -1840,6 +1868,34 @@ func parseOrchAddrs(addrs string) []*url.URL {
return res
}

func parseNodes(addrs string) ([]string, error) {
var res []string
if len(addrs) == 0 {
return res, fmt.Errorf("instances empty")
}
for _, addr := range strings.Split(addrs, ",") {
addr = strings.TrimSpace(addr)
if addr == "" {
continue
}
// Add https if not provided
if !strings.HasPrefix(addr, "https://") {
addr = "https://" + addr
}
parsed, err := url.ParseRequestURI(addr)
if err != nil {
return nil, fmt.Errorf("Could not parse instance URI '%s': %w", addr, err)
}
// Ensure scheme starts with https; if http is provided, upgrade to https
if parsed.Scheme != "https" {
return nil, fmt.Errorf("Node URI must start with https '%s'", addr)
}
// Use the canonical string form
res = append(res, parsed.String())
}
return res, nil
}

func parseOrchBlacklist(b *string) []string {
if b == nil {
return []string{}
Expand Down Expand Up @@ -1885,6 +1941,10 @@ func isLocalURL(u string) (bool, error) {
func getServiceURI(n *core.LivepeerNode, serviceAddr string) (*url.URL, error) {
// Passed in via CLI
if serviceAddr != "" {
if serviceAddr == "none" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is it actually set to none? Doesn't empty string mean that the node is not used for work?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The user sets it in the -serviceAddr CLI flag. An empty string for that flag right now means go-livepeer tries to infer the address (see the rest of this function) and we don't want to change that for anyone who is depending on the behavior.

// special value to signal this node is not to be used for work
return url.Parse("")
}
return url.ParseRequestURI("https://" + serviceAddr)
}

Expand Down
1 change: 1 addition & 0 deletions common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type NodeStatus struct {
type Broadcaster interface {
Address() ethcommon.Address
Sign([]byte) ([]byte, error)
ExtraNodes() int
}

type CapabilityComparator interface {
Expand Down
6 changes: 6 additions & 0 deletions core/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ func (bcast *broadcaster) Address() ethcommon.Address {
}
return bcast.node.Eth.Account().Address
}
func (bcast *broadcaster) ExtraNodes() int {
if bcast == nil || bcast.node == nil {
return 0
}
return bcast.node.ExtraNodes
}
func NewBroadcaster(node *LivepeerNode) *broadcaster {
return &broadcaster{
node: node,
Expand Down
5 changes: 4 additions & 1 deletion core/livepeernode.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,10 @@ type LivepeerNode struct {
ExternalCapabilities *ExternalCapabilities
AutoAdjustPrice bool
AutoSessionLimit bool

// Broadcaster public fields
Sender pm.Sender
Sender pm.Sender
ExtraNodes int

// Thread safety for config fields
mu sync.RWMutex
Expand All @@ -152,6 +154,7 @@ type LivepeerNode struct {
jobPriceInfo map[string]map[string]*big.Rat
serviceURI url.URL
segmentMutex *sync.RWMutex
Nodes []string // instance URLs of this orch available to do work

// For live video pipelines, cache for live pipelines; key is the stream name
LivePipelines map[string]*LivePipeline
Expand Down
7 changes: 7 additions & 0 deletions core/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,13 @@ func (orch *orchestrator) Capabilities() *net.Capabilities {
return orch.node.Capabilities.ToNetCapabilities()
}

func (orch *orchestrator) Nodes() []string {
if orch == nil || orch.node == nil {
return nil
}
return orch.node.Nodes
}

func (orch *orchestrator) AuthToken(sessionID string, expiration int64) *net.AuthToken {
h := hmac.New(sha256.New, orch.secret)
msg := append([]byte(sessionID), new(big.Int).SetInt64(expiration).Bytes()...)
Expand Down
1 change: 1 addition & 0 deletions discovery/db_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type DBOrchestratorPoolCache struct {
orchBlacklist []string
discoveryTimeout time.Duration
node *core.LivepeerNode
getOrchInfo func(context.Context, common.Broadcaster, *url.URL, server.GetOrchestratorInfoParams) (*net.OrchestratorInfo, error)
}

func NewDBOrchestratorPoolCache(ctx context.Context, node *core.LivepeerNode, rm common.RoundsManager, orchBlacklist []string, discoveryTimeout time.Duration) (*DBOrchestratorPoolCache, error) {
Expand Down
Loading
Loading