Name

global.EvtMgmtProcessCoordinator

Description

Sync Event management process events job with coordinator job

Script

gs.include('EvtMgmtCommons');
var EvtMgmtProcessCoordinator = Class.create();
EvtMgmtProcessCoordinator.prototype = {
  initialize: function() {
  	
  },
  NAME_PREFIX : 'Event Management - process events',
  evtMgmtCommons : new EvtMgmtCommons(),
  // To prevent EM processing jobs to starve for longer time because of few EM processing jobs not updating
  EM_PROCESSING_JOBS_MAX_WAIT_TIME_SECS :GlideProperties.get('evt.mgmt.processing.jobs.max.wait.time.secs',120),
  // To prevent infinite loop waiting for EM jobs to update status
  COORDINATOR_JOB_MAX_WAIT_TIME_SECS : GlideProperties.get('evt.mgmt.coordinator.job.max.wait.time.secs',600),
  //Number of attempts to verify whether expected,actual and stored job counts are same.
  COORDINATOR_JOB_VERIFY_ATTEMPTS : GlideProperties.getInt('evt.mgmt.coordinator.job.verify.attempts',3), 
  // The time interval between verify attempts in seconds.
  COORDINATOR_JOB_VERIFY_ATTEMPTS_WAIT_TIME_SECS : GlideProperties.getInt('evt.mgmt.coordinator.job.verify.attempts.wait.time.secs',5),
  //Enable/Disable coordination. 
  EM_PROCESSING_DISABLE_COORDINATION : GlideProperties.getBoolean('evt_mgmt.event_processor_disable_coordination',false),
  
  saveActiveJobCount: function(activeJobsCount){
  	var JOB_COUNT = 'em_job_count';
  	var gr = new GlideRecord("sa_hash");
  	gr.addQuery('name', JOB_COUNT);
  	gr.query();
  	if(gr.next()){
  		gr.setValue('hash',activeJobsCount);
  		gr.update();
  	} else{
  		gr.initialize();
  		gr.setValue('name',JOB_COUNT);
  		gr.setValue('hash',activeJobsCount);
  		gr.insert();
  	}
  },
  
  createNewJobs: function(){
  	var gr = new GlideRecord("sys_properties");
  	var qc = gr.addQuery("name", "evt_mgmt.event_processor_job_count");
  	qc.addOrCondition("name", "evt_mgmt.event_processor_enable_multi_node");
  	qc.addOrCondition("name", "evt_mgmt.events_processing_delay_sec");
  	gr.query();
  	var count = 1;
  	var enableMultiNode = false;
  	var duration = '';
  	while (gr.next()) {
  		if (gr.getValue("name") == 'evt_mgmt.event_processor_job_count')
  			count = gr.getValue("value");
  		else if (gr.getValue("name") == 'evt_mgmt.event_processor_enable_multi_node')
  			enableMultiNode = gr.getValue("value");
  		else if (gr.getValue("name") == 'evt_mgmt.events_processing_delay_sec')
  			duration = gr.getValue("value");
  	}
  	gs.log("duration = "+ duration);
  	var processEventsJobs = ProcessEventsJobs.get();
  	processEventsJobs.createNewJobs(count, enableMultiNode, duration);
  },
  
  getMinLastUpdatedTime: function(){
  	var existingJobs = new GlideAggregate('sys_trigger');
  	// On multi node instance and multi processing is disabled, EM jobs have system id as empty
  	// On multi node instance and multi processing is enabled, EM jobs will have system id as active nodes
  	var orCondition =  existingJobs.addQuery('system_id', '=', "");
  	orCondition.addOrCondition('system_id', '!=', 'ACTIVE NODES');
  	
  	existingJobs.addQuery('name', 'STARTSWITH', this.NAME_PREFIX);
  	existingJobs.addAggregate('MIN','sys_updated_on');
  	existingJobs.query();
  	if (existingJobs.next()){
  		return new GlideDateTime(existingJobs.getAggregate('MIN','sys_updated_on'));
  	}
  	return new GlideDateTime();
  },
  	
  process: function(){
  	// Skip coordination in case the property is true. 
  	if(this.EM_PROCESSING_DISABLE_COORDINATION == true){
  		return;
  	}
  	
  	var eventProcessor = new SNC.EvtMgmtEventProcessor();
  	var activeJobsCount = eventProcessor.readActualJobs();
  	var expectedJobsCount = eventProcessor.calculateExpectedJobs(false);
  	var jobsInDB = eventProcessor.readStoredJobs();
  	
  	for(var i = 0; i < this.COORDINATOR_JOB_VERIFY_ATTEMPTS; i++){
  		// Verifying whether expected,actual and stored job counts are same, Do they need to be synchronized?
  		if((activeJobsCount == expectedJobsCount) && (activeJobsCount == jobsInDB)){
  			this.evtMgmtCommons.addDebugMessage("Expected job count, actual job count, stored job count are same, returning from the job");
  			return ;
  		}
  		gs.sleep(this.COORDINATOR_JOB_VERIFY_ATTEMPTS_WAIT_TIME_SECS * 1000);			
  		expectedJobsCount = eventProcessor.calculateExpectedJobs(true);
  	}

  	gs.log("Expected EM jobs : "+ expectedJobsCount +" Actual EM jobs : "+ activeJobsCount + " Jobs in DB : " + jobsInDB);
  	var snapshotTime = new GlideDateTime();
  	
  	// Maximum time job can wait before updating job count in DB or adjusting the EM jobs as expected
  	// It is placed to avoid infinite loop, instead of just waiting for the EM processing jobs to update its state
  	// Ideally, coordinator job should never wait for more than 2 minutes.Its a safety valve
  	var maxProcessTime = new GlideDateTime();
  	maxProcessTime.addSeconds(this.COORDINATOR_JOB_MAX_WAIT_TIME_SECS );
  	
  	// Maximum time to wait before all EM processing jobs have updated once to start comparison of job counts
  	// After maximum time it will go ahead and update the DB count even if one of the job hasn't updated after snapshot time
  	var maxCheckTime = new GlideDateTime();
  	maxCheckTime.addSeconds(this.EM_PROCESSING_JOBS_MAX_WAIT_TIME_SECS);
  	
  	// Protection for the coordinator job to not be in infinite loop if EM processing jobs not responding/stuck
  	while(maxProcessTime.after(new GlideDateTime())){
  		// Query the sys trigger table for the EM processing jobs to check whether they have run after snapshot time
  		var minUpdateTime = this.getMinLastUpdatedTime();

  		// Check whether all jobs have updated atleast once after the snapshot time
  		// If it is waiting for more than maxcheckTime(max allowed time), forcing to compare irrespective of whether EM jobs have run atleast once
  		if(snapshotTime.before(minUpdateTime) || new GlideDateTime().after(maxCheckTime)){
  			this.evtMgmtCommons.addDebugMessage("All the event processing jobs have been updated after snapshot time : " + snapshotTime);
  			expectedJobsCount = eventProcessor.calculateExpectedJobs(false);
  			activeJobsCount = eventProcessor.readActualJobs();
  			// Expected jobs and active jobs need to be same with logic of BR
  			// If they are not same, some corner condition has hit and we need to adjust them correctly
  			if(expectedJobsCount != activeJobsCount) {
  				gs.error("Incorrect number of EM jobs running(" + activeJobsCount + ") than expected(" + expectedJobsCount +"), remove existing and create new jobs as expected count");
  				this.createNewJobs();
  				gs.error("New EM processing jobs have been created from the coordinator job: " + expectedJobsCount);
  				this.saveActiveJobCount(expectedJobsCount);
  			} else{
  				gs.info("Job count in the database("+ jobsInDB+ ") and actual job count(" + activeJobsCount + ") are not same, syncing the database with correct count (" + expectedJobsCount +  ")");
  				// Save the active jobs in the database
  				this.saveActiveJobCount(expectedJobsCount);
  			}
  			return;
  		}
  		gs.sleep(1000);
  	}
  },
  	
  	type: 'EvtMgmtProcessCoordinator'
  };

Sys ID

359917d15b7223009a9f2a2c11f91afb

Offical Documentation

Official Docs: