public abstract class IgfsTask<T,R> extends ComputeTaskAdapter<IgfsTaskArgs<T>,R>
IgniteFs.execute()
methods. Essentially IGFS task
is regular ComputeTask
with different map logic. Instead of implementing
ComputeTask.map(List, Object)
method to split task into jobs, you must implement
createJob(org.apache.ignite.igfs.IgfsPath, IgfsFileRange, IgfsTaskArgs)
method.
Each file participating in IGFS task is split into IgfsFileRange
s first. Normally range is a number of
consequent bytes located on a single node (see IgfssGroupDataBlocksKeyMapper
). In case maximum range size
is provided (either through FileSystemConfiguration.getMaximumTaskRangeLength()
or IgniteFs.execute()
argument), then ranges could be further divided into smaller chunks.
Once file is split into ranges, each range is passed to IgfsTask.createJob()
method in order to create a
IgfsJob
.
Finally all generated jobs are sent to Grid nodes for execution.
As with regular ComputeTask
you can define your own logic for results handling and reduce step.
Here is an example of such a task:
public class WordCountTask extends IgfsTask<String, Integer> { @Override public IgfsJob createJob(IgfsPath path, IgfsFileRange range, IgfsTaskArgs<T> args) throws IgniteCheckedException { // New job will be created for each range within each file. // We pass user-provided argument (which is essentially a word to look for) to that job. return new WordCountJob(args.userArgument()); } // Aggregate results into one compound result. public Integer reduce(List<ComputeJobResult> results) throws IgniteCheckedException { Integer total = 0; for (ComputeJobResult res : results) { Integer cnt = res.getData(); // Null can be returned for non-existent file in case we decide to ignore such situations. if (cnt != null) total += cnt; } return total; } }
Constructor and Description |
---|
IgfsTask() |
Modifier and Type | Method and Description |
---|---|
abstract @Nullable IgfsJob |
createJob(IgfsPath path,
IgfsFileRange range,
IgfsTaskArgs<T> args)
Callback invoked during task map procedure to create job that will process specified split
for IGFS file.
|
@Nullable Map<? extends ComputeJob,ClusterNode> |
map(List<ClusterNode> subgrid,
@Nullable IgfsTaskArgs<T> args)
This method is called to map or split grid task into multiple grid jobs.
|
result
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
reduce
@Nullable public final @Nullable Map<? extends ComputeJob,ClusterNode> map(List<ClusterNode> subgrid, @Nullable @Nullable IgfsTaskArgs<T> args)
subgrid
- Nodes available for this task execution. Note that order of nodes is
guaranteed to be randomized by container. This ensures that every time
you simply iterate through grid nodes, the order of nodes will be random which
over time should result into all nodes being used equally.args
- Task execution argument. Can be null
. This is the same argument
as the one passed into Grid#execute(...)
methods.ComputeTaskContinuousMapper
is
injected into task, if null
or empty map is returned, exception will be thrown.@Nullable public abstract @Nullable IgfsJob createJob(IgfsPath path, IgfsFileRange range, IgfsTaskArgs<T> args) throws IgniteException
path
- Path.range
- File range based on consecutive blocks. This range will be further
realigned to record boundaries on destination node.args
- Task argument.null
is returned, the passed in file range will be skipped.IgniteException
- If job creation failed.
Follow @ApacheIgnite
Ignite Database and Caching Platform : ver. 2.7.2 Release Date : February 6 2019