@IgniteSpiMultipleInstancesSupport(value=true) @IgniteSpiConsistencyChecked(optional=true) public class JobStealingCollisionSpi extends IgniteSpiAdapter implements CollisionSpi
The design and ideas for this SPI are significantly influenced by
Java Fork/Join Framework
authored by Doug Lea and planned for Java 7. GridJobStealingCollisionSpi
took
similar concepts and applied them to the grid (as opposed to within VM support planned
in Java 7).
Quite often grids are deployed across many computers some of which will always be more powerful than others. This SPI helps you avoid jobs being stuck at a slower node, as they will be stolen by a faster node. In the following picture when Node3 becomes free, it steals Job13 and Job23 from Node1 and Node2 respectively.
Note that this SPI must always be used in conjunction with
JobStealingFailoverSpi
.
Also note that job metrics update should be enabled in order for this SPI
to work properly (i.e. IgniteConfiguration#getMetricsUpdateFrequency()
should be set to positive value).
The responsibility of Job Stealing Failover SPI is to properly route stolen
jobs to the nodes that initially requested (stole) these jobs. The
SPI maintains a counter of how many times a jobs was stolen and
hence traveled to another node. JobStealingCollisionSpi
checks this counter and will not allow a job to be stolen if this counter
exceeds a certain threshold setMaximumStealingAttempts(int)
.
JobStealingCollisionSpi
either from Spring XML file or
directly. The following configuration parameters are supported:
setActiveJobsThreshold(int)
).
setWaitJobsThreshold(int)
).
setMessageExpireTime(long)
).
setMaximumStealingAttempts(int)
).
setStealingEnabled(boolean)
).
setStealingAttributes(Map)
).
JobStealingCollisionSpi spi = new JobStealingCollisionSpi(); // Configure number of waiting jobs // in the queue for job stealing. spi.setWaitJobsThreshold(10); // Configure message expire time (in milliseconds). spi.setMessageExpireTime(500); // Configure stealing attempts number. spi.setMaximumStealingAttempts(10); // Configure number of active jobs that are allowed to execute // in parallel. This number should usually be equal to the number // of threads in the pool (default is 100). spi.setActiveJobsThreshold(50); // Enable stealing. spi.setStealingEnabled(true); // Set stealing attribute to steal from/to nodes that have it. spi.setStealingAttributes(Collections.singletonMap("node.segment", "foobar")); IgniteConfiguration cfg = new IgniteConfiguration(); // Override default Collision SPI. cfg.setCollisionSpi(spi);Here is an example of how this SPI can be configured from Spring XML configuration:
<property name="collisionSpi"> <bean class="org.apache.ignite.spi.collision.jobstealing.JobStealingCollisionSpi"> <property name="activeJobsThreshold" value="100"/> <property name="waitJobsThreshold" value="0"/> <property name="messageExpireTime" value="1000"/> <property name="maximumStealingAttempts" value="10"/> <property name="stealingEnabled" value="true"/> <property name="stealingAttributes"> <map> <entry key="node.segment" value="foobar"/> </map> </property> </bean> </property>
For information about Spring framework visit www.springframework.org
Modifier and Type | Field and Description |
---|---|
static String |
ACTIVE_JOBS_THRESHOLD_NODE_ATTR
Threshold of maximum jobs executing concurrently.
|
static int |
DFLT_ACTIVE_JOBS_THRESHOLD
Default number of parallel jobs allowed (value is
95 which is
slightly less same as default value of threads in the execution thread pool
to allow some extra threads for system processing). |
static int |
DFLT_JOB_PRIORITY
Default start value for job priority (value is
0 ). |
static int |
DFLT_MAX_STEALING_ATTEMPTS
Maximum number of attempts to steal job by another node (default is
5 ). |
static long |
DFLT_MSG_EXPIRE_TIME
Default steal message expire time in milliseconds (value is
1000 ). |
static int |
DFLT_WAIT_JOBS_THRESHOLD
Default threshold of waiting jobs.
|
static String |
MAX_STEALING_ATTEMPT_ATTR
Maximum stealing attempts attribute name.
|
static String |
MSG_EXPIRE_TIME_ATTR
Stealing request expiration time attribute name.
|
static String |
STEALING_ATTEMPT_COUNT_ATTR
Name of job context attribute containing current stealing attempt count.
|
static String |
STEALING_PRIORITY_ATTR
Stealing priority attribute name.
|
static String |
THIEF_NODE_ATTR
Job context attribute for storing thief node UUID (this attribute is used in job stealing failover SPI).
|
static String |
WAIT_JOBS_THRESHOLD_NODE_ATTR
Threshold of maximum jobs on waiting queue.
|
ignite, igniteInstanceName
Constructor and Description |
---|
JobStealingCollisionSpi() |
Modifier and Type | Method and Description |
---|---|
int |
getActiveJobsThreshold()
|
protected List<String> |
getConsistentAttributeNames()
Returns back a list of attributes that should be consistent
for this SPI.
|
int |
getCurrentActiveJobsNumber()
Gets current number of jobs that are being executed.
|
int |
getCurrentHeldJobsNumber()
Gets number of currently
'held' jobs. |
int |
getCurrentJobsToStealNumber()
Gets current number of jobs to be stolen.
|
int |
getCurrentRunningJobsNumber()
Gets number of currently running (not
'held ) jobs. |
int |
getCurrentWaitJobsNumber()
Gets current number of jobs that wait for the execution.
|
int |
getMaximumStealingAttempts()
|
long |
getMessageExpireTime()
|
Map<String,Object> |
getNodeAttributes()
This method is called before SPI starts (before method
IgniteSpi.spiStart(String)
is called). |
Map<String,? extends Serializable> |
getStealingAttributes()
|
int |
getTotalStolenJobsNumber()
Gets total number of stolen jobs.
|
int |
getWaitJobsThreshold()
|
boolean |
isStealingEnabled()
|
void |
onCollision(CollisionContext ctx)
This is a callback called:
new grid job arrived
executing job finished its execution
topology changed
periodically (on
EventType.EVT_NODE_METRICS_UPDATED )
When new job arrives it is added to the end of the wait list and this
method is called. |
void |
onContextDestroyed0()
Method to be called in the beginning of onContextDestroyed() method.
|
protected void |
onContextInitialized0(IgniteSpiContext spiCtx)
Method to be called in the end of onContextInitialized method.
|
JobStealingCollisionSpi |
setActiveJobsThreshold(int activeJobsThreshold)
Sets number of jobs that can be executed in parallel.
|
void |
setExternalCollisionListener(CollisionExternalListener extLsnr)
Listener to be set for notification of external collision events (e.g. job stealing).
|
JobStealingCollisionSpi |
setMaximumStealingAttempts(int maxStealingAttempts)
Gets maximum number of attempts to steal job by another node.
|
JobStealingCollisionSpi |
setMessageExpireTime(long msgExpireTime)
Message expire time configuration parameter.
|
JobStealingCollisionSpi |
setName(String name)
Sets SPI name.
|
JobStealingCollisionSpi |
setStealingAttributes(Map<String,? extends Serializable> stealAttrs)
Configuration parameter to enable stealing to/from only nodes that
have these attributes set (see
ClusterNode.attribute(String) and
IgniteConfiguration.getUserAttributes() methods). |
JobStealingCollisionSpi |
setStealingEnabled(boolean isStealingEnabled)
Gets flag indicating whether this node should attempt to steal jobs
from other nodes.
|
JobStealingCollisionSpi |
setWaitJobsThreshold(int waitJobsThreshold)
Sets job count threshold at which this node will
start stealing jobs from other nodes.
|
void |
spiStart(String igniteInstanceName)
This method is called to start SPI.
|
void |
spiStop()
This method is called to stop SPI.
|
String |
toString() |
addTimeoutObject, assertParameter, checkConfigurationConsistency0, clientFailureDetectionTimeout, configInfo, createSpiAttributeName, failureDetectionTimeout, failureDetectionTimeoutEnabled, failureDetectionTimeoutEnabled, getExceptionRegistry, getLocalNode, getName, getSpiContext, ignite, initFailureDetectionTimeout, injectables, injectResources, isNodeStopping, onBeforeStart, onClientDisconnected, onClientReconnected, onContextDestroyed, onContextInitialized, registerMBean, removeTimeoutObject, started, startInfo, startStopwatch, stopInfo, unregisterMBean
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
getName, onClientDisconnected, onClientReconnected, onContextDestroyed, onContextInitialized
public static final int DFLT_MAX_STEALING_ATTEMPTS
5
).public static final int DFLT_ACTIVE_JOBS_THRESHOLD
95
which is
slightly less same as default value of threads in the execution thread pool
to allow some extra threads for system processing).public static final long DFLT_MSG_EXPIRE_TIME
1000
).
Once this time is elapsed and no response for steal message is received,
the message is considered lost and another steal message will be generated,
potentially to another node.public static final int DFLT_WAIT_JOBS_THRESHOLD
0
).public static final int DFLT_JOB_PRIORITY
0
).public static final String THIEF_NODE_ATTR
public static final String WAIT_JOBS_THRESHOLD_NODE_ATTR
public static final String ACTIVE_JOBS_THRESHOLD_NODE_ATTR
public static final String STEALING_ATTEMPT_COUNT_ATTR
ComputeJobContext
,
Constant Field Valuespublic static final String MAX_STEALING_ATTEMPT_ATTR
public static final String MSG_EXPIRE_TIME_ATTR
public static final String STEALING_PRIORITY_ATTR
@IgniteSpiConfiguration(optional=true) public JobStealingCollisionSpi setActiveJobsThreshold(int activeJobsThreshold)
activeJobsThreshold
- Number of jobs that can be executed in parallel.this
for chaining.public int getActiveJobsThreshold()
@IgniteSpiConfiguration(optional=true) public JobStealingCollisionSpi setWaitJobsThreshold(int waitJobsThreshold)
waitJobsThreshold
- Job count threshold.this
for chaining.public int getWaitJobsThreshold()
@IgniteSpiConfiguration(optional=true) public JobStealingCollisionSpi setMessageExpireTime(long msgExpireTime)
msgExpireTime
- Message expire time.this
for chaining.public long getMessageExpireTime()
@IgniteSpiConfiguration(optional=true) public JobStealingCollisionSpi setStealingEnabled(boolean isStealingEnabled)
false
, then this node will steal allow
jobs to be stolen from it, but won't attempt to steal any jobs from
other nodes.
Default value is true
.
isStealingEnabled
- Flag indicating whether this node should attempt to steal jobs
from other nodes.this
for chaining.public boolean isStealingEnabled()
@IgniteSpiConfiguration(optional=true) public JobStealingCollisionSpi setMaximumStealingAttempts(int maxStealingAttempts)
DFLT_MAX_STEALING_ATTEMPTS
value will be used.maxStealingAttempts
- Maximum number of attempts to steal job by another node.this
for chaining.public int getMaximumStealingAttempts()
@IgniteSpiConfiguration(optional=true) public JobStealingCollisionSpi setStealingAttributes(Map<String,? extends Serializable> stealAttrs)
ClusterNode.attribute(String)
and
IgniteConfiguration.getUserAttributes()
methods).stealAttrs
- Node attributes to enable job stealing for.this
for chaining.public Map<String,? extends Serializable> getStealingAttributes()
public int getCurrentRunningJobsNumber()
'held
) jobs.'held
) jobs.public int getCurrentHeldJobsNumber()
'held'
jobs.'held'
jobs.public int getCurrentWaitJobsNumber()
public int getCurrentActiveJobsNumber()
public int getTotalStolenJobsNumber()
public int getCurrentJobsToStealNumber()
public Map<String,Object> getNodeAttributes() throws IgniteSpiException
IgniteSpi.spiStart(String)
is called). It allows SPI implementation to add attributes to a local
node. Kernal collects these attributes from all SPI implementations
loaded up and then passes it to discovery SPI so that they can be
exchanged with other nodes.getNodeAttributes
in interface IgniteSpi
getNodeAttributes
in class IgniteSpiAdapter
IgniteSpiException
- Throws in case of any error.public void spiStart(String igniteInstanceName) throws IgniteSpiException
spiStart
in interface IgniteSpi
igniteInstanceName
- Name of Ignite instance this SPI is being started for
(null
for default Ignite instance).IgniteSpiException
- Throws in case of any error during SPI start.public void spiStop() throws IgniteSpiException
Note that this method can be called at any point including during recovery of failed start. It should make no assumptions on what state SPI will be in when this method is called.
spiStop
in interface IgniteSpi
IgniteSpiException
- Thrown in case of any error during SPI stop.public void setExternalCollisionListener(CollisionExternalListener extLsnr)
Ignite uses this listener to enable job stealing from overloaded to underloaded nodes.
However, you can also utilize it, for instance, to provide time based collision
resolution. To achieve this, you most likely would mark some job by setting a certain
attribute in job context (see ComputeJobContext
) for a job that requires
time-based scheduling and set some timer in your SPI implementation that would wake up
after a certain period of time. Once this period is reached, you would notify this
listener that a collision resolution should take place. Then inside of your collision
resolution logic, you would find the marked waiting job and activate it.
Note that most collision SPI's might not have external collisions. In that case, they should simply ignore this method and do nothing when listener is set.
setExternalCollisionListener
in interface CollisionSpi
extLsnr
- Listener for external collision events.protected void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException
onContextInitialized0
in class IgniteSpiAdapter
spiCtx
- SPI context.IgniteSpiException
- In case of errors.public void onContextDestroyed0()
onContextDestroyed0
in class IgniteSpiAdapter
public void onCollision(CollisionContext ctx)
EventType.EVT_NODE_METRICS_UPDATED
)
Implementation of this method should act on all lists, each of which contains collision
job contexts that define a set of operations available during collision resolution. Refer
to CollisionContext
and CollisionJobContext
documentation for
more information.
onCollision
in interface CollisionSpi
ctx
- Collision context which contains all collision lists.protected List<String> getConsistentAttributeNames()
getConsistentAttributeNames
in class IgniteSpiAdapter
public JobStealingCollisionSpi setName(String name)
setName
in class IgniteSpiAdapter
name
- SPI name.this
for chaining.
Follow @ApacheIgnite
Ignite Database and Caching Platform : ver. 2.16.0 Release Date : December 15 2023