Skip to main content

amqpsession.openreceiver(path, eventCallback, messageCallback)

Establishes a means to receive AMQP messages

Availability

Agent

Parameters

Name Type Description
path String The path of the target endpoint relative to the host
eventCallback Function An optional function that will be called at key moments during the life of the object
messageCallback Function An optional function that will be called when a message is received

Returns

An amqpreceiver object

Description

Important Note Agent AMQP functionality is now deprecated and will shortly become unsupported. Any attempt to interact with imp API amqp objects and methods on unregistered development devices or on production devices will generate a runtime error.

If you are using or intend to use Azure IoTHub, we recommend you make use of MQTT instead of AMQP. Please see our Azure IoT Hub integration for more information.


This method creates a new amqpreceiver object which represents a pipe from the AMQP broker at the endpoint specified by path. The receiver is prepared asynchronously and may not be ready for use when openreceiver() returns. Success or failure of the attempt to ready the receiver is indicated to the callback registered with amqp.openconnection(), but code can call amqpreceiver.isopen() at any time to check the receiver status.

Received messages are not relayed to the amqp.openconnection() callback, but to openreceiver’s own callback, which is set via the method’s second parameter, messageCallback. This callback has one parameter of its own, deliveries into which an array of amqpdelivery objects is passed, one for each message. These objects can be then used to retrieve a message and to settle its delivery with the broker. The amqpdelivery object has four methods:

Method Action Return Value
message() None The message as an amqpmessage object
accept() Accept the delivery Nothing
reject() Reject the delivery Nothing
release() Release the delivery Nothing

If there is no explicit handling of the delivery object (ie. neither accept(), reject() or release() are called on it) then the amqpdelivery object will automatically accept the delivery when it goes out of Squirrel scope (ie. when the callback ends).

The following code snipped shows how this might be used:

function messageHandler(deliveries) {
  if (deliveries.len() > 0) {
    foreach (delivery in deliveries) {
      local msg = delivery.message().body();
      delivery.accept();
      server.log(msg);
    }
  }
}

session.openreceiver("", nil, messageHandler);

The function passed into the streamCallback is also optional. If provided, it it should have the following parameters of its own:

Parameter Type Description
event String A constant representing the type of event that triggered the callback
errorDetail String or null In the event of an error, an error description, otherwise null

See amqp.openconnection() for details of these parameters and the values passed into them.

amqpreceiver objects are automatically closed when they are deallocated, which occurs (also automatically) when they go out of scope. To manually close an amqpreceiver, it must be deallocated by setting it to null.

Example

The following code shows how a receiver is opened following the receipt of a SESSION_OPEN event. The function amqpConnectionManager() has been already registered with amqp.openconnection() as the chosen connection callback. With the session open, the code is safe to create an amqpreceiver object and register the callback that will process incoming messages: amqpMessageManager(), which simply logs the messages embedded in received amqpdelivery objects.

const HUB_NAME = "<YOUR_HUB_NAME>";

conn <- null;
session <- null;
receiver <- null;

function amqpConnectionManager(event, errorDetail) {
  if (errorDetail) {
    server.error(errorDetail);
  } else {
    // Received an event from the AMQP connection
    switch(event) {
      case "CONNECTION_OPEN":
        // The connection is now open - create a session
        session = conn.opensession(amqpConnectionManager);
        break;

      case "SESSION_OPEN":
        // The session is now ready, create a receiver
        if (session) receiver = session.openreceiver("$cbs",
                                                     amqpConnectionManager,
                                                     amqpMessageManager);
        break;

      case "RECEIVER_OPEN":
        // The receiver has signalled its readiness
        server.log("Receiver now ready to handle messages");
    }
  }
}


function amqpMessageManager(deliveries) {
  // Process recieved amqpdelivery objects
  server.log(format("Received %d responses from Azure", deliveries.len()));
  if (deliveries.len() > 0) {
    foreach (delivery in deliveries) {
      local msg = delivery.message().body();
      delivery.accept();
      server.log(msg);
      if (delivery.message().properties().status-code == 200) server.log("Authorized with Azure IoT Hub");
    }
  }
}

// Initiate the connection
local url = "amqps://" + HUB_NAME + ".azure-devices.net:5671";
conn = amqp.openconnection(url, amqpConnectionManager);