MQTT Streamer | Ignite Documentation
Edit

MQTT Streamer

Overview

This streamer consumes from an MQTT topic and feeds key-value pairs into an IgniteDataStreamer instance, using Eclipse Paho as an MQTT client.

You need to provide a stream tuple extractor (either a single-entry or multiple-entries extractor) to process the incoming message and extract the tuple to insert.

This streamer supports:

  • Subscribing to a single topic or multiple topics at once.

  • Specifying the subscriber’s QoS for a single topic or for multiple topics.

  • Setting MqttConnectOptions to enable features like _last will testament, persistent sessions, etc.

  • Specifying the client ID. A random one will be generated and maintained throughout reconnections if the user does not provide one.

  • (Re-)Connection retries powered by the guava-retrying library. _Retry wait and retry stop policies can be configured.

  • Blocking the start() method until the client is connected for the first time.

Example

Here’s a trivial code sample showing how to use this streamer:

// Start Ignite.
Ignite ignite = Ignition.start();

// Get a data streamer reference.
IgniteDataStreamer<Integer, String> dataStreamer = grid().dataStreamer("mycache");

// Create an MQTT data streamer
MqttStreamer<Integer, String> streamer = new MqttStreamer<>();
streamer.setIgnite(ignite);
streamer.setStreamer(dataStreamer);
streamer.setBrokerUrl(brokerUrl);
streamer.setBlockUntilConnected(true);

// Set a single tuple extractor to extract items in the format 'key,value' where key => Int, and value => String
// (using Guava here).
streamer.setSingleTupleExtractor(new StreamSingleTupleExtractor<MqttMessage, Integer, String>() {
    @Override public Map.Entry<Integer, String> extract(MqttMessage msg) {
        List<String> s = Splitter.on(",").splitToList(new String(msg.getPayload()));

        return new GridMapEntry<>(Integer.parseInt(s.get(0)), s.get(1));
    }
});

// Consume from multiple topics at once.
streamer.setTopics(Arrays.asList("def", "ghi", "jkl", "mno"));

// Start the MQTT Streamer.
streamer.start();

Refer to the Javadocs of the ignite-mqtt-ext module for more info on the available options.