Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add more statistics and /status.json page #40

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,15 @@ else
CFLAGS += -DGIT_VER=\"$(VERSION)\"
endif

ifndef WITH_JSON
$(info enabeling JSON support)
$(info if you don't want JSON support run)
$(info make WITH_JSON=0)
WITH_JSON = 1
endif

CFLAGS += -DWITH_JSON=$(WITH_JSON)

RM = /bin/rm -f
Q = @

Expand All @@ -23,6 +32,10 @@ else
LIBS = -lpthread -lm -lrt
endif

ifeq "$(WITH_JSON)" "1"
LIBS += -ljson-c
endif

FUNCS_DIR = libfuncs
FUNCS_LIB = $(FUNCS_DIR)/libfuncs.a

Expand Down
18 changes: 17 additions & 1 deletion README
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ is used in production in couple of small DVB-C networks.

Installation
============
mptsd do not depend on any external libraries. There are two source code
mptsd has no hard dependencies on any external libraries. There are two source code
dependancies that come with mptsd - libfuncs and libtsfuncs.
The JSON-C library is used for some optional functionality but can be disabled.

Make sure your kernel has CONFIG_HIGH_RES_TIMERS enabled. Otherwise sleep
timeout probably won't be able to calibrate itself and mptsd will not work.
Expand Down Expand Up @@ -43,6 +44,14 @@ mptsd was tested and found to be working ok with Dektec DTE-3114 & HiDes UT-100C
To enable RTP output instead of plain UDP for network streams,
specify the SSRC identifier via the -s flag (must be != 0).

Integrated Webserver
=========
When mptsd is started with the -p option, it will start a webserver at the
specified port.

If JSON support is enabled, sending a GET request to /status.json will return a
JSON file containing various details about the configuration and statistics.

Development
===========
The development is tracked using git. The repository is hosted at github
Expand All @@ -54,11 +63,18 @@ to get it, run the following command:
OR
git submodule update --recursive --remote // if you like to checkout HEAD submodules.


Compiling
=========
mptsd can use the JSON-C library.
On debian based systems it can be installed using
`sudo apt install libjson-c-dev`

After cloning the git repository as described in Development section
just run `make`.

If you don't want JSON support run `make WITH_JSON=0`

Releases
========
Official releases can be downloaded from tsdecrypt home page which is:
Expand Down
112 changes: 112 additions & 0 deletions data.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <stdarg.h>
#include <stdint.h>
#include <float.h>

#include "libfuncs/io.h"
#include "libfuncs/log.h"
Expand Down Expand Up @@ -256,6 +259,11 @@ INPUT * input_new(const char *name, CHANNEL *channel) {
r->sock = -1;
r->channel = channel;

r->rtp_stats.last_sequence_number = -1;
r->traffic_stats.min.traffic = UINT64_MAX;
r->traffic_stats.min.kpbs = DBL_MAX;
r->traffic_stats.min.padding = DBL_MAX;

if (config->write_input_file) {
if (asprintf(&tmp, "mptsd-input-%s.ts", channel->id) > 0)
r->ifd = open(tmp, O_CREAT | O_WRONLY | O_TRUNC, 0644);
Expand Down Expand Up @@ -294,6 +302,10 @@ OUTPUT *output_new() {
OUTPUT *o = calloc(1, sizeof(OUTPUT));
o->obuf_ms = 100;

o->traffic_stats.min.traffic = UINT64_MAX;
o->traffic_stats.min.kpbs = DBL_MAX;
o->traffic_stats.min.padding = DBL_MAX;

o->psibuf = cbuf_init(50 * 1316, "psi");
if (!o->psibuf) {
LOGf("ERROR: Can't allocate PSI input buffer\n");
Expand Down Expand Up @@ -434,9 +446,109 @@ void proxy_log(INPUT *r, char *msg) {
LOGf("INPUT : [%-12s] %s fd: %d src: %s\n", r->channel->id, msg, r->sock, r->channel->source);
}

void proxy_logf(INPUT *r, const char *fmt, ...) {
char msg[1024];
va_list args;
va_start(args, fmt);
vsnprintf(msg, sizeof(msg) - 1, fmt, args);
va_end(args);
msg[sizeof(msg) - 1] = '\0';

proxy_log(r, msg);
}

void proxy_close(LIST *inputs, INPUT **input) {
proxy_log(*input, "Stop");
// If there are no clients left, no "Timeout" messages will be logged
list_del_entry(inputs, *input);
input_free(input);
}

ssize_t parse_rtp(const uint8_t *buf, size_t len, RTP_HEADER *rtp_header) {
size_t rtp_length = RTP_HEADER_SIZE;

if (len < RTP_HEADER_SIZE)
return -1;

rtp_header->version = buf[0] >> 6;
rtp_header->padding = !!((buf[0]) & (1 << 5));
rtp_header->extension = !!((buf[0]) & (1 << 4));
rtp_header->cc = buf[0] & 0xFF;

rtp_header->marker = !!((buf[1]) & (1 << 8));
rtp_header->payload_type = buf[1] & 0x7f;

// Sequence number
rtp_header->sequence_number = (buf[2] << 8) | (buf[3]);

// Timestamp
rtp_header->timestamp =
(buf[4] << 24) | (buf[5] << 16) | (buf[6] << 8) | (buf[7]);

// SSRC identifier
rtp_header->ssrc =
(buf[8] << 24) | (buf[9] << 16) | (buf[10] << 8) | (buf[11]);

rtp_length += rtp_header->cc * 4;

return rtp_length;
}

ssize_t handle_rtp_input(uint8_t *buf, size_t len, INPUT *input) {
RTP_HEADER rtp_header;
ssize_t rtp_length = parse_rtp(buf, len, &rtp_header);

RTP_STATS *stats = &input->rtp_stats;

stats->packets_received++;
if (stats->last_sequence_number >= 0) {
uint16_t expected_sequence_number = stats->last_sequence_number + 1;
if (rtp_header.sequence_number != expected_sequence_number) {
int32_t lost_packets =
rtp_header.sequence_number - expected_sequence_number;
// counter wrapped
if (lost_packets < 0) {
lost_packets += 0xFFFF;
}
proxy_logf(
input,
"RTP packets lost at sequence number %u: %i (total: %lu)",
rtp_header.sequence_number, lost_packets, stats->packets_lost);

// if the sequence number is 1, assume that the
// source was restarted (don't update packet_lost
// counter)
if (rtp_header.sequence_number != 1) {
stats->packets_lost += lost_packets;
}
}
}
stats->last_sequence_number = rtp_header.sequence_number;
stats->ssrc = rtp_header.ssrc;

return rtp_length;
}

void update_traffic_stats(TRAFFIC_STATS *stats, double kbps, double padding,
uint64_t traffic) {
// last
stats->last.kpbs = kbps;
stats->last.padding = padding;
stats->last.traffic = traffic;

// min
if (kbps < stats->min.kpbs)
stats->min.kpbs = kbps;
if (padding < stats->min.padding)
stats->min.padding = padding;
if (traffic < stats->min.traffic)
stats->min.traffic = traffic;

// max
if (kbps > stats->max.kpbs)
stats->max.kpbs = kbps;
if (padding > stats->max.padding)
stats->max.padding = padding;
if (traffic > stats->max.traffic)
stats->max.traffic = traffic;
}
43 changes: 43 additions & 0 deletions data.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,37 @@

#include "pidref.h"

typedef struct {
uint8_t version : 2;
uint8_t padding : 1;
uint8_t extension : 1;
uint8_t cc : 4;
uint8_t marker : 1;
uint8_t payload_type : 7;
uint16_t sequence_number;
uint32_t timestamp;
uint32_t ssrc;
} RTP_HEADER;

typedef struct {
int32_t last_sequence_number;
uint32_t ssrc;
uint64_t packets_received;
uint64_t packets_lost;
} RTP_STATS;

typedef struct {
double kpbs;
double padding;
uint64_t traffic;
} TRAFFIC_STATS_ENTRY;

typedef struct {
TRAFFIC_STATS_ENTRY min;
TRAFFIC_STATS_ENTRY max;
TRAFFIC_STATS_ENTRY last;
} TRAFFIC_STATS;

typedef enum { udp_sock, tcp_sock } channel_source;

typedef struct {
Expand Down Expand Up @@ -134,6 +165,12 @@ typedef struct {
int cookie; /* Used in chanconf to determine if the restreamer is alrady checked */
int ifd;

uint64_t traffic;
uint64_t traffic_period;
uint64_t padding_period;
TRAFFIC_STATS traffic_stats;
RTP_STATS rtp_stats;

pthread_t thread;

uint16_t output_pcr_pid;
Expand Down Expand Up @@ -186,6 +223,8 @@ typedef struct {
uint64_t traffic_period;
uint64_t padding_period;

TRAFFIC_STATS traffic_stats;

uint8_t pid_pat_cont:4;
uint8_t pid_nit_cont:4;
uint8_t pid_sdt_cont:4;
Expand Down Expand Up @@ -261,4 +300,8 @@ void nit_free (NIT **nit);
void proxy_log (INPUT *r, char *msg);
void proxy_close (LIST *inputs, INPUT **input);

ssize_t handle_rtp_input (uint8_t *buf, size_t len, INPUT *input);
ssize_t parse_rtp (const uint8_t *buf, size_t len, RTP_HEADER *rtp_header);

void update_traffic_stats (TRAFFIC_STATS *stats, double kbps, double padding, uint64_t traffic);
#endif
46 changes: 36 additions & 10 deletions input.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <unistd.h>
#include <signal.h>
#include <string.h>
#include <inttypes.h>

#include "libfuncs/io.h"
#include "libfuncs/log.h"
Expand Down Expand Up @@ -376,8 +377,8 @@ int in_worktime(int start, int end) {
void * input_stream(void *self) {
INPUT *r = self;
INPUT_STREAM *s = &r->stream;
struct timeval stats_ts, now;
char buffer[RTP_HEADER_SIZE + FRAME_PACKET_SIZE];
char *buf = buffer + RTP_HEADER_SIZE;

signal(SIGPIPE, SIG_IGN);

Expand All @@ -387,6 +388,7 @@ void * input_stream(void *self) {
proxy_log(r, "Worktime has not yet begin, sleeping.");

int http_code = 0;
gettimeofday(&stats_ts, NULL);
while (keep_going) {
if (input_check_state(r) == 2) // r->dienow is on
goto QUIT;
Expand Down Expand Up @@ -416,12 +418,15 @@ void * input_stream(void *self) {
}

ssize_t readen;
ssize_t rtp_length;
int i = 0;
int max_zero_reads = MAX_ZERO_READS;

// Reset all stream parameters on reconnect.
input_stream_reset(r);

for (;;) {
gettimeofday(&now, NULL);
r->working = in_worktime(r->channel->worktime_start, r->channel->worktime_end);
if (!r->working) {
proxy_log(r, "Worktime ended.");
Expand All @@ -434,14 +439,16 @@ void * input_stream(void *self) {
}

if (sproto == tcp_sock) {
readen = fdread_ex(r->sock, buf, FRAME_PACKET_SIZE, TCP_READ_TIMEOUT, TCP_READ_RETRIES, 1);
readen = fdread_ex(r->sock, buffer, FRAME_PACKET_SIZE, TCP_READ_TIMEOUT, TCP_READ_RETRIES, 1);
} else {
if (!rtp) {
readen = fdread_ex(r->sock, buf, FRAME_PACKET_SIZE, UDP_READ_TIMEOUT, UDP_READ_RETRIES, 0);
} else {
if (!rtp) { // plain UDP
readen = fdread_ex(r->sock, buffer, FRAME_PACKET_SIZE, UDP_READ_TIMEOUT, UDP_READ_RETRIES, 0);
} else { // RTP
readen = fdread_ex(r->sock, buffer, FRAME_PACKET_SIZE + RTP_HEADER_SIZE, UDP_READ_TIMEOUT, UDP_READ_RETRIES, 0);
if (readen > RTP_HEADER_SIZE)
readen -= RTP_HEADER_SIZE;
if (readen > RTP_HEADER_SIZE) {
rtp_length = handle_rtp_input((uint8_t *)buffer, readen, r);
i = rtp_length; // skip RTP header during TS reading
}
}
}

Expand All @@ -456,14 +463,20 @@ void * input_stream(void *self) {
continue;
}

int i;
for (i=0; i<readen; i+=188) {
r->traffic += readen - i;
r->traffic_period += readen - i;

for (; i<readen; i+=188) {

if (r->dienow)
goto QUIT;
uint8_t *ts_packet = (uint8_t *)buf + i;
uint8_t *ts_packet = (uint8_t *)buffer + i;
uint16_t pid = ts_packet_get_pid(ts_packet);

if(pid == 0x1FFF) { // NULL packets (Stuffing)
r->padding_period += 188;
}

int pat_result = process_pat(r, pid, ts_packet);
if (pat_result == -2)
goto RECONNECT;
Expand Down Expand Up @@ -505,6 +518,19 @@ void * input_stream(void *self) {
}
}

// stats
unsigned long long stats_interval = timeval_diff_msec(&stats_ts, &now);
if (stats_interval > config->timeouts.stats) {
stats_ts = now;
double kbps = (double)(r->traffic_period * 8) / 1000;
double padding = ((double)r->padding_period / r->traffic_period) * 100;

update_traffic_stats(&r->traffic_stats, kbps, padding, r->traffic_period);

r->traffic_period = 0;
r->padding_period = 0;
}

max_zero_reads = MAX_ZERO_READS;
}
proxy_log(r, "fdread timeout");
Expand Down
Loading