Skip to content

Commit

Permalink
fix: listen tcp failed return err
Browse files Browse the repository at this point in the history
  • Loading branch information
langhuihui committed Dec 15, 2024
1 parent 4a66d54 commit c161674
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 2 deletions.
7 changes: 6 additions & 1 deletion pkg/annexb.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,14 @@ func (a *AnnexB) Parse(t *AVTrack) (err error) {
switch codec.ParseH264NALUType(nalu.Buffers[0][0]) {
case codec.NALU_SPS:
ctx.RecordInfo.SPS = [][]byte{nalu.ToBytes()}
if len(ctx.RecordInfo.PPS) > 0 {
ctx.CodecData, err = h264parser.NewCodecDataFromSPSAndPPS(ctx.SPS(), ctx.PPS())
}
case codec.NALU_PPS:
ctx.RecordInfo.PPS = [][]byte{nalu.ToBytes()}
ctx.CodecData, err = h264parser.NewCodecDataFromSPSAndPPS(ctx.SPS(), ctx.PPS())
if len(ctx.RecordInfo.SPS) > 0 {
ctx.CodecData, err = h264parser.NewCodecDataFromSPSAndPPS(ctx.SPS(), ctx.PPS())
}
case codec.NALU_IDR_Picture:
t.Value.IDR = true
}
Expand Down
1 change: 1 addition & 0 deletions pkg/config/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func (task *ListenTCPWork) Start() (err error) {
task.Info("listen tcp")
} else {
task.Error("failed to listen tcp", "error", err)
return err
}
if task.handler == nil {
return nil
Expand Down
10 changes: 9 additions & 1 deletion plugin/gb28181/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ type GB28181Plugin struct {
tcpPorts chan uint16
}

var _ = m7s.InstallPlugin[GB28181Plugin](pb.RegisterApiHandler, &pb.Api_ServiceDesc, func() m7s.IPuller {
var _ = m7s.InstallPlugin[GB28181Plugin](pb.RegisterApiHandler, &pb.Api_ServiceDesc, func(conf config.Pull) m7s.IPuller {
if util.Exist(conf.URL) {
return &gb28181.DumpPuller{}
}
return new(Dialog)
})

Expand Down Expand Up @@ -365,6 +368,11 @@ func (gb *GB28181Plugin) StoreDevice(id string, req *sip.Request) (d *Device) {
}

func (gb *GB28181Plugin) Pull(streamPath string, conf config.Pull, pubConf *config.Publish) {
if util.Exist(conf.URL) {
var puller gb28181.DumpPuller
puller.GetPullJob().Init(&puller, &gb.Plugin, streamPath, conf, pubConf)
return
}
dialog := Dialog{
gb: gb,
}
Expand Down
41 changes: 41 additions & 0 deletions plugin/gb28181/pkg/puller-dump.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package gb28181

import (
"m7s.live/v5"
"m7s.live/v5/pkg/util"
"time"
)

type DumpPuller struct {
m7s.HTTPFilePuller
}

func (p *DumpPuller) Run() (err error) {
pub := p.PullJob.Publisher
pub.Type = m7s.PublishTypeReplay
puber := NewPSPublisher(pub)
puber.Receiver.Logger = p.Logger
go puber.Demux()
var t uint16
defer close(puber.Receiver.FeedChan)
for l := make([]byte, 6); pub.State != m7s.PublisherStateDisposed; time.Sleep(time.Millisecond * time.Duration(t)) {
_, err = p.Read(l)
if err != nil {
return
}
payloadLen := util.ReadBE[int](l[:4])
payload := make([]byte, payloadLen)
t = util.ReadBE[uint16](l[4:])
_, err = p.Read(payload)
if err != nil {
return
}
if err = puber.Receiver.ReadRTP(payload); err != nil {
p.Error("replayPS", "err", err)
}
if pub.IsStopped() {
return pub.StopReason()
}
}
return
}
5 changes: 5 additions & 0 deletions plugin/gb28181/pkg/transceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ func (p *PSPublisher) Demux() {
}
case StartCodeMAP:
p.decProgramStreamMap()
case StartCodeSYS, PrivateStreamCode:
p.ReadPayload()
default:
p.ReadPayload()
}
Expand Down Expand Up @@ -137,6 +139,9 @@ func (p *Receiver) ReadRTP(rtp util.Buffer) (err error) {
if err = p.Unmarshal(rtp); err != nil {
return
}
if p.Enabled(p, task.TraceLevel) {
p.Trace("rtp", "len", rtp.Len(), "seq", p.SequenceNumber, "payloadType", p.PayloadType, "ssrc", p.SSRC)
}
copyData := make([]byte, len(p.Payload))
copy(copyData, p.Payload)
p.FeedChan <- copyData
Expand Down
8 changes: 8 additions & 0 deletions plugin/mp4/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ func (p *MP4Plugin) List(ctx context.Context, req *pb.ReqRecordList) (resp *pb.R
}

func (p *MP4Plugin) Catalog(ctx context.Context, req *emptypb.Empty) (resp *pb.ResponseCatalog, err error) {
if p.DB == nil {
err = pkg.ErrNoDB
return
}
resp = &pb.ResponseCatalog{}
var result []struct {
StreamPath string
Expand Down Expand Up @@ -151,6 +155,10 @@ func (p *MP4Plugin) Delete(ctx context.Context, req *pb.ReqRecordDelete) (resp *
}

func (p *MP4Plugin) download(w http.ResponseWriter, r *http.Request) {
if p.DB == nil {
http.Error(w, pkg.ErrNoDB.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "video/mp4")
streamPath := r.PathValue("streamPath")

Expand Down

0 comments on commit c161674

Please sign in to comment.