Name
sn_cmdb_ci_class.SGOTDataStreamBase
Description
Base pattern to invoke a specific data stream with given inputs
Script
var SGOTDataStreamBase = Class.create();
SGOTDataStreamBase.prototype = {
initialize: function(import_set_table_input) {
this.import_set_table = import_set_table_input;
},
getDataStreamName: function() {
gs.error("[SGOTDataStreamBase] Please override getDataStreamName() function");
return "dummy_stream_name"; // place holder for data stream name
},
getInputs: function() {
gs.error("[SGOTDataStreamBase] Please override getInputs() function");
var inputs = {};
return inputs;
},
processStream: function(stream) {
// if there is no output stream is available, return
if (gs.nil(stream)) {
gs.info("[SGOTDataStreamBase] No output Data Stream available, skipping import");
return;
}
// Variables for stats purposes
var totalObjectsReceived = 0;
var invalidObjects = 0;
var objectsInsertedIntoImportSetTable = 0;
// Process each item in the data stream
while (stream.hasNext()) {
totalObjectsReceived++;
// Get a single item from the data stream.
var record = stream.next();
// if the result object string is null or empty, log and continue with next result object
if (gs.nil(record)) {
invalidObjects++;
gs.warn("[SGOTDataStreamBase] outputObject from result stream is empty");
continue;
}
var transformedRecord = this.transformData(record);
if (gs.nil(transformedRecord) || Object.keys(transformedRecord).length == 0) {
gs.warn("[SGOTDataStreamBase] object data is null or empty");
continue;
}
this.import_set_table.insert(transformedRecord);
objectsInsertedIntoImportSetTable++;
}
gs.info("[SGOTDataStreamBase], import stats of objects, Total Received = " + totalObjectsReceived + ", invalid = " + invalidObjects + ", inserted into ImportSetTable = " + objectsInsertedIntoImportSetTable);
},
runDataStream: function() {
var stream;
var startTime = new Date().getTime();
var dataStreamInputs = this.getInputs();
var dataStreamName = this.getDataStreamName();
try {
gs.info("[SGOTDataStreamBase] DataStream Name : " + dataStreamName + " with Inputs: " + JSON.stringify(dataStreamInputs));
var flowRunnerResult = sn_fd.FlowAPI.getRunner() // Create a ScriptableFlowRunner builder object.
.datastream(dataStreamName)
.withInputs(dataStreamInputs) // inputs to the data stream action
.run(); // execute the data stream action
stream = flowRunnerResult.getDataStream();
this.processStream(stream);
} catch (ex) {
var actualMessage = ex.message;
gs.error("Actual Error: " + actualMessage);
throw actualMessage;
} finally {
if (stream)
stream.close();
}
gs.info("[SGOTDataStreamBase] DataStream : " + dataStreamName + ". Time taken to import records is " + ((new Date().getTime()) - startTime) + "ms.");
},
transformData: function(data) {
//Returns the unchanged data by default, can be overridden to transform the data before the insertion in the import set table
return data;
},
type: 'SGOTDataStreamBase'
};
Sys ID
0d2c53f4eb42301061a083bfc852282d