Name
global.WorkflowCoordinator
Description
Parallel workflow launching, input collection, and output/status handling.
Script
var WorkflowCoordinator = Class.create();
WorkflowCoordinator.prototype = {
initialize: function(parameters, reloadedFlows, activityId) {
// create the functionality as protected closure using Module
this.coordinator = (function(parameters, reloadedFlows, activityId) {
if (!parameters)
parameters = {};
var settings = {
workflow: '-no-workflow-set',
retain: true,
max: 100,
poolsize: 10,
next: 0, // (internal) next flow to run
running: 0, // number of flows running
errormode: 'stop'
};
var errorMessage = "";
var activityId = executing.activity.sys_id+'';
integerizeParameters(parameters);
//debug( 'parameters:' + JSUtil.describeObject(parameters) );
$extend(settings, parameters); // consider $.extend()
if (activityId)
settings.activityId = activityId;
if (!current)
return error('Not running from a valid workflow. No current record.');
//debug(JSUtil.describeObject(settings, 'settings'));
// assume one set of empty inputs if none given.
var flows = [];
if (reloadedFlows)
flows = reloadedFlows;
else {
// load inputs from array. If none, then just use count. If count > inputs
// duplicate them to fill count
//
if (parameters.count)
if (parameters.count < 1)
return error("Invalid count");
addRequestedFlows();
}
// discard the orig params now that they are in a 'flow' object, so we don't reload them twice
if (settings.inputs)
delete settings.inputs;
function addRequestedFlows() {
var numInputs = parameters.inputs && parameters.inputs instanceof Array ? parameters.inputs.length : 0;
var inp = 0;
var cnt = parameters.count && parameters.count > 0
? parameters.count
: numInputs;
debug("inputs: " + numInputs + " count: " + cnt);
for (var i = 0; i < cnt; i++) {
var wfInputs = numInputs == 0 ? {} : parameters.inputs[inp++];
if (!add(wfInputs))
return;
if (inp >= numInputs)
inp = 0;
}
}
/**
* Add a workflow with parameters to the list of flows to be launched
*
* @param inputs - the input variables to the flow
* @param workflow [optional] -the workflow id used to find the workflow version to run
*
* @return builder, do more adds, or start()
*/
function add(inputs, workflow) {
if (flows.length >= settings.max) {
error('Number of flows requested ' + (flows.length+1) + ' exceeds max:' + settings.max);
return null;
}
var index = flows.length;
var flow = new Flow(index, workflow, inputs);
flows.push(flow);
return this;
}
/**
* Return the number of valid flows to be run.
*/
function getNumFlows() {
return flows.length;
}
function setMax(max) {
if (isNaN(max+''))
return error('Max value invalid:' + max);
else
settings.max = parseInt(max+'');
}
function setPoolsize(poolsize) {
if (isNaN(poolsize+''))
return error('Poolsize value invalid:' + poolsize);
else
settings.poolsize = parseInt(poolsize+'');
}
function setWorkflow(workflow) {
settings.workflow = workflow;
}
/**
* Start the batch of flows. This will run up to 'poolsize' simulaneous flows immediately
* and then return. The remaining flows in the queue will be launched as pooled flows
* finish, until the queue is exhausted.
*
* @param: the executing activity that owns this coordinator
* @return int the number of flows initially started.
*/
function start(executing) {
if (errorMessage != '')
return 'error';
if (flows.length < 1 || settings.running > 0)
return error('Cannot start: no flows or already running? flows:' + flows.length + ' running:' + settings.running);
if (settings.poolsize < 1 || settings.poolsize > settings.max) {
error('Invalid pool configuraiton, using max for poolsize ' + JSUtil.describeObject(settings));
settings.poolsize = settings.max;
}
if (flows.length > settings.max)
return error('Number of flows requested ' + flows.length + ' exceeds max:' + settings.max);
settings.totalFlows = flows.length;
var numToStart = settings.poolsize < flows.length ? settings.poolsize : flows.length;
debug('starting ' + numToStart + ' initial flows of ' + flows.length + ' total');
settings.parent = executing.context.sys_id+'';
settings.originator = executing.sys_id+'';
for (var i = 0; i < numToStart && i < flows.length; i++) {
var newFlow = startFlow(flows[i]);
if (newFlow == null && settings.errormode == 'stop') {
save(activityId);
return error('Did not start all flows');
}
}
// place ourself in the holding area (which is currently scratchpad under our activity id but
// will hopefully be a hidden activity variable.
save(activityId);
if (hasNext() || settings.running > 0)
return 'waiting';
return 'finished';
}
/**
* Cancels all subf-lows, and will not run any more flows
*/
function cancel(reason) {
settings.reason = reason;
var workflowHelper = new Workflow();
var context = new GlideRecord('wf_context');
context.addQuery('sys_id', 'IN', getRunningFlows());
context.query();
while (context.next())
workflowHelper.cancelContext(context);
}
/**
* Indicates the number of flows that are currently running.
*
* @return boolean the # of running flows.
*/
function getRunningCount() {
return settings.running;
}
/**
* Indicates whether there are more flows that need to be run (pooled)
*
* @return boolean true if more flows need to run.
*/
function hasNext() {
if (isCancelled())
return false;
return settings.next < flows.length;
}
/**
* @return boolean true if the coordinator was cancelled.
*/
function isCancelled() {
return (typeof(settings.reason) !== 'undefined');
}
/**
* Tells if a flow is running
*
* @param index the index of the flow to ask about.
*
* @return boolean true if a given (sub)flow has started
*/
function isRunning(index) {
return getFlow(index).status == 'running';
}
/**
* Tells if a flow is finished
*
* @param index the index of the flow to ask about.
*
* @return boolean true if a given (sub)flow has ended
*/
function isFinished(index) {
return getFlow(index).status == 'finished';
}
/**
* Serialize the data for this object into the running workflow.
* If <strong>depot</strong> is provided, then the data is saved
* to that with the given nameOrId, otherwise it is saved to <strong>
* activity.scratchpad.coordinator</strong> by default.
*/
function save(nameOrId, depot) {
var data = {
settings: settings,
flows: flows
};
// if caller provided a depot to save to, use that
//
if (depot) {
depot[nameOrId] = data;
return;
}
// if an in-flight || launcher is using workflow scratchpad then
// keep using it, otherwise use this activity's scratchpad
//
if (workflow.scratchpad.coordinator && workflow.scratchpad.coordinator[nameOrId]) {
workflow.scratchpad.coordinator[nameOrId] = data;
return;
}
if (!activity.scratchpad.coordinator)
activity.scratchpad.coordinator = {};
activity.scratchpad.coordinator = data;
}
/**
* Indicates that a subflow has finished.
*
* Sets the result status of a subflow that finished.
* Note: this function places 'index' into the event parameter
* packet to indicate which flow has completed.
*
* @param ev : a result packet from the event which holds:
* <li> context - subflow contet sys_id
* <li> status - subflow context 'state'
* <li> isComplete - true if the context is complete (?)
* <li> result - subflow context 'result'
*
* @return string "continue" if flows are still running, "finished" if all are done.
*
*/
function onFinish(ev) {
var index = findFlow(ev.context);
debug( 'onFinish() [' + index + ']:' + JSUtil.describeObject(ev, 'eventData')
+ ' - ' + JSUtil.describeObject(settings, 'settings') );
ev.index = index;
if (index == -1) {
debug('onFinish - error on ' + ev.context);
return 'error';
}
var flow = getFlow(index);
settings.running--;
flow.status = ev.status;
flow.output = ev.returnValue ? ev.returnValue : {};
// put the coordinator,index, and flow into rhino for use in the completion scripts
workflow.prepareScriptVariable('coordinator', this);
workflow.prepareScriptVariable('index', index);
workflow.prepareScriptVariable('flow', flow);
// assumption: getting here means the flow is done whether or not it finished OK
var error = false;
if (settings.next < settings.totalFlows && settings.next < settings.max) {
debug("onFinish - starting a nother flow:" + settings.next);
error = startFlow( flows[settings.next] ) == null;
}
// Make sure the counts and states are valid and saved
updatePool(settings.retain);
if (error && settings.errormode === 'stop') {
debug('onFinish() [' + index + ']: error');
return 'error';
}
if (settings.running > 0) {
debug('onFinish() [' + index + ']: continuing');
return 'continuing';
}
debug('onFinish() [' + index + ']: finished');
return 'finished';
function updatePool(toWorkflowScratchpad) {
if (toWorkflowScratchpad) {
if (!workflow.scratchpad.coordinator)
workflow.scratchpad.coordinator = {};
save(settings.activityId, workflow.scratchpad.coordinator);
if (activity.scratchpad.coordinator)
delete activity.scratchpad.coordinator;
return;
}
save(settings.activityId);
}
}
/**
* Returns the error message if any error occurred. If no errors then this returns
* an empty string.
*
* @return string the error message or empty
*/
function getError() {
return errorMessage;
}
/**
* Returns the index of a flow by looking it up by contextId
*
* @param contextId - the context sys_id to find the internal index of
* @return int the index of the flow or -1 if not found
*/
function findFlow(contextId) {
for (var i = 0; i < flows.length; i++)
if (flows[i] && flows[i].contextId)
if (flows[i].contextId == contextId)
return i;
error("Cannot find flow for context " + contextId);
return -1;
}
/*
* Internal functions
*/
/**
* @private
* @return the context sys_id or null if the context is not started
*/
function startFlow(flow) {
var workflowId = ensureWorkflow(flow);
debug('startFlow(' + workflowId + ') owner:' + settings.owner + ' inputs:' + JSUtil.describeObject(flow.inputs) );
settings.next++;
var context = workflow.startSubflow(workflowId, current, flow.inputs, settings.originator+'');
if (gs.nil(context)) {
error("Error launching workflow: " + workflowId + ":" + JSUtil.describeObject(flow.inputs));
flow.contextId = null;
flow.status = "error";
return null;
}
else {
flow.contextId = context.sys_id+'';
flow.status = "running";
settings.running++;
debug('subflow started:' + flow.contextId + ' - ' + JSUtil.describeObject(settings, 'settings'));
}
return flow.contextId;
}
/**
* @private
*/
function ensureWorkflow(flow) {
// if workflow provided in the 'flow' use that.
var wf = '';
if (flow.workflow && flow.workflow !== '-no-workflow-set')
wf = flow.workflow;
else
// must be in settings, use that
if (settings.workflow)
wf = settings.workflow;
else
return error("No workflow");
// if it's a sys id, then we're good, use it.
return getWorkflowForIdentifier(wf);
}
/**
* If a workflow ID is provdided use it else assume its a name and get the ID
*
* @private
*/
function getWorkflowForIdentifier(wf) {
wf = wf+'';
// assume its a name and find it.. and get the sys_id
var workflow = new GlideRecord('wf_workflow');
// if it is valid ID then use it.
if (GlideStringUtil.isEligibleSysID(wf+''))
if (workflow.get(wf))
return wf;
// try by name.
workflow.addQuery('name', wf);
workflow.query();
if (!workflow.next())
return "Invalid workflow:" + wf;
if (workflow.hasNext())
warn('Warning, more than one workflow named:' + wf + ' found. Using first:' + workflow.sys_id);
return workflow.sys_id+'';
}
/**
* @private
*/
function getFlow(idx) {
if (isNaN(idx+''))
error("IllegalArgumentException: (index) " + idx);
idx = parseInt(idx+'');
if (idx > settings.totalFlows-1)
error("Index out of bounds. totalFlows=" + totalFlows + " index=" + idx);
return flows[idx];
}
/**
* @private return array of IDs of any flows we launched, that are running
*/
function getRunningFlows() {
var runningFlows = [];
for (var i = 0; i < flows.length; i++)
if (flows[i].status === 'running')
runningFlows.push( flows[i].contextId );
return runningFlows;
}
function integerizeParameters(params) {
if (params.max) {
if (isNaN(params.max))
return error("Illegal parameter: max - non integer value:" + params.max);
else
params.max = parseInt(params.max);
}
if (params.poolsize) {
if (isNaN(params.poolsize))
return error("Illegal parameter: poolsize - non integer value:" + params.poolsize);
else
params.poolsize = parseInt(params.poolsize);
}
if (params.count) {
if (isNaN(params.count))
return error("Illegal parameter: count - non integer value:" + params.count);
else
params.count = parseInt(params.count);
}
}
/**
* @private record an error and return 'error' to caller
*/
function error(msg) {
errorMessage += msg + "\n";
if (workflow)
workflow.error('WorkflowCoordinator:' + msg);
else
gs.logError(msg, 'WorkflowCoordinator');
return "error";
}
/**
* @private record an warning
*/
function warn(msg) {
errorMessage += msg + "\n";
if (workflow)
workflow.warn('WorkflowCoordinator:' + msg);
else
gs.logWarning(msg, 'WorkflowCoordinator');
}
/**
* @private
*/
function debug(msg) {
if (workflow)
workflow.info('WorkflowCoordinator:' + msg);
else
gs.log('WorkflowCoordinator:' + msg);
return msg;
}
/**
* @private
*
* Simple obj merge routine, similar to jQuery extend, different than prototype in that it overwrites.
*
* @param target fields are copied 'into' this object
* @param source fields are copied 'from' this object.
*/
function $extend(target, source) {
for (var x in source)
if (x != undefined)
target[x] = source[x];
}
/**
* An internal, data Class that holds the details of the flows being executed.
*
*Fields:
* @param index - the index of this flow in the queue
* @param workflow - the ID of the wf_workflow used to start this flow
* @param inputs - any inputs provided to the launched flow
* @param status - status of the workflow context
* @param contextId - (internal) added by the startFlow method. 'onFinish()' events use
* this to locate their internal 'flow' object.
*/
function Flow(index, workflow, inputs) {
return {
index: index,
inputs: inputs,
status: 'pending',
workflow: workflow
// contextId - transparent
}
};
// Return the interface (Module pattern)
return {
type: 'WorkflowCoordinator', // JavaScript class name compliance
add: add,
start: start,
cancel: cancel,
getRunningCount: getRunningCount,
isRunning: isRunning,
isFinished: isFinished,
isCancelled: isCancelled,
hasNext: hasNext,
getFlow: getFlow,
setMax: setMax,
setPoolsize: setPoolsize,
setWorkflow: setWorkflow,
findFlow: findFlow,
getNumFlows: getNumFlows,
onFinish: onFinish,
save: save, // actually private- here for unit testing for now.
getError: getError
};
}).apply(this, arguments);
// map the coordinator fields to our outer class
for (var x in this.coordinator)
this[x] = this.coordinator[x];
},
type: 'WorkflowCoordinator'
};
/**
* Factory method to deserialize an instance of this object.
*
* @param nameOrId - a name for this coordinator. Default is name or sys_id of the activity that owns this coordinator.
* @param [depot]- an optional depot (object) to load from. If not provided we load from <strong>activity.scratchpad</strong>.
* If depot is present we load from <strong>depot[nameOrId]</strong>.
*
* @return an instance of WorkflowCoordinator pre-loaded with valid state.
*/
WorkflowCoordinator.load = function(nameOrId, depot) {
// if the param given is not the exact name of a coordinator instance,
// then try it as a sys_id or name of a launcher activity
nameOrId = getActivityForIdentifier(nameOrId+'');
if (workflow.scratchpad.coordinator && workflow.scratchpad.coordinator[nameOrId])
return deserialize(workflow.scratchpad.coordinator[nameOrId], nameOrId);
// if depot provided then use it.
if (depot && depot[nameOrId]) {
workflow.error('Cannot find parallel flow data in depot for activityId:' + nameOrId);
return null;
}
// if it's in activity scratchpad (where it is by default) then use that!
if (activity && activity.scratchpad.coordinator)
return deserialize(activity.scratchpad.coordinator, nameOrId);
workflow.fault("Cannot find coordinator for " + nameOrId);
return null;
function deserialize(data, nameOrId) {
if (!data)
workflow.error('Cannot find parallel flow data for activityId:' + nameOrId);
return new WorkflowCoordinator(data.settings, data.flows, nameOrId);
}
function getActivityForIdentifier(activityId) {
activityId = activityId+'';
var activity = new GlideRecord('wf_activity');
// if it is valid ID then use it.
if (GlideStringUtil.isEligibleSysID(activityId))
if (activity.get(activityId))
return activityId;
// try by name.
activity.addQuery('name', activityId);
activity.addQuery('workflow_version', context.workflow_version+'');
activity.query();
if (!activity.next())
return "Invalid activity:" + activityId;
return activity.sys_id+'';
}
};
Sys ID
683eac081b211100adca1e094f071328