Skip to content

Commit 817288f

Browse files
committed
fix(test): Fix streaming_send_data_error_is_ignored to fail when background thread hangs
Fixes the `streaming_send_data_error_is_ignored` test to make it crash when the background thread goes off. Adds logging into the error
1 parent 1921b2d commit 817288f

File tree

1 file changed

+27
-7
lines changed

1 file changed

+27
-7
lines changed

lambda-runtime/src/requests.rs

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use bytes::Bytes;
33
use http::{header::CONTENT_TYPE, Method, Request, Uri};
44
use lambda_runtime_api_client::{body::Body, build_request};
55
use serde::Serialize;
6-
use std::{fmt::Debug, marker::PhantomData, str::FromStr};
6+
use std::{fmt::Debug, marker::PhantomData, str::FromStr, time::Duration};
77
use tokio_stream::{Stream, StreamExt};
88

99
pub(crate) trait IntoRequest {
@@ -109,11 +109,13 @@ where
109109
let (mut tx, rx) = Body::channel();
110110

111111
tokio::spawn(async move {
112-
if tx.send_data(metadata_prelude.clone().into()).await.is_err() {
112+
if tx.send_data(metadata_prelude.into()).await.is_err() {
113+
tracing::error!("Error sending metadata prelude, response channel closed");
113114
return;
114115
}
115116

116117
if tx.send_data("\u{0}".repeat(8).into()).await.is_err() {
118+
tracing::error!("Error sending metadata prelude delimiter, response channel closed");
117119
return;
118120
}
119121

@@ -124,8 +126,8 @@ where
124126
};
125127

126128
if tx.send_data(chunk).await.is_err() {
127-
// Consumer has gone away; nothing else to do.
128-
break;
129+
tracing::error!("Error sending response body chunk, response channel closed");
130+
return;
129131
}
130132
}
131133
});
@@ -219,6 +221,18 @@ mod tests {
219221
#[tokio::test]
220222
async fn streaming_send_data_error_is_ignored() {
221223
use crate::StreamResponse;
224+
use std::sync::atomic::{AtomicBool, Ordering};
225+
use std::sync::Arc;
226+
227+
// Track if a panic occurred in any spawned task
228+
let panicked = Arc::new(AtomicBool::new(false));
229+
let panicked_clone = panicked.clone();
230+
231+
// Set a custom panic hook to detect panics in spawned tasks
232+
let old_hook = std::panic::take_hook();
233+
std::panic::set_hook(Box::new(move |_| {
234+
panicked_clone.store(true, Ordering::SeqCst);
235+
}));
222236

223237
let stream = tokio_stream::iter(vec![Ok::<Bytes, Error>(Bytes::from_static(b"chunk"))]);
224238

@@ -232,9 +246,15 @@ mod tests {
232246
// immediate drop simulates client disconnection
233247
drop(http_req);
234248

235-
// force the task to run
236-
tokio::task::yield_now().await;
249+
// give the spawned task time to run and potentially panic
250+
tokio::time::sleep(Duration::from_millis(10)).await;
251+
252+
// Restore the old panic hook
253+
std::panic::set_hook(old_hook);
237254

238-
// at this point the inner task will panic if errors are unwrapped.
255+
assert!(
256+
!panicked.load(Ordering::SeqCst),
257+
"spawned task panicked - send_data errors should be ignored, not unwrapped"
258+
);
239259
}
240260
}

0 commit comments

Comments
 (0)