From 1b9b5a797396ce19e08b55c0f111e6e30310fb89 Mon Sep 17 00:00:00 2001 From: Xinhao Xu <84456268+xxhZs@users.noreply.github.com> Date: Tue, 24 Dec 2024 12:59:52 +0800 Subject: [PATCH] fix(sink): fix mogodb write error handling (#19869) --- src/connector/src/sink/mongodb.rs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/connector/src/sink/mongodb.rs b/src/connector/src/sink/mongodb.rs index 244a1bb70db8d..44d116cedf093 100644 --- a/src/connector/src/sink/mongodb.rs +++ b/src/connector/src/sink/mongodb.rs @@ -81,6 +81,12 @@ mod send_bulk_write_command_future { ))) })?; + if let Ok(ok) = result.get_i32("ok") + && ok != 1 + { + return Err(SinkError::Mongodb(anyhow!("bulk write write errors"))); + } + if let Ok(write_errors) = result.get_array("writeErrors") { return Err(SinkError::Mongodb(anyhow!( "bulk write respond with write errors: {:?}", @@ -88,15 +94,10 @@ mod send_bulk_write_command_future { ))); } - let n = result.get_i32("n").map_err(|err| { - SinkError::Mongodb( - anyhow!(err).context("can't extract field n from bulk write response"), - ) - })?; - if n < 1 { + if let Ok(write_concern_error) = result.get_array("writeConcernError") { return Err(SinkError::Mongodb(anyhow!( - "bulk write respond with an abnormal state, n = {}", - n + "bulk write respond with write errors: {:?}", + write_concern_error, ))); }