@IgniteSpiMultipleInstancesSupport(value=true) @IgniteSpiConsistencyChecked(optional=true) public class PriorityQueueCollisionSpi extends IgniteSpiAdapter implements CollisionSpi
getParallelJobsNumber()
jobs
is allowed to execute in parallel. Other jobs will be queued up.
setParallelJobsNumber(int)
).
This number should usually be set to no greater than number of threads in the execution thread pool.
getPriorityAttributeKey()
). Prior to
returning from ComputeTask.map(List, Object)
method, task implementation should
set a value into the task session keyed by this attribute key. See ComputeTaskSession
for more information about task session.
getJobPriorityAttributeKey()
).
It is used for specifying job priority.
See ComputeJobContext
for more information about job context.
getDefaultPriority()
). It is used when no priority is set.getStarvationIncrement()
).
It is used for increasing priority when job gets bumped down.
This future is used for preventing starvation waiting jobs execution.
isStarvationPreventionEnabled()
).
It is used for enabling increasing priority when job gets bumped down.
This future is used for preventing starvation waiting jobs execution.
PriorityQueueCollisionSpi colSpi = new PriorityQueueCollisionSpi(); // Execute all jobs sequentially by setting parallel job number to 1. colSpi.setParallelJobsNumber(1); IgniteConfiguration cfg = new IgniteConfiguration(); // Override default collision SPI. cfg.setCollisionSpi(colSpi); // Starts grid. G.start(cfg);Here is Spring XML configuration example:
<property name="collisionSpi"> <bean class="org.apache.ignite.spi.collision.priorityqueue.PriorityQueueCollisionSpi"> <property name="priorityAttributeKey" value="myPriorityAttributeKey"/> <property name="parallelJobsNumber" value="10"/> </bean> </property>
This example demonstrates how urgent task may be declared with a higher priority value.
Priority SPI guarantees (see its configuration in example above, where number of parallel
jobs is set to 1
) that all jobs from MyGridUrgentTask
will most likely
be activated first (one by one) and jobs from MyGridUsualTask
with lowest priority
will wait. Once higher priority jobs complete, lower priority jobs will be scheduled.
public class MyGridUsualTask extends ComputeTaskSplitAdapter<Object, Object> { public static final int SPLIT_COUNT = 20; @TaskSessionResource private ComputeTaskSession taskSes; @Override protected Collection<? extends ComputeJob> split(int gridSize, Object arg) throws IgniteCheckedException { ... // Set low task priority (note that attribute name is used by the SPI // and should not be changed). taskSes.setAttribute("grid.task.priority", 5); Collection<ComputeJob> jobs = new ArrayList<ComputeJob>(SPLIT_COUNT); for (int i = 1; i <= SPLIT_COUNT; i++) { jobs.add(new ComputeJobAdapter<Integer>(i) { ... }); } ... } }and
public class MyGridUrgentTask extends ComputeTaskSplitAdapter<Object, Object> { public static final int SPLIT_COUNT = 5; @TaskSessionResource private ComputeTaskSession taskSes; @Override protected Collection<? extends ComputeJob> split(int gridSize, Object arg) throws IgniteCheckedException { ... // Set high task priority (note that attribute name is used by the SPI // and should not be changed). taskSes.setAttribute("grid.task.priority", 10); Collection<ComputeJob> jobs = new ArrayList<ComputeJob>(SPLIT_COUNT); for (int i = 1; i <= SPLIT_COUNT; i++) { jobs.add(new ComputeJobAdapter<Integer>(i) { ... }); } ... } }
For information about Spring framework visit www.springframework.org
Modifier and Type | Field and Description |
---|---|
static String |
DFLT_JOB_PRIORITY_ATTRIBUTE_KEY
Default job priority attribute key (value is
grid.job.priority ). |
static int |
DFLT_PARALLEL_JOBS_NUM
Default number of parallel jobs allowed (set to number of cores times 2).
|
static boolean |
DFLT_PREVENT_STARVATION_ENABLED
Default flag for preventing starvation of lower priority jobs.
|
static int |
DFLT_PRIORITY
Default priority that will be assigned if job does not have a
priority attribute set (value is
0 ). |
static String |
DFLT_PRIORITY_ATTRIBUTE_KEY
Default priority attribute key (value is
grid.task.priority ). |
static int |
DFLT_STARVATION_INCREMENT
Default value on which job priority will be increased every time when job gets bumped down.
|
static int |
DFLT_WAIT_JOBS_NUM
Default waiting jobs number.
|
ignite, igniteInstanceName
Constructor and Description |
---|
PriorityQueueCollisionSpi() |
Modifier and Type | Method and Description |
---|---|
protected List<String> |
getConsistentAttributeNames()
Returns back a list of attributes that should be consistent
for this SPI.
|
int |
getCurrentActiveJobsNumber()
Gets current number of jobs that are active, i.e.
|
int |
getCurrentHeldJobsNumber()
Gets number of currently
'held' jobs. |
int |
getCurrentRunningJobsNumber()
Gets number of currently running (not
'held ) jobs. |
int |
getCurrentWaitJobsNumber()
Gets current number of jobs that wait for the execution.
|
int |
getDefaultPriority()
Gets default priority to use if a job does not have priority attribute
set.
|
String |
getJobPriorityAttributeKey()
Gets key name of job priority attribute.
|
Map<String,Object> |
getNodeAttributes()
This method is called before SPI starts (before method
IgniteSpi.spiStart(String)
is called). |
int |
getParallelJobsNumber()
Gets number of jobs that can be executed in parallel.
|
String |
getPriorityAttributeKey()
Gets key name of task priority attribute.
|
int |
getStarvationIncrement()
Gets value to increment job priority by every time a lower priority job gets
behind a higher priority job.
|
int |
getWaitingJobsNumber()
Maximum number of jobs that are allowed to wait in waiting queue.
|
boolean |
isStarvationPreventionEnabled()
Gets flag indicating whether job starvation prevention is enabled.
|
void |
onCollision(CollisionContext ctx)
This is a callback called:
new grid job arrived
executing job finished its execution
topology changed
periodically (on
EventType.EVT_NODE_METRICS_UPDATED )
When new job arrives it is added to the end of the wait list and this
method is called. |
PriorityQueueCollisionSpi |
setDefaultPriority(int priority)
Sets default priority to use if a job does not have priority attribute set.
|
void |
setExternalCollisionListener(CollisionExternalListener lsnr)
Listener to be set for notification of external collision events (e.g. job stealing).
|
PriorityQueueCollisionSpi |
setJobPriorityAttributeKey(String jobPriAttrKey)
Sets job priority attribute key.
|
PriorityQueueCollisionSpi |
setName(String name)
Sets SPI name.
|
PriorityQueueCollisionSpi |
setParallelJobsNumber(int parallelJobsNum)
Sets number of jobs that can be executed in parallel.
|
PriorityQueueCollisionSpi |
setPriorityAttributeKey(String taskPriAttrKey)
Sets task priority attribute key.
|
PriorityQueueCollisionSpi |
setStarvationIncrement(int starvationInc)
Sets value to increment job priority by every time a lower priority job gets
behind a higher priority job.
|
PriorityQueueCollisionSpi |
setStarvationPreventionEnabled(boolean preventStarvation)
Sets flag indicating whether job starvation prevention is enabled.
|
PriorityQueueCollisionSpi |
setWaitingJobsNumber(int waitJobsNum)
Maximum number of jobs that are allowed to wait in waiting queue.
|
void |
spiStart(String igniteInstanceName)
This method is called to start SPI.
|
void |
spiStop()
This method is called to stop SPI.
|
String |
toString() |
addTimeoutObject, assertParameter, checkConfigurationConsistency0, clientFailureDetectionTimeout, configInfo, createSpiAttributeName, failureDetectionTimeout, failureDetectionTimeoutEnabled, failureDetectionTimeoutEnabled, getExceptionRegistry, getLocalNode, getName, getSpiContext, ignite, initFailureDetectionTimeout, injectables, injectResources, isNodeStopping, onBeforeStart, onClientDisconnected, onClientReconnected, onContextDestroyed, onContextDestroyed0, onContextInitialized, onContextInitialized0, registerMBean, removeTimeoutObject, started, startInfo, startStopwatch, stopInfo, unregisterMBean
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
getName, onClientDisconnected, onClientReconnected, onContextDestroyed, onContextInitialized
public static final int DFLT_PARALLEL_JOBS_NUM
public static final int DFLT_WAIT_JOBS_NUM
Integer.MAX_VALUE
.public static final String DFLT_PRIORITY_ATTRIBUTE_KEY
grid.task.priority
).public static final String DFLT_JOB_PRIORITY_ATTRIBUTE_KEY
grid.job.priority
).public static final int DFLT_PRIORITY
0
).public static final int DFLT_STARVATION_INCREMENT
public static final boolean DFLT_PREVENT_STARVATION_ENABLED
public int getParallelJobsNumber()
@IgniteSpiConfiguration(optional=true) public PriorityQueueCollisionSpi setParallelJobsNumber(int parallelJobsNum)
parallelJobsNum
- Parallel jobs number.this
for chaining.public int getWaitingJobsNumber()
@IgniteSpiConfiguration(optional=true) public PriorityQueueCollisionSpi setWaitingJobsNumber(int waitJobsNum)
waitJobsNum
- Maximium jobs number.this
for chaining.public int getCurrentWaitJobsNumber()
public int getCurrentActiveJobsNumber()
'running + held'
jobs.public int getCurrentRunningJobsNumber()
'held
) jobs.'held
) jobs.public int getCurrentHeldJobsNumber()
'held'
jobs.'held'
jobs.@IgniteSpiConfiguration(optional=true) public PriorityQueueCollisionSpi setPriorityAttributeKey(String taskPriAttrKey)
ComputeTaskSession.getAttribute(Object)
).
If not provided, default value is {@link #DFLT_PRIORITY_ATTRIBUTE_KEY}
.
taskPriAttrKey
- Priority session attribute key.this
for chaining.@IgniteSpiConfiguration(optional=true) public PriorityQueueCollisionSpi setJobPriorityAttributeKey(String jobPriAttrKey)
ComputeJobContext.getAttribute(Object)
).
If not provided, default value is {@link #DFLT_JOB_PRIORITY_ATTRIBUTE_KEY}
.
jobPriAttrKey
- Job priority attribute key.this
for chaining.public String getPriorityAttributeKey()
public String getJobPriorityAttributeKey()
public int getDefaultPriority()
@IgniteSpiConfiguration(optional=true) public PriorityQueueCollisionSpi setDefaultPriority(int priority)
priority
- default priority.this
for chaining.public int getStarvationIncrement()
@IgniteSpiConfiguration(optional=true) public PriorityQueueCollisionSpi setStarvationIncrement(int starvationInc)
starvationInc
- Increment value.this
for chaining.public boolean isStarvationPreventionEnabled()
@IgniteSpiConfiguration(optional=true) public PriorityQueueCollisionSpi setStarvationPreventionEnabled(boolean preventStarvation)
preventStarvation
- Flag indicating whether job starvation prevention is enabled.this
for chaining.public Map<String,Object> getNodeAttributes() throws IgniteSpiException
IgniteSpi.spiStart(String)
is called). It allows SPI implementation to add attributes to a local
node. Kernal collects these attributes from all SPI implementations
loaded up and then passes it to discovery SPI so that they can be
exchanged with other nodes.getNodeAttributes
in interface IgniteSpi
getNodeAttributes
in class IgniteSpiAdapter
IgniteSpiException
- Throws in case of any error.public void spiStart(String igniteInstanceName) throws IgniteSpiException
spiStart
in interface IgniteSpi
igniteInstanceName
- Name of Ignite instance this SPI is being started for
(null
for default Ignite instance).IgniteSpiException
- Throws in case of any error during SPI start.public void spiStop() throws IgniteSpiException
Note that this method can be called at any point including during recovery of failed start. It should make no assumptions on what state SPI will be in when this method is called.
spiStop
in interface IgniteSpi
IgniteSpiException
- Thrown in case of any error during SPI stop.public void setExternalCollisionListener(CollisionExternalListener lsnr)
Ignite uses this listener to enable job stealing from overloaded to underloaded nodes.
However, you can also utilize it, for instance, to provide time based collision
resolution. To achieve this, you most likely would mark some job by setting a certain
attribute in job context (see ComputeJobContext
) for a job that requires
time-based scheduling and set some timer in your SPI implementation that would wake up
after a certain period of time. Once this period is reached, you would notify this
listener that a collision resolution should take place. Then inside of your collision
resolution logic, you would find the marked waiting job and activate it.
Note that most collision SPI's might not have external collisions. In that case, they should simply ignore this method and do nothing when listener is set.
setExternalCollisionListener
in interface CollisionSpi
lsnr
- Listener for external collision events.public void onCollision(CollisionContext ctx)
EventType.EVT_NODE_METRICS_UPDATED
)
Implementation of this method should act on all lists, each of which contains collision
job contexts that define a set of operations available during collision resolution. Refer
to CollisionContext
and CollisionJobContext
documentation for
more information.
onCollision
in interface CollisionSpi
ctx
- Collision context which contains all collision lists.protected List<String> getConsistentAttributeNames()
getConsistentAttributeNames
in class IgniteSpiAdapter
public PriorityQueueCollisionSpi setName(String name)
setName
in class IgniteSpiAdapter
name
- SPI name.this
for chaining.
Follow @ApacheIgnite
Ignite Database and Caching Platform : ver. 2.16.0 Release Date : December 15 2023