forked from openreplay/openreplay
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Video replay pipeline split (openreplay#1848)
* feat(backend): split * feat(docker): added ffmpeg to canvas-maker service
- Loading branch information
1 parent
97ee6c7
commit 68a87b2
Showing
29 changed files
with
1,305 additions
and
125 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
package main | ||
|
||
import ( | ||
"fmt" | ||
"log" | ||
"os" | ||
"os/signal" | ||
"syscall" | ||
"time" | ||
|
||
config "openreplay/backend/internal/config/imagestorage" | ||
"openreplay/backend/internal/imagestorage" | ||
"openreplay/backend/pkg/messages" | ||
"openreplay/backend/pkg/metrics" | ||
storageMetrics "openreplay/backend/pkg/metrics/imagestorage" | ||
"openreplay/backend/pkg/queue" | ||
) | ||
|
||
func main() { | ||
m := metrics.New() | ||
m.Register(storageMetrics.List()) | ||
|
||
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) | ||
|
||
cfg := config.New() | ||
|
||
srv, err := imagestorage.New(cfg) | ||
if err != nil { | ||
log.Printf("can't init storage service: %s", err) | ||
return | ||
} | ||
|
||
producer := queue.NewProducer(cfg.MessageSizeLimit, true) | ||
|
||
canvasConsumer := queue.NewConsumer( | ||
cfg.GroupImageStorage, | ||
[]string{ | ||
cfg.TopicCanvasImages, | ||
}, | ||
messages.NewImagesMessageIterator(func(data []byte, sessID uint64) { | ||
checkSessionEnd := func(data []byte) (messages.Message, error) { | ||
reader := messages.NewBytesReader(data) | ||
msgType, err := reader.ReadUint() | ||
if err != nil { | ||
return nil, err | ||
} | ||
if msgType != messages.MsgSessionEnd { | ||
return nil, fmt.Errorf("not a session end message") | ||
} | ||
msg, err := messages.ReadMessage(msgType, reader) | ||
if err != nil { | ||
return nil, fmt.Errorf("read message err: %s", err) | ||
} | ||
return msg, nil | ||
} | ||
|
||
if msg, err := checkSessionEnd(data); err == nil { | ||
sessEnd := msg.(*messages.SessionEnd) | ||
// Received session end | ||
if list, err := srv.PrepareCanvas(sessID); err != nil { | ||
log.Printf("can't prepare canvas: %s", err) | ||
} else { | ||
for _, name := range list { | ||
sessEnd.EncryptionKey = name | ||
if err := producer.Produce(cfg.TopicCanvasTrigger, sessID, sessEnd.Encode()); err != nil { | ||
log.Printf("can't send session end signal to video service: %s", err) | ||
} | ||
} | ||
} | ||
} else { | ||
if err := srv.ProcessCanvas(sessID, data); err != nil { | ||
log.Printf("can't process canvas image: %s", err) | ||
} | ||
} | ||
}, nil, true), | ||
false, | ||
cfg.MessageSizeLimit, | ||
) | ||
|
||
log.Printf("Canvas handler service started\n") | ||
|
||
sigchan := make(chan os.Signal, 1) | ||
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) | ||
|
||
counterTick := time.Tick(time.Second * 30) | ||
for { | ||
select { | ||
case sig := <-sigchan: | ||
log.Printf("Caught signal %v: terminating\n", sig) | ||
srv.Wait() | ||
canvasConsumer.Close() | ||
os.Exit(0) | ||
case <-counterTick: | ||
srv.Wait() | ||
if err := canvasConsumer.Commit(); err != nil { | ||
log.Printf("can't commit messages: %s", err) | ||
} | ||
case msg := <-canvasConsumer.Rebalanced(): | ||
log.Println(msg) | ||
default: | ||
err = canvasConsumer.ConsumeNext() | ||
if err != nil { | ||
log.Fatalf("Error on images consumption: %v", err) | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
package main | ||
|
||
import ( | ||
"log" | ||
"os" | ||
"os/signal" | ||
"strconv" | ||
"strings" | ||
"syscall" | ||
"time" | ||
|
||
config "openreplay/backend/internal/config/videostorage" | ||
"openreplay/backend/internal/videostorage" | ||
"openreplay/backend/pkg/messages" | ||
"openreplay/backend/pkg/metrics" | ||
storageMetrics "openreplay/backend/pkg/metrics/videostorage" | ||
"openreplay/backend/pkg/objectstorage/store" | ||
"openreplay/backend/pkg/queue" | ||
) | ||
|
||
func main() { | ||
m := metrics.New() | ||
m.Register(storageMetrics.List()) | ||
|
||
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) | ||
|
||
cfg := config.New() | ||
|
||
objStore, err := store.NewStore(&cfg.ObjectsConfig) | ||
if err != nil { | ||
log.Fatalf("can't init object storage: %s", err) | ||
} | ||
srv, err := videostorage.New(cfg, objStore) | ||
if err != nil { | ||
log.Printf("can't init storage service: %s", err) | ||
return | ||
} | ||
|
||
workDir := cfg.FSDir | ||
|
||
canvasConsumer := queue.NewConsumer( | ||
cfg.GroupVideoStorage, | ||
[]string{ | ||
cfg.TopicCanvasTrigger, | ||
}, | ||
messages.NewMessageIterator( | ||
func(msg messages.Message) { | ||
sesEnd := msg.(*messages.SessionEnd) | ||
filePath := workDir + "/canvas/" + strconv.FormatUint(sesEnd.SessionID(), 10) + "/" | ||
canvasMix := sesEnd.EncryptionKey // dirty hack to use encryption key as canvas mix holder (only between canvas handler and canvas maker) | ||
if canvasMix == "" { | ||
log.Printf("no canvas mix for session: %d", sesEnd.SessionID()) | ||
return | ||
} | ||
if err := srv.Process(sesEnd.SessionID(), filePath, canvasMix); err != nil { | ||
if !strings.Contains(err.Error(), "no such file or directory") { | ||
log.Printf("upload session err: %s, sessID: %d", err, msg.SessionID()) | ||
} | ||
} | ||
}, | ||
[]int{messages.MsgSessionEnd}, | ||
true, | ||
), | ||
false, | ||
cfg.MessageSizeLimit, | ||
) | ||
|
||
log.Printf("Canvas maker service started\n") | ||
|
||
sigchan := make(chan os.Signal, 1) | ||
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) | ||
|
||
counterTick := time.Tick(time.Second * 30) | ||
for { | ||
select { | ||
case sig := <-sigchan: | ||
log.Printf("Caught signal %v: terminating\n", sig) | ||
srv.Wait() | ||
canvasConsumer.Close() | ||
os.Exit(0) | ||
case <-counterTick: | ||
srv.Wait() | ||
if err := canvasConsumer.Commit(); err != nil { | ||
log.Printf("can't commit messages: %s", err) | ||
} | ||
case msg := <-canvasConsumer.Rebalanced(): | ||
log.Println(msg) | ||
default: | ||
err = canvasConsumer.ConsumeNext() | ||
if err != nil { | ||
log.Fatalf("Error on end event consumption: %v", err) | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.