Class JobStealingCollisionSpi
- java.lang.Object
-
- org.apache.ignite.spi.IgniteSpiAdapter
-
- org.apache.ignite.spi.collision.jobstealing.JobStealingCollisionSpi
-
- All Implemented Interfaces:
CollisionSpi
,IgniteSpi
@IgniteSpiMultipleInstancesSupport(true) @IgniteSpiConsistencyChecked(optional=true) public class JobStealingCollisionSpi extends IgniteSpiAdapter implements CollisionSpi
Collision SPI that supports job stealing from over-utilized nodes to under-utilized nodes. This SPI is especially useful if you have some jobs within task complete fast, and others sitting in the waiting queue on slower nodes. In such case, the waiting jobs will be stolen from slower node and moved to the fast under-utilized node.The design and ideas for this SPI are significantly influenced by Java Fork/Join Framework authored by Doug Lea and planned for Java 7.
GridJobStealingCollisionSpi
took similar concepts and applied them to the grid (as opposed to within VM support planned in Java 7).Quite often grids are deployed across many computers some of which will always be more powerful than others. This SPI helps you avoid jobs being stuck at a slower node, as they will be stolen by a faster node. In the following picture when Node3 becomes free, it steals Job13 and Job23 from Node1 and Node2 respectively.
Note that this SPI must always be used in conjunction with
JobStealingFailoverSpi
. Also note that job metrics update should be enabled in order for this SPI to work properly (i.e.IgniteConfiguration#getMetricsUpdateFrequency()
should be set to positive value). The responsibility of Job Stealing Failover SPI is to properly route stolen jobs to the nodes that initially requested (stole) these jobs. The SPI maintains a counter of how many times a jobs was stolen and hence traveled to another node.JobStealingCollisionSpi
checks this counter and will not allow a job to be stolen if this counter exceeds a certain thresholdsetMaximumStealingAttempts(int)
.Configuration
In order to use this SPI, you should configure your grid instance to useJobStealingCollisionSpi
either from Spring XML file or directly. The following configuration parameters are supported:Mandatory
This SPI has no mandatory configuration parameters.Optional
The following configuration parameters are optional:-
Maximum number of active jobs that will be allowed by this SPI
to execute concurrently (see
setActiveJobsThreshold(int)
). -
Maximum number of waiting jobs. Once waiting queue size goes below
this number, this SPI will attempt to steal jobs from over-utilized
nodes by sending "steal" requests (see
setWaitJobsThreshold(int)
). -
Steal message expire time. If no response was received from a node
to which steal request was sent, then request will be considered
lost and will be resent, potentially to another node (see
setMessageExpireTime(long)
). -
Maximum number of stealing attempts for the job (see
setMaximumStealingAttempts(int)
). -
Whether stealing enabled or not (see
setStealingEnabled(boolean)
). -
Enables stealing to/from only nodes that have these attributes set
(see
setStealingAttributes(Map)
).
JobStealingCollisionSpi spi = new JobStealingCollisionSpi(); // Configure number of waiting jobs // in the queue for job stealing. spi.setWaitJobsThreshold(10); // Configure message expire time (in milliseconds). spi.setMessageExpireTime(500); // Configure stealing attempts number. spi.setMaximumStealingAttempts(10); // Configure number of active jobs that are allowed to execute // in parallel. This number should usually be equal to the number // of threads in the pool (default is 100). spi.setActiveJobsThreshold(50); // Enable stealing. spi.setStealingEnabled(true); // Set stealing attribute to steal from/to nodes that have it. spi.setStealingAttributes(Collections.singletonMap("node.segment", "foobar")); IgniteConfiguration cfg = new IgniteConfiguration(); // Override default Collision SPI. cfg.setCollisionSpi(spi);
Here is an example of how this SPI can be configured from Spring XML configuration:<property name="collisionSpi"> <bean class="org.apache.ignite.spi.collision.jobstealing.JobStealingCollisionSpi"> <property name="activeJobsThreshold" value="100"/> <property name="waitJobsThreshold" value="0"/> <property name="messageExpireTime" value="1000"/> <property name="maximumStealingAttempts" value="10"/> <property name="stealingEnabled" value="true"/> <property name="stealingAttributes"> <map> <entry key="node.segment" value="foobar"/> </map> </property> </bean> </property>
For information about Spring framework visit www.springframework.org
-
-
Field Summary
Fields Modifier and Type Field Description static String
ACTIVE_JOBS_THRESHOLD_NODE_ATTR
Threshold of maximum jobs executing concurrently.static int
DFLT_ACTIVE_JOBS_THRESHOLD
Default number of parallel jobs allowed (value is95
which is slightly less same as default value of threads in the execution thread pool to allow some extra threads for system processing).static int
DFLT_JOB_PRIORITY
Default start value for job priority (value is0
).static int
DFLT_MAX_STEALING_ATTEMPTS
Maximum number of attempts to steal job by another node (default is5
).static long
DFLT_MSG_EXPIRE_TIME
Default steal message expire time in milliseconds (value is1000
).static int
DFLT_WAIT_JOBS_THRESHOLD
Default threshold of waiting jobs.static String
MAX_STEALING_ATTEMPT_ATTR
Maximum stealing attempts attribute name.static String
MSG_EXPIRE_TIME_ATTR
Stealing request expiration time attribute name.static String
STEALING_ATTEMPT_COUNT_ATTR
Name of job context attribute containing current stealing attempt count.static String
STEALING_PRIORITY_ATTR
Stealing priority attribute name.static String
THIEF_NODE_ATTR
Job context attribute for storing thief node UUID (this attribute is used in job stealing failover SPI).static String
WAIT_JOBS_THRESHOLD_NODE_ATTR
Threshold of maximum jobs on waiting queue.-
Fields inherited from class org.apache.ignite.spi.IgniteSpiAdapter
ignite, igniteInstanceName
-
-
Constructor Summary
Constructors Constructor Description JobStealingCollisionSpi()
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description int
getActiveJobsThreshold()
protected List<String>
getConsistentAttributeNames()
Returns back a list of attributes that should be consistent for this SPI.int
getCurrentActiveJobsNumber()
Gets current number of jobs that are being executed.int
getCurrentHeldJobsNumber()
Gets number of currently'held'
jobs.int
getCurrentJobsToStealNumber()
Gets current number of jobs to be stolen.int
getCurrentRunningJobsNumber()
Gets number of currently running (not'held
) jobs.int
getCurrentWaitJobsNumber()
Gets current number of jobs that wait for the execution.int
getMaximumStealingAttempts()
long
getMessageExpireTime()
Map<String,Object>
getNodeAttributes()
This method is called before SPI starts (before methodIgniteSpi.spiStart(String)
is called).Map<String,? extends Serializable>
getStealingAttributes()
int
getTotalStolenJobsNumber()
Gets total number of stolen jobs.int
getWaitJobsThreshold()
boolean
isStealingEnabled()
void
onCollision(CollisionContext ctx)
This is a callback called: new grid job arrived executing job finished its execution topology changed periodically (onEventType.EVT_NODE_METRICS_UPDATED
) When new job arrives it is added to the end of the wait list and this method is called.void
onContextDestroyed0()
Method to be called in the beginning of onContextDestroyed() method.protected void
onContextInitialized0(IgniteSpiContext spiCtx)
Method to be called in the end of onContextInitialized method.JobStealingCollisionSpi
setActiveJobsThreshold(int activeJobsThreshold)
Sets number of jobs that can be executed in parallel.void
setExternalCollisionListener(CollisionExternalListener extLsnr)
Listener to be set for notification of external collision events (e.g. job stealing).JobStealingCollisionSpi
setMaximumStealingAttempts(int maxStealingAttempts)
Gets maximum number of attempts to steal job by another node.JobStealingCollisionSpi
setMessageExpireTime(long msgExpireTime)
Message expire time configuration parameter.JobStealingCollisionSpi
setName(String name)
Sets SPI name.JobStealingCollisionSpi
setStealingAttributes(Map<String,? extends Serializable> stealAttrs)
Configuration parameter to enable stealing to/from only nodes that have these attributes set (seeClusterNode.attribute(String)
andIgniteConfiguration.getUserAttributes()
methods).JobStealingCollisionSpi
setStealingEnabled(boolean isStealingEnabled)
Gets flag indicating whether this node should attempt to steal jobs from other nodes.JobStealingCollisionSpi
setWaitJobsThreshold(int waitJobsThreshold)
Sets job count threshold at which this node will start stealing jobs from other nodes.void
spiStart(String igniteInstanceName)
This method is called to start SPI.void
spiStop()
This method is called to stop SPI.String
toString()
-
Methods inherited from class org.apache.ignite.spi.IgniteSpiAdapter
addTimeoutObject, assertParameter, checkConfigurationConsistency0, clientFailureDetectionTimeout, configInfo, createSpiAttributeName, failureDetectionTimeout, failureDetectionTimeoutEnabled, failureDetectionTimeoutEnabled, getExceptionRegistry, getLocalNode, getName, getSpiContext, ignite, initFailureDetectionTimeout, injectables, injectResources, isNodeStopping, onBeforeStart, onClientDisconnected, onClientReconnected, onContextDestroyed, onContextInitialized, registerMBean, removeTimeoutObject, started, startInfo, startStopwatch, stopInfo, unregisterMBean
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
-
Methods inherited from interface org.apache.ignite.spi.IgniteSpi
getName, onClientDisconnected, onClientReconnected, onContextDestroyed, onContextInitialized
-
-
-
-
Field Detail
-
DFLT_MAX_STEALING_ATTEMPTS
public static final int DFLT_MAX_STEALING_ATTEMPTS
Maximum number of attempts to steal job by another node (default is5
).- See Also:
- Constant Field Values
-
DFLT_ACTIVE_JOBS_THRESHOLD
public static final int DFLT_ACTIVE_JOBS_THRESHOLD
Default number of parallel jobs allowed (value is95
which is slightly less same as default value of threads in the execution thread pool to allow some extra threads for system processing).- See Also:
- Constant Field Values
-
DFLT_MSG_EXPIRE_TIME
public static final long DFLT_MSG_EXPIRE_TIME
Default steal message expire time in milliseconds (value is1000
). Once this time is elapsed and no response for steal message is received, the message is considered lost and another steal message will be generated, potentially to another node.- See Also:
- Constant Field Values
-
DFLT_WAIT_JOBS_THRESHOLD
public static final int DFLT_WAIT_JOBS_THRESHOLD
Default threshold of waiting jobs. If number of waiting jobs exceeds this threshold, then waiting jobs will become available to be stolen (value is0
).- See Also:
- Constant Field Values
-
DFLT_JOB_PRIORITY
public static final int DFLT_JOB_PRIORITY
Default start value for job priority (value is0
).- See Also:
- Constant Field Values
-
THIEF_NODE_ATTR
public static final String THIEF_NODE_ATTR
Job context attribute for storing thief node UUID (this attribute is used in job stealing failover SPI).- See Also:
- Constant Field Values
-
WAIT_JOBS_THRESHOLD_NODE_ATTR
public static final String WAIT_JOBS_THRESHOLD_NODE_ATTR
Threshold of maximum jobs on waiting queue.- See Also:
- Constant Field Values
-
ACTIVE_JOBS_THRESHOLD_NODE_ATTR
public static final String ACTIVE_JOBS_THRESHOLD_NODE_ATTR
Threshold of maximum jobs executing concurrently.- See Also:
- Constant Field Values
-
STEALING_ATTEMPT_COUNT_ATTR
public static final String STEALING_ATTEMPT_COUNT_ATTR
Name of job context attribute containing current stealing attempt count. This count is incremented every time the same job gets stolen for execution.- See Also:
ComputeJobContext
, Constant Field Values
-
MAX_STEALING_ATTEMPT_ATTR
public static final String MAX_STEALING_ATTEMPT_ATTR
Maximum stealing attempts attribute name.- See Also:
- Constant Field Values
-
MSG_EXPIRE_TIME_ATTR
public static final String MSG_EXPIRE_TIME_ATTR
Stealing request expiration time attribute name.- See Also:
- Constant Field Values
-
STEALING_PRIORITY_ATTR
public static final String STEALING_PRIORITY_ATTR
Stealing priority attribute name.- See Also:
- Constant Field Values
-
-
Method Detail
-
setActiveJobsThreshold
@IgniteSpiConfiguration(optional=true) public JobStealingCollisionSpi setActiveJobsThreshold(int activeJobsThreshold)
Sets number of jobs that can be executed in parallel.- Parameters:
activeJobsThreshold
- Number of jobs that can be executed in parallel.- Returns:
this
for chaining.
-
getActiveJobsThreshold
public int getActiveJobsThreshold()
- Returns:
- Number of jobs that can be executed in parallel.
-
setWaitJobsThreshold
@IgniteSpiConfiguration(optional=true) public JobStealingCollisionSpi setWaitJobsThreshold(int waitJobsThreshold)
Sets job count threshold at which this node will start stealing jobs from other nodes.- Parameters:
waitJobsThreshold
- Job count threshold.- Returns:
this
for chaining.
-
getWaitJobsThreshold
public int getWaitJobsThreshold()
- Returns:
- Job count threshold.
-
setMessageExpireTime
@IgniteSpiConfiguration(optional=true) public JobStealingCollisionSpi setMessageExpireTime(long msgExpireTime)
Message expire time configuration parameter. If no response is received from a busy node to a job stealing message, then implementation will assume that message never got there, or that remote node does not have this node included into topology of any of the jobs it has.- Parameters:
msgExpireTime
- Message expire time.- Returns:
this
for chaining.
-
getMessageExpireTime
public long getMessageExpireTime()
- Returns:
- Message expire time.
-
setStealingEnabled
@IgniteSpiConfiguration(optional=true) public JobStealingCollisionSpi setStealingEnabled(boolean isStealingEnabled)
Gets flag indicating whether this node should attempt to steal jobs from other nodes. Iffalse
, then this node will steal allow jobs to be stolen from it, but won't attempt to steal any jobs from other nodes.Default value is
true
.- Parameters:
isStealingEnabled
- Flag indicating whether this node should attempt to steal jobs from other nodes.- Returns:
this
for chaining.
-
isStealingEnabled
public boolean isStealingEnabled()
- Returns:
- Flag indicating whether this node should attempt to steal jobs from other nodes.
-
setMaximumStealingAttempts
@IgniteSpiConfiguration(optional=true) public JobStealingCollisionSpi setMaximumStealingAttempts(int maxStealingAttempts)
Gets maximum number of attempts to steal job by another node. If not specified,DFLT_MAX_STEALING_ATTEMPTS
value will be used.- Parameters:
maxStealingAttempts
- Maximum number of attempts to steal job by another node.- Returns:
this
for chaining.
-
getMaximumStealingAttempts
public int getMaximumStealingAttempts()
- Returns:
- Maximum number of attempts to steal job by another node.
-
setStealingAttributes
@IgniteSpiConfiguration(optional=true) public JobStealingCollisionSpi setStealingAttributes(Map<String,? extends Serializable> stealAttrs)
Configuration parameter to enable stealing to/from only nodes that have these attributes set (seeClusterNode.attribute(String)
andIgniteConfiguration.getUserAttributes()
methods).- Parameters:
stealAttrs
- Node attributes to enable job stealing for.- Returns:
this
for chaining.
-
getStealingAttributes
public Map<String,? extends Serializable> getStealingAttributes()
- Returns:
- Node attributes to enable job stealing for.
-
getCurrentRunningJobsNumber
public int getCurrentRunningJobsNumber()
Gets number of currently running (not'held
) jobs.- Returns:
- Number of currently running (not
'held
) jobs.
-
getCurrentHeldJobsNumber
public int getCurrentHeldJobsNumber()
Gets number of currently'held'
jobs.- Returns:
- Number of currently
'held'
jobs.
-
getCurrentWaitJobsNumber
public int getCurrentWaitJobsNumber()
Gets current number of jobs that wait for the execution.- Returns:
- Number of jobs that wait for execution.
-
getCurrentActiveJobsNumber
public int getCurrentActiveJobsNumber()
Gets current number of jobs that are being executed.- Returns:
- Number of active jobs.
-
getTotalStolenJobsNumber
public int getTotalStolenJobsNumber()
Gets total number of stolen jobs.- Returns:
- Number of stolen jobs.
-
getCurrentJobsToStealNumber
public int getCurrentJobsToStealNumber()
Gets current number of jobs to be stolen. This is outstanding requests number.- Returns:
- Number of jobs to be stolen.
-
getNodeAttributes
public Map<String,Object> getNodeAttributes() throws IgniteSpiException
This method is called before SPI starts (before methodIgniteSpi.spiStart(String)
is called). It allows SPI implementation to add attributes to a local node. Kernal collects these attributes from all SPI implementations loaded up and then passes it to discovery SPI so that they can be exchanged with other nodes.- Specified by:
getNodeAttributes
in interfaceIgniteSpi
- Overrides:
getNodeAttributes
in classIgniteSpiAdapter
- Returns:
- Map of local node attributes this SPI wants to add.
- Throws:
IgniteSpiException
- Throws in case of any error.
-
spiStart
public void spiStart(String igniteInstanceName) throws IgniteSpiException
This method is called to start SPI. After this method returns successfully kernel assumes that SPI is fully operational.- Specified by:
spiStart
in interfaceIgniteSpi
- Parameters:
igniteInstanceName
- Name of Ignite instance this SPI is being started for (null
for default Ignite instance).- Throws:
IgniteSpiException
- Throws in case of any error during SPI start.
-
spiStop
public void spiStop() throws IgniteSpiException
This method is called to stop SPI. After this method returns kernel assumes that this SPI is finished and all resources acquired by it are released.Note that this method can be called at any point including during recovery of failed start. It should make no assumptions on what state SPI will be in when this method is called.
- Specified by:
spiStop
in interfaceIgniteSpi
- Throws:
IgniteSpiException
- Thrown in case of any error during SPI stop.
-
setExternalCollisionListener
public void setExternalCollisionListener(CollisionExternalListener extLsnr)
Listener to be set for notification of external collision events (e.g. job stealing). Once grid receives such notification, it will immediately invoke collision SPI.Ignite uses this listener to enable job stealing from overloaded to underloaded nodes. However, you can also utilize it, for instance, to provide time based collision resolution. To achieve this, you most likely would mark some job by setting a certain attribute in job context (see
ComputeJobContext
) for a job that requires time-based scheduling and set some timer in your SPI implementation that would wake up after a certain period of time. Once this period is reached, you would notify this listener that a collision resolution should take place. Then inside of your collision resolution logic, you would find the marked waiting job and activate it.Note that most collision SPI's might not have external collisions. In that case, they should simply ignore this method and do nothing when listener is set.
- Specified by:
setExternalCollisionListener
in interfaceCollisionSpi
- Parameters:
extLsnr
- Listener for external collision events.
-
onContextInitialized0
protected void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException
Method to be called in the end of onContextInitialized method.- Overrides:
onContextInitialized0
in classIgniteSpiAdapter
- Parameters:
spiCtx
- SPI context.- Throws:
IgniteSpiException
- In case of errors.
-
onContextDestroyed0
public void onContextDestroyed0()
Method to be called in the beginning of onContextDestroyed() method.- Overrides:
onContextDestroyed0
in classIgniteSpiAdapter
-
onCollision
public void onCollision(CollisionContext ctx)
This is a callback called:- new grid job arrived
- executing job finished its execution
- topology changed
- periodically (on
EventType.EVT_NODE_METRICS_UPDATED
)
Implementation of this method should act on all lists, each of which contains collision job contexts that define a set of operations available during collision resolution. Refer to
CollisionContext
andCollisionJobContext
documentation for more information.- Specified by:
onCollision
in interfaceCollisionSpi
- Parameters:
ctx
- Collision context which contains all collision lists.
-
getConsistentAttributeNames
protected List<String> getConsistentAttributeNames()
Returns back a list of attributes that should be consistent for this SPI. Consistency means that remote node has to have the same attribute with the same value.- Overrides:
getConsistentAttributeNames
in classIgniteSpiAdapter
- Returns:
- List or attribute names.
-
setName
public JobStealingCollisionSpi setName(String name)
Sets SPI name.- Overrides:
setName
in classIgniteSpiAdapter
- Parameters:
name
- SPI name.- Returns:
this
for chaining.
-
-