Skip to content

Commit

Permalink
Greater delay between EAGAIN/EWOULDBLOCK attempts to avoid video corr…
Browse files Browse the repository at this point in the history
…uption
  • Loading branch information
wberube committed Oct 24, 2024
1 parent 1d444f7 commit 1b9b04b
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 59 deletions.
87 changes: 40 additions & 47 deletions src/rtsp/rtp.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ static inline int __transfer_nal_h26x(struct list_head_t *trans_list, unsigned c

if (nalsize < 4) return SUCCESS;

if (nalsize <= __RTP_MAXPAYLOADSIZE){
if (nalsize <= __RTP_MAXPAYLOADSIZE) {
/* single packet */
/* SPS, PPS, SEI is not marked */
if ((isH265 && pt < H265_NAL_TYPE_VPS) ||
Expand All @@ -74,7 +74,7 @@ static inline int __transfer_nal_h26x(struct list_head_t *trans_list, unsigned c

rtp.rtpsize = nalsize + sizeof(rtp_hdr_t);

ASSERT(__rtp_send(&rtp,trans_list) == SUCCESS, return FAILURE);
ASSERT(__rtp_send(&rtp, trans_list) == SUCCESS, return FAILURE);
} else {
nalptr += isH265 ? 2 : 1;
nalsize -= isH265 ? 2 : 1;
Expand All @@ -92,7 +92,7 @@ static inline int __transfer_nal_h26x(struct list_head_t *trans_list, unsigned c
payload[head - 1] |= 1 << 7;

/* send fragmented nal */
while(nalsize > __RTP_MAXPAYLOADSIZE - head){
while (nalsize > __RTP_MAXPAYLOADSIZE - head) {
p_header->m = 0;

memcpy(&(payload[head]), nalptr, __RTP_MAXPAYLOADSIZE - head);
Expand All @@ -102,7 +102,7 @@ static inline int __transfer_nal_h26x(struct list_head_t *trans_list, unsigned c
nalptr += __RTP_MAXPAYLOADSIZE - head;
nalsize -= __RTP_MAXPAYLOADSIZE - head;

ASSERT(__rtp_send(&rtp,trans_list) == SUCCESS, return FAILURE);
ASSERT(__rtp_send(&rtp, trans_list) == SUCCESS, return FAILURE);

/* intended xor. blame vim :( */
payload[head - 1] &= 0xFF ^ (1<<7);
Expand Down Expand Up @@ -146,7 +146,7 @@ static inline int __transfer_nal_mpga(struct list_head_t *trans_list, unsigned c

rtp.rtpsize = size + sizeof(rtp_hdr_t);

ASSERT(__rtp_send(&rtp,trans_list) == SUCCESS, return FAILURE);
ASSERT(__rtp_send(&rtp, trans_list) == SUCCESS, return FAILURE);

return SUCCESS;
}
Expand Down Expand Up @@ -174,15 +174,15 @@ static inline int __rtp_send_eachconnection(struct list_t *e, void *v)
send_bytes = send(con->trans[track_id].server_rtp_fd,
&(rtp->packet),rtp->rtpsize,0);

if(send_bytes == rtp->rtpsize) {
if (send_bytes == rtp->rtpsize) {
con->trans[track_id].rtcp_packet_cnt += 1;
con->trans[track_id].rtcp_octet += rtp->rtpsize;
return SUCCESS;
} else if(con->con_state != __CON_S_PLAYING) {
DBG("connection state changed before send\n");
return SUCCESS;
} else
usleep(1000);
usleep(5000);
} while (++attempts < 10 &&
send_bytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK));

Expand All @@ -192,7 +192,7 @@ static inline int __rtp_send_eachconnection(struct list_t *e, void *v)

static inline int __rtp_send(struct nal_rtp_t *rtp, struct list_head_t *trans_list)
{
return list_map_inline(trans_list,(__rtp_send_eachconnection), rtp);
return list_map_inline(trans_list, (__rtp_send_eachconnection), rtp);
}


Expand All @@ -209,7 +209,7 @@ static inline int __rtp_setup_transfer(struct list_t *e, void *v)
MUST(bufpool_attach(con->pool, con) == SUCCESS,
return FAILURE);

if(con->con_state == __CON_S_PLAYING) {
if (con->con_state == __CON_S_PLAYING) {

ASSERT(bufpool_get_free(trans_set->h->transfer_pool, &trans) == SUCCESS, ({
ERR("transfer object resouce starvation detected. possibly connection limits are wrongfully setup\n");
Expand All @@ -220,7 +220,7 @@ static inline int __rtp_setup_transfer(struct list_t *e, void *v)

trans->con = con;

MUST(list_push(&trans_set->list_head,&trans->list_entry) == SUCCESS,
MUST(list_push(&trans_set->list_head, &trans->list_entry) == SUCCESS,
goto error);

timestamp_offset = trans_set->h->stat.ts_offset;
Expand All @@ -245,18 +245,18 @@ static inline int __retrieve_sprop(rtsp_handle h, unsigned char *buf, size_t len
mime_encoded_handle base16 = NULL;

/* check VPS is set */
if(h->isH265 && !(h->sprop_vps_b64)){
if (h->isH265 && !(h->sprop_vps_b64)) {
nalptr = buf;
single_len = 0;
while (__split_nal(buf,&nalptr,&single_len,len) == SUCCESS) {
while (__split_nal(buf, &nalptr, &single_len, len) == SUCCESS) {
if (nalptr[0] >> 1 & 0x3F == H265_NAL_TYPE_VPS) {
ASSERT(base64 = mime_base64_create((char *)&(nalptr[0]),single_len), return FAILURE);
ASSERT(base64 = mime_base64_create((char *)&(nalptr[0]), single_len), return FAILURE);

DASSERT(base64->base == 64, return FAILURE);

/* optimistic lock */
rtsp_lock(h);
if(h->sprop_vps_b64) {
if (h->sprop_vps_b64) {
DBG("vps is set by another thread?\n");
mime_encoded_delete(base64);
} else {
Expand All @@ -271,29 +271,29 @@ static inline int __retrieve_sprop(rtsp_handle h, unsigned char *buf, size_t len
}

/* check SPS is set */
if(!(h->sprop_sps_b64)){
if (!(h->sprop_sps_b64)) {
nalptr = buf;
single_len = 0;

while (__split_nal(buf,&nalptr,&single_len,len) == SUCCESS) {
while (__split_nal(buf, &nalptr, &single_len, len) == SUCCESS) {
if ((!(h->isH265) && nalptr[0] & 0x1F == H264_NAL_TYPE_SPS) ||
(h->isH265 && nalptr[0] >> 1 & 0x3F == H265_NAL_TYPE_SPS)) {
ASSERT(base64 = mime_base64_create((char *)&(nalptr[0]),single_len), return FAILURE);
ASSERT(base16 = mime_base16_create((char *)&(nalptr[1]),3), return FAILURE);
ASSERT(base64 = mime_base64_create((char *)&(nalptr[0]), single_len), return FAILURE);
ASSERT(base16 = mime_base16_create((char *)&(nalptr[1]), 3), return FAILURE);

DASSERT(base16->base == 16, return FAILURE);
DASSERT(base64->base == 64, return FAILURE);

/* optimistic lock */
rtsp_lock(h);
if(h->sprop_sps_b64) {
if (h->sprop_sps_b64) {
DBG("sps is set by another thread?\n");
mime_encoded_delete(base64);
} else {
h->sprop_sps_b64 = base64;
}

if(h->sprop_sps_b16) {
if (h->sprop_sps_b16) {
DBG("sps is set by another thread?\n");
mime_encoded_delete(base16);
} else {
Expand All @@ -308,20 +308,20 @@ static inline int __retrieve_sprop(rtsp_handle h, unsigned char *buf, size_t len
}

/* check PPS is set */
if(!(h->sprop_pps_b64)){
if (!(h->sprop_pps_b64)) {
nalptr = buf;
single_len = 0;
while (__split_nal(buf,&nalptr,&single_len,len) == SUCCESS) {
while (__split_nal(buf, &nalptr, &single_len, len) == SUCCESS) {
if ((!(h->isH265) && nalptr[0] & 0x1F == H264_NAL_TYPE_PPS) ||
(h->isH265 && nalptr[0] >> 1 & 0x3F == H265_NAL_TYPE_PPS)) {
ASSERT(single_len >= 4, return FAILURE);
ASSERT(base64 = mime_base64_create((char *)&(nalptr[0]),single_len), return FAILURE);
ASSERT(base64 = mime_base64_create((char *)&(nalptr[0]), single_len), return FAILURE);

DASSERT(base64->base == 64, return FAILURE);

/* optimistic lock */
rtsp_lock(h);
if(h->sprop_pps_b64) {
if (h->sprop_pps_b64) {
DBG("pps is set by another thread?\n");
mime_encoded_delete(base64);
} else {
Expand All @@ -346,8 +346,8 @@ static inline int __rtcp_poll(struct list_t *e, void *v)

list_upcast(trans, e);
MUST(con = trans->con, return FAILURE);
if((con->trans[*track_id].rtcp_tick)-- == 0) {

if ((con->trans[*track_id].rtcp_tick)-- == 0) {
ASSERT(__rtcp_send_sr(con) == SUCCESS, return FAILURE);

/* postcondition check */
Expand All @@ -356,6 +356,7 @@ static inline int __rtcp_poll(struct list_t *e, void *v)
DASSERT(con->trans[*track_id].rtcp_packet_cnt == 0, return FAILURE);
DASSERT(con->trans[*track_id].rtcp_octet == 0, return FAILURE);
}

return SUCCESS;
}
/******************************************************************************
Expand All @@ -372,31 +373,26 @@ int rtp_send_h26x(rtsp_handle h, unsigned char *buf, size_t len, char isH265)
/* checkout RTP packet */
DASSERT(h, return FAILURE);

if(gbl_get_quit(h->pool->sharedp->gbl)) {
if (gbl_get_quit(h->pool->sharedp->gbl)) {
ERR("server threads have gone already. call rtsp_finish()\n");
return FAILURE;
}

h->isH265 = isH265;

ASSERT(__retrieve_sprop(h,buf,len) == SUCCESS, goto error);
ASSERT(__retrieve_sprop(h, buf, len) == SUCCESS, goto error);

trans.h = h;

/* setup transmission objecl t*/
ASSERT(list_map_inline(&h->con_list,(__rtp_setup_transfer),&trans) == SUCCESS, goto error);
ASSERT(list_map_inline(&h->con_list, (__rtp_setup_transfer), &trans) == SUCCESS, goto error);

if(trans.list_head.list) {

while (__split_nal(buf,&nalptr,&single_len,len) == SUCCESS) {

ASSERT(__transfer_nal_h26x(&(trans.list_head),nalptr,single_len,h->isH265) == SUCCESS, goto error);

ASSERT(list_map_inline(&(trans.list_head),(__rtcp_poll), &track_id) == SUCCESS, goto error);

if (trans.list_head.list) {
while (__split_nal(buf, &nalptr, &single_len, len) == SUCCESS) {
ASSERT(__transfer_nal_h26x(&(trans.list_head), nalptr, single_len, h->isH265) == SUCCESS, goto error);
ASSERT(list_map_inline(&(trans.list_head), (__rtcp_poll), &track_id) == SUCCESS, goto error);
}

ASSERT(list_map_inline(&(trans.list_head),(__rtcp_poll), &track_id) == SUCCESS, goto error);
ASSERT(list_map_inline(&(trans.list_head), (__rtcp_poll), &track_id) == SUCCESS, goto error);
}

ret = SUCCESS;
Expand All @@ -416,7 +412,7 @@ int rtp_send_mp3(rtsp_handle h, unsigned char *buf, size_t len)
/* checkout RTP packet */
DASSERT(h, return FAILURE);

if(gbl_get_quit(h->pool->sharedp->gbl)) {
if (gbl_get_quit(h->pool->sharedp->gbl)) {
ERR("server threads have gone already. call rtsp_finish()\n");
return FAILURE;
}
Expand All @@ -426,14 +422,11 @@ int rtp_send_mp3(rtsp_handle h, unsigned char *buf, size_t len)
trans.h = h;

/* setup transmission objecl t*/
ASSERT(list_map_inline(&h->con_list,(__rtp_setup_transfer),&trans) == SUCCESS, goto error);
ASSERT(list_map_inline(&h->con_list, (__rtp_setup_transfer), &trans) == SUCCESS, goto error);

if(trans.list_head.list) {

ASSERT(__transfer_nal_mpga(&(trans.list_head),buf,len) == SUCCESS, goto error);

ASSERT(list_map_inline(&(trans.list_head),(__rtcp_poll), &track_id) == SUCCESS, goto error);

if (trans.list_head.list) {
ASSERT(__transfer_nal_mpga(&(trans.list_head), buf, len) == SUCCESS, goto error);
ASSERT(list_map_inline(&(trans.list_head), (__rtcp_poll), &track_id) == SUCCESS, goto error);
}

ret = SUCCESS;
Expand Down
2 changes: 1 addition & 1 deletion src/rtsp/rtsp.c
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ static int __message_proc_sock(struct list_t *e, void *p)
DASSERT(socks, return FAILURE);
MUST(h = socks->h_rtsp, return FAILURE);

list_upcast(con,e);
list_upcast(con, e);

if (con->con_state == __CON_S_DISCONNECTED) {
ERR("zombie connection detected: report to author\n");
Expand Down
22 changes: 11 additions & 11 deletions src/rtsp/rtsp.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,10 @@ static inline void rtsp_unlock(rtsp_handle h)
static inline int __read_line(struct connection_item_t *p, char *buf)
{
/* we set the socket to non-blocking */
if(fgets(buf,__RTSP_TCP_BUF_SIZE,p->fp_tcp_read) == NULL)
if (fgets(buf, __RTSP_TCP_BUF_SIZE, p->fp_tcp_read) == NULL)
{
/* unexpected end. we do not expect it */
if(p->parser_state == __PARSER_S_INIT) {
if (p->parser_state == __PARSER_S_INIT) {
/* when this selected sd is EOF at first glance, it's dead */
DBG("disconnected\n");
} else {
Expand All @@ -181,14 +181,14 @@ static inline int __read_line(struct connection_item_t *p, char *buf)
}

p->con_state = __CON_S_DISCONNECTED;
ASSERT(bufpool_detach(p->pool,p) == SUCCESS, ERR("connection detach failed\n"));
ASSERT(bufpool_detach(p->pool, p) == SUCCESS, ERR("connection detach failed\n"));
return FALSE;
}

DBG(">%s",buf);
DBG(">%s", buf);

/* check end of request */
return !(SCMP(__TERM,buf));
return !(SCMP(__TERM, buf));
}

static inline unsigned long long __get_random_byte(unsigned *ctx)
Expand All @@ -214,23 +214,23 @@ static inline int __transfer_item_cleaner(struct list_t *e)
struct transfer_item_t *p;
list_upcast(p,e);

if(p->con) {
if (p->con) {
ASSERT(bufpool_detach(p->con->pool,p->con) == SUCCESS,
return FAILURE);
p->con = NULL;
}

ASSERT(bufpool_detach(p->pool,p) == SUCCESS,
ASSERT(bufpool_detach(p->pool, p) == SUCCESS,
return FAILURE);

return SUCCESS;
}

static inline int __get_timestamp_offset(struct __time_stat_t *p_stat, struct timeval *p_tv)
{
unsigned long long kts;
unsigned long long kts;

if(p_stat->prev_tv.tv_sec == 0) {
if (p_stat->prev_tv.tv_sec == 0) {
p_stat->prev_tv = *p_tv;
p_stat->total_cnt = 0;
p_stat->cnt = 0;
Expand All @@ -240,7 +240,7 @@ static inline int __get_timestamp_offset(struct __time_stat_t *p_stat, struct ti
}

/* we fix time stamp offset in 5 years of running on 30fps.. */
if(p_stat->total_cnt < 0xFFFFFFFFLLU) {
if (p_stat->total_cnt < 0xFFFFFFFFLLU) {
kts = ((p_tv->tv_sec - p_stat->prev_tv.tv_sec) * 1000000 + (p_tv->tv_usec - p_stat->prev_tv.tv_usec)) * 90;

p_stat->avg = ((p_stat->avg * p_stat->total_cnt) + kts * 1000) / (p_stat->total_cnt + 1);
Expand All @@ -251,7 +251,7 @@ static inline int __get_timestamp_offset(struct __time_stat_t *p_stat, struct ti
p_stat->jitter_mask += p_stat->avg % 1000000;
p_stat->ts_offset = (p_stat->avg / 1000000);

if(p_stat->jitter_mask > 1000000) {
if (p_stat->jitter_mask > 1000000) {
p_stat->ts_offset += 1;
p_stat->jitter_mask -= 1000000;
}
Expand Down

0 comments on commit 1b9b04b

Please sign in to comment.