Name

global.SLAAsyncDelegatorSNC

Description

No description available

Script

var SLAAsyncDelegatorSNC = Class.create();
SLAAsyncDelegatorSNC.prototype = {
  SLA_ASYNC_MUTEX_NAME: 'SLAAsyncDelegator',
  SLA_ASYNC_JOB_LIMIT: 'com.snc.sla.async.job.limit',
  SLA_ASYNC_JOB_PRIORITY: 'com.snc.sla.async.job.priority',
  SLA_ASYNC_JOB_CLAIM_LIMIT_MIN: 'com.snc.sla.async.job.claim_limit.min',
  SLA_ASYNC_JOB_CLAIM_LIMIT_MAX: 'com.snc.sla.async.job.claim_limit.max',
  SLA_ASYNC_DELEGATOR_LOG: 'com.snc.sla.async.delegator.log',

  DEFAULT_JOB_PRIORITY: 100,
  DEFAULT_JOB_LIMIT: 4,
  DEFAULT_JOB_CLAIM_LIMIT_MIN: 20,
  DEFAULT_JOB_CLAIM_LIMIT_MAX: 20,
  DEFAULT_READY_QUERY_LIMIT: 1000,

  initialize: function() {
  	this.jobs = [];
  	this.jobData = null;
  	this.belowMinJobNumber = null;
  	this.availableJobNumbers = [];
  	this._initJobLimits();
  	this.lu = new GSLog(this.SLA_ASYNC_DELEGATOR_LOG, this.type);
  	this.lu.includeTimestamp();
  },

  process: function() {
  	var startTime;
  	var endTime;

  	if (this.lu.atLevel(GSLog.DEBUG)) {
  		this.lu.logDebug("process: starting");
  		startTime = new GlideDateTime();
  	}

  	this.jobData = this._getJobData();

  	var slaAsyncJobs = this.jobData.slaAsyncJobs;
  	var existingJobCount = Object.keys(slaAsyncJobs).length;

  	if (existingJobCount >= this.jobLimit) {
  		if (this.lu.atLevel(GSLog.WARN))
  			this.lu.logWarn("process: there are already " + existingJobCount + " jobs running to process the records in the \"" +
  							SLAAsyncSNC.SLA_ASYNC_QUEUE + " table (Job limit is: " + this.jobLimit + ")");

  		return;
  	}

  	if (this.lu.atLevel(GSLog.DEBUG))
  		this.lu.logDebug("process: data retrieved for currently running jobs:\n" + JSON.stringify(this.jobData));

  	this.jobLimit = this.jobLimit - existingJobCount;

  	// Get the query for all SLA Async records that are ready to be given to jobs for processing
  	if (this.lu.atLevel(GSLog.DEBUG))
  		this.lu.logDebug("process: query \"" + SLAAsyncSNC.SLA_ASYNC_QUEUE + "\" table for records that are ready for processing:\n");

  	var slaAsyncQueueGr = this._getReadyQueue();
  	if (slaAsyncQueueGr.hasNext())
  		SelfCleaningMutex.enterCriticalSectionRecordInStats(this.SLA_ASYNC_MUTEX_NAME, this.SLA_ASYNC_MUTEX_NAME, this, this._process, slaAsyncQueueGr);
  	else if (this.lu.atLevel(GSLog.DEBUG))
  		this.lu.logDebug("process: thare are no ready records to be delegated");

  	if (this.lu.atLevel(GSLog.DEBUG)) {
  		endTime = new GlideDateTime();
  		this.lu.logDebug("process: finished - processing duration " + (endTime.getNumericValue() - startTime.getNumericValue()) + "ms");
  	}
  },

  _process: function(slaAsyncQueueGr) {
  	var jobNumber;
  	var currentJobNumber;
  	var jobQueue;
  	var queuedTasks = {};

  	// We run the query again just in case another process acquired the mutex before us and things have changed...
  	slaAsyncQueueGr.query();

  	if (!slaAsyncQueueGr.hasNext()) {
  		if (this.lu.atLevel(GSLog.DEBUG))
  			this.lu.logDebug("_process: thare are no ready records to be delegated");

  		return;
  	}

  	while (slaAsyncQueueGr.next()) {
  		var documentId = slaAsyncQueueGr.getValue("document_id");
  		// This task is already being processed or in the queue of a job that is already running so we have to leave it alone
  		if (this.jobData.queuedTasks[documentId]) {
  			if (this.lu.atLevel(GSLog.INFO))
  				this.lu.logInfo("_process: record " + slaAsyncQueueGr.getDisplayValue() + " and sequence " + slaAsyncQueueGr.getValue("sequence") +
  								" cannot be queued as it is currently being processed or in the queue of an existing job");
  			continue;
  		}

  		jobNumber = null;

  		// If we've already allocated this task to a job number then give it to the same job if it's not reached the maximum
  		currentJobNumber = queuedTasks[documentId];
  		if (currentJobNumber !== undefined) {
  			if (this.availableJobNumbers.indexOf(currentJobNumber) >= 0) {
  				jobNumber = currentJobNumber;
  				if (this.lu.atLevel(GSLog.DEBUG))
  					this.lu.logDebug("_process: documentId " + documentId + " already has updates allocated to job number " + jobNumber);
  			} else {
  				if (this.lu.atLevel(GSLog.INFO))
  					this.lu.logInfo("_process: queued record for " + slaAsyncQueueGr.getDisplayValue() + " and sequence " + slaAsyncQueueGr.getValue("sequence") +
  									" cannot be delegated as the job allocated for this task has reached the maximum");
  			}
  		} else {
  			jobNumber = this._getNextJobNumber();
  			if (jobNumber === null) {
  				if (this.lu.atLevel(GSLog.INFO))
  					this.lu.logInfo("_process: there are no more jobs available to delegate records to for record with document_id=" +
  									documentId + " and sequence=" + slaAsyncQueueGr.getValue("sequence"));
  				break;
  			}
  		}

  		// We shouldn't ever get to here with a null jobNumber but just in case
  		if (jobNumber === null)
  			continue;

  		jobQueue = this.jobs[jobNumber];
  		// Check that the job number is an array
  		if (!Array.isArray(jobQueue))
  			continue;

  		jobQueue.push(slaAsyncQueueGr.getUniqueValue());
  		queuedTasks[documentId] = jobNumber;
  		if (this.lu.atLevel(GSLog.INFO))
  			this.lu.logInfo("_process: record for " + slaAsyncQueueGr.getDisplayValue() + " and sequence " + slaAsyncQueueGr.getValue("sequence") +
  							" has been delegated to job number " + jobNumber);

  		// if this queue has reached the minimum limit remove it from the below min variable
  		if (jobQueue.length >= this.jobClaimLimitMin)
  			this.belowMinJobNumber = null;

  		// if this queue has reached the maximum limit remove it from the available jobs
  		if (jobQueue.length >= this.jobClaimLimitMax) {
  			var indexOfJobNumber = this.availableJobNumbers.indexOf(jobNumber);
  			if (indexOfJobNumber >= 0)
  				this.availableJobNumbers.splice(indexOfJobNumber, 1);
  		}
  	}

  	// Now we can create the sys_trigger records to process the queued records
  	var sysTriggerGr;
  	for (var i = 0, n = this.jobs.length; i < n; i++) {
  		sysTriggerGr = this._createTrigger();
  		if (!sysTriggerGr)
  			continue;

  		this._allocateRecordsToTrigger(sysTriggerGr, this.jobs[i]);
  		sysTriggerGr.setValue("trigger_type", "0");
  		sysTriggerGr.update();
  	}
  },

  _initJobLimits: function() {
  	this.jobLimit = parseInt(gs.getProperty(this.SLA_ASYNC_JOB_LIMIT, null), 10);
  	if (isNaN(this.jobLimit))
  		this.jobLimit = this.DEFAULT_JOB_LIMIT;

  	this.jobPriority = parseInt(gs.getProperty(this.SLA_ASYNC_JOB_PRIORITY, null), 10);
  	if (isNaN(this.jobPriority))
  		this.jobPriority = this.DEFAULT_JOB_PRIORITY;

  	this.jobClaimLimitMin = parseInt(gs.getProperty(this.SLA_ASYNC_JOB_CLAIM_LIMIT_MIN, null), 10);
  	if (isNaN(this.jobClaimLimitMin))
  		this.jobClaimLimitMin = this.DEFAULT_JOB_CLAIM_LIMIT_MIN;

  	this.jobClaimLimitMax = parseInt(gs.getProperty(this.SLA_ASYNC_JOB_CLAIM_LIMIT_MAX, null), 10);
  	if (isNaN(this.jobClaimLimitMax))
  		this.jobClaimLimitMax = this.DEFAULT_JOB_CLAIM_LIMIT_MAX;
  },

  _getJobData: function() {
  	var slaAsyncJobs = {};
  	var queuedTasks = {};

  	var slaAsyncQueueGr = new GlideAggregate(SLAAsyncSNC.SLA_ASYNC_QUEUE);
  	slaAsyncQueueGr.addQuery("state", "IN", "queued,processing");
  	slaAsyncQueueGr.groupBy("document_id");
  	slaAsyncQueueGr.groupBy("sys_trigger");
  	slaAsyncQueueGr.query();

  	var slaAsyncJob;
  	var slaAsyncJobId;
  	while (slaAsyncQueueGr.next()) {
  		slaAsyncJobId = slaAsyncQueueGr.getValue("sys_trigger");
  		queuedTasks[slaAsyncQueueGr.getValue("document_id")] = slaAsyncJobId;
  		slaAsyncJobs[slaAsyncJobId] = true;
  	}

  	return {slaAsyncJobs: slaAsyncJobs, queuedTasks: queuedTasks};
  },

  /* Queries the queue table for records that need to be allocated to jobs excluding any records
     for Tasks that already have records allocated to jobs for processing
     The "setLimit" is there to prevent a slow query in the event that the queue table contains
     a large number of ready records
  */
  _getReadyQueue: function() {
  	var queryLimit = this._getReadyQueueQueryLimit();

  	var slaAsyncQueueGr = new GlideRecord(SLAAsyncSNC.SLA_ASYNC_QUEUE);
  	slaAsyncQueueGr.addQuery("state", "ready");
  	slaAsyncQueueGr.addNullQuery("sys_trigger");
  	slaAsyncQueueGr.addQuery("document_id", "NOT IN", Object.keys(this.jobData.queuedTasks));
  	slaAsyncQueueGr.orderBy("sequence");
  	slaAsyncQueueGr.setLimit(queryLimit);
  	slaAsyncQueueGr.query();

  	if (this.lu.atLevel(GSLog.DEBUG))
  		this.lu.logDebug("_getReadyQueue: \"" + SLAAsyncSNC.SLA_ASYNC_QUEUE + "\" queried with limit " + queryLimit);

  	return slaAsyncQueueGr;
  },

  /* Provides the limit on the number of records queried from the SLA async queue
     Calculated as the number of jobs multiplied by the number of records that can be allocated to each job.
     The additional multiplication is to allow for a situation where we have a number of records all for the same
     Task - by doubling the query limit we give the query more of a chance to return some records that we can
     delegate out for processing
  */
  _getReadyQueueQueryLimit: function() {
  	var queryLimit = this.jobLimit * this.jobClaimLimitMax * 2;
  	if (isNaN(queryLimit))
  		return this.DEFAULT_READY_QUERY_LIMIT;
  	
  	return queryLimit;
  },

  _getNextJobNumber: function() {
  	// if we've got a job that is below the minimum queue limit use it
  	if (this.belowMinJobNumber !== null)
  		return this.belowMinJobNumber;

  	// If we haven't reached the limit of jobs, create a new one and return it
  	if (this.jobs.length < this.jobLimit) {
  		this.jobs.push([]);
  		this.availableJobNumbers.push(this.jobs.length - 1);
  		this.belowMinJobNumber = this.jobs.length - 1;
  		return this.belowMinJobNumber;
  	}

  	if (this.availableJobNumbers.length === 0)
  		return null;

  	// Otherwise, from our available jobs, find the one with the least records allocated to it
  	var jobNumber = null;
  	var availableJobNumber;
  	var minQueueLength = Infinity;
  	var queueLength;
  	for (var i = 0, n = this.availableJobNumbers.length; i < n; i++) {
  		availableJobNumber = this.availableJobNumbers[i];
  		queueLength = this.jobs[jobNumber].length;
  		if (queueLength === undefined)
  			continue;
  		if (queueLength < minQueueLength) {
  			jobNumber = availableJobNumber;
  			minQueueLength = queueLength;
  		}
  	}

  	// Return a job from the jobs we've got available
  	return jobNumber;
  },

  _createTrigger: function() {
  	var sysTriggerGr = new GlideRecord("sys_trigger");
  	sysTriggerGr.setValue("name", "SLA Async Job - " + GlideCounter.next(SLAAsyncSNC.JOB_COUNTER_NAME));
  	sysTriggerGr.setValue("next_action", new GlideDateTime());
  	sysTriggerGr.setValue("priority", this.jobPriority);
  	// Initially create the trigger as "On demand" - we'll update to "Run once" later when we've finished delegating records to the job
  	sysTriggerGr.setValue("trigger_type", "2");
  	sysTriggerGr.setValue("script", "new SLAAsyncQueue().processQueue(g_schedule_record);");
  	if (!sysTriggerGr.insert()) {
  		this.lu.logError("_createTrigger: failed to create sys_trigger record with name \"" + sysTriggerGr.getValue("name") + "\":\n" + sysTriggerGr.getLastErrorMessage());
  		return null;
  	}

  	return sysTriggerGr;
  },

  _allocateRecordsToTrigger: function(sysTriggerGr, slaAsyncQueueIds) {
  	if (!sysTriggerGr || !sysTriggerGr.isValidRecord())
  		return;

  	if (!slaAsyncQueueIds || slaAsyncQueueIds.length === 0)
  		return;

  	var slaAsyncQueueGr = new GlideRecord(SLAAsyncSNC.SLA_ASYNC_QUEUE);
  	slaAsyncQueueGr.addQuery("sys_id", slaAsyncQueueIds);
  	slaAsyncQueueGr.setWorkflow(false);
  	slaAsyncQueueGr.setValue("sys_trigger", sysTriggerGr.getUniqueValue());
  	slaAsyncQueueGr.setValue("state", "queued");
  	slaAsyncQueueGr.updateMultiple();
  },

  type: 'SLAAsyncDelegatorSNC'
};

Sys ID

304968a4b7202300491de1f6ee11a968

Offical Documentation

Official Docs: