Class JobStealingCollisionSpi

  • All Implemented Interfaces:
    CollisionSpi, IgniteSpi

    @IgniteSpiMultipleInstancesSupport(true)
    @IgniteSpiConsistencyChecked(optional=true)
    public class JobStealingCollisionSpi
    extends IgniteSpiAdapter
    implements CollisionSpi
    Collision SPI that supports job stealing from over-utilized nodes to under-utilized nodes. This SPI is especially useful if you have some jobs within task complete fast, and others sitting in the waiting queue on slower nodes. In such case, the waiting jobs will be stolen from slower node and moved to the fast under-utilized node.

    The design and ideas for this SPI are significantly influenced by Java Fork/Join Framework authored by Doug Lea and planned for Java 7. GridJobStealingCollisionSpi took similar concepts and applied them to the grid (as opposed to within VM support planned in Java 7).

    Quite often grids are deployed across many computers some of which will always be more powerful than others. This SPI helps you avoid jobs being stuck at a slower node, as they will be stolen by a faster node. In the following picture when Node3 becomes free, it steals Job13 and Job23 from Node1 and Node2 respectively.

    Note that this SPI must always be used in conjunction with JobStealingFailoverSpi. Also note that job metrics update should be enabled in order for this SPI to work properly (i.e. IgniteConfiguration#getMetricsUpdateFrequency() should be set to positive value). The responsibility of Job Stealing Failover SPI is to properly route stolen jobs to the nodes that initially requested (stole) these jobs. The SPI maintains a counter of how many times a jobs was stolen and hence traveled to another node. JobStealingCollisionSpi checks this counter and will not allow a job to be stolen if this counter exceeds a certain threshold setMaximumStealingAttempts(int).

    Configuration

    In order to use this SPI, you should configure your grid instance to use JobStealingCollisionSpi either from Spring XML file or directly. The following configuration parameters are supported:

    Mandatory

    This SPI has no mandatory configuration parameters.

    Optional

    The following configuration parameters are optional: Below is example of configuring this SPI from Java code:
     JobStealingCollisionSpi spi = new JobStealingCollisionSpi();
    
     // Configure number of waiting jobs
     // in the queue for job stealing.
     spi.setWaitJobsThreshold(10);
    
     // Configure message expire time (in milliseconds).
     spi.setMessageExpireTime(500);
    
     // Configure stealing attempts number.
     spi.setMaximumStealingAttempts(10);
    
     // Configure number of active jobs that are allowed to execute
     // in parallel. This number should usually be equal to the number
     // of threads in the pool (default is 100).
     spi.setActiveJobsThreshold(50);
    
     // Enable stealing.
     spi.setStealingEnabled(true);
    
     // Set stealing attribute to steal from/to nodes that have it.
     spi.setStealingAttributes(Collections.singletonMap("node.segment", "foobar"));
    
     IgniteConfiguration cfg = new IgniteConfiguration();
    
     // Override default Collision SPI.
     cfg.setCollisionSpi(spi);
     
    Here is an example of how this SPI can be configured from Spring XML configuration:
     <property name="collisionSpi">
         <bean class="org.apache.ignite.spi.collision.jobstealing.JobStealingCollisionSpi">
             <property name="activeJobsThreshold" value="100"/>
             <property name="waitJobsThreshold" value="0"/>
             <property name="messageExpireTime" value="1000"/>
             <property name="maximumStealingAttempts" value="10"/>
             <property name="stealingEnabled" value="true"/>
             <property name="stealingAttributes">
                 <map>
                     <entry key="node.segment" value="foobar"/>
                 </map>
             </property>
         </bean>
     </property>
     


    For information about Spring framework visit www.springframework.org

    • Field Detail

      • DFLT_MAX_STEALING_ATTEMPTS

        public static final int DFLT_MAX_STEALING_ATTEMPTS
        Maximum number of attempts to steal job by another node (default is 5).
        See Also:
        Constant Field Values
      • DFLT_ACTIVE_JOBS_THRESHOLD

        public static final int DFLT_ACTIVE_JOBS_THRESHOLD
        Default number of parallel jobs allowed (value is 95 which is slightly less same as default value of threads in the execution thread pool to allow some extra threads for system processing).
        See Also:
        Constant Field Values
      • DFLT_MSG_EXPIRE_TIME

        public static final long DFLT_MSG_EXPIRE_TIME
        Default steal message expire time in milliseconds (value is 1000). Once this time is elapsed and no response for steal message is received, the message is considered lost and another steal message will be generated, potentially to another node.
        See Also:
        Constant Field Values
      • DFLT_WAIT_JOBS_THRESHOLD

        public static final int DFLT_WAIT_JOBS_THRESHOLD
        Default threshold of waiting jobs. If number of waiting jobs exceeds this threshold, then waiting jobs will become available to be stolen (value is 0).
        See Also:
        Constant Field Values
      • DFLT_JOB_PRIORITY

        public static final int DFLT_JOB_PRIORITY
        Default start value for job priority (value is 0).
        See Also:
        Constant Field Values
      • THIEF_NODE_ATTR

        public static final String THIEF_NODE_ATTR
        Job context attribute for storing thief node UUID (this attribute is used in job stealing failover SPI).
        See Also:
        Constant Field Values
      • WAIT_JOBS_THRESHOLD_NODE_ATTR

        public static final String WAIT_JOBS_THRESHOLD_NODE_ATTR
        Threshold of maximum jobs on waiting queue.
        See Also:
        Constant Field Values
      • ACTIVE_JOBS_THRESHOLD_NODE_ATTR

        public static final String ACTIVE_JOBS_THRESHOLD_NODE_ATTR
        Threshold of maximum jobs executing concurrently.
        See Also:
        Constant Field Values
      • STEALING_ATTEMPT_COUNT_ATTR

        public static final String STEALING_ATTEMPT_COUNT_ATTR
        Name of job context attribute containing current stealing attempt count. This count is incremented every time the same job gets stolen for execution.
        See Also:
        ComputeJobContext, Constant Field Values
      • MAX_STEALING_ATTEMPT_ATTR

        public static final String MAX_STEALING_ATTEMPT_ATTR
        Maximum stealing attempts attribute name.
        See Also:
        Constant Field Values
      • MSG_EXPIRE_TIME_ATTR

        public static final String MSG_EXPIRE_TIME_ATTR
        Stealing request expiration time attribute name.
        See Also:
        Constant Field Values
      • STEALING_PRIORITY_ATTR

        public static final String STEALING_PRIORITY_ATTR
        Stealing priority attribute name.
        See Also:
        Constant Field Values
    • Constructor Detail

      • JobStealingCollisionSpi

        public JobStealingCollisionSpi()
    • Method Detail

      • setActiveJobsThreshold

        @IgniteSpiConfiguration(optional=true)
        public JobStealingCollisionSpi setActiveJobsThreshold​(int activeJobsThreshold)
        Sets number of jobs that can be executed in parallel.
        Parameters:
        activeJobsThreshold - Number of jobs that can be executed in parallel.
        Returns:
        this for chaining.
      • getActiveJobsThreshold

        public int getActiveJobsThreshold()
        Returns:
        Number of jobs that can be executed in parallel.
      • setWaitJobsThreshold

        @IgniteSpiConfiguration(optional=true)
        public JobStealingCollisionSpi setWaitJobsThreshold​(int waitJobsThreshold)
        Sets job count threshold at which this node will start stealing jobs from other nodes.
        Parameters:
        waitJobsThreshold - Job count threshold.
        Returns:
        this for chaining.
      • setMessageExpireTime

        @IgniteSpiConfiguration(optional=true)
        public JobStealingCollisionSpi setMessageExpireTime​(long msgExpireTime)
        Message expire time configuration parameter. If no response is received from a busy node to a job stealing message, then implementation will assume that message never got there, or that remote node does not have this node included into topology of any of the jobs it has.
        Parameters:
        msgExpireTime - Message expire time.
        Returns:
        this for chaining.
      • setStealingEnabled

        @IgniteSpiConfiguration(optional=true)
        public JobStealingCollisionSpi setStealingEnabled​(boolean isStealingEnabled)
        Gets flag indicating whether this node should attempt to steal jobs from other nodes. If false, then this node will steal allow jobs to be stolen from it, but won't attempt to steal any jobs from other nodes.

        Default value is true.

        Parameters:
        isStealingEnabled - Flag indicating whether this node should attempt to steal jobs from other nodes.
        Returns:
        this for chaining.
      • isStealingEnabled

        public boolean isStealingEnabled()
        Returns:
        Flag indicating whether this node should attempt to steal jobs from other nodes.
      • setMaximumStealingAttempts

        @IgniteSpiConfiguration(optional=true)
        public JobStealingCollisionSpi setMaximumStealingAttempts​(int maxStealingAttempts)
        Gets maximum number of attempts to steal job by another node. If not specified, DFLT_MAX_STEALING_ATTEMPTS value will be used.
        Parameters:
        maxStealingAttempts - Maximum number of attempts to steal job by another node.
        Returns:
        this for chaining.
      • getMaximumStealingAttempts

        public int getMaximumStealingAttempts()
        Returns:
        Maximum number of attempts to steal job by another node.
      • getCurrentRunningJobsNumber

        public int getCurrentRunningJobsNumber()
        Gets number of currently running (not 'held) jobs.
        Returns:
        Number of currently running (not 'held) jobs.
      • getCurrentHeldJobsNumber

        public int getCurrentHeldJobsNumber()
        Gets number of currently 'held' jobs.
        Returns:
        Number of currently 'held' jobs.
      • getCurrentWaitJobsNumber

        public int getCurrentWaitJobsNumber()
        Gets current number of jobs that wait for the execution.
        Returns:
        Number of jobs that wait for execution.
      • getCurrentActiveJobsNumber

        public int getCurrentActiveJobsNumber()
        Gets current number of jobs that are being executed.
        Returns:
        Number of active jobs.
      • getTotalStolenJobsNumber

        public int getTotalStolenJobsNumber()
        Gets total number of stolen jobs.
        Returns:
        Number of stolen jobs.
      • getCurrentJobsToStealNumber

        public int getCurrentJobsToStealNumber()
        Gets current number of jobs to be stolen. This is outstanding requests number.
        Returns:
        Number of jobs to be stolen.
      • spiStart

        public void spiStart​(String igniteInstanceName)
                      throws IgniteSpiException
        This method is called to start SPI. After this method returns successfully kernel assumes that SPI is fully operational.
        Specified by:
        spiStart in interface IgniteSpi
        Parameters:
        igniteInstanceName - Name of Ignite instance this SPI is being started for (null for default Ignite instance).
        Throws:
        IgniteSpiException - Throws in case of any error during SPI start.
      • spiStop

        public void spiStop()
                     throws IgniteSpiException
        This method is called to stop SPI. After this method returns kernel assumes that this SPI is finished and all resources acquired by it are released.

        Note that this method can be called at any point including during recovery of failed start. It should make no assumptions on what state SPI will be in when this method is called.

        Specified by:
        spiStop in interface IgniteSpi
        Throws:
        IgniteSpiException - Thrown in case of any error during SPI stop.
      • setExternalCollisionListener

        public void setExternalCollisionListener​(CollisionExternalListener extLsnr)
        Listener to be set for notification of external collision events (e.g. job stealing). Once grid receives such notification, it will immediately invoke collision SPI.

        Ignite uses this listener to enable job stealing from overloaded to underloaded nodes. However, you can also utilize it, for instance, to provide time based collision resolution. To achieve this, you most likely would mark some job by setting a certain attribute in job context (see ComputeJobContext) for a job that requires time-based scheduling and set some timer in your SPI implementation that would wake up after a certain period of time. Once this period is reached, you would notify this listener that a collision resolution should take place. Then inside of your collision resolution logic, you would find the marked waiting job and activate it.

        Note that most collision SPI's might not have external collisions. In that case, they should simply ignore this method and do nothing when listener is set.

        Specified by:
        setExternalCollisionListener in interface CollisionSpi
        Parameters:
        extLsnr - Listener for external collision events.
      • onContextDestroyed0

        public void onContextDestroyed0()
        Method to be called in the beginning of onContextDestroyed() method.
        Overrides:
        onContextDestroyed0 in class IgniteSpiAdapter
      • onCollision

        public void onCollision​(CollisionContext ctx)
        This is a callback called: When new job arrives it is added to the end of the wait list and this method is called. When job finished its execution, it is removed from the active list and this method is called (i.e., when grid job is finished it will not appear in any list in collision resolution).

        Implementation of this method should act on all lists, each of which contains collision job contexts that define a set of operations available during collision resolution. Refer to CollisionContext and CollisionJobContext documentation for more information.

        Specified by:
        onCollision in interface CollisionSpi
        Parameters:
        ctx - Collision context which contains all collision lists.
      • getConsistentAttributeNames

        protected List<String> getConsistentAttributeNames()
        Returns back a list of attributes that should be consistent for this SPI. Consistency means that remote node has to have the same attribute with the same value.
        Overrides:
        getConsistentAttributeNames in class IgniteSpiAdapter
        Returns:
        List or attribute names.