Skip to content

Commit

Permalink
Added monitoring of bandwidth from tuner and to (first) client
Browse files Browse the repository at this point in the history
Added counter when video packets are lost because client can't keep up
  • Loading branch information
Craig Moksnes committed Mar 2, 2020
1 parent 5ae8b6e commit 4de46f0
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 7 deletions.
2 changes: 1 addition & 1 deletion ProxyFormUnit.pas
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ procedure TMainForm.FillTunerStatistics;

for i := 0 to High(lStatsArray) do
begin
lbStats.Items[i].Text := Format('%d. Channel: %d, From tuner: %d, To client: %d', [i+1, lStatsArray[i].Channel, lStatsArray[i].PacketsReceived, lStatsArray[i].PacketsRead[0]]);
lbStats.Items[i].Text := Format('%d. Channel: %d, From tuner: %0.2fMbps, To client: %0.2fMbps, Lost: %d', [i+1, lStatsArray[i].Channel, lStatsArray[i].InMeter.GetBytesPerSecond(True)/1024/1024, lStatsArray[i].OutMeter.GetBytesPerSecond(True)/1024/1024, lStatsArray[i].Lost]);
end;
end;

Expand Down
90 changes: 90 additions & 0 deletions SocketUtils.pas
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ interface
System.SysUtils,
System.SyncObjs,
System.Math,
System.Diagnostics,
System.Timespan,
FMX.Types,
IdUDPServer,
IdGlobal,
Expand Down Expand Up @@ -79,6 +81,7 @@ TVideoReader = record
IVideoStats = interface
procedure PacketReceived(const aPacketIndex: Integer; const aPacket: TVideoPacket);
procedure PacketRead(const aReaderIndex: Integer; const aPacketIndex: Integer; const aPacket: TVideoPacket);
procedure ReaderSlow(const aReaderIndex: Integer; const aPacketIndex: Integer; const aPacket: TVideoPacket);
procedure ReaderWait(const aReaderIndex: Integer; const aPacketIndex: Integer);
end;

Expand Down Expand Up @@ -134,6 +137,25 @@ TRTPVideoSink = class(TInterfacedObject, IRTPVideoSink)
destructor Destroy; override;
end;

TDataMeterWindow = type String;

TDataMeter = record
private
type
PSnapshot = ^TSnapshot;
TSnapshot = record
Ticks, Bytes: Int64;
end;
private
fSnapshots: array[0..3] of TSnapshot;
fSnapshotIndex: Integer;
fLastTicks: Int64;
public
procedure Add(const aBytes: Int64);
procedure Reset;
function GetBytesPerSecond(const aCurrent: Boolean = False; const aWindowTicks: Int64 = 0): Double;
end;

function Swap16(const aValue: UInt16): UInt16;
function Swap32(const aValue: UInt32): UInt32;
function Swap64(const aValue: UInt64): UInt64;
Expand Down Expand Up @@ -412,7 +434,12 @@ procedure TRTPVideoSink.UDPRead(AThread: TIdUDPListenerThread;
for i := 0 to High(fReaderPacketIndexes) do
begin
if fReaderPacketIndexes[i] = fWritePacketIndex then
begin
if Assigned(fStats) then
fStats.ReaderSlow(i, fReaderPacketIndexes[i], lPacket^);

fReaderPacketIndexes[i] := (fReaderPacketIndexes[i] + 1) mod Length(fPackets);
end;
end;

fWriteEvent.SetEvent;
Expand Down Expand Up @@ -570,4 +597,67 @@ procedure TRTPVideoSink.Close;
fClosed := True;
end;

{ TDataMeter }

procedure TDataMeter.Add(const aBytes: Int64);
var
lTicks: Int64;
begin
lTicks := TStopWatch.GetTimestamp;

if fSnapshots[fSnapshotIndex].Ticks > 0 then
begin
if lTicks > fSnapshots[fSnapshotIndex].Ticks+TTimeSpan.TicksPerSecond then
begin
fSnapshotIndex := (fSnapshotIndex + 1) mod Length(fSnapshots);
fSnapshots[fSnapshotIndex].Bytes := 0;
fSnapshots[fSnapshotIndex].Ticks := lTicks;
end;
end
else
fSnapshots[fSnapshotIndex].Ticks := lTicks;

Inc(fSnapshots[fSnapshotIndex].Bytes, aBytes);
fLastTicks := lTicks;
end;

function TDataMeter.GetBytesPerSecond(const aCurrent: Boolean = False; const aWindowTicks: Int64 = 0): Double;
var
lIndex, i: Integer;
lBytes, lStartTicks, lCurrentTicks, lWindowTicks: Int64;
begin
if aCurrent then
lCurrentTicks := TStopWatch.GetTimeStamp
else
lCurrentTicks := fLastTicks;

lWindowTicks := aWindowTicks;
if lWindowTicks = 0 then
lWindowTicks := (Length(fSnapshots)+1) * TTimeSpan.TicksPerSecond;

lStartTicks := 0;
lBytes := 0;
lIndex := fSnapshotIndex+1;
for i := lIndex to lIndex+Length(fSnapshots) do
begin
lIndex := i mod Length(fSnapshots);
if fSnapshots[lIndex].Ticks >= lCurrentTicks-lWindowTicks then
begin
if lStartTicks = 0 then
lStartTicks := fSnapshots[lIndex].Ticks;
Inc(lBytes, fSnapshots[lIndex].Bytes);
end;
end;
if lStartTicks > 0 then
Result := lBytes / ((lCurrentTicks - lStartTicks) / TTimeSpan.TicksPerSecond)
else
Result := 0;
end;

procedure TDataMeter.Reset;
begin
FillChar(fSnapshots, SizeOf(fSnapshots), 0);
fLastTicks := 0;
end;

end.
21 changes: 15 additions & 6 deletions ceton/Ceton.pas
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,9 @@ TTunerStats = record
Channel: Integer;
Streaming: Boolean;
PacketsReceived: Integer;
PacketsRead: array[0..9] of Integer;
ReaderWait: array[0..9] of Integer;
Lost: Integer;
InMeter: TDataMeter;
OutMeter: TDataMeter;
end;

TTuner = class(TInterfacedPersistent, IVideoStats)
Expand All @@ -228,6 +229,7 @@ TTuner = class(TInterfacedPersistent, IVideoStats)
// IVideoStats
procedure PacketReceived(const aPacketIndex: Integer; const aPacket: TVideoPacket);
procedure PacketRead(const aReaderIndex: Integer; const aPacketIndex: Integer; const aPacket: TVideoPacket);
procedure ReaderSlow(const aReaderIndex: Integer; const aPacketIndex: Integer; const aPacket: TVideoPacket);
procedure ReaderWait(const aReaderIndex: Integer; const aPacketIndex: Integer);
public
constructor Create;
Expand Down Expand Up @@ -1288,19 +1290,19 @@ procedure TTuner.PacketReceived(const aPacketIndex: Integer;
const aPacket: TVideoPacket);
begin
Inc(fStats.PacketsReceived);
fStats.InMeter.Add(aPacket.Size);
end;

procedure TTuner.PacketRead(const aReaderIndex, aPacketIndex: Integer;
const aPacket: TVideoPacket);
begin
if aReaderIndex < Length(fStats.PacketsRead) then
Inc(fStats.PacketsRead[aReaderIndex]);
if aReaderIndex = 0 then
fStats.OutMeter.Add(aPacket.Size);
end;

procedure TTuner.ReaderWait(const aReaderIndex, aPacketIndex: Integer);
begin
if aReaderIndex < Length(fStats.PacketsRead) then
Inc(fStats.ReaderWait[aReaderIndex]);

end;

procedure TTuner.SetChannel(const Value: Integer);
Expand All @@ -1320,6 +1322,13 @@ function TTuner.ContainsSink(const aSink: IRTPVideoSink): Boolean;
Result := Assigned(fSink) and Assigned(aSink) and ((fSink as IRTPVideoSink) = (aSink as IRTPVideoSink));
end;

procedure TTuner.ReaderSlow(const aReaderIndex, aPacketIndex: Integer;
const aPacket: TVideoPacket);
begin
if aReaderIndex = 0 then
Inc(fStats.Lost);
end;

{ TCetonVideoStream }

function TCetonVideoStream.ConverterRead(const aBuf: PByte;
Expand Down

0 comments on commit 4de46f0

Please sign in to comment.