MQTT Streamer
Overview
This streamer consumes from an MQTT topic and feeds key-value pairs into an IgniteDataStreamer
instance, using
Eclipse Paho as an MQTT client.
You need to provide a stream tuple extractor (either a single-entry or multiple-entries extractor) to process the incoming message and extract the tuple to insert.
This streamer supports:
-
Subscribing to a single topic or multiple topics at once.
-
Specifying the subscriber’s QoS for a single topic or for multiple topics.
-
Setting MqttConnectOptions to enable features like _last will testament, persistent sessions, etc.
-
Specifying the client ID. A random one will be generated and maintained throughout reconnections if the user does not provide one.
-
(Re-)Connection retries powered by the guava-retrying library. _Retry wait and retry stop policies can be configured.
-
Blocking the start() method until the client is connected for the first time.
Example
Here’s a trivial code sample showing how to use this streamer:
// Start Ignite.
Ignite ignite = Ignition.start();
// Get a data streamer reference.
IgniteDataStreamer<Integer, String> dataStreamer = grid().dataStreamer("mycache");
// Create an MQTT data streamer
MqttStreamer<Integer, String> streamer = new MqttStreamer<>();
streamer.setIgnite(ignite);
streamer.setStreamer(dataStreamer);
streamer.setBrokerUrl(brokerUrl);
streamer.setBlockUntilConnected(true);
// Set a single tuple extractor to extract items in the format 'key,value' where key => Int, and value => String
// (using Guava here).
streamer.setSingleTupleExtractor(new StreamSingleTupleExtractor<MqttMessage, Integer, String>() {
@Override public Map.Entry<Integer, String> extract(MqttMessage msg) {
List<String> s = Splitter.on(",").splitToList(new String(msg.getPayload()));
return new GridMapEntry<>(Integer.parseInt(s.get(0)), s.get(1));
}
});
// Consume from multiple topics at once.
streamer.setTopics(Arrays.asList("def", "ghi", "jkl", "mno"));
// Start the MQTT Streamer.
streamer.start();
Refer to the Javadocs of the ignite-mqtt-ext
module for more info on the available options.
Apache, Apache Ignite, the Apache feather and the Apache Ignite logo are either registered trademarks or trademarks of The Apache Software Foundation.