Class CacheLoadOnlyStoreAdapter<K,​V,​I>

  • Type Parameters:
    K - Key type.
    V - Value type.
    I - Input type.
    All Implemented Interfaces:
    javax.cache.integration.CacheLoader<K,​V>, javax.cache.integration.CacheWriter<K,​V>, CacheStore<K,​V>

    public abstract class CacheLoadOnlyStoreAdapter<K,​V,​I>
    extends Object
    implements CacheStore<K,​V>
    This adapter designed to support stores with bulk loading from stream-like source.

    This class processes input data in the following way:

    • Iterator of input record obtained from user-defined inputIterator(Object...).
    • Iterator continuously queried for input records and they are grouped into batches of batchSize.
    • Batch is placed into processing queue and puled by one of threadsCnt working threads.
    • Each record in batch is passed to user-defined parse(Object, Object...) method and result is stored into cache.

    Two methods should be implemented by inheritants:

    • inputIterator(Object...). It should open underlying data source and iterate all record available in it. Individual records could be in very raw form, like text lines for CSV files.
    • parse(Object, Object...). This method should process input records and transform them into key-value pairs for cache.

    • Field Detail

      • DFLT_BATCH_QUEUE_SIZE

        public static final int DFLT_BATCH_QUEUE_SIZE
        Default batch queue size (max batches count to limit memory usage).
        See Also:
        Constant Field Values
      • DFLT_THREADS_COUNT

        public static final int DFLT_THREADS_COUNT
        Default number of working threads (equal to the number of available processors).
    • Constructor Detail

      • CacheLoadOnlyStoreAdapter

        public CacheLoadOnlyStoreAdapter()
    • Method Detail

      • inputIterator

        protected abstract Iterator<I> inputIterator​(@Nullable
                                                     @Nullable Object... args)
                                              throws javax.cache.integration.CacheLoaderException
        Returns iterator of input records.

        Note that returned iterator doesn't have to be thread-safe. Thus it could operate on raw streams, DB connections, etc. without additional synchronization.

        Parameters:
        args - Arguments passes into IgniteCache.loadCache(IgniteBiPredicate, Object...) method.
        Returns:
        Iterator over input records.
        Throws:
        javax.cache.integration.CacheLoaderException - If iterator can't be created with the given arguments.
      • parse

        @Nullable
        protected abstract @Nullable IgniteBiTuple<K,​V> parse​(I rec,
                                                                    @Nullable
                                                                    @Nullable Object... args)
        This method should transform raw data records into valid key-value pairs to be stored into cache.

        If null is returned then this record will be just skipped.

        Parameters:
        rec - A raw data record.
        args - Arguments passed into IgniteCache.loadCache(IgniteBiPredicate, Object...) method.
        Returns:
        Cache entry to be saved in cache or null if no entry could be produced from this record.
      • getBatchSize

        public int getBatchSize()
        Returns batch size.
        Returns:
        Batch size.
      • setBatchSize

        public void setBatchSize​(int batchSize)
        Sets batch size.
        Parameters:
        batchSize - Batch size.
      • getBatchQueueSize

        public int getBatchQueueSize()
        Returns batch queue size.
        Returns:
        Batch queue size.
      • setBatchQueueSize

        public void setBatchQueueSize​(int batchQueueSize)
        Sets batch queue size.
        Parameters:
        batchQueueSize - Batch queue size.
      • getThreadsCount

        public int getThreadsCount()
        Returns number of worker threads.
        Returns:
        Number of worker threads.
      • setThreadsCount

        public void setThreadsCount​(int threadsCnt)
        Sets number of worker threads.
        Parameters:
        threadsCnt - Number of worker threads.
      • load

        public V load​(K key)
        Specified by:
        load in interface javax.cache.integration.CacheLoader<K,​V>
      • loadAll

        public Map<K,​V> loadAll​(Iterable<? extends K> keys)
        Specified by:
        loadAll in interface javax.cache.integration.CacheLoader<K,​V>
      • write

        public void write​(javax.cache.Cache.Entry<? extends K,​? extends V> entry)
        Specified by:
        write in interface javax.cache.integration.CacheWriter<K,​V>
      • writeAll

        public void writeAll​(Collection<javax.cache.Cache.Entry<? extends K,​? extends V>> entries)
        Specified by:
        writeAll in interface javax.cache.integration.CacheWriter<K,​V>
      • delete

        public void delete​(Object key)
        Specified by:
        delete in interface javax.cache.integration.CacheWriter<K,​V>
      • deleteAll

        public void deleteAll​(Collection<?> keys)
        Specified by:
        deleteAll in interface javax.cache.integration.CacheWriter<K,​V>
      • sessionEnd

        public void sessionEnd​(boolean commit)
        Tells store to commit or rollback a transaction depending on the value of the 'commit' parameter.
        Specified by:
        sessionEnd in interface CacheStore<K,​V>
        Parameters:
        commit - True if transaction should commit, false for rollback.