Skip to content

Synchronous Subscriber

fmp edited this page Feb 2, 2018 · 1 revision

The Rust MQTT library supports two client interfaces, an Asynchronous (non-blocking) API and a Synchronous (blocking) API. The synchronous client is actually just a convenience wrapper around the asynchronous one, but can be used to easily receive published messages from the server using the "consumer" API.

The full source code for this example is at: sync_consume.rs

The steps to create a synchronous consumer are as follows:

  1. Create an instance of a client.
  2. Connect to the server/broker.
  3. Start message consumer.
  4. Subscribe to topics.
  5. Loop on the receiver channel to read incoming messages.
  6. Handle errors and reconnect when necessary.
  7. Disconnect from the broker.

Creating a synchronous client

To create a synchronous client, use mqtt::Client::new(). At a minimum, this requires a URI for the broker, but could also take a full set of creation options. A subscriber will often want to identify itself to the broker using a unique client ID, and create a persistent, non-clean, session in case the connecion is unexpectedly lost, the server will retain messages for the client until it reconnects.

So we build a set of creation options specifying the server URI and a unique client identifier and use those to create a new client:

let create_opts = mqtt::CreateOptionsBuilder::new()
    .server_uri("tcp://localhost:1883")
    .client_id("rust_sync_consumer")
    .finalize();

let mut cli = mqtt::Client::new(create_opts).unwrap_or_else(|e| {
    println!("Error creating the client: {:?}", e);
    process::exit(1);
});

The MQTT URI is in the form of <protocol>://<host>:<port>, where the protocol is either tcp for a standard connection or ssl for a secure one. The host can be specified as an IP address or domain name, and the port is an integer number, where 1883 is the standard IANA port number for TCP connections, and 8883 is standard for SSL connections.

Here we specify a relatively simple client ID, "rust_sync_consumer", but in a real system, we need to come up with something better which would be guaranteed unique in the system - perhaps by using product serial numbers or GUID's.

Client::new returns a Rust Result object to indicate success or failure of the call. The call doesn't perform any network I/O, it just sets up internal data structures, so is unlikely to fail. In that case we can unwrap the result to get the client object. More complex clients that use persistence could fail, so we might want to get in the habit of checking the return for errors and handling them - or at least reporting a clear error message, like:

let cli = mqtt::Client::new("tcp://localhost:1883").unwrap_or_else(|err| {
    println!("Error creating the client: {:?}", err);
    process::exit(1);
});

Connecting to the broker

Once we have a client we can use it to connect to the broker using the connect function. For the subscriber with a unique client ID, we would want to set up a persistent session so that the server will store messages and subscription information in the case that the subscriber is unexpectedly disconnected. So we build a set of connection options and send them to the connect() function:

let conn_opts = mqtt::ConnectOptionsBuilder::new()
    .keep_alive_interval(Duration::from_secs(20))
    .clean_session(false)
    .finalize();

println!("Connecting to the MQTT broker...");
if let Err(e) = cli.connect(conn_opts) {
    println!("Error connecting to the broker: {:?}", e);
    process::exit(1);
};

This is a blocking network call, and won't return until the connection has succeeded or failed. It can fail for all the normal reasons that network connections fail: the server is down, it's unreachable, the name can't be resolved, there are security restrictions, etc. So it is imperative that a failed connection is handled properly. In a real system, it could be retried for a while. Here we just report the error and exit the application.

Since connect doesn't return anything useful on success (just the unit value), we only pattern match for the error and use it to report the failure to the user.

Preparing to receive messages

In order to start receiving messages, the synchronous application should do two things, put the client into consuming mode, and subscribe to the desired topics.

By starting the consumer, the client internally sets up a callback on incoming messages which places them into an mpsc::channel.

let rx = cli.start_consuming();

After that, the application is free to register for the desired topics that it wants to receive. It can use Client::subscribe() to register a single topic with the server or Client::subscribe_many() to register multiple topics simultaneously:

let subscriptions = [ "test", "hello" ];
let qos = [1, 1];

if let Err(e) = cli.subscribe_many(&subscriptions, &qos) {
    println!("Error subscribing to topics: {:?}", e);
    cli.disconnect(None).unwrap();
    process::exit(1);
}

Each topic needs to be registered with a requested QoS for messages on that topic. The broker will send messages using that QoS to the client. Each subscribed topic can have a different QoS.

The subscribe() functions send the request to the broker after which it is free to start sending messages to the subscriber as they are received.

Note that it is important to start consuming messages before subscribing. This is to eliminate the race condition that would occur if the subscription were done first. In that case the server could start sending messages before the consumer was in place, and messages would get lost.

So start_consuming() should be called before subscribe() or subscribe_many().

Receiving Messages

Once the client is in consuming mode and the subscriptions are registered with the server, the client can start receiving messages. The start_consuming() call returns the receiver end of an mpsc channel, which the app can use to read the incoming messages.

let rx = cli.start_consuming();

// ...subscribe to topics...

for msg in rx.iter() {
    if let Some(msg) = msg {
        println!("{}", msg);
    }
}

The channel is actually a queue of Option<Message> objects. Receiving a None would indicate a problem with the client, most likely that the connection was lost. It's up to the application to decide what to do next. Commonly it might try to reconnect and continue, like:

for msg in rx.iter() {
    if let Some(msg) = msg {
        println!("{}", msg);
    }
    else if cli.is_connected() ||
             !try_reconnect(&cli) {
        break;
    }
}

This says: if the client is still connected, then abort (since the problem is something else which must be handled), otherwise try to reconnect. If the reconnect is successful, then continue as normal.

Note that once the consumer is started, the channed remains valid through disconnects and reconnects. The application should retain and keep using the receiver through reconnection attempts.

This strategy of a single reconnection attempt is somewhat effective, but really too simplistic for a production system. The application should consider a more robust reconnect algorithm appropriate to the overall system - like retrying the reconnect on failure with a backoff/retry.

Disconnecting

A subscriber like this might try to run forever without ever disconnecting. in that case the receiver will loop forever, recovering from any errors or disconnects as appropriate.

But if the subscriber has a limited lifetime and would want to disconnect, it would normally unsubscribe from topics and call the disconnect function:

cli.unsubscribe_many(&subscriptions).unwrap();
cli.disconnect(None).unwrap();

This shuts down the connection to the server cleanly and ends the persistent session.