Class PriorityQueueCollisionSpi
- java.lang.Object
- 
- org.apache.ignite.spi.IgniteSpiAdapter
- 
- org.apache.ignite.spi.collision.priorityqueue.PriorityQueueCollisionSpi
 
 
- 
- All Implemented Interfaces:
- CollisionSpi,- IgniteSpi
 
 @IgniteSpiMultipleInstancesSupport(true) @IgniteSpiConsistencyChecked(optional=true) public class PriorityQueueCollisionSpi extends IgniteSpiAdapter implements CollisionSpi This class provides implementation for Collision SPI based on priority queue. Jobs are first ordered by their priority, if one is specified, and only firstgetParallelJobsNumber()jobs is allowed to execute in parallel. Other jobs will be queued up.ConfigurationMandatoryThis SPI has no mandatory configuration parameters.OptionalThis SPI has following optional configuration parameters:- 
      Number of jobs that can be executed in parallel (see setParallelJobsNumber(int)). This number should usually be set to no greater than number of threads in the execution thread pool.
- 
      Priority attribute session key (see getPriorityAttributeKey()). Prior to returning fromComputeTask.map(List, Object)method, task implementation should set a value into the task session keyed by this attribute key. SeeComputeTaskSessionfor more information about task session.
- 
      Priority attribute job context key (see getJobPriorityAttributeKey()). It is used for specifying job priority. SeeComputeJobContextfor more information about job context.
- Default priority value (see getDefaultPriority()). It is used when no priority is set.
- 
      Default priority increase value (see getStarvationIncrement()). It is used for increasing priority when job gets bumped down. This future is used for preventing starvation waiting jobs execution.
- 
      Default increasing priority flag value (see isStarvationPreventionEnabled()). It is used for enabling increasing priority when job gets bumped down. This future is used for preventing starvation waiting jobs execution.
 PriorityQueueCollisionSpi colSpi = new PriorityQueueCollisionSpi(); // Execute all jobs sequentially by setting parallel job number to 1. colSpi.setParallelJobsNumber(1); IgniteConfiguration cfg = new IgniteConfiguration(); // Override default collision SPI. cfg.setCollisionSpi(colSpi); // Starts grid. G.start(cfg); Here is Spring XML configuration example:<property name="collisionSpi"> <bean class="org.apache.ignite.spi.collision.priorityqueue.PriorityQueueCollisionSpi"> <property name="priorityAttributeKey" value="myPriorityAttributeKey"/> <property name="parallelJobsNumber" value="10"/> </bean> </property>Coding ExampleHere is an example of a grid tasks that uses priority collision SPI configured in example above. Note that priority collision resolution is absolutely transparent to the user and is simply a matter of proper grid configuration. Also, priority may be defined only for task (it can be defined within the task, not at a job level). All split jobs will be started with priority declared in their owner task.This example demonstrates how urgent task may be declared with a higher priority value. Priority SPI guarantees (see its configuration in example above, where number of parallel jobs is set to 1) that all jobs fromMyGridUrgentTaskwill most likely be activated first (one by one) and jobs fromMyGridUsualTaskwith lowest priority will wait. Once higher priority jobs complete, lower priority jobs will be scheduled.public class MyGridUsualTask extends ComputeTaskSplitAdapter<Object, Object> { public static final int SPLIT_COUNT = 20; @TaskSessionResource private ComputeTaskSession taskSes; @Override protected Collection<? extends ComputeJob> split(int gridSize, Object arg) throws IgniteCheckedException { ... // Set low task priority (note that attribute name is used by the SPI // and should not be changed). taskSes.setAttribute("grid.task.priority", 5); Collection<ComputeJob> jobs = new ArrayList<ComputeJob>(SPLIT_COUNT); for (int i = 1; i <= SPLIT_COUNT; i++) { jobs.add(new ComputeJobAdapter<Integer>(i) { ... }); } ... } }andpublic class MyGridUrgentTask extends ComputeTaskSplitAdapter<Object, Object> { public static final int SPLIT_COUNT = 5; @TaskSessionResource private ComputeTaskSession taskSes; @Override protected Collection<? extends ComputeJob> split(int gridSize, Object arg) throws IgniteCheckedException { ... // Set high task priority (note that attribute name is used by the SPI // and should not be changed). taskSes.setAttribute("grid.task.priority", 10); Collection<ComputeJob> jobs = new ArrayList<ComputeJob>(SPLIT_COUNT); for (int i = 1; i <= SPLIT_COUNT; i++) { jobs.add(new ComputeJobAdapter<Integer>(i) { ... }); } ... } }  
 For information about Spring framework visit www.springframework.org
- 
- 
Field SummaryFields Modifier and Type Field Description static StringDFLT_JOB_PRIORITY_ATTRIBUTE_KEYDefault job priority attribute key (value isgrid.job.priority).static intDFLT_PARALLEL_JOBS_NUMDefault number of parallel jobs allowed (set to number of cores times 2).static booleanDFLT_PREVENT_STARVATION_ENABLEDDefault flag for preventing starvation of lower priority jobs.static intDFLT_PRIORITYDefault priority that will be assigned if job does not have a priority attribute set (value is0).static StringDFLT_PRIORITY_ATTRIBUTE_KEYDefault priority attribute key (value isgrid.task.priority).static intDFLT_STARVATION_INCREMENTDefault value on which job priority will be increased every time when job gets bumped down.static intDFLT_WAIT_JOBS_NUMDefault waiting jobs number.- 
Fields inherited from class org.apache.ignite.spi.IgniteSpiAdapterignite, igniteInstanceName
 
- 
 - 
Constructor SummaryConstructors Constructor Description PriorityQueueCollisionSpi()
 - 
Method SummaryAll Methods Instance Methods Concrete Methods Modifier and Type Method Description protected List<String>getConsistentAttributeNames()Returns back a list of attributes that should be consistent for this SPI.intgetCurrentActiveJobsNumber()Gets current number of jobs that are active, i.e.intgetCurrentHeldJobsNumber()Gets number of currently'held'jobs.intgetCurrentRunningJobsNumber()Gets number of currently running (not'held) jobs.intgetCurrentWaitJobsNumber()Gets current number of jobs that wait for the execution.intgetDefaultPriority()Gets default priority to use if a job does not have priority attribute set.StringgetJobPriorityAttributeKey()Gets key name of job priority attribute.Map<String,Object>getNodeAttributes()This method is called before SPI starts (before methodIgniteSpi.spiStart(String)is called).intgetParallelJobsNumber()Gets number of jobs that can be executed in parallel.StringgetPriorityAttributeKey()Gets key name of task priority attribute.intgetStarvationIncrement()Gets value to increment job priority by every time a lower priority job gets behind a higher priority job.intgetWaitingJobsNumber()Maximum number of jobs that are allowed to wait in waiting queue.booleanisStarvationPreventionEnabled()Gets flag indicating whether job starvation prevention is enabled.voidonCollision(CollisionContext ctx)This is a callback called: new grid job arrived executing job finished its execution topology changed periodically (onEventType.EVT_NODE_METRICS_UPDATED) When new job arrives it is added to the end of the wait list and this method is called.PriorityQueueCollisionSpisetDefaultPriority(int priority)Sets default priority to use if a job does not have priority attribute set.voidsetExternalCollisionListener(CollisionExternalListener lsnr)Listener to be set for notification of external collision events (e.g. job stealing).PriorityQueueCollisionSpisetJobPriorityAttributeKey(String jobPriAttrKey)Sets job priority attribute key.PriorityQueueCollisionSpisetName(String name)Sets SPI name.PriorityQueueCollisionSpisetParallelJobsNumber(int parallelJobsNum)Sets number of jobs that can be executed in parallel.PriorityQueueCollisionSpisetPriorityAttributeKey(String taskPriAttrKey)Sets task priority attribute key.PriorityQueueCollisionSpisetStarvationIncrement(int starvationInc)Sets value to increment job priority by every time a lower priority job gets behind a higher priority job.PriorityQueueCollisionSpisetStarvationPreventionEnabled(boolean preventStarvation)Sets flag indicating whether job starvation prevention is enabled.PriorityQueueCollisionSpisetWaitingJobsNumber(int waitJobsNum)Maximum number of jobs that are allowed to wait in waiting queue.voidspiStart(String igniteInstanceName)This method is called to start SPI.voidspiStop()This method is called to stop SPI.StringtoString()- 
Methods inherited from class org.apache.ignite.spi.IgniteSpiAdapteraddTimeoutObject, assertParameter, checkConfigurationConsistency0, clientFailureDetectionTimeout, configInfo, createSpiAttributeName, failureDetectionTimeout, failureDetectionTimeoutEnabled, failureDetectionTimeoutEnabled, getExceptionRegistry, getLocalNode, getName, getSpiContext, ignite, initFailureDetectionTimeout, injectables, injectResources, isNodeStopping, onBeforeStart, onClientDisconnected, onClientReconnected, onContextDestroyed, onContextDestroyed0, onContextInitialized, onContextInitialized0, registerMBean, removeTimeoutObject, started, startInfo, startStopwatch, stopInfo, unregisterMBean
 - 
Methods inherited from class java.lang.Objectclone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 - 
Methods inherited from interface org.apache.ignite.spi.IgniteSpigetName, onClientDisconnected, onClientReconnected, onContextDestroyed, onContextInitialized
 
- 
 
- 
- 
- 
Field Detail- 
DFLT_PARALLEL_JOBS_NUMpublic static final int DFLT_PARALLEL_JOBS_NUM Default number of parallel jobs allowed (set to number of cores times 2).
 - 
DFLT_WAIT_JOBS_NUMpublic static final int DFLT_WAIT_JOBS_NUM Default waiting jobs number. If number of waiting jobs exceed this number, jobs will be rejected. Default value isInteger.MAX_VALUE.- See Also:
- Constant Field Values
 
 - 
DFLT_PRIORITY_ATTRIBUTE_KEYpublic static final String DFLT_PRIORITY_ATTRIBUTE_KEY Default priority attribute key (value isgrid.task.priority).- See Also:
- Constant Field Values
 
 - 
DFLT_JOB_PRIORITY_ATTRIBUTE_KEYpublic static final String DFLT_JOB_PRIORITY_ATTRIBUTE_KEY Default job priority attribute key (value isgrid.job.priority).- See Also:
- Constant Field Values
 
 - 
DFLT_PRIORITYpublic static final int DFLT_PRIORITY Default priority that will be assigned if job does not have a priority attribute set (value is0).- See Also:
- Constant Field Values
 
 - 
DFLT_STARVATION_INCREMENTpublic static final int DFLT_STARVATION_INCREMENT Default value on which job priority will be increased every time when job gets bumped down.- See Also:
- Constant Field Values
 
 - 
DFLT_PREVENT_STARVATION_ENABLEDpublic static final boolean DFLT_PREVENT_STARVATION_ENABLED Default flag for preventing starvation of lower priority jobs.- See Also:
- Constant Field Values
 
 
- 
 - 
Method Detail- 
getParallelJobsNumberpublic int getParallelJobsNumber() Gets number of jobs that can be executed in parallel.- Returns:
- Number of jobs that can be executed in parallel.
 
 - 
setParallelJobsNumber@IgniteSpiConfiguration(optional=true) public PriorityQueueCollisionSpi setParallelJobsNumber(int parallelJobsNum) Sets number of jobs that can be executed in parallel.- Parameters:
- parallelJobsNum- Parallel jobs number.
- Returns:
- thisfor chaining.
 
 - 
getWaitingJobsNumberpublic int getWaitingJobsNumber() Maximum number of jobs that are allowed to wait in waiting queue. If number of waiting jobs ever exceeds this number, excessive jobs will be rejected.- Returns:
- Maximum allowed number of waiting jobs.
 
 - 
setWaitingJobsNumber@IgniteSpiConfiguration(optional=true) public PriorityQueueCollisionSpi setWaitingJobsNumber(int waitJobsNum) Maximum number of jobs that are allowed to wait in waiting queue. If number of waiting jobs ever exceeds this number, excessive jobs will be rejected.- Parameters:
- waitJobsNum- Maximium jobs number.
- Returns:
- thisfor chaining.
 
 - 
getCurrentWaitJobsNumberpublic int getCurrentWaitJobsNumber() Gets current number of jobs that wait for the execution.- Returns:
- Number of jobs that wait for execution.
 
 - 
getCurrentActiveJobsNumberpublic int getCurrentActiveJobsNumber() Gets current number of jobs that are active, i.e.'running + held'jobs.- Returns:
- Number of active jobs.
 
 - 
getCurrentRunningJobsNumberpublic int getCurrentRunningJobsNumber() Gets number of currently running (not'held) jobs.- Returns:
- Number of currently running (not 'held) jobs.
 
 - 
getCurrentHeldJobsNumberpublic int getCurrentHeldJobsNumber() Gets number of currently'held'jobs.- Returns:
- Number of currently 'held'jobs.
 
 - 
setPriorityAttributeKey@IgniteSpiConfiguration(optional=true) public PriorityQueueCollisionSpi setPriorityAttributeKey(String taskPriAttrKey) Sets task priority attribute key. This key will be used to look up task priorities from task context (seeComputeTaskSession.getAttribute(Object)).If not provided, default value is {@link #DFLT_PRIORITY_ATTRIBUTE_KEY}.- Parameters:
- taskPriAttrKey- Priority session attribute key.
- Returns:
- thisfor chaining.
 
 - 
setJobPriorityAttributeKey@IgniteSpiConfiguration(optional=true) public PriorityQueueCollisionSpi setJobPriorityAttributeKey(String jobPriAttrKey) Sets job priority attribute key. This key will be used to look up job priorities from job context (seeComputeJobContext.getAttribute(Object)).If not provided, default value is {@link #DFLT_JOB_PRIORITY_ATTRIBUTE_KEY}.- Parameters:
- jobPriAttrKey- Job priority attribute key.
- Returns:
- thisfor chaining.
 
 - 
getPriorityAttributeKeypublic String getPriorityAttributeKey() Gets key name of task priority attribute.- Returns:
- Key name of task priority attribute.
 
 - 
getJobPriorityAttributeKeypublic String getJobPriorityAttributeKey() Gets key name of job priority attribute.- Returns:
- Key name of job priority attribute.
 
 - 
getDefaultPrioritypublic int getDefaultPriority() Gets default priority to use if a job does not have priority attribute set.- Returns:
- Default priority to use if a task does not have priority attribute set.
 
 - 
setDefaultPriority@IgniteSpiConfiguration(optional=true) public PriorityQueueCollisionSpi setDefaultPriority(int priority) Sets default priority to use if a job does not have priority attribute set.- Parameters:
- priority- default priority.
- Returns:
- thisfor chaining.
 
 - 
getStarvationIncrementpublic int getStarvationIncrement() Gets value to increment job priority by every time a lower priority job gets behind a higher priority job.- Returns:
- Value to increment job priority by every time a lower priority job gets behind a higher priority job.
 
 - 
setStarvationIncrement@IgniteSpiConfiguration(optional=true) public PriorityQueueCollisionSpi setStarvationIncrement(int starvationInc) Sets value to increment job priority by every time a lower priority job gets behind a higher priority job.- Parameters:
- starvationInc- Increment value.
- Returns:
- thisfor chaining.
 
 - 
isStarvationPreventionEnabledpublic boolean isStarvationPreventionEnabled() Gets flag indicating whether job starvation prevention is enabled.- Returns:
- Flag indicating whether job starvation prevention is enabled.
 
 - 
setStarvationPreventionEnabled@IgniteSpiConfiguration(optional=true) public PriorityQueueCollisionSpi setStarvationPreventionEnabled(boolean preventStarvation) Sets flag indicating whether job starvation prevention is enabled.- Parameters:
- preventStarvation- Flag indicating whether job starvation prevention is enabled.
- Returns:
- thisfor chaining.
 
 - 
getNodeAttributespublic Map<String,Object> getNodeAttributes() throws IgniteSpiException This method is called before SPI starts (before methodIgniteSpi.spiStart(String)is called). It allows SPI implementation to add attributes to a local node. Kernal collects these attributes from all SPI implementations loaded up and then passes it to discovery SPI so that they can be exchanged with other nodes.- Specified by:
- getNodeAttributesin interface- IgniteSpi
- Overrides:
- getNodeAttributesin class- IgniteSpiAdapter
- Returns:
- Map of local node attributes this SPI wants to add.
- Throws:
- IgniteSpiException- Throws in case of any error.
 
 - 
spiStartpublic void spiStart(String igniteInstanceName) throws IgniteSpiException This method is called to start SPI. After this method returns successfully kernel assumes that SPI is fully operational.- Specified by:
- spiStartin interface- IgniteSpi
- Parameters:
- igniteInstanceName- Name of Ignite instance this SPI is being started for (- nullfor default Ignite instance).
- Throws:
- IgniteSpiException- Throws in case of any error during SPI start.
 
 - 
spiStoppublic void spiStop() throws IgniteSpiExceptionThis method is called to stop SPI. After this method returns kernel assumes that this SPI is finished and all resources acquired by it are released.Note that this method can be called at any point including during recovery of failed start. It should make no assumptions on what state SPI will be in when this method is called. - Specified by:
- spiStopin interface- IgniteSpi
- Throws:
- IgniteSpiException- Thrown in case of any error during SPI stop.
 
 - 
setExternalCollisionListenerpublic void setExternalCollisionListener(CollisionExternalListener lsnr) Listener to be set for notification of external collision events (e.g. job stealing). Once grid receives such notification, it will immediately invoke collision SPI.Ignite uses this listener to enable job stealing from overloaded to underloaded nodes. However, you can also utilize it, for instance, to provide time based collision resolution. To achieve this, you most likely would mark some job by setting a certain attribute in job context (see ComputeJobContext) for a job that requires time-based scheduling and set some timer in your SPI implementation that would wake up after a certain period of time. Once this period is reached, you would notify this listener that a collision resolution should take place. Then inside of your collision resolution logic, you would find the marked waiting job and activate it.Note that most collision SPI's might not have external collisions. In that case, they should simply ignore this method and do nothing when listener is set. - Specified by:
- setExternalCollisionListenerin interface- CollisionSpi
- Parameters:
- lsnr- Listener for external collision events.
 
 - 
onCollisionpublic void onCollision(CollisionContext ctx) This is a callback called:- new grid job arrived
- executing job finished its execution
- topology changed
- periodically (on EventType.EVT_NODE_METRICS_UPDATED)
 Implementation of this method should act on all lists, each of which contains collision job contexts that define a set of operations available during collision resolution. Refer to CollisionContextandCollisionJobContextdocumentation for more information.- Specified by:
- onCollisionin interface- CollisionSpi
- Parameters:
- ctx- Collision context which contains all collision lists.
 
 - 
getConsistentAttributeNamesprotected List<String> getConsistentAttributeNames() Returns back a list of attributes that should be consistent for this SPI. Consistency means that remote node has to have the same attribute with the same value.- Overrides:
- getConsistentAttributeNamesin class- IgniteSpiAdapter
- Returns:
- List or attribute names.
 
 - 
setNamepublic PriorityQueueCollisionSpi setName(String name) Sets SPI name.- Overrides:
- setNamein class- IgniteSpiAdapter
- Parameters:
- name- SPI name.
- Returns:
- thisfor chaining.
 
 
- 
 
-