public interface IgniteDataStreamer<K,V> extends AutoCloseable
Note that data streamer data manipulation methods do not support transactions.
When updating data with allowOverwrite()
set to false
new entry
is created on primary and backups if it has not existed. If allowOverwrite()
is true
then batches are applied with regular cache.put(..)
methods
starting implicit transactions if streamer is targeted to a transactional cache.
However, explicit transactional updates inside are possible with custom StreamReceiver
.
This way batches can be applied within transaction(s) on target node.
See receiver(StreamReceiver)
for details.
Note that streamer will stream data concurrently by multiple internal threads, so the data may get to remote nodes in different order from which it was added to the streamer.
Also note that IgniteDataStreamer
is not the only way to add data into cache.
Alternatively you can use IgniteCache.loadCache(IgniteBiPredicate, Object...)
method to add data from underlying data store. You can also use standard
cache put(...)
and putAll(...)
operations as well, but they most
likely will not perform as well as this class for adding data. And finally,
data can be added from underlying data store on demand, whenever it is accessed -
for this no explicit data adding step is needed.
IgniteDataStreamer
supports the following configuration properties:
perNodeBufferSize(int)
- when entries are added to data streamer via
addData(Object, Object)
method, they are not sent to in-memory data grid right
away and are buffered internally for better performance and network utilization.
This setting controls the size of internal per-node buffer before buffered data
is sent to remote node. Default is defined by DFLT_PER_NODE_BUFFER_SIZE
value.
perNodeParallelOperations(int)
- sometimes data may be added
to the data streamer via addData(Object, Object)
method faster than it can
be put in cache. In this case, new buffered stream messages are sent to remote nodes
before responses from previous ones are received. This could cause unlimited heap
memory utilization growth on local and remote nodes. To control memory utilization,
this setting limits maximum allowed number of parallel buffered stream messages that
are being processed on remote nodes. If this number is exceeded, then
addData(Object, Object)
method will block to control memory utilization.
Default is equal to CPU count on remote node multiply by DFLT_PARALLEL_OPS_MULTIPLIER
.
autoFlushFrequency(long)
- automatic flush frequency in milliseconds. Essentially,
this is the time after which the streamer will make an attempt to submit all data
added so far to remote nodes. Note that there is no guarantee that data will be
delivered after this concrete attempt (e.g., it can fail when topology is
changing), but it won't be lost anyway. Disabled by default (default value is 0
).
allowOverwrite(boolean)
- Sets flag enabling overwriting existing values in cache.
Data streamer will perform better if this flag is disabled, which is the default setting.
receiver(StreamReceiver)
- defines how cache will be updated with added entries.
It allows to provide user-defined custom logic to update the cache in the most effective and flexible way.
deployClass(Class)
- optional deploy class for peer deployment. All classes
streamed by a data streamer must be class-loadable from the same class-loader.
Ignite will make the best effort to detect the most suitable class-loader
for data loading. However, in complex cases, where compound or deeply nested
class-loaders are used, it is best to specify a deploy class which can be any
class loaded by the class-loader for given data.
Modifier and Type | Field and Description |
---|---|
static int |
DFLT_MAX_PARALLEL_OPS
Deprecated.
Is not used anymore.
|
static int |
DFLT_PARALLEL_OPS_MULTIPLIER
Default multiplier for data streamer pool size to get concurrent batches count for each remote node.
|
static int |
DFLT_PER_NODE_BUFFER_SIZE
Default operations batch size to sent to remote node for loading.
|
static int |
DFLT_PER_THREAD_BUFFER_SIZE
Default batch size per thread to send to buffer on node.
|
static long |
DFLT_UNLIMIT_TIMEOUT
Default timeout for streamer's operations.
|
Modifier and Type | Method and Description |
---|---|
IgniteFuture<?> |
addData(Collection<? extends Map.Entry<K,V>> entries)
Adds data for streaming on remote node.
|
IgniteFuture<?> |
addData(K key,
V val)
Adds data for streaming on remote node.
|
IgniteFuture<?> |
addData(Map.Entry<K,V> entry)
Adds data for streaming on remote node.
|
IgniteFuture<?> |
addData(Map<K,V> entries)
Adds data for streaming on remote node.
|
boolean |
allowOverwrite()
Gets flag enabling overwriting existing values in cache.
|
void |
allowOverwrite(boolean allowOverwrite)
Sets flag enabling overwriting existing values in cache.
|
long |
autoFlushFrequency()
Gets automatic flush frequency.
|
void |
autoFlushFrequency(long autoFlushFreq)
Sets automatic flush frequency.
|
String |
cacheName()
Name of cache to stream data to.
|
void |
close()
Closes data streamer.
|
void |
close(boolean cancel)
Streams any remaining data and closes this streamer.
|
void |
deployClass(Class<?> depCls)
Optional deploy class for peer deployment.
|
void |
flush()
Streams any remaining data, but doesn't close the streamer.
|
IgniteFuture<?> |
future()
Gets future for this streaming process.
|
boolean |
keepBinary()
Gets flag indicating that objects should be kept in binary format when passed to the stream receiver.
|
void |
keepBinary(boolean keepBinary)
Sets flag indicating that objects should be kept in binary format when passes to the steam receiver.
|
int |
perNodeBufferSize()
Gets size of per node key-value pairs buffer.
|
void |
perNodeBufferSize(int bufSize)
Sets size of per node key-value pairs buffer.
|
int |
perNodeParallelOperations()
Gets maximum number of parallel stream operations for a single node.
|
void |
perNodeParallelOperations(int parallelOps)
Sets maximum number of parallel stream operations for a single node.
|
int |
perThreadBufferSize()
Gets buffer size set by
perThreadBufferSize(int) . |
void |
perThreadBufferSize(int size)
Allows to set buffer size for thread in case of stream by
addData(Object, Object) call. |
void |
receiver(StreamReceiver<K,V> rcvr)
Sets custom stream receiver to this data streamer.
|
IgniteFuture<?> |
removeData(K key)
Adds key for removal on remote node.
|
boolean |
skipStore()
Gets flag indicating that write-through behavior should be disabled for data streaming.
|
void |
skipStore(boolean skipStore)
Sets flag indicating that write-through behavior should be disabled for data streaming.
|
long |
timeout()
Gets timeout set by
timeout(long) . |
void |
timeout(long timeout)
Sets the timeout that is used in the following cases:
any data addition method can be blocked when all per node parallel operations are exhausted.
|
void |
tryFlush()
Makes an attempt to stream remaining data.
|
@Deprecated static final int DFLT_MAX_PARALLEL_OPS
static final int DFLT_PARALLEL_OPS_MULTIPLIER
static final int DFLT_PER_NODE_BUFFER_SIZE
static final int DFLT_PER_THREAD_BUFFER_SIZE
static final long DFLT_UNLIMIT_TIMEOUT
String cacheName()
null
for default cache.boolean allowOverwrite()
This flag is disabled by default (default is false
).
True
if overwriting is allowed, false
otherwise..void allowOverwrite(boolean allowOverwrite) throws javax.cache.CacheException
false
, updates will not be propagated to the cache store
(i.e. skipStore()
flag will be set to true
implicitly).
This flag is disabled by default (default is false
).
The flag has no effect when custom cache receiver set using receiver(StreamReceiver)
method.
allowOverwrite
- Flag value.javax.cache.CacheException
- If failed.boolean skipStore()
false
.void skipStore(boolean skipStore)
false
.skipStore
- Skip store flag.boolean keepBinary()
false
.void keepBinary(boolean keepBinary)
false
.keepBinary
- Keep binary flag.int perNodeBufferSize()
void perNodeBufferSize(int bufSize)
This method should be called prior to addData(Object, Object)
call.
If not provided, default value is DFLT_PER_NODE_BUFFER_SIZE
.
bufSize
- Per node buffer size.int perNodeParallelOperations()
void perNodeParallelOperations(int parallelOps)
This method should be called prior to addData(Object, Object)
call.
If not provided, default value is calculated as follows
DFLT_PARALLEL_OPS_MULTIPLIER
* DATA_STREAMER_POOL_SIZE_ON_REMOTE_NODE
.
parallelOps
- Maximum number of parallel stream operations for a single node.IgniteConfiguration.getDataStreamerThreadPoolSize()
void perThreadBufferSize(int size)
addData(Object, Object)
call.size
- Size of buffer.int perThreadBufferSize()
perThreadBufferSize(int)
.void timeout(long timeout)
timeout
- Timeout in milliseconds.IllegalArgumentException
- If timeout is zero or less than -1
.long timeout()
timeout(long)
.long autoFlushFrequency()
If set to 0
, automatic flush is disabled.
Automatic flush is disabled by default (default value is 0
).
0
if automatic flush is disabled.flush()
void autoFlushFrequency(long autoFlushFreq)
If set to 0
, automatic flush is disabled.
Automatic flush is disabled by default (default value is 0
).
autoFlushFreq
- Flush frequency or 0
to disable automatic flush.flush()
IgniteFuture<?> future()
close(boolean)
completes. By attaching listeners to this future
it is possible to get asynchronous notifications for completion of this
streaming process.void deployClass(Class<?> depCls)
depCls
- Any class loaded by the class-loader for given data.void receiver(StreamReceiver<K,V> rcvr)
rcvr
- Stream receiver.IgniteFuture<?> removeData(K key) throws javax.cache.CacheException, IgniteInterruptedException, IllegalStateException
addData(key, null)
.key
- Key.flush()
or close()
are explicitly called.javax.cache.CacheException
- If failed to map key to node.IgniteInterruptedException
- If thread has been interrupted.IllegalStateException
- If grid has been concurrently stopped or
close(boolean)
has already been called on streamer.IgniteFuture<?> addData(K key, V val) throws javax.cache.CacheException, IgniteInterruptedException, IllegalStateException, IgniteDataStreamerTimeoutException
Note that streamer will stream data concurrently by multiple internal threads, so the
data may get to remote nodes in different order from which it was added to
the streamer. The data may not be sent until flush()
or close()
are called.
Note: if allowOverwrite()
set to false
(by default)
then data streamer will not overwrite existing cache entries for better performance
(to change, set allowOverwrite(boolean)
to true
)
key
- Key.val
- Value or null
if respective entry must be removed from cache.flush()
or close()
are explicitly called.javax.cache.CacheException
- If failed to map key to node.IgniteInterruptedException
- If thread has been interrupted.IllegalStateException
- If grid has been concurrently stopped or
close(boolean)
has already been called on streamer.IgniteDataStreamerTimeoutException
- If timeout
is exceeded.allowOverwrite()
IgniteFuture<?> addData(Map.Entry<K,V> entry) throws javax.cache.CacheException, IgniteInterruptedException, IllegalStateException, IgniteDataStreamerTimeoutException
Note that streamer will stream data concurrently by multiple internal threads, so the
data may get to remote nodes in different order from which it was added to
the streamer. The data may not be sent until flush()
or close()
are called.
Note: if allowOverwrite()
set to false
(by default)
then data streamer will not overwrite existing cache entries for better performance
(to change, set allowOverwrite(boolean)
to true
)
entry
- Entry.flush()
or close()
are explicitly called.javax.cache.CacheException
- If failed to map key to node.IgniteInterruptedException
- If thread has been interrupted.IllegalStateException
- If grid has been concurrently stopped or
close(boolean)
has already been called on streamer.IgniteDataStreamerTimeoutException
- If timeout
is exceeded.allowOverwrite()
IgniteFuture<?> addData(Collection<? extends Map.Entry<K,V>> entries) throws IllegalStateException, IgniteDataStreamerTimeoutException
Note that streamer will stream data concurrently by multiple internal threads, so the
data may get to remote nodes in different order from which it was added to
the streamer. The data may not be sent until flush()
or close()
are called.
Note: if allowOverwrite()
set to false
(by default)
then data streamer will not overwrite existing cache entries for better performance
(to change, set allowOverwrite(boolean)
to true
)
entries
- Collection of entries to be streamed.flush()
or close()
are explicitly called.IllegalStateException
- If grid has been concurrently stopped or
close(boolean)
has already been called on streamer.IgniteDataStreamerTimeoutException
- If timeout
is exceeded.allowOverwrite()
IgniteFuture<?> addData(Map<K,V> entries) throws IllegalStateException, IgniteDataStreamerTimeoutException
Note that streamer will stream data concurrently by multiple internal threads, so the
data may get to remote nodes in different order from which it was added to
the streamer. The data may not be sent until flush()
or close()
are called.
Note: if allowOverwrite()
set to false
(by default)
then data streamer will not overwrite existing cache entries for better performance
(to change, set allowOverwrite(boolean)
to true
)
entries
- Map to be streamed.flush()
or close()
are explicitly called.IllegalStateException
- If grid has been concurrently stopped or
close(boolean)
has already been called on streamer.IgniteDataStreamerTimeoutException
- If timeout
is exceeded.allowOverwrite()
void flush() throws javax.cache.CacheException, IgniteInterruptedException, IllegalStateException, IgniteDataStreamerTimeoutException
If another thread is already performing flush, this method will block, wait for
another thread to complete flush and exit. If you don't want to wait in this case,
use tryFlush()
method.
Note that #flush() guarantees completion of all futures returned by addData(Object, Object)
, listeners
should be tracked separately.
javax.cache.CacheException
- If failed to load data from buffer.IgniteInterruptedException
- If thread has been interrupted.IllegalStateException
- If grid has been concurrently stopped or
close(boolean)
has already been called on streamer.IgniteDataStreamerTimeoutException
- If timeout
is exceeded.tryFlush()
void tryFlush() throws javax.cache.CacheException, IgniteInterruptedException, IllegalStateException
flush()
,
with the difference that it won't wait and will exit immediately.javax.cache.CacheException
- If failed to load data from buffer.IgniteInterruptedException
- If thread has been interrupted.IllegalStateException
- If grid has been concurrently stopped or
close(boolean)
has already been called on streamer.flush()
void close(boolean cancel) throws javax.cache.CacheException, IgniteInterruptedException, IgniteDataStreamerTimeoutException
cancel
- True
to cancel ongoing streaming operations.javax.cache.CacheException
- If failed to close data streamer.IgniteInterruptedException
- If thread has been interrupted.IgniteDataStreamerTimeoutException
- If timeout
is exceeded, only if cancel is false
.void close() throws javax.cache.CacheException, IgniteInterruptedException, IgniteDataStreamerTimeoutException
close(false)
method.
The method is invoked automatically on objects managed by the
try-with-resources
statement.
close
in interface AutoCloseable
javax.cache.CacheException
- If failed to close data streamer.IgniteInterruptedException
- If thread has been interrupted.IgniteDataStreamerTimeoutException
- If timeout
is exceeded.
GridGain In-Memory Computing Platform : ver. 8.9.15 Release Date : December 3 2024