Apache Kafka Streamer | Ignite Documentation
Edit

Apache Kafka Streamer

Overview

Apache Ignite Kafka Streamer module provides streaming from Kafka to Ignite cache. Either of the following two methods can be used to achieve such streaming:

  • using Kafka Connect functionality with Ignite sink

  • importing the Kafka Streamer module in your Maven project and instantiating KafkaStreamer for data streaming

Streaming Data via Kafka Connect

IgniteSinkConnector will help you export data from Kafka to Ignite cache by polling data from Kafka topics and writing it to your specified cache. The connector can be found in the optional/ignite-kafka module. It and its dependencies have to be on the classpath of a Kafka running instance, as described in the following subsection. For more information on Kafka Connect, see Kafka Documentation.

Setting up and Running

  1. Add the IGNITE_HOME/libs/optional/ignite-kafka module to the application classpath.

  2. Prepare worker configurations, e.g.,

    bootstrap.servers=localhost:9092
    
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    key.converter.schemas.enable=false
    value.converter.schemas.enable=false
    
    internal.key.converter=org.apache.kafka.connect.storage.StringConverter
    internal.value.converter=org.apache.kafka.connect.storage.StringConverter
    internal.key.converter.schemas.enable=false
    internal.value.converter.schemas.enable=false
    
    offset.storage.file.filename=/tmp/connect.offsets
    offset.flush.interval.ms=10000
  3. Prepare connector configurations, e.g.,

    # connector
    name=my-ignite-connector
    connector.class=org.apache.ignite.stream.kafka.connect.IgniteSinkConnector
    tasks.max=2
    topics=someTopic1,someTopic2
    
    # cache
    cacheName=myCache
    cacheAllowOverwrite=true
    igniteCfg=/some-path/ignite.xml
    singleTupleExtractorCls=my.company.MyExtractor
    • where cacheName is the name of the cache you specify in /some-path/ignite.xml and the data from someTopic1,someTopic2 will be pulled and stored.

    • cacheAllowOverwrite can be set to true if you want to enable overwriting existing values in the cache.

    • If you need to parse the incoming data and decide on your new key and value, you can implement it as StreamSingleTupleExtractor and specify as singleTupleExtractorCls.

    • You can also set cachePerNodeDataSize and cachePerNodeParOps to adjust per-node buffer and the maximum number of parallel stream operations for a single node.

  4. Start connector, for instance, in a standalone mode as follows,

    bin/connect-standalone.sh myconfig/connect-standalone.properties myconfig/ignite-connector.properties

Checking the Flow

To perform a very basic functionality check, you can do the following,

  1. Start Zookeeper

    bin/zookeeper-server-start.sh config/zookeeper.properties
  2. Start Kafka server

    bin/kafka-server-start.sh config/server.properties
  3. Provide some data input to the Kafka server

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --property parse.key=true --property key.separator=,k1,v1
  4. Start the connector

    bin/connect-standalone.sh myconfig/connect-standalone.properties myconfig/ignite-connector.properties
  5. Check the value is in the cache. For example, via REST API,

    http://node1:8080/ignite?cmd=size&cacheName=cache1

Streaming data with Ignite Kafka Streamer Module

If you are using Maven to manage dependencies of your project, first of all you will have to add Kafka Streamer module dependency like this (replace ${ignite-kafka-ext.version} with actual Ignite Kafka Extension version you are interested in):

<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
                        http://maven.apache.org/xsd/maven-4.0.0.xsd">
    ...
    <dependencies>
        ...
        <dependency>
            <groupId>org.apache.ignite</groupId>
            <artifactId>ignite-kafka-ext</artifactId>
            <version>${ignite-kafka-ext.version}</version>
        </dependency>
        ...
    </dependencies>
    ...
</project>

Having a cache with String keys and String values, the streamer can be started as follows

KafkaStreamer<String, String, String> kafkaStreamer = new KafkaStreamer<>();

IgniteDataStreamer<String, String> stmr = ignite.dataStreamer("myCache"));

// allow overwriting cache data
stmr.allowOverwrite(true);

kafkaStreamer.setIgnite(ignite);
kafkaStreamer.setStreamer(stmr);

// set the topic
kafkaStreamer.setTopic(someKafkaTopic);

// set the number of threads to process Kafka streams
kafkaStreamer.setThreads(4);

// set Kafka consumer configurations
kafkaStreamer.setConsumerConfig(kafkaConsumerConfig);

// set extractor
kafkaStreamer.setSingleTupleExtractor(strExtractor);

kafkaStreamer.start();

...

// stop on shutdown
kafkaStreamer.stop();

strm.close();

For the detailed information on Kafka consumer properties, refer http://kafka.apache.org/documentation.html