public interface ComputeTaskSession
Session has 2 main features: attribute
and checkpoint
management. Both attributes and checkpoints can be used from task itself and
from the jobs belonging to this task. Session attributes and checkpoints can
be set from any task or job methods. Session attribute and checkpoint consistency
is fault tolerant and is preserved whenever a job gets failed over to
another node for execution. Whenever task execution ends, all checkpoints
saved within session with ComputeTaskSessionScope.SESSION_SCOPE
scope
will be removed from checkpoint storage. Checkpoints saved with
ComputeTaskSessionScope.GLOBAL_SCOPE
will outlive the session and
can be viewed by other tasks.
The sequence in which session attributes are set is consistent across the task and all job siblings within it. There will never be a case when one job sees attribute A before attribute B, and another job sees attribute B before A. Attribute order is identical across all session participants. Attribute order is also fault tolerant and is preserved whenever a job gets failed over to another node.
waitForAttribute(...)
methods. Tasks and jobs can also
receive asynchronous notifications about a certain attribute being set
through ComputeTaskSessionAttributeListener
listener. Such feature
allows grid jobs and tasks remain connected in order
to synchronize their execution with each other and opens a solution for a
whole new range of problems.
Imagine for example that you need to compress a very large file (let's say terabytes in size). To do that in grid environment you would split such file into multiple sections and assign every section to a remote job for execution. Every job would have to scan its section to look for repetition patterns. Once this scan is done by all jobs in parallel, jobs would need to synchronize their results with their siblings so compression would happen consistently across the whole file. This can be achieved by setting repetition patterns discovered by every job into the session. Once all patterns are synchronized, all jobs can proceed with compressing their designated file sections in parallel, taking into account repetition patterns found by all the jobs in the split. Grid task would then reduce (aggregate) all compressed sections into one compressed file. Without session attribute synchronization step this problem would be much harder to solve.
@TaskSessionResource
annotation to a field or a setter method inside of ComputeTask
or
ComputeJob
implementations as follows:
... // This field will be injected with distributed task session. @TaskSessionResource private ComputeTaskSession ses; ...or from a setter method:
// This setter method will be automatically called by the system // to set grid task session. @TaskSessionResource void setSession(ComputeTaskSession ses) { this.ses = ses; }
Modifier and Type | Method and Description |
---|---|
void |
addAttributeListener(ComputeTaskSessionAttributeListener lsnr,
boolean rewind)
Add listener for the session attributes.
|
<K,V> V |
getAttribute(K key)
Gets an attribute set by
setAttribute(Object, Object) or setAttributes(Map)
method. |
Map<?,?> |
getAttributes()
Gets all attributes.
|
ClassLoader |
getClassLoader()
Gets class loader responsible for loading all classes within task.
|
long |
getEndTime()
Gets end of computation time for the task.
|
IgniteUuid |
getId()
Gets session ID of the task being executed.
|
@Nullable ComputeJobSibling |
getJobSibling(IgniteUuid jobId)
Gets job sibling for a given ID.
|
Collection<ComputeJobSibling> |
getJobSiblings()
Gets a collection of all grid job siblings.
|
long |
getStartTime()
Gets start of computation time for the task.
|
String |
getTaskName()
Gets task name of the task this session belongs to.
|
UUID |
getTaskNodeId()
Gets ID of the node on which task execution originated.
|
Collection<UUID> |
getTopology()
Gets a collection of grid nodes IDs.
|
<T> T |
loadCheckpoint(String key)
Loads job's state previously saved via
saveCheckpoint(String, Object, ComputeTaskSessionScope, long)
method from an underlying storage for a given key . |
IgniteFuture<?> |
mapFuture()
Gets future that will be completed when task "map" step has completed
(which means that
ComputeTask.map(List, Object) method has finished). |
Collection<ComputeJobSibling> |
refreshJobSiblings()
Refreshes collection of job siblings.
|
boolean |
removeAttributeListener(ComputeTaskSessionAttributeListener lsnr)
Removes given listener.
|
boolean |
removeCheckpoint(String key)
Removes previously saved job's state for a given
key from an underlying storage. |
void |
saveCheckpoint(String key,
Object state)
Saves intermediate state of a job or task to a storage.
|
void |
saveCheckpoint(String key,
Object state,
ComputeTaskSessionScope scope,
long timeout)
Saves intermediate state of a job to a storage.
|
void |
saveCheckpoint(String key,
Object state,
ComputeTaskSessionScope scope,
long timeout,
boolean overwrite)
Saves intermediate state of a job or task to a storage.
|
void |
setAttribute(Object key,
@Nullable Object val)
Sets session attributed.
|
void |
setAttributes(Map<?,?> attrs)
Sets task attributes.
|
<K,V> V |
waitForAttribute(K key,
long timeout)
Waits for the specified attribute to be set.
|
<K,V> boolean |
waitForAttribute(K key,
V val,
long timeout)
Waits for the specified attribute to be set or updated with given value.
|
Map<?,?> |
waitForAttributes(Collection<?> keys,
long timeout)
Waits for the specified attributes to be set.
|
boolean |
waitForAttributes(Map<?,?> attrs,
long timeout)
Waits for the specified attributes to be set or updated with given values.
|
String getTaskName()
UUID getTaskNodeId()
long getStartTime()
long getEndTime()
IgniteUuid getId()
ClassLoader getClassLoader()
Note that for classes that were loaded remotely from other nodes methods
Class.getResource(String)
or ClassLoader.getResource(String)
will always return null
. Use
Class.getResourceAsStream(String)
or ClassLoader.getResourceAsStream(String)
instead.
Collection<ComputeJobSibling> getJobSiblings() throws IgniteException
If task uses continuous mapper (i.e. it injected into task class) then job siblings will be requested from task node for each apply.
IgniteException
- If job siblings can not be received from task node.Collection<ComputeJobSibling> refreshJobSiblings() throws IgniteException
ComputeTaskContinuousMapper
),
list of siblings on remote node may not be fresh. In that case, this method
will re-request list of siblings from originating node.IgniteException
- If refresh failed.@Nullable @Nullable ComputeJobSibling getJobSibling(IgniteUuid jobId) throws IgniteException
If task uses continuous mapper (i.e. it injected into task class) then job sibling will be requested from task node for each apply.
jobId
- Job ID to get the sibling for.IgniteException
- If job sibling can not be received from task node.void setAttribute(Object key, @Nullable @Nullable Object val) throws IgniteException
ComputeTaskSessionAttributeListener
callback than an attribute has changed.
This method is no-op if the session has finished.
key
- Attribute key.val
- Attribute value. Can be null
.IgniteException
- If sending of attribute message failed.@Nullable <K,V> V getAttribute(K key)
setAttribute(Object, Object)
or setAttributes(Map)
method. Note that this attribute could have been set by another job on
another node.
This method is no-op if the session has finished.
K
- Attribute key type.V
- Attribute value type.key
- Attribute key.void setAttributes(Map<?,?> attrs) throws IgniteException
setAttribute(Object, Object)
method, whenever you need to set multiple attributes.
This method is no-op if the session has finished.
attrs
- Attributes to set.IgniteException
- If sending of attribute message failed.Map<?,?> getAttributes()
void addAttributeListener(ComputeTaskSessionAttributeListener lsnr, boolean rewind)
lsnr
- Listener to add.rewind
- true
value will result in calling given listener for all
already received attributes, while false
value will result only
in new attribute notification. Settings rewind
to true
allows for a simple mechanism that prevents the loss of notifications for
the attributes that were previously received or received while this method
was executing.boolean removeAttributeListener(ComputeTaskSessionAttributeListener lsnr)
lsnr
- Listener to remove.true
if listener was removed, false
otherwise.<K,V> V waitForAttribute(K key, long timeout) throws InterruptedException
K
- Attribute key type.V
- Attribute value type.key
- Attribute key to wait for.timeout
- Timeout in milliseconds to wait for. 0
means indefinite wait.InterruptedException
- Thrown if wait was interrupted.<K,V> boolean waitForAttribute(K key, @Nullable V val, long timeout) throws InterruptedException
K
- Attribute key type.V
- Attribute value type.key
- Attribute key to wait for.val
- Attribute value to wait for. Can be null
.timeout
- Timeout in milliseconds to wait for. 0
means indefinite wait.InterruptedException
- Thrown if wait was interrupted.Map<?,?> waitForAttributes(Collection<?> keys, long timeout) throws InterruptedException
keys
- Attribute keys to wait for.timeout
- Timeout in milliseconds to wait for. 0
means indefinite wait.InterruptedException
- Thrown if wait was interrupted.boolean waitForAttributes(Map<?,?> attrs, long timeout) throws InterruptedException
attrs
- Key/value pairs to wait for.timeout
- Timeout in milliseconds to wait for. 0
means indefinite wait.InterruptedException
- Thrown if wait was interrupted.void saveCheckpoint(String key, Object state) throws IgniteException
CheckpointSpi
implementation used.
Long running jobs may decide to store intermediate state to protect themselves from failures.
This way whenever a job fails over to another node, it can load its previously saved state via
loadCheckpoint(String)
method and continue with execution.
This method defaults checkpoint scope to ComputeTaskSessionScope.SESSION_SCOPE
and
implementation will automatically remove the checkpoint at the end of the session. It is
analogous to calling saveCheckpoint(String, Serializable, GridCheckpointScope.SESSION_SCOPE, 0
.
key
- Key to be used to load this checkpoint in future.state
- Intermediate job state to save.IgniteException
- If failed to save intermediate job state.loadCheckpoint(String)
,
removeCheckpoint(String)
,
CheckpointSpi
void saveCheckpoint(String key, Object state, ComputeTaskSessionScope scope, long timeout) throws IgniteException
CheckpointSpi
implementation used.
Long running jobs may decide to store intermediate state to protect themselves from failures.
This way whenever a job fails over to another node, it can load its previously saved state via
loadCheckpoint(String)
method and continue with execution.
The life time of the checkpoint is determined by its timeout and scope.
If ComputeTaskSessionScope.GLOBAL_SCOPE
is used, the checkpoint will outlive
its session, and can only be removed by calling CheckpointSpi.removeCheckpoint(String)
from Ignite
or another task or job.
key
- Key to be used to load this checkpoint in future.state
- Intermediate job state to save.scope
- Checkpoint scope. If equal to ComputeTaskSessionScope.SESSION_SCOPE
, then
state will automatically be removed at the end of task execution. Otherwise, if scope is
ComputeTaskSessionScope.GLOBAL_SCOPE
then state will outlive its session and can be
removed by calling removeCheckpoint(String)
from another task or whenever
timeout expires.timeout
- Maximum time this state should be kept by the underlying storage. Value 0
means that
timeout will never expire.IgniteException
- If failed to save intermediate job state.loadCheckpoint(String)
,
removeCheckpoint(String)
,
CheckpointSpi
void saveCheckpoint(String key, Object state, ComputeTaskSessionScope scope, long timeout, boolean overwrite) throws IgniteException
CheckpointSpi
implementation used.
Long running jobs may decide to store intermediate state to protect themselves from failures.
This way whenever a job fails over to another node, it can load its previously saved state via
loadCheckpoint(String)
method and continue with execution.
The life time of the checkpoint is determined by its timeout and scope.
If ComputeTaskSessionScope.GLOBAL_SCOPE
is used, the checkpoint will outlive
its session, and can only be removed by calling CheckpointSpi.removeCheckpoint(String)
from Ignite
or another task or job.
key
- Key to be used to load this checkpoint in future.state
- Intermediate job state to save.scope
- Checkpoint scope. If equal to ComputeTaskSessionScope.SESSION_SCOPE
, then
state will automatically be removed at the end of task execution. Otherwise, if scope is
ComputeTaskSessionScope.GLOBAL_SCOPE
then state will outlive its session and can be
removed by calling removeCheckpoint(String)
from another task or whenever
timeout expires.timeout
- Maximum time this state should be kept by the underlying storage. Value 0 means that
timeout will never expire.overwrite
- Whether or not overwrite checkpoint if it already exists.IgniteException
- If failed to save intermediate job state.loadCheckpoint(String)
,
removeCheckpoint(String)
,
CheckpointSpi
@Nullable <T> T loadCheckpoint(String key) throws IgniteException
saveCheckpoint(String, Object, ComputeTaskSessionScope, long)
method from an underlying storage for a given key
. If state was not previously
saved, then null
will be returned. The storage implementation is defined by
CheckpointSpi
implementation used.
Long running jobs may decide to store intermediate state to protect themselves from failures. This way whenever a job starts, it can load its previously saved state and continue with execution.
T
- Type of the checkpoint state.key
- Key for intermediate job state to load.null
if no state was found for a given key
.IgniteException
- If failed to load job state.removeCheckpoint(String)
,
CheckpointSpi
boolean removeCheckpoint(String key) throws IgniteException
key
from an underlying storage.
The storage implementation is defined by CheckpointSpi
implementation used.
Long running jobs may decide to store intermediate state to protect themselves from failures. This way whenever a job starts, it can load its previously saved state and continue with execution.
key
- Key for intermediate job state to load.true
if job state was removed, false
if state was not found.IgniteException
- If failed to remove job state.loadCheckpoint(String)
,
CheckpointSpi
Collection<UUID> getTopology()
IgniteFuture<?> mapFuture()
ComputeTask.map(List, Object)
method has finished).
Follow @ApacheIgnite
Ignite Database and Caching Platform : ver. 2.7.2 Release Date : February 6 2019