Skip to content

Commit

Permalink
Merge pull request #2344 from suneetnangia/issue/2335/fix-swallowed-m…
Browse files Browse the repository at this point in the history
…qtt-errors

Fixed issue around swallowing mqtt publish errors.
  • Loading branch information
itowlson authored Mar 11, 2024
2 parents 84b7009 + ca6b061 commit 6d4de86
Showing 1 changed file with 13 additions and 1 deletion.
14 changes: 13 additions & 1 deletion crates/outbound-mqtt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,24 @@ impl v2::HostConnection for OutboundMqtt {
qos: Qos,
) -> Result<Result<(), Error>> {
Ok(async {
let (client, _) = self.get_conn(connection).await.map_err(other_error)?;
let (client, eventloop) = self.get_conn(connection).await.map_err(other_error)?;
let qos = convert_to_mqtt_qos_value(qos);

// Message published to EventLoop (not MQTT Broker)
client
.publish_bytes(topic, qos, false, payload.into())
.await
.map_err(other_error)?;

// Poll EventLoop once to send the message to MQTT broker or capture/throw error
// We may revisit this later to manage long running connections and their issues in the connection pool.
eventloop
.poll()
.await
.map_err(|err: rumqttc::ConnectionError| {
v2::Error::ConnectionFailed(err.to_string())
})?;

Ok(())
}
.await)
Expand Down

0 comments on commit 6d4de86

Please sign in to comment.