Skip to content

Commit 4a2774a

Browse files
authored
Stream heartbeats (#3721)
* Implement stream heartbeat * Global heartbeat sending * New api key * add config vars * Review comments * fix * add test * review comment
1 parent 996ea75 commit 4a2774a

File tree

6 files changed

+191
-0
lines changed

6 files changed

+191
-0
lines changed

cmd/livepeer/starter/flags.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ func NewLivepeerConfig(fs *flag.FlagSet) LivepeerConfig {
6767
cfg.MediaMTXApiPassword = fs.String("mediaMTXApiPassword", "", "HTTP basic auth password for MediaMTX API requests")
6868
cfg.LiveAITrickleHostForRunner = fs.String("liveAITrickleHostForRunner", "", "Trickle Host used by AI Runner; It's used to overwrite the publicly available Trickle Host")
6969
cfg.LiveAIAuthApiKey = fs.String("liveAIAuthApiKey", "", "API key to use for Live AI authentication requests")
70+
cfg.LiveAIHeartbeatURL = fs.String("liveAIHeartbeatURL", "", "Base URL for Live AI heartbeat requests")
71+
cfg.LiveAIHeartbeatHeaders = fs.String("liveAIHeartbeatHeaders", "", "Map of headers to use for Live AI heartbeat requests. e.g. 'header:val,header2:val2'")
72+
cfg.LiveAIHeartbeatInterval = fs.Duration("liveAIHeartbeatInterval", *cfg.LiveAIHeartbeatInterval, "Interval to send Live AI heartbeat requests")
7073
cfg.LiveAIAuthWebhookURL = fs.String("liveAIAuthWebhookUrl", "", "Live AI RTMP authentication webhook URL")
7174
cfg.LivePaymentInterval = fs.Duration("livePaymentInterval", *cfg.LivePaymentInterval, "Interval to pay process Gateway <> Orchestrator Payments for Live AI Video")
7275
cfg.LiveOutSegmentTimeout = fs.Duration("liveOutSegmentTimeout", *cfg.LiveOutSegmentTimeout, "Timeout duration to wait the output segment to be available in the Live AI pipeline; defaults to no timeout")

cmd/livepeer/starter/starter.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,9 @@ type LivepeerConfig struct {
177177
KafkaGatewayTopic *string
178178
MediaMTXApiPassword *string
179179
LiveAIAuthApiKey *string
180+
LiveAIHeartbeatURL *string
181+
LiveAIHeartbeatHeaders *string
182+
LiveAIHeartbeatInterval *time.Duration
180183
LivePaymentInterval *time.Duration
181184
LiveOutSegmentTimeout *time.Duration
182185
LiveAICapRefreshModels *string
@@ -233,6 +236,7 @@ func DefaultLivepeerConfig() LivepeerConfig {
233236
defaultLivePaymentInterval := 5 * time.Second
234237
defaultLiveOutSegmentTimeout := 0 * time.Second
235238
defaultGatewayHost := ""
239+
defaultLiveAIHeartbeatInterval := 5 * time.Second
236240

237241
// Onchain:
238242
defaultEthAcctAddr := ""
@@ -348,6 +352,7 @@ func DefaultLivepeerConfig() LivepeerConfig {
348352
LivePaymentInterval: &defaultLivePaymentInterval,
349353
LiveOutSegmentTimeout: &defaultLiveOutSegmentTimeout,
350354
GatewayHost: &defaultGatewayHost,
355+
LiveAIHeartbeatInterval: &defaultLiveAIHeartbeatInterval,
351356

352357
// Onchain:
353358
EthAcctAddr: &defaultEthAcctAddr,
@@ -1674,6 +1679,22 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
16741679
if cfg.LiveAIAuthApiKey != nil {
16751680
n.LiveAIAuthApiKey = *cfg.LiveAIAuthApiKey
16761681
}
1682+
if cfg.LiveAIHeartbeatURL != nil {
1683+
n.LiveAIHeartbeatURL = *cfg.LiveAIHeartbeatURL
1684+
}
1685+
if cfg.LiveAIHeartbeatInterval != nil {
1686+
n.LiveAIHeartbeatInterval = *cfg.LiveAIHeartbeatInterval
1687+
}
1688+
if cfg.LiveAIHeartbeatHeaders != nil {
1689+
n.LiveAIHeartbeatHeaders = make(map[string]string)
1690+
headers := strings.Split(*cfg.LiveAIHeartbeatHeaders, ",")
1691+
for _, header := range headers {
1692+
parts := strings.SplitN(header, ":", 2)
1693+
if len(parts) == 2 {
1694+
n.LiveAIHeartbeatHeaders[parts[0]] = parts[1]
1695+
}
1696+
}
1697+
}
16771698
n.LivePaymentInterval = *cfg.LivePaymentInterval
16781699
n.LiveOutSegmentTimeout = *cfg.LiveOutSegmentTimeout
16791700
if cfg.LiveAITrickleHostForRunner != nil {

core/livepeernode.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,9 @@ type LivepeerNode struct {
160160
MediaMTXApiPassword string
161161
LiveAITrickleHostForRunner string
162162
LiveAIAuthApiKey string
163+
LiveAIHeartbeatURL string
164+
LiveAIHeartbeatHeaders map[string]string
165+
LiveAIHeartbeatInterval time.Duration
163166
LivePaymentInterval time.Duration
164167
LiveOutSegmentTimeout time.Duration
165168
LiveAICapRefreshModels []string
@@ -171,6 +174,7 @@ type LivepeerNode struct {
171174

172175
type LivePipeline struct {
173176
RequestID string
177+
StreamID string
174178
Params []byte
175179
Pipeline string
176180
ControlPub *trickle.TricklePublisher

server/ai_live_video.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,7 @@ func registerControl(ctx context.Context, params aiRequestParams) {
512512
params.node.LivePipelines[stream] = &core.LivePipeline{
513513
RequestID: params.liveParams.requestID,
514514
Pipeline: params.liveParams.pipeline,
515+
StreamID: params.liveParams.streamID,
515516
}
516517
}
517518

server/ai_mediaserver.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ func startAIMediaServer(ctx context.Context, ls *LivepeerServer) error {
113113
ls.HTTPMux.Handle("/process/request/", ls.SubmitJob())
114114

115115
media.StartFileCleanup(ctx, ls.LivepeerNode.WorkDir)
116+
117+
startHearbeats(ctx, ls.LivepeerNode)
116118
return nil
117119
}
118120

@@ -1319,3 +1321,69 @@ func (ls *LivepeerServer) SmokeTestLiveVideo() http.Handler {
13191321
}()
13201322
})
13211323
}
1324+
1325+
func startHearbeats(ctx context.Context, node *core.LivepeerNode) {
1326+
if node.LiveAIHeartbeatURL == "" {
1327+
return
1328+
}
1329+
1330+
go func() {
1331+
ticker := time.NewTicker(node.LiveAIHeartbeatInterval)
1332+
defer ticker.Stop()
1333+
for {
1334+
select {
1335+
case <-ctx.Done():
1336+
return
1337+
case <-ticker.C:
1338+
sendHeartbeat(ctx, node, node.LiveAIHeartbeatURL, node.LiveAIHeartbeatHeaders)
1339+
}
1340+
}
1341+
}()
1342+
}
1343+
1344+
func getStreamIDs(node *core.LivepeerNode) []string {
1345+
node.LiveMu.Lock()
1346+
defer node.LiveMu.Unlock()
1347+
var streamIDs []string
1348+
for _, pipeline := range node.LivePipelines {
1349+
streamIDs = append(streamIDs, pipeline.StreamID)
1350+
}
1351+
return streamIDs
1352+
}
1353+
1354+
func sendHeartbeat(ctx context.Context, node *core.LivepeerNode, liveAIHeartbeatURL string, liveAIHeartbeatHeaders map[string]string) {
1355+
streamIDs := getStreamIDs(node)
1356+
1357+
reqBody, err := json.Marshal(map[string]interface{}{
1358+
"ids": streamIDs,
1359+
})
1360+
if err != nil {
1361+
clog.Errorf(ctx, "heartbeat: failed to marshal request body %s", err)
1362+
return
1363+
}
1364+
1365+
request, err := http.NewRequest("POST", liveAIHeartbeatURL, bytes.NewReader(reqBody))
1366+
if err != nil {
1367+
clog.Errorf(ctx, "heartbeat: failed to build heartbeat request %s", err)
1368+
return
1369+
}
1370+
1371+
request.Header.Set("Content-Type", "application/json")
1372+
for key, value := range liveAIHeartbeatHeaders {
1373+
request.Header.Set(key, value)
1374+
}
1375+
1376+
resp, err := http.DefaultClient.Do(request)
1377+
if err != nil {
1378+
clog.Errorf(ctx, "heartbeat: failed to send heartbeat %s", err)
1379+
return
1380+
}
1381+
defer resp.Body.Close()
1382+
1383+
if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusMultipleChoices {
1384+
body, _ := io.ReadAll(resp.Body)
1385+
clog.Errorf(ctx, "heartbeat: failed to send heartbeat %s", resp.Status)
1386+
clog.Errorf(ctx, "heartbeat: response body: %s", string(body))
1387+
return
1388+
}
1389+
}

server/ai_mediaserver_test.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package server
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"io"
7+
"net/http"
8+
"net/http/httptest"
9+
"sync"
10+
"testing"
11+
12+
"github.com/livepeer/go-livepeer/core"
13+
"github.com/stretchr/testify/assert"
14+
"github.com/stretchr/testify/require"
15+
)
16+
17+
// mockLivepeerNode creates a mock LivepeerNode with test pipelines
18+
func mockLivepeerNode(streamIDs []string) *core.LivepeerNode {
19+
node := &core.LivepeerNode{
20+
LiveMu: &sync.RWMutex{},
21+
LivePipelines: make(map[string]*core.LivePipeline),
22+
}
23+
24+
for _, streamID := range streamIDs {
25+
pipeline := &core.LivePipeline{
26+
StreamID: streamID,
27+
}
28+
node.LivePipelines[streamID] = pipeline
29+
}
30+
31+
return node
32+
}
33+
34+
func TestSendHeartbeat_Success(t *testing.T) {
35+
// Test successful heartbeat with valid stream IDs
36+
streamIDs := []string{"stream1", "stream2", "stream3"}
37+
node := mockLivepeerNode(streamIDs)
38+
39+
// Create test server to capture the request
40+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
41+
// Verify request method and headers
42+
assert.Equal(t, "POST", r.Method)
43+
assert.Equal(t, "application/json", r.Header.Get("Content-Type"))
44+
assert.Equal(t, "test-value", r.Header.Get("X-Test-Header"))
45+
46+
// Verify request body
47+
body, err := io.ReadAll(r.Body)
48+
require.NoError(t, err)
49+
50+
var requestData map[string][]string
51+
err = json.Unmarshal(body, &requestData)
52+
require.NoError(t, err)
53+
54+
ids, ok := requestData["ids"]
55+
require.True(t, ok)
56+
require.Len(t, ids, 3)
57+
58+
assert.ElementsMatch(t, ids, streamIDs)
59+
60+
w.WriteHeader(http.StatusOK)
61+
}))
62+
defer server.Close()
63+
64+
headers := map[string]string{
65+
"X-Test-Header": "test-value",
66+
}
67+
68+
ctx := context.Background()
69+
sendHeartbeat(ctx, node, server.URL, headers)
70+
}
71+
72+
func TestSendHeartbeat_EmptyStreamIDs(t *testing.T) {
73+
// Test heartbeat with no stream IDs
74+
node := mockLivepeerNode([]string{})
75+
76+
// Create test server to capture the request
77+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
78+
// Verify request body contains empty stream list
79+
body, err := io.ReadAll(r.Body)
80+
require.NoError(t, err)
81+
82+
var requestData map[string][]string
83+
err = json.Unmarshal(body, &requestData)
84+
require.NoError(t, err)
85+
86+
require.Len(t, requestData["ids"], 0, "ids array should be empty")
87+
88+
w.WriteHeader(http.StatusOK)
89+
}))
90+
defer server.Close()
91+
92+
ctx := context.Background()
93+
sendHeartbeat(ctx, node, server.URL, map[string]string{})
94+
}

0 commit comments

Comments
 (0)