Skip to content

Commit 4a66b22

Browse files
ad-astra-videokyriediculousreubenr0dleszkorickstaa
committed
feat: add AI Remote Worker (#3168)
This commit adds a new AI remote worker node which can be used to split worker and orchestrator machines similar to how it is done on the transcoding side. Co-authored-by: kyriediculous <[email protected]> Co-authored-by: Reuben Rodrigues <[email protected]> Co-authored-by: Rafał Leszko <[email protected]> Co-authored-by: Rick Staa <[email protected]>
1 parent 4390579 commit 4a66b22

30 files changed

+6260
-1688
lines changed

cmd/livepeer/starter/starter.go

Lines changed: 126 additions & 170 deletions
Large diffs are not rendered by default.

common/testutil.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func IgnoreRoutines() []goleak.Option {
8989
"github.com/livepeer/go-livepeer/server.(*LivepeerServer).StartMediaServer", "github.com/livepeer/go-livepeer/core.(*RemoteTranscoderManager).Manage.func1",
9090
"github.com/livepeer/go-livepeer/server.(*LivepeerServer).HandlePush.func1", "github.com/rjeczalik/notify.(*nonrecursiveTree).dispatch",
9191
"github.com/rjeczalik/notify.(*nonrecursiveTree).internal", "github.com/livepeer/lpms/stream.NewBasicRTMPVideoStream.func1", "github.com/patrickmn/go-cache.(*janitor).Run",
92-
"github.com/golang/glog.(*fileSink).flushDaemon",
92+
"github.com/golang/glog.(*fileSink).flushDaemon", "github.com/livepeer/go-livepeer/core.(*LivepeerNode).transcodeFrames.func2",
9393
}
9494

9595
res := make([]goleak.Option, 0, len(funcs2ignore))

common/util.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ var (
7777
ErrProfName = fmt.Errorf("unknown VideoProfile profile name")
7878

7979
ErrAudioDurationCalculation = fmt.Errorf("audio duration calculation failed")
80+
ErrNoExtensionsForType = fmt.Errorf("no extensions exist for mime type")
8081

8182
ext2mime = map[string]string{
8283
".ts": "video/mp2t",
@@ -571,3 +572,17 @@ func CalculateAudioDuration(audio types.File) (int64, error) {
571572
func ValidateServiceURI(serviceURI *url.URL) bool {
572573
return !strings.Contains(serviceURI.Host, "0.0.0.0")
573574
}
575+
576+
func ExtensionByType(contentType string) (string, error) {
577+
contentType = strings.ToLower(contentType)
578+
switch contentType {
579+
case "video/mp2t":
580+
return ".ts", nil
581+
case "video/mp4":
582+
return ".mp4", nil
583+
case "image/png":
584+
return ".png", nil
585+
}
586+
587+
return "", ErrNoExtensionsForType
588+
}

common/util_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -519,3 +519,21 @@ func TestValidateServiceURI(t *testing.T) {
519519
}
520520
}
521521
}
522+
func TestExtensionByType(t *testing.T) {
523+
assert := assert.New(t)
524+
525+
// Test valid content types
526+
contentTypes := []string{"image/png", "video/mp4", "video/mp2t"}
527+
expectedExtensions := []string{".png", ".mp4", ".ts"}
528+
529+
for i, contentType := range contentTypes {
530+
ext, err := ExtensionByType(contentType)
531+
assert.Nil(err)
532+
assert.Equal(expectedExtensions[i], ext)
533+
}
534+
535+
// Test invalid content type
536+
invalidContentType := "invalid/type"
537+
_, err := ExtensionByType(invalidContentType)
538+
assert.Equal(ErrNoExtensionsForType, err)
539+
}

core/ai.go

Lines changed: 110 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"strconv"
1212
"strings"
1313

14+
"github.com/golang/glog"
1415
"github.com/livepeer/ai-worker/worker"
1516
)
1617

@@ -64,15 +65,19 @@ func PipelineToCapability(pipeline string) (Capability, error) {
6465
}
6566

6667
type AIModelConfig struct {
67-
Pipeline string `json:"pipeline"`
68-
ModelID string `json:"model_id"`
68+
Pipeline string `json:"pipeline"`
69+
ModelID string `json:"model_id"`
70+
// used by worker
6971
URL string `json:"url,omitempty"`
7072
Token string `json:"token,omitempty"`
7173
Warm bool `json:"warm,omitempty"`
72-
PricePerUnit JSONRat `json:"price_per_unit,omitempty"`
73-
PixelsPerUnit JSONRat `json:"pixels_per_unit,omitempty"`
74-
Currency string `json:"currency,omitempty"`
74+
Capacity int `json:"capacity,omitempty"`
7575
OptimizationFlags worker.OptimizationFlags `json:"optimization_flags,omitempty"`
76+
// used by orchestrator
77+
Gateway string `json:"gateway"`
78+
PricePerUnit JSONRat `json:"price_per_unit,omitempty"`
79+
PixelsPerUnit JSONRat `json:"pixels_per_unit,omitempty"`
80+
Currency string `json:"currency,omitempty"`
7681
}
7782

7883
func ParseAIModelConfigs(config string) ([]AIModelConfig, error) {
@@ -112,7 +117,7 @@ func ParseAIModelConfigs(config string) ([]AIModelConfig, error) {
112117
return configs, nil
113118
}
114119

115-
// parseStepsFromModelID parses the number of inference steps from the model ID suffix.
120+
// ParseStepsFromModelID parses the number of inference steps from the model ID suffix.
116121
func ParseStepsFromModelID(modelID *string, defaultSteps float64) float64 {
117122
numInferenceSteps := defaultSteps
118123

@@ -127,3 +132,102 @@ func ParseStepsFromModelID(modelID *string, defaultSteps float64) float64 {
127132

128133
return numInferenceSteps
129134
}
135+
136+
// AddAICapabilities adds AI capabilities to the node.
137+
func (n *LivepeerNode) AddAICapabilities(caps *Capabilities) {
138+
aiConstraints := caps.PerCapability()
139+
if aiConstraints == nil {
140+
return
141+
}
142+
143+
n.Capabilities.mutex.Lock()
144+
defer n.Capabilities.mutex.Unlock()
145+
for aiCapability, aiConstraint := range aiConstraints {
146+
_, capExists := n.Capabilities.constraints.perCapability[aiCapability]
147+
if !capExists {
148+
n.Capabilities.constraints.perCapability[aiCapability] = &CapabilityConstraints{
149+
Models: make(ModelConstraints),
150+
}
151+
}
152+
153+
for modelId, modelConstraint := range aiConstraint.Models {
154+
_, modelExists := n.Capabilities.constraints.perCapability[aiCapability].Models[modelId]
155+
if modelExists {
156+
n.Capabilities.constraints.perCapability[aiCapability].Models[modelId].Capacity += modelConstraint.Capacity
157+
} else {
158+
n.Capabilities.constraints.perCapability[aiCapability].Models[modelId] = &ModelConstraint{Warm: modelConstraint.Warm, Capacity: modelConstraint.Capacity}
159+
}
160+
}
161+
}
162+
}
163+
164+
// RemoveAICapabilities removes AI capabilities from the node.
165+
func (n *LivepeerNode) RemoveAICapabilities(caps *Capabilities) {
166+
aiConstraints := caps.PerCapability()
167+
if aiConstraints == nil {
168+
return
169+
}
170+
171+
n.Capabilities.mutex.Lock()
172+
defer n.Capabilities.mutex.Unlock()
173+
for capability, constraint := range aiConstraints {
174+
_, ok := n.Capabilities.constraints.perCapability[capability]
175+
if ok {
176+
for modelId, modelConstraint := range constraint.Models {
177+
_, modelExists := n.Capabilities.constraints.perCapability[capability].Models[modelId]
178+
if modelExists {
179+
n.Capabilities.constraints.perCapability[capability].Models[modelId].Capacity -= modelConstraint.Capacity
180+
if n.Capabilities.constraints.perCapability[capability].Models[modelId].Capacity <= 0 {
181+
delete(n.Capabilities.constraints.perCapability[capability].Models, modelId)
182+
}
183+
} else {
184+
glog.Errorf("failed to remove AI capability capacity, model does not exist pipeline=%v modelID=%v", capability, modelId)
185+
}
186+
}
187+
}
188+
}
189+
}
190+
191+
func (n *LivepeerNode) ReserveAICapability(pipeline string, modelID string) error {
192+
cap, err := PipelineToCapability(pipeline)
193+
if err != nil {
194+
return err
195+
}
196+
197+
_, hasCap := n.Capabilities.constraints.perCapability[cap]
198+
if hasCap {
199+
_, hasModel := n.Capabilities.constraints.perCapability[cap].Models[modelID]
200+
if hasModel {
201+
n.Capabilities.mutex.Lock()
202+
defer n.Capabilities.mutex.Unlock()
203+
if n.Capabilities.constraints.perCapability[cap].Models[modelID].Capacity > 0 {
204+
n.Capabilities.constraints.perCapability[cap].Models[modelID].Capacity -= 1
205+
} else {
206+
return fmt.Errorf("failed to reserve AI capability capacity, model capacity is 0 pipeline=%v modelID=%v", pipeline, modelID)
207+
}
208+
return nil
209+
}
210+
return fmt.Errorf("failed to reserve AI capability capacity, model does not exist pipeline=%v modelID=%v", pipeline, modelID)
211+
}
212+
return fmt.Errorf("failed to reserve AI capability capacity, pipeline does not exist pipeline=%v modelID=%v", pipeline, modelID)
213+
}
214+
215+
func (n *LivepeerNode) ReleaseAICapability(pipeline string, modelID string) error {
216+
cap, err := PipelineToCapability(pipeline)
217+
if err != nil {
218+
return err
219+
}
220+
_, hasCap := n.Capabilities.constraints.perCapability[cap]
221+
if hasCap {
222+
_, hasModel := n.Capabilities.constraints.perCapability[cap].Models[modelID]
223+
if hasModel {
224+
n.Capabilities.mutex.Lock()
225+
defer n.Capabilities.mutex.Unlock()
226+
n.Capabilities.constraints.perCapability[cap].Models[modelID].Capacity += 1
227+
228+
return nil
229+
}
230+
return fmt.Errorf("failed to release AI capability capacity, model does not exist pipeline=%v modelID=%v", pipeline, modelID)
231+
}
232+
return fmt.Errorf("failed to release AI capability capacity, pipeline does not exist pipeline=%v modelID=%v", pipeline, modelID)
233+
}

0 commit comments

Comments
 (0)