Skip to content

SendStream::ready() behaviour does match documentation (always return immediately as Ready) #848

@muhamadazmy

Description

@muhamadazmy

I have been trying to track down an issue when using h2 client where call to SendStream::ready() returns immediately as Ready() even if the number of concurrent streams already at capacity. This does not match what is stated in the docs (quoted below)

The SendRequest::poll_ready function returns Ready when a new HTTP/2 stream can be created, i.e. as long as the current number of active streams is below MAX_CONCURRENT_STREAMS. If a new stream cannot be created, the caller will be notified once an existing stream closes, freeing capacity for the caller. The caller should use SendRequest::poll_ready to check for capacity before sending a request to the server.

To show case the problem, I put together a code example (server/client) here. A code snippet below that zooms in on the current behaviour and why it's not actually behaving as expected.

async fn client() -> anyhow::Result<()> {
    info!("starting client");

    let mut handlers = JoinSet::<anyhow::Result<()>>::new();

    let stream = TcpStream::connect("127.0.0.1:8000").await?;

    // we start with 0 streams, so in theory we can't make requests until the server
    // sends a SETTINGS frame with MAX_CONCURRENT_STREAMS > 0.
    let (send_request, connection) = h2::client::Builder::new()
        .initial_max_send_streams(0)
        .handshake::<_, Bytes>(stream)
        .await?;

    tokio::spawn(async move {
        connection.await.expect("connection failed");
    });

    for i in 0..10 {
        let send_request = send_request.clone();

        handlers.spawn(async move {
            //NOTE: This always return Ready() even if we at max concurrent streams capacity.
            let Some(send_request) = futures::future::poll_immediate(send_request.ready()).await
            else {
                anyhow::bail!("send_request not ready");
            };

            let mut send_request = send_request.context("read() failed")?;
            info!("{i} is ready");
            debug!("{i} Making request");
            let (response, _) = send_request.send_request(Request::new(()), true)?;

            // NOTE: The actual waiting happens here. not when calling ready().
            let response = response.await?;
            info!("{i} status: {:?}", response.status());

            let mut received = 0;
            let mut incoming = response.into_body();
            info!("{i} starting to receive data");
            while let Some(data) = incoming.data().await {
                match data {
                    Ok(data) => {
                        debug!("{i} received data {}", data.len());
                        received += data.len();
                    }
                    Err(err) => {
                        error!("{i} error receiving data: {err}");
                        anyhow::bail!("error receiving data: {err}");
                    }
                }
            }

            info!("{i} finished receiving: {received}");
            Ok(())
        });
    }

    let results = handlers.join_all().await;
    for (i, result) in results.into_iter().enumerate() {
        info!("{i} result: {:?}", result);
    }

    Ok(())
}

Note that, server max_concurrent_streams is set to 2 to make it easier to reproduce.

Calling futures::future::poll_immediate(send_request.ready()) always returns Some (never None). There is no new streams gets open since the response.await? actually blocks until a stream is available.

This makes it impossible to custom handle hitting the max concurrent streams.

It also makes the ready() call unnecessary since it's the actual await for the response that gets blocked before

Work around

By changing the code so we don't copy the send_request object but instead use the same one over and over, it's only then the ready() actually behaves as expected. Not be able to copy the SendRequest though is problematic since we will have to use a Mutex to work around this limitation.

changes from the code above

    for i in 0..10 {
        // let send_request = send_request.clone();
        //NOTE: This always return Ready() even if we at max concurrent streams capacity.
        send_request = send_request.ready().await?;
        warn!("{i} is ready");
        debug!("{i} Making request");
        let (response, _) = send_request.send_request(Request::new(()), true)?;

        handlers.spawn(async move {
            // NOTE: The actual waiting happens here. not when calling ready().
            let response = response.await?;
            info!("{i} status: {:?}", response.status());

Even with the code above, it seems that there is an off-by-one error where is ready is printed 3 times (for streams 0, 1, and 2) then after that it works as expected (2 by 2).

Expectation

What I would expect (and also what is stated in the docs) is that ready() would block if we hit max concurrent streams even if we pass around a copy of SendRequest.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions