Skip to content

Commit 959ae10

Browse files
fix suspension
1 parent a65731d commit 959ae10

File tree

1 file changed

+32
-7
lines changed

1 file changed

+32
-7
lines changed

server/ai_session.go

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,16 @@ type AISessionPool struct {
2929
sessMap map[string]*BroadcastSession
3030
inUseSess []*BroadcastSession
3131
suspender *suspender
32+
penalty int
3233
mu sync.RWMutex
3334
}
3435

35-
func NewAISessionPool(selector BroadcastSessionsSelector, suspender *suspender) *AISessionPool {
36+
func NewAISessionPool(selector BroadcastSessionsSelector, suspender *suspender, penalty int) *AISessionPool {
3637
return &AISessionPool{
3738
selector: selector,
3839
sessMap: make(map[string]*BroadcastSession),
3940
suspender: suspender,
41+
penalty: penalty,
4042
mu: sync.RWMutex{},
4143
}
4244
}
@@ -122,10 +124,17 @@ func (pool *AISessionPool) Remove(sess *BroadcastSession) {
122124
delete(pool.sessMap, sess.Transcoder())
123125
pool.inUseSess = removeSessionFromList(pool.inUseSess, sess)
124126

125-
// Magic number for now
126-
penalty := 3
127+
penalty := 0
127128
// If this method is called assume that the orch should be suspended
128-
// as well
129+
// as well. Since AISessionManager re-uses the pools the suspension
130+
// penalty needs to consider the current suspender count to set the penalty
131+
last_count, ok := pool.suspender.list[sess.Transcoder()]
132+
if ok {
133+
penalty = pool.suspender.count - last_count + pool.penalty
134+
} else {
135+
penalty = pool.suspender.count + pool.penalty
136+
}
137+
129138
pool.suspender.suspend(sess.Transcoder(), penalty)
130139
}
131140

@@ -152,12 +161,14 @@ type AISessionSelector struct {
152161
// The time until the pools should be refreshed with orchs from discovery
153162
ttl time.Duration
154163
lastRefreshTime time.Time
164+
initialPoolSize int
155165

156166
cap core.Capability
157167
modelID string
158168

159169
node *core.LivepeerNode
160170
suspender *suspender
171+
penalty int
161172
os drivers.OSSession
162173
}
163174

@@ -172,8 +183,9 @@ func NewAISessionSelector(cap core.Capability, modelID string, node *core.Livepe
172183
// The latency score in this context is just the latency of the last completed request for a session
173184
// The "good enough" latency score is set to 0.0 so the selector will always select unknown sessions first
174185
minLS := 0.0
175-
warmPool := NewAISessionPool(NewMinLSSelector(stakeRdr, minLS, node.SelectionAlgorithm, node.OrchPerfScore), suspender)
176-
coldPool := NewAISessionPool(NewMinLSSelector(stakeRdr, minLS, node.SelectionAlgorithm, node.OrchPerfScore), suspender)
186+
penalty := 3
187+
warmPool := NewAISessionPool(NewMinLSSelector(stakeRdr, minLS, node.SelectionAlgorithm, node.OrchPerfScore), suspender, penalty)
188+
coldPool := NewAISessionPool(NewMinLSSelector(stakeRdr, minLS, node.SelectionAlgorithm, node.OrchPerfScore), suspender, penalty)
177189
sel := &AISessionSelector{
178190
warmPool: warmPool,
179191
coldPool: coldPool,
@@ -182,6 +194,7 @@ func NewAISessionSelector(cap core.Capability, modelID string, node *core.Livepe
182194
modelID: modelID,
183195
node: node,
184196
suspender: suspender,
197+
penalty: penalty,
185198
os: drivers.NodeStorage.NewSession(strconv.Itoa(int(cap)) + "_" + modelID),
186199
}
187200

@@ -196,7 +209,17 @@ func (sel *AISessionSelector) Select(ctx context.Context) *AISession {
196209
shouldRefreshSelector := func() bool {
197210
// Refresh if the # of sessions across warm and cold pools falls below the smaller of the maxRefreshSessionsThreshold and
198211
// 1/2 the total # of orchs that can be queried during discovery
199-
discoveryPoolSize := sel.node.OrchestratorPool.Size()
212+
discoveryPoolSize := int(math.Min(float64(sel.node.OrchestratorPool.Size()), float64(sel.initialPoolSize)))
213+
214+
if (sel.warmPool.Size() + sel.coldPool.Size()) == 0 {
215+
//release all orchestrators from suspension and try refresh
216+
//if penalty in
217+
clog.Infof(ctx, "refreshing sessions, no orchestrators in pools")
218+
for i := 0; i < sel.penalty; i++ {
219+
sel.suspender.signalRefresh()
220+
}
221+
}
222+
200223
if sel.warmPool.Size()+sel.coldPool.Size() < int(math.Min(maxRefreshSessionsThreshold, math.Ceil(float64(discoveryPoolSize)/2.0))) {
201224
return true
202225
}
@@ -257,6 +280,7 @@ func (sel *AISessionSelector) Refresh(ctx context.Context) error {
257280

258281
var warmSessions []*BroadcastSession
259282
var coldSessions []*BroadcastSession
283+
260284
for _, sess := range sessions {
261285
// If the constraints are missing for this capability skip this session
262286
constraints, ok := sess.OrchestratorInfo.Capabilities.Constraints[uint32(sel.cap)]
@@ -279,6 +303,7 @@ func (sel *AISessionSelector) Refresh(ctx context.Context) error {
279303

280304
sel.warmPool.Add(warmSessions)
281305
sel.coldPool.Add(coldSessions)
306+
sel.initialPoolSize = len(warmSessions) + len(coldSessions) + len(sel.suspender.list)
282307

283308
sel.lastRefreshTime = time.Now()
284309

0 commit comments

Comments
 (0)