Subscribe to one or more specific MQTT topics
Agent
Name | Type | Description |
---|---|---|
topics | String or blob, or array of strings/blobs |
Topic(s) to be subscribed to. Must be valid UTF-8
|
qos | Integer or array of integers |
Optional delivery mode constant(s)
|
callback | Function |
Optional function to be called with MQTT broker’s acknowledgement
|
Integer — 0, or an error code
This method allows you to subscribe to the specified topic or topics.
To subscribe to multiple topics, pass the names of the topics (each a strings or a blob containing valid UTF-8 text) in an array. This array may include no more than ten topics, or an error will be thrown.
You can request a delivery mode by passing one of the following integer constants into the qos parameter:
Constant | Value | Description |
---|---|---|
mqtt.AT_MOST_ONCE | 0 | The message is sent but receipt is not confirmed by the recipient so there is no guarantee of delivery. The message is not stored and re-transmitted by the sender |
mqtt.AT_LEAST_ONCE | 1 | The message may be delivered more than once. The sender stores and periodically re-transmits the message until the receiver acknowledges receipt of the message |
mqtt.EXACTLY_ONCE | 2 | The message is delivered only once. The sender stores and periodically re-transmits the message until the receiver acknowledges receipt of the message, but subsequent sends mark the message as a duplicate, allowing the receiver to process the message only once. This also utilizes multiple acknowledge messages transmitted by sender and recipient. It is the safest but slowest QoS level |
You should note that this is simply a request made to the broker — which may apply a different delivery mode. To discover the actual delivery mode, provide a callback function: this has one parameter, qosMode, which receives an integer (or an array of integers if multiple topics were included in the subscribe request) indicating the actual delivery mode — or 128 (0x80
) if the subscription request was rejected by the broker:
Value | State | Actual QoS |
---|---|---|
0x00 |
Success | mqtt.AT_MOST_ONCE |
0x01 |
Success | mqtt.AT_LEAST_ONCE |
0x02 |
Success | mqtt.EXACTLY_ONCE |
0x80 |
Failure | N/A |
You can provide the QoS value as a single integer or as an array of integers. In the case of a single integer, its value will be applied to all of the topics included in the subscription request:
mqttclient.subscribe("mytopic", 0, function(qos) {
server.log("Subscribed QoS = " + qos);
});
mqttclient.subscribe(["mytopic1", "mytopic2"], 0, function(qosList) {
foreach (qos in qosList) server.log("Subscribed QoS = " + qos);
});
To request specific QoS values for specific topics, pass in an array of integers. The topic and QoS arrays must contain the same number of elements, or an error will be thrown. The nth topic will be subscribed with the nth QoS value:
mqttclient.subscribe(["mytopic1", "mytopic2"], [0, 2], function(qosList) {
foreach (qos in qosList) server.log("Subscribed QoS = " + qos);
});
An exception will be thrown if you call subscribe() on an mqttclient instance that is currently disconnected.
We recommend that you wrap all mqttclient.subscribe() calls in a try... catch
statement. Even if the client is connected at the time of the call, the socket could nonetheless fail as a result of the send process itself.
If you subsequently call disconnect the mqttclient instance that you used to subscribe to topics, you will need to re-subscribe to those topics if you later connect the mqttclient once more. Re-connecting does not cause these subscriptions to be remade automatically. The recommended way to do this is to include all your calls to mqttclient.subscribe() in a callback function registered using mqttclient.onconnect(). This is shown in the example code below.
Note Inbound and outbound MQTT messages are subject to rate limits.
The following code sets up an MQTT client and connects to the specified MQTT broker. If the connection is made, the code attempts to subscribe to a topic using mqttclient.subscribe(). If this succeeds in turn, the code registers the handler that will log any incoming messages that have been posted to the chosen topic.
// CONSTANTS
const URL = "tcp://test.mosquitto.org";
const PORT = 1883
const UN = "generalgrugger";
const PW = "gaztakh1ghc0mmand";
// GLOBALS
local client = null;
local cid = "imp.mqtt.test." + imp.configparams.deviceid;
// FUNCTIONS
function error(code) {
switch (code) {
case -1: return "Socket Error";
case 1: return "Unsupported MQTT version";
case 2: return "Client ID rejected";
case 3: return "Server unavailable";
case 4: return "Invalid username/password";
case 5: return "Not authorized";
default: return "Unknown error";
}
}
function onMessage(message) {
// Called on receipt of a message
server.log("Message \'" + message.message + "\' received under topic \'" + message.topic + "\'");
}
function onConnect(resultCode) {
// Called when the client connects (or fails to connect) to the MQTT broker
local s = URL + ":" + PORT.tostring();
if (resultCode == 0) {
server.log("Connected to " + s);
// We're connected to try to subscribe to a topic
client.subscribe("imp.mqtt.test.pings",
mqtt.AT_MOST_ONCE,
function(qosMode) {
// This is the subscription acknowledgement callback
if (qosMode < 0x80) {
// We are subscribed, so inform the user...
server.log("Subscribed to \'imp.mqtt.test.pings\' with delivery mode: " + qosMode + ".");
// ...and set the message handler
client.onmessage(onMessage);
} else {
// We couldn't subscribe for some reason so inform the user
server.log("Not subscribed to \'imp.mqtt.test.pings\'.");
}
});
} else {
server.error("Failed to connect to " + s + " Error: " + error(resultCode));
}
}
function onLost() {
// Called when the connection to the MQTT broker is unexpectedly lost
local s = URL + ":" + PORT.tostring();
server.log("Connected to " + s + " broken");
}
// RUNTIME START
// Instance an MQTT client
client = mqtt.createclient();
// Set up the initial handlers
client.onconnect(onConnect);
client.onconnectionlost(onLost);
// Connect with credentials
local options = {};
options.username <- UN;
options.password <- PW;
client.connect(URL + ":" + PORT.tostring(), cid, options);