Interface IgniteDataStreamer<K,V>
-
- All Superinterfaces:
AutoCloseable
public interface IgniteDataStreamer<K,V> extends AutoCloseable
Data streamer is responsible for streaming external data into cache. It achieves it by properly buffering updates and properly mapping keys to nodes responsible for the data to make sure that there is the least amount of data movement possible and optimal network and memory utilization.Note that data streamer data manipulation methods do not support transactions. When updating data with
allowOverwrite()
set tofalse
new entry is created on primary and backups if it has not existed. IfallowOverwrite()
istrue
then batches are applied with regularcache.put(..)
methods starting implicit transactions if streamer is targeted to a transactional cache.However, explicit transactional updates inside are possible with custom
StreamReceiver
. This way batches can be applied within transaction(s) on target node. Seereceiver(StreamReceiver)
for details.Data streamer doesn’t guarantee:
- Data order. Data records may be loaded into a cache in a different order compared to putting into the streamer;
- Immediate data loading. Data can be kept for a while before loading;
- By default, data consistency until successfully finished;
- By default, working with external storages.
If
allowOverwrite()
setting isfalse
(default), consider:- You should not have the same keys repeating in the data being streamed;
- Streamer cancelation or streamer node failure can cause data inconsistency;
- If loading into a persistent cache, concurrently created snapshot may contain inconsistent data and might not be restored entirely.
StreamReceiver
andallowOverwrite()
property.Also note that
IgniteDataStreamer
is not the only way to add data into cache. Alternatively you can useIgniteCache.loadCache(IgniteBiPredicate, Object...)
method to add data from underlying data store. You can also use standard cacheput(...)
andputAll(...)
operations as well, but they most likely will not perform as well as this class for adding data. And finally, data can be added from underlying data store on demand, whenever it is accessed - for this no explicit data adding step is needed.IgniteDataStreamer
supports the following configuration properties:-
perNodeBufferSize(int)
- when entries are added to data streamer viaaddData(Object, Object)
method, they are not sent to in-memory data grid right away and are buffered internally for better performance and network utilization. This setting controls the size of internal per-node buffer before buffered data is sent to remote node. Default is defined byDFLT_PER_NODE_BUFFER_SIZE
value. -
perNodeParallelOperations(int)
- sometimes data may be added to the data streamer viaaddData(Object, Object)
method faster than it can be put in cache. In this case, new buffered stream messages are sent to remote nodes before responses from previous ones are received. This could cause unlimited heap memory utilization growth on local and remote nodes. To control memory utilization, this setting limits maximum allowed number of parallel buffered stream messages that are being processed on remote nodes. If this number is exceeded, thenaddData(Object, Object)
method will block to control memory utilization. Default is equal to CPU count on remote node multiply byDFLT_PARALLEL_OPS_MULTIPLIER
. -
autoFlushFrequency(long)
- automatic flush frequency in milliseconds. Essentially, this is the time after which the streamer will make an attempt to submit all data added so far to remote nodes. Note that there is no guarantee that data will be delivered after this concrete attempt (e.g., it can fail when topology is changing), but it won't be lost anyway. Disabled by default (default value is0
). -
allowOverwrite(boolean)
- Sets flag enabling overwriting existing values in cache. Data streamer will perform better if this flag is disabled, which is the default setting. -
receiver(StreamReceiver)
- defines how cache will be updated with added entries. It allows to provide user-defined custom logic to update the cache in the most effective and flexible way. -
deployClass(Class)
- optional deploy class for peer deployment. All classes streamed by a data streamer must be class-loadable from the same class-loader. Ignite will make the best effort to detect the most suitable class-loader for data loading. However, in complex cases, where compound or deeply nested class-loaders are used, it is best to specify a deploy class which can be any class loaded by the class-loader for given data.
-
-
Field Summary
Fields Modifier and Type Field Description static int
DFLT_MAX_PARALLEL_OPS
Deprecated.Is not used anymore.static int
DFLT_PARALLEL_OPS_MULTIPLIER
Default multiplier for data streamer pool size to get concurrent batches count for each remote node.static int
DFLT_PER_NODE_BUFFER_SIZE
Default operations batch size to sent to remote node for loading.static int
DFLT_PER_THREAD_BUFFER_SIZE
Default batch size per thread to send to buffer on node.static long
DFLT_UNLIMIT_TIMEOUT
Default timeout for streamer's operations.
-
Method Summary
All Methods Instance Methods Abstract Methods Modifier and Type Method Description IgniteFuture<?>
addData(Collection<? extends Map.Entry<K,V>> entries)
Adds data for streaming on remote node.IgniteFuture<?>
addData(Map.Entry<K,V> entry)
Adds data for streaming on remote node.IgniteFuture<?>
addData(Map<K,V> entries)
Adds data for streaming on remote node.IgniteFuture<?>
addData(K key, V val)
Adds data for streaming on remote node.boolean
allowOverwrite()
Gets flag enabling overwriting existing values in cache.void
allowOverwrite(boolean allowOverwrite)
Sets flag enabling overwriting existing values in cache.long
autoFlushFrequency()
Gets automatic flush frequency.void
autoFlushFrequency(long autoFlushFreq)
Sets automatic flush frequency.String
cacheName()
Name of cache to stream data to.void
close()
Closes data streamer.void
close(boolean cancel)
Streams any remaining data and closes this streamer.void
deployClass(Class<?> depCls)
Optional deploy class for peer deployment.void
flush()
Streams any remaining data, but doesn't close the streamer.IgniteFuture<?>
future()
Gets future for this streaming process.boolean
keepBinary()
Gets flag indicating that objects should be kept in binary format when passed to the stream receiver.void
keepBinary(boolean keepBinary)
Sets flag indicating that objects should be kept in binary format when passes to the steam receiver.int
perNodeBufferSize()
Gets size of per node key-value pairs buffer.void
perNodeBufferSize(int bufSize)
Sets size of per node key-value pairs buffer.int
perNodeParallelOperations()
Gets maximum number of parallel stream operations for a single node.void
perNodeParallelOperations(int parallelOps)
Sets maximum number of parallel stream operations for a single node.int
perThreadBufferSize()
Gets buffer size set byperThreadBufferSize(int)
.void
perThreadBufferSize(int size)
Allows to set buffer size for thread in case of stream byaddData(Object, Object)
call.void
receiver(StreamReceiver<K,V> rcvr)
Sets custom stream receiver to this data streamer.IgniteFuture<?>
removeData(K key)
Adds key for removal on remote node.boolean
skipStore()
Gets flag indicating that write-through behavior should be disabled for data streaming.void
skipStore(boolean skipStore)
Sets flag indicating that write-through behavior should be disabled for data streaming.long
timeout()
Gets timeout set bytimeout(long)
.void
timeout(long timeout)
Sets the timeout that is used in the following cases: any data addition method can be blocked when all per node parallel operations are exhausted.void
tryFlush()
Makes an attempt to stream remaining data.
-
-
-
Field Detail
-
DFLT_MAX_PARALLEL_OPS
@Deprecated static final int DFLT_MAX_PARALLEL_OPS
Deprecated.Is not used anymore.Default max concurrent put operations count.- See Also:
- Constant Field Values
-
DFLT_PARALLEL_OPS_MULTIPLIER
static final int DFLT_PARALLEL_OPS_MULTIPLIER
Default multiplier for data streamer pool size to get concurrent batches count for each remote node.
-
DFLT_PER_NODE_BUFFER_SIZE
static final int DFLT_PER_NODE_BUFFER_SIZE
Default operations batch size to sent to remote node for loading.- See Also:
- Constant Field Values
-
DFLT_PER_THREAD_BUFFER_SIZE
static final int DFLT_PER_THREAD_BUFFER_SIZE
Default batch size per thread to send to buffer on node.- See Also:
- Constant Field Values
-
DFLT_UNLIMIT_TIMEOUT
static final long DFLT_UNLIMIT_TIMEOUT
Default timeout for streamer's operations.- See Also:
- Constant Field Values
-
-
Method Detail
-
cacheName
String cacheName()
Name of cache to stream data to.- Returns:
- Cache name or
null
for default cache.
-
allowOverwrite
boolean allowOverwrite()
Gets flag enabling overwriting existing values in cache. Data streamer will perform better if this flag is disabled.This flag is disabled by default (default is
false
).- Returns:
True
if overwriting is allowed or if receiver is changed byreceiver(StreamReceiver)
.False
otherwise.
-
allowOverwrite
void allowOverwrite(boolean allowOverwrite) throws javax.cache.CacheException
Sets flag enabling overwriting existing values in cache. Data streamer will perform better if this flag is disabled. Note that when this flag isfalse
, updates will not be propagated to the cache store (i.e.skipStore()
flag will be set totrue
implicitly).This flag is disabled by default (default is
false
).The flag has no effect when custom cache receiver set using
receiver(StreamReceiver)
method.- Parameters:
allowOverwrite
- Flag value.- Throws:
javax.cache.CacheException
- If failed.
-
skipStore
boolean skipStore()
Gets flag indicating that write-through behavior should be disabled for data streaming. Default isfalse
.- Returns:
- Skip store flag.
-
skipStore
void skipStore(boolean skipStore)
Sets flag indicating that write-through behavior should be disabled for data streaming. Default isfalse
.- Parameters:
skipStore
- Skip store flag.
-
keepBinary
boolean keepBinary()
Gets flag indicating that objects should be kept in binary format when passed to the stream receiver. Default isfalse
.- Returns:
- Skip store flag.
-
keepBinary
void keepBinary(boolean keepBinary)
Sets flag indicating that objects should be kept in binary format when passes to the steam receiver. Default isfalse
.- Parameters:
keepBinary
- Keep binary flag.
-
perNodeBufferSize
int perNodeBufferSize()
Gets size of per node key-value pairs buffer.- Returns:
- Per node buffer size.
-
perNodeBufferSize
void perNodeBufferSize(int bufSize)
Sets size of per node key-value pairs buffer.This method should be called prior to
addData(Object, Object)
call.If not provided, default value is
DFLT_PER_NODE_BUFFER_SIZE
.- Parameters:
bufSize
- Per node buffer size.
-
perNodeParallelOperations
int perNodeParallelOperations()
Gets maximum number of parallel stream operations for a single node.- Returns:
- Maximum number of parallel stream operations for a single node.
-
perNodeParallelOperations
void perNodeParallelOperations(int parallelOps)
Sets maximum number of parallel stream operations for a single node.This method should be called prior to
addData(Object, Object)
call.If not provided, default value is calculated as follows
DFLT_PARALLEL_OPS_MULTIPLIER
*DATA_STREAMER_POOL_SIZE_ON_REMOTE_NODE
.- Parameters:
parallelOps
- Maximum number of parallel stream operations for a single node.- See Also:
IgniteConfiguration.getDataStreamerThreadPoolSize()
-
perThreadBufferSize
void perThreadBufferSize(int size)
Allows to set buffer size for thread in case of stream byaddData(Object, Object)
call.- Parameters:
size
- Size of buffer.
-
perThreadBufferSize
int perThreadBufferSize()
Gets buffer size set byperThreadBufferSize(int)
.- Returns:
- Buffer size.
-
timeout
void timeout(long timeout)
Sets the timeout that is used in the following cases: By default the timeout is disabled.- Parameters:
timeout
- Timeout in milliseconds.- Throws:
IllegalArgumentException
- If timeout is zero or less than-1
.
-
timeout
long timeout()
Gets timeout set bytimeout(long)
.- Returns:
- Timeout in milliseconds.
-
autoFlushFrequency
long autoFlushFrequency()
Gets automatic flush frequency. Essentially, this is the time after which the streamer will make an attempt to submit all data added so far to remote nodes. Note that there is no guarantee that data will be delivered after this concrete attempt (e.g., it can fail when topology is changing), but it won't be lost anyway.If set to
0
, automatic flush is disabled.Automatic flush is disabled by default (default value is
0
).- Returns:
- Flush frequency or
0
if automatic flush is disabled. - See Also:
flush()
-
autoFlushFrequency
void autoFlushFrequency(long autoFlushFreq)
Sets automatic flush frequency. Essentially, this is the time after which the streamer will make an attempt to submit all data added so far to remote nodes. Note that there is no guarantee that data will be delivered after this concrete attempt (e.g., it can fail when topology is changing), but it won't be lost anyway.If set to
0
, automatic flush is disabled.Automatic flush is disabled by default (default value is
0
).- Parameters:
autoFlushFreq
- Flush frequency or0
to disable automatic flush.- See Also:
flush()
-
future
IgniteFuture<?> future()
Gets future for this streaming process. This future completes whenever methodclose(boolean)
completes. By attaching listeners to this future it is possible to get asynchronous notifications for completion of this streaming process.- Returns:
- Future for this streaming process.
-
deployClass
void deployClass(Class<?> depCls)
Optional deploy class for peer deployment. All classes added by a data streamer must be class-loadable from the same class-loader. Ignite will make the best effort to detect the most suitable class-loader for data loading. However, in complex cases, where compound or deeply nested class-loaders are used, it is best to specify a deploy class which can be any class loaded by the class-loader for given data.- Parameters:
depCls
- Any class loaded by the class-loader for given data.
-
receiver
void receiver(StreamReceiver<K,V> rcvr)
Sets custom stream receiver to this data streamer.Disables
allowOverwrite(boolean)
and setsallowOverwrite()
returningtrue
.- Parameters:
rcvr
- Stream receiver.
-
removeData
IgniteFuture<?> removeData(K key) throws javax.cache.CacheException, IgniteInterruptedException, IllegalStateException
Adds key for removal on remote node. Equivalent toaddData(key, null)
.- Parameters:
key
- Key.- Returns:
- Future for this operation.
Note: It may never complete unless
flush()
orclose()
are explicitly called. - Throws:
javax.cache.CacheException
- If failed to map key to node.IgniteInterruptedException
- If thread has been interrupted.IllegalStateException
- If grid has been concurrently stopped orclose(boolean)
has already been called on streamer.
-
addData
IgniteFuture<?> addData(K key, @Nullable V val) throws javax.cache.CacheException, IgniteInterruptedException, IllegalStateException, IgniteDataStreamerTimeoutException
Adds data for streaming on remote node. This method can be called from multiple threads in parallel to speed up streaming if needed.Note that streamer will stream data concurrently by multiple internal threads, so the data may get to remote nodes in different order from which it was added to the streamer. The data may not be sent until
flush()
orclose()
are called.Note: if
allowOverwrite()
set tofalse
(by default) then data streamer will not overwrite existing cache entries for better performance (to change, setallowOverwrite(boolean)
totrue
)- Parameters:
key
- Key.val
- Value ornull
if respective entry must be removed from cache.- Returns:
- Future for this operation.
Note: It may never complete unless
flush()
orclose()
are explicitly called. - Throws:
javax.cache.CacheException
- If failed to map key to node.IgniteInterruptedException
- If thread has been interrupted.IllegalStateException
- If grid has been concurrently stopped orclose(boolean)
has already been called on streamer.IgniteDataStreamerTimeoutException
- Iftimeout
is exceeded.- See Also:
allowOverwrite()
-
addData
IgniteFuture<?> addData(Map.Entry<K,V> entry) throws javax.cache.CacheException, IgniteInterruptedException, IllegalStateException, IgniteDataStreamerTimeoutException
Adds data for streaming on remote node. This method can be called from multiple threads in parallel to speed up streaming if needed.Note that streamer will stream data concurrently by multiple internal threads, so the data may get to remote nodes in different order from which it was added to the streamer. The data may not be sent until
flush()
orclose()
are called.Note: if
allowOverwrite()
set tofalse
(by default) then data streamer will not overwrite existing cache entries for better performance (to change, setallowOverwrite(boolean)
totrue
)- Parameters:
entry
- Entry.- Returns:
- Future for this operation.
Note: It may never complete unless
flush()
orclose()
are explicitly called. - Throws:
javax.cache.CacheException
- If failed to map key to node.IgniteInterruptedException
- If thread has been interrupted.IllegalStateException
- If grid has been concurrently stopped orclose(boolean)
has already been called on streamer.IgniteDataStreamerTimeoutException
- Iftimeout
is exceeded.- See Also:
allowOverwrite()
-
addData
IgniteFuture<?> addData(Collection<? extends Map.Entry<K,V>> entries) throws IllegalStateException, IgniteDataStreamerTimeoutException
Adds data for streaming on remote node. This method can be called from multiple threads in parallel to speed up streaming if needed.Note that streamer will stream data concurrently by multiple internal threads, so the data may get to remote nodes in different order from which it was added to the streamer. The data may not be sent until
flush()
orclose()
are called.Note: if
allowOverwrite()
set tofalse
(by default) then data streamer will not overwrite existing cache entries for better performance (to change, setallowOverwrite(boolean)
totrue
)- Parameters:
entries
- Collection of entries to be streamed.- Returns:
- Future for this stream operation.
Note: It may never complete unless
flush()
orclose()
are explicitly called. - Throws:
IllegalStateException
- If grid has been concurrently stopped orclose(boolean)
has already been called on streamer.IgniteDataStreamerTimeoutException
- Iftimeout
is exceeded.- See Also:
allowOverwrite()
-
addData
IgniteFuture<?> addData(Map<K,V> entries) throws IllegalStateException, IgniteDataStreamerTimeoutException
Adds data for streaming on remote node. This method can be called from multiple threads in parallel to speed up streaming if needed.Note that streamer will stream data concurrently by multiple internal threads, so the data may get to remote nodes in different order from which it was added to the streamer. The data may not be sent until
flush()
orclose()
are called.Note: if
allowOverwrite()
set tofalse
(by default) then data streamer will not overwrite existing cache entries for better performance (to change, setallowOverwrite(boolean)
totrue
)- Parameters:
entries
- Map to be streamed.- Returns:
- Future for this stream operation.
Note: It may never complete unless
flush()
orclose()
are explicitly called. - Throws:
IllegalStateException
- If grid has been concurrently stopped orclose(boolean)
has already been called on streamer.IgniteDataStreamerTimeoutException
- Iftimeout
is exceeded.- See Also:
allowOverwrite()
-
flush
void flush() throws javax.cache.CacheException, IgniteInterruptedException, IllegalStateException, IgniteDataStreamerTimeoutException
Streams any remaining data, but doesn't close the streamer. Data can be still added after flush is finished. This method blocks and doesn't allow to add any data until all data is streamed.If another thread is already performing flush, this method will block, wait for another thread to complete flush and exit. If you don't want to wait in this case, use
tryFlush()
method.Note that #flush() guarantees completion of all futures returned by
addData(Object, Object)
, listeners should be tracked separately.- Throws:
javax.cache.CacheException
- If failed to load data from buffer.IgniteInterruptedException
- If thread has been interrupted.IllegalStateException
- If grid has been concurrently stopped orclose(boolean)
has already been called on streamer.IgniteDataStreamerTimeoutException
- Iftimeout
is exceeded.- See Also:
tryFlush()
-
tryFlush
void tryFlush() throws javax.cache.CacheException, IgniteInterruptedException, IllegalStateException
Makes an attempt to stream remaining data. This method is mostly similar toflush()
, with the difference that it won't wait and will exit immediately.- Throws:
javax.cache.CacheException
- If failed to load data from buffer.IgniteInterruptedException
- If thread has been interrupted.IllegalStateException
- If grid has been concurrently stopped orclose(boolean)
has already been called on streamer.- See Also:
flush()
-
close
void close(boolean cancel) throws javax.cache.CacheException, IgniteInterruptedException, IgniteDataStreamerTimeoutException
Streams any remaining data and closes this streamer.- Parameters:
cancel
-True
to cancel ongoing streaming operations.- Throws:
javax.cache.CacheException
- If failed to close data streamer.IgniteInterruptedException
- If thread has been interrupted.IgniteDataStreamerTimeoutException
- Iftimeout
is exceeded, only if cancel isfalse
.
-
close
void close() throws javax.cache.CacheException, IgniteInterruptedException, IgniteDataStreamerTimeoutException
Closes data streamer. This method is identical to callingclose(false)
method.The method is invoked automatically on objects managed by the
try-with-resources
statement.- Specified by:
close
in interfaceAutoCloseable
- Throws:
javax.cache.CacheException
- If failed to close data streamer.IgniteInterruptedException
- If thread has been interrupted.IgniteDataStreamerTimeoutException
- Iftimeout
is exceeded.
-
-