diff --git a/CHANGELOG b/CHANGELOG index ef7558481..27355c250 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,21 @@ +2019-01-17 + - 1.17.15 + - [BUGFIX] http_client: make sure only one read per on_read() callback + is performed in the header conversion bypass (-B) mode. + - http_client: with -E, assign random priority when stream is created. + - [OPTIMIZATION] On immediate write, place an ACK frame into the first + buffered packet if an ACK is queued. This reduces the number of + standalone ACK packets. + - [OPTIMIZATION] Allow placing more than one STREAM frame from the same + stream into an outgoing packet. This change minimizes the number of + buffered packets required to store several small HTTP messages by + virtue of allowing more than one STREAM frame from HEADERS stream in + the same packet. + - [OPTIMIZATION] Flush headers when writing to buffered packets. This + causes the headers to be written to the same buffered packet queue, + thereby improving packet utilization, especially for small HTTP + messages. + 2019-01-16 - 1.17.14 - [FEATURE] http_client can now collect stats: time to connect, TTFB, diff --git a/include/lsquic.h b/include/lsquic.h index 669b790d0..a7f5caaa9 100644 --- a/include/lsquic.h +++ b/include/lsquic.h @@ -25,7 +25,7 @@ extern "C" { #define LSQUIC_MAJOR_VERSION 1 #define LSQUIC_MINOR_VERSION 17 -#define LSQUIC_PATCH_VERSION 14 +#define LSQUIC_PATCH_VERSION 15 /** * Engine flags: diff --git a/src/liblsquic/lsquic_conn.h b/src/liblsquic/lsquic_conn.h index 3f5dcd40d..091ef7082 100644 --- a/src/liblsquic/lsquic_conn.h +++ b/src/liblsquic/lsquic_conn.h @@ -87,6 +87,13 @@ struct conn_iface lsquic_time_t (*ci_next_tick_time) (struct lsquic_conn *); + int + (*ci_can_write_ack) (struct lsquic_conn *); + + /* No return status: best effort */ + void + (*ci_write_ack) (struct lsquic_conn *, struct lsquic_packet_out *); + #if LSQUIC_CONN_STATS const struct conn_stats * (*ci_get_stats) (struct lsquic_conn *); diff --git a/src/liblsquic/lsquic_full_conn.c b/src/liblsquic/lsquic_full_conn.c index a56c99181..2ccb63673 100644 --- a/src/liblsquic/lsquic_full_conn.c +++ b/src/liblsquic/lsquic_full_conn.c @@ -903,6 +903,93 @@ reset_ack_state (struct full_conn *conn) } +#if 1 +# define verify_ack_frame(a, b, c) +#else +static void +verify_ack_frame (struct full_conn *conn, const unsigned char *buf, int bufsz) +{ + unsigned i; + int parsed_len; + struct ack_info *ack_info; + const struct lsquic_packno_range *range; + char ack_buf[512]; + unsigned buf_off = 0; + int nw; + + ack_info = conn->fc_pub.mm->acki; + parsed_len = parse_ack_frame(buf, bufsz, ack_info); + assert(parsed_len == bufsz); + + for (range = lsquic_rechist_first(&conn->fc_rechist), i = 0; range; + range = lsquic_rechist_next(&conn->fc_rechist), ++i) + { + assert(i < ack_info->n_ranges); + assert(range->high == ack_info->ranges[i].high); + assert(range->low == ack_info->ranges[i].low); + if (LSQ_LOG_ENABLED(LSQ_LOG_DEBUG)) + { + nw = snprintf(ack_buf + buf_off, sizeof(ack_buf) - buf_off, + "[%"PRIu64"-%"PRIu64"]", range->high, range->low); + assert(nw >= 0); + buf_off += nw; + } + } + assert(i == ack_info->n_ranges); + LSQ_DEBUG("Sent ACK frame %s", ack_buf); +} + + +#endif + + +static void +full_conn_ci_write_ack (struct lsquic_conn *lconn, + struct lsquic_packet_out *packet_out) +{ + struct full_conn *conn = (struct full_conn *) lconn; + lsquic_time_t now; + int has_missing, w; + + now = lsquic_time_now(); + w = conn->fc_conn.cn_pf->pf_gen_ack_frame( + packet_out->po_data + packet_out->po_data_sz, + lsquic_packet_out_avail(packet_out), + (gaf_rechist_first_f) lsquic_rechist_first, + (gaf_rechist_next_f) lsquic_rechist_next, + (gaf_rechist_largest_recv_f) lsquic_rechist_largest_recv, + &conn->fc_rechist, now, &has_missing, &packet_out->po_ack2ed); + if (w < 0) { + ABORT_ERROR("generating ACK frame failed: %d", errno); + return; + } +#if LSQUIC_CONN_STATS + ++conn->fc_stats.out.acks; +#endif + EV_LOG_GENERATED_ACK_FRAME(LSQUIC_LOG_CONN_ID, conn->fc_conn.cn_pf, + packet_out->po_data + packet_out->po_data_sz, w); + verify_ack_frame(conn, packet_out->po_data + packet_out->po_data_sz, w); + lsquic_send_ctl_scheduled_ack(&conn->fc_send_ctl); + packet_out->po_frame_types |= 1 << QUIC_FRAME_ACK; + lsquic_send_ctl_incr_pack_sz(&conn->fc_send_ctl, packet_out, w); + packet_out->po_regen_sz += w; + if (has_missing) + conn->fc_flags |= FC_ACK_HAD_MISS; + else + conn->fc_flags &= ~FC_ACK_HAD_MISS; + LSQ_DEBUG("Put %d bytes of ACK frame into packet on outgoing queue", w); + if (conn->fc_conn.cn_version >= LSQVER_039 && + conn->fc_n_cons_unretx >= 20 && + !lsquic_send_ctl_have_outgoing_retx_frames(&conn->fc_send_ctl)) + { + LSQ_DEBUG("schedule WINDOW_UPDATE frame after %u non-retx " + "packets sent", conn->fc_n_cons_unretx); + conn->fc_flags |= FC_SEND_WUF; + } + reset_ack_state(conn); +} + + static lsquic_stream_t * new_stream_ext (struct full_conn *conn, uint32_t stream_id, int if_idx, enum stream_ctor_flags stream_ctor_flags) @@ -2232,7 +2319,7 @@ generate_rst_stream_frame (struct full_conn *conn, lsquic_stream_t *stream) lsquic_send_ctl_incr_pack_sz(&conn->fc_send_ctl, packet_out, sz); packet_out->po_frame_types |= 1 << QUIC_FRAME_RST_STREAM; s = lsquic_packet_out_add_stream(packet_out, conn->fc_pub.mm, stream, - QUIC_FRAME_RST_STREAM, 0, 0); + QUIC_FRAME_RST_STREAM, packet_out->po_data_sz, sz); if (s != 0) { ABORT_ERROR("adding stream to packet failed: %s", strerror(errno)); @@ -2578,96 +2665,19 @@ process_hsk_stream_write_events (struct full_conn *conn) } -#if 1 -# define verify_ack_frame(a, b, c) -#else -static void -verify_ack_frame (struct full_conn *conn, const unsigned char *buf, int bufsz) -{ - unsigned i; - int parsed_len; - struct ack_info *ack_info; - const struct lsquic_packno_range *range; - char ack_buf[512]; - unsigned buf_off = 0; - int nw; - - ack_info = conn->fc_pub.mm->acki; - parsed_len = parse_ack_frame(buf, bufsz, ack_info); - assert(parsed_len == bufsz); - - for (range = lsquic_rechist_first(&conn->fc_rechist), i = 0; range; - range = lsquic_rechist_next(&conn->fc_rechist), ++i) - { - assert(i < ack_info->n_ranges); - assert(range->high == ack_info->ranges[i].high); - assert(range->low == ack_info->ranges[i].low); - if (LSQ_LOG_ENABLED(LSQ_LOG_DEBUG)) - { - nw = snprintf(ack_buf + buf_off, sizeof(ack_buf) - buf_off, - "[%"PRIu64"-%"PRIu64"]", range->high, range->low); - assert(nw >= 0); - buf_off += nw; - } - } - assert(i == ack_info->n_ranges); - LSQ_DEBUG("Sent ACK frame %s", ack_buf); -} - - -#endif - - static void generate_ack_frame (struct full_conn *conn) { lsquic_packet_out_t *packet_out; - lsquic_time_t now; - int has_missing, w; packet_out = lsquic_send_ctl_new_packet_out(&conn->fc_send_ctl, 0); - if (!packet_out) + if (packet_out) { - ABORT_ERROR("cannot allocate packet: %s", strerror(errno)); - return; + lsquic_send_ctl_scheduled_one(&conn->fc_send_ctl, packet_out); + full_conn_ci_write_ack(&conn->fc_conn, packet_out); } - - lsquic_send_ctl_scheduled_one(&conn->fc_send_ctl, packet_out); - now = lsquic_time_now(); - w = conn->fc_conn.cn_pf->pf_gen_ack_frame( - packet_out->po_data + packet_out->po_data_sz, - lsquic_packet_out_avail(packet_out), - (gaf_rechist_first_f) lsquic_rechist_first, - (gaf_rechist_next_f) lsquic_rechist_next, - (gaf_rechist_largest_recv_f) lsquic_rechist_largest_recv, - &conn->fc_rechist, now, &has_missing, &packet_out->po_ack2ed); - if (w < 0) { - ABORT_ERROR("generating ACK frame failed: %d", errno); - return; - } -#if LSQUIC_CONN_STATS - ++conn->fc_stats.out.acks; -#endif - EV_LOG_GENERATED_ACK_FRAME(LSQUIC_LOG_CONN_ID, conn->fc_conn.cn_pf, - packet_out->po_data + packet_out->po_data_sz, w); - verify_ack_frame(conn, packet_out->po_data + packet_out->po_data_sz, w); - lsquic_send_ctl_scheduled_ack(&conn->fc_send_ctl); - packet_out->po_frame_types |= 1 << QUIC_FRAME_ACK; - lsquic_send_ctl_incr_pack_sz(&conn->fc_send_ctl, packet_out, w); - packet_out->po_regen_sz += w; - if (has_missing) - conn->fc_flags |= FC_ACK_HAD_MISS; else - conn->fc_flags &= ~FC_ACK_HAD_MISS; - LSQ_DEBUG("Put %d bytes of ACK frame into packet on outgoing queue", w); - if (conn->fc_conn.cn_version >= LSQVER_039 && - conn->fc_n_cons_unretx >= 20 && - !lsquic_send_ctl_have_outgoing_retx_frames(&conn->fc_send_ctl)) - { - LSQ_DEBUG("schedule WINDOW_UPDATE frame after %u non-retx " - "packets sent", conn->fc_n_cons_unretx); - conn->fc_flags |= FC_SEND_WUF; - } + ABORT_ERROR("cannot allocate packet: %s", strerror(errno)); } @@ -2775,6 +2785,14 @@ should_generate_ack (const struct full_conn *conn) } +static int +full_conn_ci_can_write_ack (struct lsquic_conn *lconn) +{ + struct full_conn *conn = (struct full_conn *) lconn; + return should_generate_ack(conn); +} + + static enum tick_st full_conn_ci_tick (lsquic_conn_t *lconn, lsquic_time_t now) { @@ -2874,7 +2892,6 @@ full_conn_ci_tick (lsquic_conn_t *lconn, lsquic_time_t now) */ generate_ack_frame(conn); CLOSE_IF_NECESSARY(); - reset_ack_state(conn); /* Try to send STOP_WAITING frame at the same time we send an ACK * This follows reference implementation. @@ -3575,6 +3592,7 @@ static const struct headers_stream_callbacks headers_callbacks = static const struct headers_stream_callbacks *headers_callbacks_ptr = &headers_callbacks; static const struct conn_iface full_conn_iface = { + .ci_can_write_ack = full_conn_ci_can_write_ack, .ci_destroy = full_conn_ci_destroy, #if LSQUIC_CONN_STATS .ci_get_stats = full_conn_ci_get_stats, @@ -3588,6 +3606,7 @@ static const struct conn_iface full_conn_iface = { .ci_packet_not_sent = full_conn_ci_packet_not_sent, .ci_packet_sent = full_conn_ci_packet_sent, .ci_tick = full_conn_ci_tick, + .ci_write_ack = full_conn_ci_write_ack, }; static const struct conn_iface *full_conn_iface_ptr = &full_conn_iface; diff --git a/src/liblsquic/lsquic_packet_out.c b/src/liblsquic/lsquic_packet_out.c index 2f74caf5a..404c87e4a 100644 --- a/src/liblsquic/lsquic_packet_out.c +++ b/src/liblsquic/lsquic_packet_out.c @@ -31,7 +31,7 @@ static struct stream_rec * srec_one_posi_first (struct packet_out_srec_iter *posi, struct lsquic_packet_out *packet_out) { - if (packet_out->po_srecs.one.sr_frame_types) + if (packet_out->po_srecs.one.sr_frame_type) return &packet_out->po_srecs.one; else return NULL; @@ -53,7 +53,7 @@ srec_arr_posi_next (struct packet_out_srec_iter *posi) for (; posi->srec_idx < sizeof(posi->cur_srec_arr->srecs) / sizeof(posi->cur_srec_arr->srecs[0]); ++posi->srec_idx) { - if (posi->cur_srec_arr->srecs[ posi->srec_idx ].sr_frame_types) + if (posi->cur_srec_arr->srecs[ posi->srec_idx ].sr_frame_type) return &posi->cur_srec_arr->srecs[ posi->srec_idx++ ]; } posi->cur_srec_arr = TAILQ_NEXT(posi->cur_srec_arr, next_stream_rec_arr); @@ -106,15 +106,9 @@ posi_next (struct packet_out_srec_iter *posi) } -/* Assumption: there can only be one STREAM and only one RST_STREAM frame - * for a particular stream per packet. The latter is true because a stream - * will only send out one RST_STREAM frame. The former is true because we - * make sure only to place one STREAM frame from a particular stream into a - * packet. - * +/* * Assumption: frames are added to the packet_out in order of their placement - * in packet_out->po_data. There is an assertion in this function that guards - * for this. + * in packet_out->po_data. There is no assertion to guard for for this. */ int lsquic_packet_out_add_stream (lsquic_packet_out_t *packet_out, @@ -123,41 +117,17 @@ lsquic_packet_out_add_stream (lsquic_packet_out_t *packet_out, enum QUIC_FRAME_TYPE frame_type, unsigned short off, unsigned short len) { - struct packet_out_srec_iter posi; struct stream_rec_arr *srec_arr; - struct stream_rec *srec; int last_taken; unsigned i; assert(!(new_stream->stream_flags & STREAM_FINISHED)); - for (srec = posi_first(&posi, packet_out); srec; srec = posi_next(&posi)) - if (srec->sr_stream == new_stream) - { - switch (frame_type) - { - case QUIC_FRAME_STREAM: - assert(!(srec->sr_frame_types & (1 << QUIC_FRAME_STREAM))); - srec->sr_frame_types |= (1 << QUIC_FRAME_STREAM); - srec->sr_off = off; - srec->sr_len = len; - break; - default: - assert(QUIC_FRAME_RST_STREAM == frame_type); - assert(!(srec->sr_frame_types & (1 << QUIC_FRAME_RST_STREAM))); - srec->sr_frame_types |= (1 << QUIC_FRAME_RST_STREAM); - break; - } - return 0; /* Update existing record */ - } - else if (srec->sr_frame_types & (1 << QUIC_FRAME_STREAM) & (1 << frame_type)) - assert(srec->sr_off < off); /* Check that STREAM frames are added in order */ - if (!(packet_out->po_flags & PO_SREC_ARR)) { if (!srec_taken(&packet_out->po_srecs.one)) { - packet_out->po_srecs.one.sr_frame_types = (1 << frame_type); + packet_out->po_srecs.one.sr_frame_type = frame_type; packet_out->po_srecs.one.sr_stream = new_stream; packet_out->po_srecs.one.sr_off = off; packet_out->po_srecs.one.sr_len = len; @@ -188,7 +158,7 @@ lsquic_packet_out_add_stream (lsquic_packet_out_t *packet_out, if (i < sizeof(srec_arr->srecs) / sizeof(srec_arr->srecs[0])) { set_elem: - srec_arr->srecs[i].sr_frame_types = (1 << frame_type); + srec_arr->srecs[i].sr_frame_type = frame_type; srec_arr->srecs[i].sr_stream = new_stream; srec_arr->srecs[i].sr_off = off; srec_arr->srecs[i].sr_len = len; @@ -201,7 +171,7 @@ lsquic_packet_out_add_stream (lsquic_packet_out_t *packet_out, return -1; memset(srec_arr, 0, sizeof(*srec_arr)); - srec_arr->srecs[0].sr_frame_types = (1 << frame_type); + srec_arr->srecs[0].sr_frame_type = frame_type; srec_arr->srecs[0].sr_stream = new_stream; srec_arr->srecs[0].sr_off = off; srec_arr->srecs[0].sr_len = len; @@ -315,7 +285,7 @@ lsquic_packet_out_elide_reset_stream_frames (lsquic_packet_out_t *packet_out, for (srec = posi_first(&posi, packet_out); srec; srec = posi_next(&posi)) { - if (srec->sr_frame_types & (1 << QUIC_FRAME_STREAM)) + if (srec->sr_frame_type == QUIC_FRAME_STREAM) { ++n_stream_frames; @@ -345,9 +315,8 @@ lsquic_packet_out_elide_reset_stream_frames (lsquic_packet_out_t *packet_out, packet_out->po_data_sz -= srec->sr_len; /* See what we can do with the stream */ - srec->sr_frame_types &= ~(1 << QUIC_FRAME_STREAM); - if (!srec_taken(srec)) - lsquic_stream_acked(srec->sr_stream); + srec->sr_frame_type = 0; + lsquic_stream_acked(srec->sr_stream); } } } @@ -374,28 +343,11 @@ lsquic_packet_out_chop_regen (lsquic_packet_out_t *packet_out) packet_out->po_regen_sz = 0; for (srec = posi_first(&posi, packet_out); srec; srec = posi_next(&posi)) - if (srec->sr_frame_types & (1 << QUIC_FRAME_STREAM)) + if (srec->sr_frame_type == QUIC_FRAME_STREAM) srec->sr_off -= delta; } -int -lsquic_packet_out_has_frame (struct lsquic_packet_out *packet_out, - const struct lsquic_stream *stream, - enum QUIC_FRAME_TYPE frame_type) -{ - struct packet_out_srec_iter posi; - struct stream_rec *srec; - - for (srec = posi_first(&posi, packet_out); srec; srec = posi_next(&posi)) - if (srec->sr_stream == stream && - srec->sr_frame_types & (1 << frame_type)) - return 1; - - return 0; -} - - int lsquic_packet_out_has_hsk_frames (struct lsquic_packet_out *packet_out) { @@ -403,7 +355,7 @@ lsquic_packet_out_has_hsk_frames (struct lsquic_packet_out *packet_out) struct stream_rec *srec; for (srec = posi_first(&posi, packet_out); srec; srec = posi_next(&posi)) - if ((srec->sr_frame_types & (1 << QUIC_FRAME_STREAM)) + if (srec->sr_frame_type == QUIC_FRAME_STREAM && LSQUIC_STREAM_HANDSHAKE == srec->sr_stream->id) { return 1; @@ -439,7 +391,7 @@ split_off_last_frames (struct lsquic_mm *mm, lsquic_packet_out_t *packet_out, srec->sr_stream, QUIC_FRAME_STREAM, new_packet_out->po_data_sz, srec->sr_len)) return -1; - srec->sr_frame_types &= ~(1 << QUIC_FRAME_STREAM); + srec->sr_frame_type = 0; assert(srec->sr_stream->n_unacked > 1); --srec->sr_stream->n_unacked; new_packet_out->po_data_sz += srec->sr_len; @@ -469,7 +421,7 @@ move_largest_frame (struct lsquic_mm *mm, lsquic_packet_out_t *packet_out, new_packet_out->po_data_sz, max_srec->sr_len)) return -1; - max_srec->sr_frame_types &= ~(1 << QUIC_FRAME_STREAM); + max_srec->sr_frame_type = 0; assert(max_srec->sr_stream->n_unacked > 1); --max_srec->sr_stream->n_unacked; new_packet_out->po_data_sz += max_srec->sr_len; @@ -613,7 +565,7 @@ verify_srecs (lsquic_packet_out_t *packet_out) for ( ; srec; srec = posi_next(&posi)) { assert(srec->sr_off == off); - assert(srec->sr_frame_types & (1 << QUIC_FRAME_STREAM)); + assert(srec->sr_frame_type == QUIC_FRAME_STREAM); off += srec->sr_len; } @@ -652,7 +604,7 @@ lsquic_packet_out_split_in_two (struct lsquic_mm *mm, for (srec = posi_first(&posi, packet_out); srec; srec = posi_next(&posi)) { /* We only expect references to STREAM frames (buffered packets): */ - assert(srec->sr_frame_types == (1 << QUIC_FRAME_STREAM)); + assert(srec->sr_frame_type == QUIC_FRAME_STREAM); if (n_srecs >= n_srecs_alloced) { n_srecs_alloced *= 2; @@ -787,7 +739,7 @@ lsquic_packet_out_turn_on_fin (struct lsquic_packet_out *packet_out, int len; for (srec = posi_first(&posi, packet_out); srec; srec = posi_next(&posi)) - if ((srec->sr_frame_types & (1 << QUIC_FRAME_STREAM)) + if (srec->sr_frame_type == QUIC_FRAME_STREAM && srec->sr_stream == stream) { len = pf->pf_parse_stream_frame(packet_out->po_data + srec->sr_off, diff --git a/src/liblsquic/lsquic_packet_out.h b/src/liblsquic/lsquic_packet_out.h index 9df880a20..65a3e7b0e 100644 --- a/src/liblsquic/lsquic_packet_out.h +++ b/src/liblsquic/lsquic_packet_out.h @@ -17,12 +17,10 @@ struct parse_funcs; /* Each stream_rec is associated with one packet_out. packet_out can have * zero or more stream_rec structures. stream_rec keeps a pointer to a stream - * that has STREAM or RST_STREAM frames inside packet_out. `sr_frame_types' - * is a bitmask that records which of these two frames are in the packet. - * If this value is zero, values of the other struct members are not valid. - * `sr_off' indicates where inside packet_out->po_data STREAM frame begins - * and `sr_len' is its length. These values are not kept for RST_STREAM - * frames. + * that has STREAM or RST_STREAM frames inside packet_out. `sr_frame_type' + * specifies the type of the frame; if this value is zero, values of the + * other struct members are not valid. `sr_off' indicates where inside + * packet_out->po_data the frame begins and `sr_len' is its length. * * We need this information for three reasons: * 1. A stream is not destroyed until all of its STREAM and RST_STREAM @@ -40,10 +38,10 @@ struct stream_rec { struct lsquic_stream *sr_stream; unsigned short sr_off, sr_len; - enum quic_ft_bit sr_frame_types:16; + enum QUIC_FRAME_TYPE sr_frame_type:16; }; -#define srec_taken(srec) ((srec)->sr_frame_types) +#define srec_taken(srec) ((srec)->sr_frame_type) struct stream_rec_arr { TAILQ_ENTRY(stream_rec_arr) next_stream_rec_arr; @@ -229,10 +227,6 @@ lsquic_packet_out_split_in_two (struct lsquic_mm *, lsquic_packet_out_t *, void lsquic_packet_out_chop_regen (lsquic_packet_out_t *); -int -lsquic_packet_out_has_frame (struct lsquic_packet_out *, - const struct lsquic_stream *, enum QUIC_FRAME_TYPE); - int lsquic_packet_out_has_hsk_frames (struct lsquic_packet_out *); diff --git a/src/liblsquic/lsquic_send_ctl.c b/src/liblsquic/lsquic_send_ctl.c index 702942b8e..ee9df526f 100644 --- a/src/liblsquic/lsquic_send_ctl.c +++ b/src/liblsquic/lsquic_send_ctl.c @@ -1228,8 +1228,6 @@ lsquic_send_ctl_new_packet_out (lsquic_send_ctl_t *ctl, unsigned need_at_least) } -/* Do not use for STREAM frames - */ lsquic_packet_out_t * lsquic_send_ctl_get_writeable_packet (lsquic_send_ctl_t *ctl, unsigned need_at_least, int *is_err) @@ -1248,48 +1246,20 @@ lsquic_send_ctl_get_writeable_packet (lsquic_send_ctl_t *ctl, if (!lsquic_send_ctl_can_send(ctl)) { - *is_err = 0; + if (is_err) + *is_err = 0; return NULL; } packet_out = lsquic_send_ctl_new_packet_out(ctl, need_at_least); if (packet_out) lsquic_send_ctl_scheduled_one(ctl, packet_out); - else + else if (is_err) *is_err = 1; return packet_out; } -static lsquic_packet_out_t * -send_ctl_get_packet_for_stream (lsquic_send_ctl_t *ctl, - unsigned need_at_least, const lsquic_stream_t *stream) -{ - lsquic_packet_out_t *packet_out; - - assert(need_at_least > 0); - - packet_out = lsquic_send_ctl_last_scheduled(ctl); - if (packet_out - && !(packet_out->po_flags & PO_STREAM_END) - && lsquic_packet_out_avail(packet_out) >= need_at_least - && !lsquic_packet_out_has_frame(packet_out, stream, QUIC_FRAME_STREAM)) - { - return packet_out; - } - - if (!lsquic_send_ctl_can_send(ctl)) - return NULL; - - packet_out = lsquic_send_ctl_new_packet_out(ctl, need_at_least); - if (!packet_out) - return NULL; - - lsquic_send_ctl_scheduled_one(ctl, packet_out); - return packet_out; -} - - static void update_for_resending (lsquic_send_ctl_t *ctl, lsquic_packet_out_t *packet_out) { @@ -1670,6 +1640,24 @@ send_ctl_max_bpq_count (const lsquic_send_ctl_t *ctl, } +static void +send_ctl_move_ack (struct lsquic_send_ctl *ctl, struct lsquic_packet_out *dst, + struct lsquic_packet_out *src) +{ + assert(dst->po_data_sz == 0); + + if (lsquic_packet_out_avail(dst) >= src->po_regen_sz) + { + memcpy(dst->po_data, src->po_data, src->po_regen_sz); + dst->po_data_sz = src->po_regen_sz; + dst->po_regen_sz = src->po_regen_sz; + dst->po_frame_types |= (QFRAME_REGEN_MASK & src->po_frame_types); + src->po_frame_types &= ~QFRAME_REGEN_MASK; + lsquic_packet_out_chop_regen(src); + } +} + + static lsquic_packet_out_t * send_ctl_get_buffered_packet (lsquic_send_ctl_t *ctl, enum buf_packet_type packet_type, unsigned need_at_least, @@ -1677,14 +1665,15 @@ send_ctl_get_buffered_packet (lsquic_send_ctl_t *ctl, { struct buf_packet_q *const packet_q = &ctl->sc_buffered_packets[packet_type]; + struct lsquic_conn *const lconn = ctl->sc_conn_pub->lconn; lsquic_packet_out_t *packet_out; enum lsquic_packno_bits bits; + enum { AA_STEAL, AA_GENERATE, AA_NONE, } ack_action; packet_out = TAILQ_LAST(&packet_q->bpq_packets, lsquic_packets_tailq); if (packet_out && !(packet_out->po_flags & PO_STREAM_END) - && lsquic_packet_out_avail(packet_out) >= need_at_least - && !lsquic_packet_out_has_frame(packet_out, stream, QUIC_FRAME_STREAM)) + && lsquic_packet_out_avail(packet_out) >= need_at_least) { return packet_out; } @@ -1693,10 +1682,56 @@ send_ctl_get_buffered_packet (lsquic_send_ctl_t *ctl, return NULL; bits = lsquic_send_ctl_guess_packno_bits(ctl); + if (packet_q->bpq_count == 0) + { + /* If ACK was written to the low-priority queue first, steal it */ + if (packet_q == &ctl->sc_buffered_packets[BPT_HIGHEST_PRIO] + && !TAILQ_EMPTY(&ctl->sc_buffered_packets[BPT_OTHER_PRIO].bpq_packets) + && (TAILQ_FIRST(&ctl->sc_buffered_packets[BPT_OTHER_PRIO].bpq_packets) + ->po_frame_types & QUIC_FTBIT_ACK)) + { + LSQ_DEBUG("steal ACK frame from low-priority buffered queue"); + ack_action = AA_STEAL; + bits = PACKNO_LEN_6; + } + /* If ACK can be generated, write it to the first buffered packet. */ + else if (lconn->cn_if->ci_can_write_ack(lconn)) + { + LSQ_DEBUG("generate ACK frame for first buffered packet in " + "queue #%u", packet_type); + ack_action = AA_GENERATE; + /* Packet length is set to the largest possible size to guarantee + * that buffered packet with the ACK will not need to be split. + */ + bits = PACKNO_LEN_6; + } + else + goto no_ack_action; + } + else + { + no_ack_action: + ack_action = AA_NONE; + bits = lsquic_send_ctl_guess_packno_bits(ctl); + } + packet_out = send_ctl_allocate_packet(ctl, bits, need_at_least); if (!packet_out) return NULL; + switch (ack_action) + { + case AA_STEAL: + send_ctl_move_ack(ctl, packet_out, + TAILQ_FIRST(&ctl->sc_buffered_packets[BPT_OTHER_PRIO].bpq_packets)); + break; + case AA_GENERATE: + lconn->cn_if->ci_write_ack(lconn, packet_out); + break; + case AA_NONE: + break; + } + TAILQ_INSERT_TAIL(&packet_q->bpq_packets, packet_out, po_next); ++packet_q->bpq_count; LSQ_DEBUG("Add new packet to buffered queue #%u; count: %u", @@ -1712,7 +1747,7 @@ lsquic_send_ctl_get_packet_for_stream (lsquic_send_ctl_t *ctl, enum buf_packet_type packet_type; if (lsquic_send_ctl_schedule_stream_packets_immediately(ctl)) - return send_ctl_get_packet_for_stream(ctl, need_at_least, stream); + return lsquic_send_ctl_get_writeable_packet(ctl, need_at_least, NULL); else { packet_type = send_ctl_lookup_bpt(ctl, stream); @@ -1722,6 +1757,15 @@ lsquic_send_ctl_get_packet_for_stream (lsquic_send_ctl_t *ctl, } +int +lsquic_send_ctl_buffered_and_same_prio_as_headers (struct lsquic_send_ctl *ctl, + const struct lsquic_stream *stream) +{ + return !lsquic_send_ctl_schedule_stream_packets_immediately(ctl) + && BPT_HIGHEST_PRIO == send_ctl_lookup_bpt(ctl, stream); +} + + #ifdef NDEBUG static #elif __GNUC__ @@ -1817,9 +1861,10 @@ lsquic_send_ctl_schedule_buffered (lsquic_send_ctl_t *ctl, } TAILQ_REMOVE(&packet_q->bpq_packets, packet_out, po_next); --packet_q->bpq_count; - LSQ_DEBUG("Remove packet from buffered queue #%u; count: %u", - packet_type, packet_q->bpq_count); packet_out->po_packno = send_ctl_next_packno(ctl); + LSQ_DEBUG("Remove packet from buffered queue #%u; count: %u. " + "It becomes packet %"PRIu64, packet_type, packet_q->bpq_count, + packet_out->po_packno); lsquic_send_ctl_scheduled_one(ctl, packet_out); } diff --git a/src/liblsquic/lsquic_send_ctl.h b/src/liblsquic/lsquic_send_ctl.h index 40b2ee66a..82e8ec2c4 100644 --- a/src/liblsquic/lsquic_send_ctl.h +++ b/src/liblsquic/lsquic_send_ctl.h @@ -279,4 +279,8 @@ lsquic_send_ctl_pacer_blocked (struct lsquic_send_ctl *); int lsquic_send_ctl_sched_is_blocked (const struct lsquic_send_ctl *); +int +lsquic_send_ctl_buffered_and_same_prio_as_headers (struct lsquic_send_ctl *, + const struct lsquic_stream *); + #endif diff --git a/src/liblsquic/lsquic_stream.c b/src/liblsquic/lsquic_stream.c index 473d997c4..32d677255 100644 --- a/src/liblsquic/lsquic_stream.c +++ b/src/liblsquic/lsquic_stream.c @@ -1564,6 +1564,25 @@ stream_write_to_packet (struct frame_gen_ctx *fg_ctx, const size_t size) lsquic_packet_out_t *packet_out; int len, s, hsk; + if ((stream->stream_flags & (STREAM_HEADERS_SENT|STREAM_HDRS_FLUSHED)) + == STREAM_HEADERS_SENT + && lsquic_send_ctl_buffered_and_same_prio_as_headers(send_ctl, stream)) + { + struct lsquic_stream *const headers_stream + = lsquic_headers_stream_get_stream(stream->conn_pub->hs); + if (lsquic_stream_has_data_to_flush(headers_stream)) + { + LSQ_DEBUG("flushing headers stream before potential write to a " + "buffered packet"); + (void) lsquic_stream_flush(headers_stream); + } + else + /* Some other stream must have flushed it: this means our headers + * are flushed. + */ + stream->stream_flags |= STREAM_HDRS_FLUSHED; + } + stream_header_sz = pf->pf_calc_stream_frame_header_sz(stream->id, stream->tosend_off); need_at_least = stream_header_sz + (size > 0); diff --git a/src/liblsquic/lsquic_stream.h b/src/liblsquic/lsquic_stream.h index 7bdd07022..5a1bad3f5 100644 --- a/src/liblsquic/lsquic_stream.h +++ b/src/liblsquic/lsquic_stream.h @@ -63,6 +63,7 @@ struct lsquic_stream STREAM_AUTOSWITCH = (1 <<27), STREAM_RW_ONCE = (1 <<28), /* When set, read/write events are dispatched once per call */ STREAM_CRITICAL = (1 <<29), + STREAM_HDRS_FLUSHED = (1 <<30), /* Only used in buffered packets mode */ } stream_flags; /* There are more than one reason that a stream may be put onto diff --git a/test/http_client.c b/test/http_client.c index 3745cba4c..e100a3264 100644 --- a/test/http_client.c +++ b/test/http_client.c @@ -391,6 +391,8 @@ http_client_on_new_stream (void *stream_if_ctx, lsquic_stream_t *stream) st_h->reader.lsqr_ctx = NULL; LSQ_INFO("created new stream, path: %s", st_h->path); lsquic_stream_wantwrite(stream, 1); + if (randomly_reprioritize_streams) + lsquic_stream_set_priority(stream, 1 + (random() & 0xFF)); return st_h; } @@ -542,28 +544,26 @@ http_client_on_read (lsquic_stream_t *stream, lsquic_stream_ctx_t *st_h) srand(GetTickCount()); #endif - if (g_header_bypass && !(st_h->sh_flags & PROCESSED_HEADERS)) + do { - hset = lsquic_stream_get_hset(stream); - if (!hset) + if (g_header_bypass && !(st_h->sh_flags & PROCESSED_HEADERS)) { - LSQ_ERROR("could not get header set from stream"); - exit(2); + hset = lsquic_stream_get_hset(stream); + if (!hset) + { + LSQ_ERROR("could not get header set from stream"); + exit(2); + } + st_h->sh_ttfb = lsquic_time_now(); + update_sample_stats(&s_stat_ttfb, st_h->sh_ttfb - st_h->sh_created); + if (s_discard_response) + LSQ_DEBUG("discard response: do not dump headers"); + else + hset_dump(hset, stdout); + hset_destroy(hset); + st_h->sh_flags |= PROCESSED_HEADERS; } - st_h->sh_ttfb = lsquic_time_now(); - update_sample_stats(&s_stat_ttfb, st_h->sh_ttfb - st_h->sh_created); - if (s_discard_response) - LSQ_DEBUG("discard response: do not dump headers"); - else - hset_dump(hset, stdout); - hset_destroy(hset); - st_h->sh_flags |= PROCESSED_HEADERS; - } - - do - { - nread = lsquic_stream_read(stream, buf, sizeof(buf)); - if (nread > 0) + else if (nread = lsquic_stream_read(stream, buf, sizeof(buf)), nread > 0) { s_stat_downloaded_bytes += nread; if (!g_header_bypass && !(st_h->sh_flags & PROCESSED_HEADERS)) @@ -585,7 +585,7 @@ http_client_on_read (lsquic_stream_t *stream, lsquic_stream_ctx_t *st_h) #endif lsquic_stream_set_priority(stream, new_prio); assert(s == 0); - LSQ_NOTICE("changed stream %u priority from %u to %u", + LSQ_DEBUG("changed stream %u priority from %u to %u", lsquic_stream_id(stream), old_prio, new_prio); } } diff --git a/test/unittests/test_elision.c b/test/unittests/test_elision.c index 27fa642d2..08042df08 100644 --- a/test/unittests/test_elision.c +++ b/test/unittests/test_elision.c @@ -25,7 +25,7 @@ static const struct parse_funcs *const pf = select_pf_by_ver(LSQVER_035); static struct { - char buf[0x100]; + unsigned char buf[0x1000]; size_t bufsz; uint64_t off; } stream_contents; @@ -40,6 +40,15 @@ setup_stream_contents (uint64_t off, const char *str) } +void +setup_stream_contents_n (uint64_t off, const unsigned char *buf, size_t size) +{ + stream_contents.bufsz = size; + stream_contents.off = off; + memcpy(stream_contents.buf, buf, size); +} + + int lsquic_stream_tosend_fin (const lsquic_stream_t *stream) { @@ -121,6 +130,78 @@ elide_single_stream_frame (void) } +/* In this test, we check that if the last STREAM frame is moved due to + * elision and PO_STREAM_END is set, the packet size is adjusted. This + * is needed to prevent data corruption for STREAM frames that have + * implicit length. + */ +static void +shrink_packet_post_elision (void) +{ + struct packet_out_srec_iter posi; + struct lsquic_engine_public enpub; + lsquic_stream_t streams[2]; + lsquic_packet_out_t *packet_out; + const struct stream_rec *srec; + int len, off = 0; + unsigned char stream2_data[0x1000]; + + memset(stream2_data, '2', sizeof(stream2_data)); + memset(streams, 0, sizeof(streams)); + memset(&enpub, 0, sizeof(enpub)); + lsquic_mm_init(&enpub.enp_mm); + packet_out = lsquic_mm_get_packet_out(&enpub.enp_mm, NULL, QUIC_MAX_PAYLOAD_SZ); + + setup_stream_contents(123, "Dude, where is my car?"); + len = pf->pf_gen_stream_frame(packet_out->po_data + packet_out->po_data_sz, + lsquic_packet_out_avail(packet_out), + streams[0].id, lsquic_stream_tosend_offset(&streams[0]), + lsquic_stream_tosend_fin(&streams[0]), + lsquic_stream_tosend_sz(&streams[0]), + (gsf_read_f) lsquic_stream_tosend_read, + &streams[0]); + packet_out->po_data_sz += len; + packet_out->po_frame_types |= (1 << QUIC_FRAME_STREAM); + lsquic_packet_out_add_stream(packet_out, &enpub.enp_mm, &streams[0], + QUIC_FRAME_STREAM, off, len); + + /* We want to fill the packet just right so that PO_STREAM_END gets set */ + const int exp = lsquic_packet_out_avail(packet_out); + setup_stream_contents_n(0, stream2_data, exp - 2); + len = pf->pf_gen_stream_frame(packet_out->po_data + packet_out->po_data_sz, + lsquic_packet_out_avail(packet_out), + streams[1].id, lsquic_stream_tosend_offset(&streams[1]), + lsquic_stream_tosend_fin(&streams[1]), + lsquic_stream_tosend_sz(&streams[1]), + (gsf_read_f) lsquic_stream_tosend_read, + &streams[1]); + assert(len == exp); + packet_out->po_data_sz += len; + packet_out->po_frame_types |= (1 << QUIC_FRAME_STREAM); + lsquic_packet_out_add_stream(packet_out, &enpub.enp_mm, &streams[1], + QUIC_FRAME_STREAM, off, len); + assert(0 == lsquic_packet_out_avail(packet_out)); /* Same as len == exp check really */ + packet_out->po_flags |= PO_STREAM_END; + + assert(1 == streams[0].n_unacked); + assert(1 == streams[1].n_unacked); + assert(posi_first(&posi, packet_out)); + + streams[0].stream_flags |= STREAM_RST_SENT; + + lsquic_packet_out_elide_reset_stream_frames(packet_out, 0); + assert(0 == streams[0].n_unacked); + + assert(QUIC_FTBIT_STREAM == packet_out->po_frame_types); + srec = posi_first(&posi, packet_out); + assert(srec->sr_stream == &streams[1]); + assert(packet_out->po_data_sz == exp); + + lsquic_packet_out_destroy(packet_out, &enpub, NULL); + lsquic_mm_cleanup(&enpub.enp_mm); +} + + /* This test is more involved. We will construct the following packet: * * | ACK | STREAM A | STREAM B | STREAM C | RST A | STREAM D | STREAM E @@ -237,7 +318,7 @@ elide_three_stream_frames (int chop_regen) len = pf->pf_gen_rst_frame(packet_out->po_data + packet_out->po_data_sz, lsquic_packet_out_avail(packet_out), 'A', 133, 0); lsquic_packet_out_add_stream(packet_out, &enpub.enp_mm, &streams[0], - QUIC_FRAME_RST_STREAM, 0, 0); + QUIC_FRAME_RST_STREAM, packet_out->po_data_sz, len); packet_out->po_data_sz += len; /* STREAM D */ setup_stream_contents(123, "DDDDDDDDDD"); @@ -293,17 +374,17 @@ elide_three_stream_frames (int chop_regen) assert(packet_out->po_frame_types == ((1 << QUIC_FRAME_STREAM) | (1 << QUIC_FRAME_RST_STREAM))); srec = posi_first(&posi, packet_out); - assert(srec->sr_stream == &streams[0]); - assert(srec->sr_frame_types == (1 << QUIC_FRAME_RST_STREAM)); - - srec = posi_next(&posi); assert(srec->sr_stream == &streams[1]); - assert(srec->sr_frame_types == (1 << QUIC_FRAME_STREAM)); + assert(srec->sr_frame_type == QUIC_FRAME_STREAM); assert(srec->sr_off == b_off - (chop_regen ? 5 : 0)); + srec = posi_next(&posi); + assert(srec->sr_stream == &streams[0]); + assert(srec->sr_frame_type == QUIC_FRAME_RST_STREAM); + srec = posi_next(&posi); assert(srec->sr_stream == &streams[3]); - assert(srec->sr_frame_types == (1 << QUIC_FRAME_STREAM)); + assert(srec->sr_frame_type == QUIC_FRAME_STREAM); assert(srec->sr_off == d_off - (chop_regen ? 5 : 0)); srec = posi_next(&posi); @@ -320,6 +401,7 @@ main (void) { /* TODO-ENDIAN: test with every PF */ elide_single_stream_frame(); + shrink_packet_post_elision(); elide_three_stream_frames(0); elide_three_stream_frames(1); diff --git a/test/unittests/test_packet_out.c b/test/unittests/test_packet_out.c index 917e9e987..3fa6fbe2a 100644 --- a/test/unittests/test_packet_out.c +++ b/test/unittests/test_packet_out.c @@ -54,32 +54,37 @@ main (void) srec = posi_first(&posi, packet_out); assert(srec->sr_stream == &streams[0]); assert(srec->sr_off == 7); - assert(srec->sr_frame_types == (1 << QUIC_FRAME_STREAM)); + assert(srec->sr_frame_type == QUIC_FRAME_STREAM); srec = posi_next(&posi); assert(srec->sr_stream == &streams[1]); assert(srec->sr_off == 8); - assert(srec->sr_frame_types == ((1 << QUIC_FRAME_STREAM)|(1 << QUIC_FRAME_RST_STREAM))); + assert(srec->sr_frame_type == QUIC_FRAME_STREAM); srec = posi_next(&posi); assert(srec->sr_stream == &streams[2]); assert(srec->sr_off == 9); - assert(srec->sr_frame_types == (1 << QUIC_FRAME_STREAM)); + assert(srec->sr_frame_type == QUIC_FRAME_STREAM); + + srec = posi_next(&posi); + assert(srec->sr_stream == &streams[1]); + assert(srec->sr_off == 10); + assert(srec->sr_frame_type == QUIC_FRAME_RST_STREAM); srec = posi_next(&posi); assert(srec->sr_stream == &streams[3]); assert(srec->sr_off == 11); - assert(srec->sr_frame_types == (1 << QUIC_FRAME_STREAM)); + assert(srec->sr_frame_type == QUIC_FRAME_STREAM); srec = posi_next(&posi); assert(srec->sr_stream == &streams[4]); assert(srec->sr_off == 12); - assert(srec->sr_frame_types == (1 << QUIC_FRAME_STREAM)); + assert(srec->sr_frame_type == QUIC_FRAME_STREAM); srec = posi_next(&posi); assert(srec->sr_stream == &streams[5]); assert(srec->sr_off == 13); - assert(srec->sr_frame_types == (1 << QUIC_FRAME_STREAM)); + assert(srec->sr_frame_type == QUIC_FRAME_STREAM); assert((void *) 0 == posi_next(&posi)); diff --git a/test/unittests/test_stream.c b/test/unittests/test_stream.c index 287dfd15b..07a2c5fb1 100644 --- a/test/unittests/test_stream.c +++ b/test/unittests/test_stream.c @@ -202,7 +202,7 @@ read_from_scheduled_packets (lsquic_send_ctl_t *send_ctl, uint32_t stream_id, { if (fullcheck) { - assert(srec->sr_frame_types & (1 << QUIC_FRAME_STREAM)); + assert(srec->sr_frame_type == QUIC_FRAME_STREAM); if (packet_out->po_packno != 1) { /* First packet may contain two stream frames, do not @@ -216,7 +216,7 @@ read_from_scheduled_packets (lsquic_send_ctl_t *send_ctl, uint32_t stream_id, } } } - if ((srec->sr_frame_types & (1 << QUIC_FRAME_STREAM)) && + if (srec->sr_frame_type == QUIC_FRAME_STREAM && srec->sr_stream->id == stream_id) { assert(!fin); @@ -264,6 +264,19 @@ struct test_objs { }; +static int +unit_test_doesnt_write_ack (struct lsquic_conn *lconn) +{ + return 0; +} + + +static const struct conn_iface our_conn_if = +{ + .ci_can_write_ack = unit_test_doesnt_write_ack, +}; + + static void init_test_objs (struct test_objs *tobjs, unsigned initial_conn_window, unsigned initial_stream_window, const struct parse_funcs *pf) @@ -271,6 +284,7 @@ init_test_objs (struct test_objs *tobjs, unsigned initial_conn_window, memset(tobjs, 0, sizeof(*tobjs)); tobjs->lconn.cn_pf = pf ? pf : g_pf; tobjs->lconn.cn_pack_size = 1370; + tobjs->lconn.cn_if = &our_conn_if; lsquic_mm_init(&tobjs->eng_pub.enp_mm); TAILQ_INIT(&tobjs->conn_pub.sending_streams); TAILQ_INIT(&tobjs->conn_pub.read_streams); @@ -1871,8 +1885,8 @@ test_writing_to_stream_schedule_stream_packets_immediately (void) assert(("9 bytes written correctly", nw == 9)); s = lsquic_stream_flush(stream); assert(0 == s); - assert(("packetized -- 2 packets now", - 2 == lsquic_send_ctl_n_scheduled(&tobjs.send_ctl))); + assert(("packetized -- still 1 packet", + 1 == lsquic_send_ctl_n_scheduled(&tobjs.send_ctl))); assert(("connection cap is reduced by 23 bytes", lsquic_conn_cap_avail(conn_cap) == 0x4000 - 23)); @@ -1934,7 +1948,7 @@ test_writing_to_stream_outside_callback (void) assert(("9 bytes written correctly", nw == 9)); s = lsquic_stream_flush(stream); assert(0 == s); - assert(("packetized -- 2 packets now", 2 == bpq->bpq_count)); + assert(("packetized -- still 1 packet", 1 == bpq->bpq_count)); assert(("connection cap is reduced by 23 bytes", lsquic_conn_cap_avail(conn_cap) == 0x4000 - 23)); @@ -1943,8 +1957,8 @@ test_writing_to_stream_outside_callback (void) g_ctl_settings.tcs_schedule_stream_packets_immediately = 1; lsquic_send_ctl_schedule_buffered(&tobjs.send_ctl, g_ctl_settings.tcs_bp_type); - assert(("packetized -- 2 packets now", - 2 == lsquic_send_ctl_n_scheduled(&tobjs.send_ctl))); + assert(("packetized -- 1 packet", + 1 == lsquic_send_ctl_n_scheduled(&tobjs.send_ctl))); nw = read_from_scheduled_packets(&tobjs.send_ctl, stream->id, buf, sizeof(buf), 0, NULL, 0);