Package org.apache.ignite.cache.store
Class CacheLoadOnlyStoreAdapter<K,V,I>
- java.lang.Object
-
- org.apache.ignite.cache.store.CacheLoadOnlyStoreAdapter<K,V,I>
-
- Type Parameters:
K
- Key type.V
- Value type.I
- Input type.
- All Implemented Interfaces:
javax.cache.integration.CacheLoader<K,V>
,javax.cache.integration.CacheWriter<K,V>
,CacheStore<K,V>
public abstract class CacheLoadOnlyStoreAdapter<K,V,I> extends Object implements CacheStore<K,V>
This adapter designed to support stores with bulk loading from stream-like source.This class processes input data in the following way:
-
Iterator of input record obtained from user-defined
inputIterator(Object...)
. -
Iterator continuously queried for input records and they are grouped into batches of
batchSize
. -
Batch is placed into processing queue and puled by one of
threadsCnt
working threads. -
Each record in batch is passed to user-defined
parse(Object, Object...)
method and result is stored into cache.
Two methods should be implemented by inheritants:
-
inputIterator(Object...)
. It should open underlying data source and iterate all record available in it. Individual records could be in very raw form, like text lines for CSV files. -
parse(Object, Object...)
. This method should process input records and transform them into key-value pairs for cache.
-
-
Field Summary
Fields Modifier and Type Field Description static int
DFLT_BATCH_QUEUE_SIZE
Default batch queue size (max batches count to limit memory usage).static int
DFLT_BATCH_SIZE
Default batch size (number of records read withinputIterator(Object...)
and then submitted to internal pool at a time).static int
DFLT_THREADS_COUNT
Default number of working threads (equal to the number of available processors).
-
Constructor Summary
Constructors Constructor Description CacheLoadOnlyStoreAdapter()
-
Method Summary
All Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description void
delete(Object key)
void
deleteAll(Collection<?> keys)
int
getBatchQueueSize()
Returns batch queue size.int
getBatchSize()
Returns batch size.int
getThreadsCount()
Returns number of worker threads.protected abstract Iterator<I>
inputIterator(@Nullable Object... args)
Returns iterator of input records.V
load(K key)
Map<K,V>
loadAll(Iterable<? extends K> keys)
void
loadCache(IgniteBiInClosure<K,V> c, @Nullable Object... args)
Loads all values from underlying persistent storage.protected abstract @Nullable IgniteBiTuple<K,V>
parse(I rec, @Nullable Object... args)
This method should transform raw data records into valid key-value pairs to be stored into cache.void
sessionEnd(boolean commit)
Tells store to commit or rollback a transaction depending on the value of the'commit'
parameter.void
setBatchQueueSize(int batchQueueSize)
Sets batch queue size.void
setBatchSize(int batchSize)
Sets batch size.void
setThreadsCount(int threadsCnt)
Sets number of worker threads.void
write(javax.cache.Cache.Entry<? extends K,? extends V> entry)
void
writeAll(Collection<javax.cache.Cache.Entry<? extends K,? extends V>> entries)
-
-
-
Field Detail
-
DFLT_BATCH_SIZE
public static final int DFLT_BATCH_SIZE
Default batch size (number of records read withinputIterator(Object...)
and then submitted to internal pool at a time).- See Also:
- Constant Field Values
-
DFLT_BATCH_QUEUE_SIZE
public static final int DFLT_BATCH_QUEUE_SIZE
Default batch queue size (max batches count to limit memory usage).- See Also:
- Constant Field Values
-
DFLT_THREADS_COUNT
public static final int DFLT_THREADS_COUNT
Default number of working threads (equal to the number of available processors).
-
-
Method Detail
-
inputIterator
protected abstract Iterator<I> inputIterator(@Nullable @Nullable Object... args) throws javax.cache.integration.CacheLoaderException
Returns iterator of input records.Note that returned iterator doesn't have to be thread-safe. Thus it could operate on raw streams, DB connections, etc. without additional synchronization.
- Parameters:
args
- Arguments passes intoIgniteCache.loadCache(IgniteBiPredicate, Object...)
method.- Returns:
- Iterator over input records.
- Throws:
javax.cache.integration.CacheLoaderException
- If iterator can't be created with the given arguments.
-
parse
@Nullable protected abstract @Nullable IgniteBiTuple<K,V> parse(I rec, @Nullable @Nullable Object... args)
This method should transform raw data records into valid key-value pairs to be stored into cache.If
null
is returned then this record will be just skipped.- Parameters:
rec
- A raw data record.args
- Arguments passed intoIgniteCache.loadCache(IgniteBiPredicate, Object...)
method.- Returns:
- Cache entry to be saved in cache or
null
if no entry could be produced from this record.
-
loadCache
public void loadCache(IgniteBiInClosure<K,V> c, @Nullable @Nullable Object... args)
Loads all values from underlying persistent storage. Note that keys are not passed, so it is up to implementation to figure out what to load. This method is called wheneverIgniteCache.loadCache(IgniteBiPredicate, Object...)
method is invoked which is usually to preload the cache from persistent storage.This method is optional, and cache implementation does not depend on this method to do anything. Default implementation of this method in
CacheStoreAdapter
does nothing.For every loaded value method
IgniteBiInClosure.apply(Object, Object)
should be called on the passed in closure. The closure will then make sure that the loaded value is stored in cache.- Specified by:
loadCache
in interfaceCacheStore<K,V>
- Parameters:
c
- Closure for loaded values.args
- Arguments passes intoIgniteCache.loadCache(IgniteBiPredicate, Object...)
method.
-
getBatchSize
public int getBatchSize()
Returns batch size.- Returns:
- Batch size.
-
setBatchSize
public void setBatchSize(int batchSize)
Sets batch size.- Parameters:
batchSize
- Batch size.
-
getBatchQueueSize
public int getBatchQueueSize()
Returns batch queue size.- Returns:
- Batch queue size.
-
setBatchQueueSize
public void setBatchQueueSize(int batchQueueSize)
Sets batch queue size.- Parameters:
batchQueueSize
- Batch queue size.
-
getThreadsCount
public int getThreadsCount()
Returns number of worker threads.- Returns:
- Number of worker threads.
-
setThreadsCount
public void setThreadsCount(int threadsCnt)
Sets number of worker threads.- Parameters:
threadsCnt
- Number of worker threads.
-
writeAll
public void writeAll(Collection<javax.cache.Cache.Entry<? extends K,? extends V>> entries)
-
delete
public void delete(Object key)
-
deleteAll
public void deleteAll(Collection<?> keys)
-
sessionEnd
public void sessionEnd(boolean commit)
Tells store to commit or rollback a transaction depending on the value of the'commit'
parameter.- Specified by:
sessionEnd
in interfaceCacheStore<K,V>
- Parameters:
commit
-True
if transaction should commit,false
for rollback.
-
-