Name

sn_cmdb_ci_class.SGOTDataStreamBase

Description

Base pattern to invoke a specific data stream with given inputs

Script

var SGOTDataStreamBase = Class.create();
SGOTDataStreamBase.prototype = {

  initialize: function(import_set_table_input) {
      this.import_set_table = import_set_table_input;
  },

  getDataStreamName: function() {
      gs.error("[SGOTDataStreamBase] Please override getDataStreamName() function");
      return "dummy_stream_name"; // place holder for data stream name
  },

  getInputs: function() {
      gs.error("[SGOTDataStreamBase] Please override getInputs() function");
      var inputs = {};
      return inputs;
  },

  processStream: function(stream) {
      // if there is no output stream is available, return
      if (gs.nil(stream)) {
          gs.info("[SGOTDataStreamBase] No output Data Stream available, skipping import");
          return;
      }

      // Variables for stats purposes
      var totalObjectsReceived = 0;
      var invalidObjects = 0;
      var objectsInsertedIntoImportSetTable = 0;


      // Process each item in the data stream
      while (stream.hasNext()) {
          totalObjectsReceived++;
          // Get a single item from the data stream.
          var record = stream.next();

          // if the result object string is null or empty, log and continue with next result object
          if (gs.nil(record)) {
              invalidObjects++;
              gs.warn("[SGOTDataStreamBase] outputObject from result stream is empty");
              continue;
          }

          var transformedRecord = this.transformData(record);

          if (gs.nil(transformedRecord) || Object.keys(transformedRecord).length == 0) {
              gs.warn("[SGOTDataStreamBase] object data is null or empty");
              continue;
          }

          this.import_set_table.insert(transformedRecord);
          objectsInsertedIntoImportSetTable++;
      }
      gs.info("[SGOTDataStreamBase], import stats of objects, Total Received = " + totalObjectsReceived + ", invalid = " + invalidObjects + ", inserted into ImportSetTable = " + objectsInsertedIntoImportSetTable);

  },

  runDataStream: function() {
      var stream;
      var startTime = new Date().getTime();

      var dataStreamInputs = this.getInputs();
      var dataStreamName = this.getDataStreamName();

      try {
          gs.info("[SGOTDataStreamBase] DataStream Name : " + dataStreamName + " with Inputs: " + JSON.stringify(dataStreamInputs));
          var flowRunnerResult = sn_fd.FlowAPI.getRunner() // Create a ScriptableFlowRunner builder object.
              .datastream(dataStreamName)
              .withInputs(dataStreamInputs) // inputs to the data stream action
              .run(); // execute the data stream action
          stream = flowRunnerResult.getDataStream();
          this.processStream(stream);
      } catch (ex) {

          var actualMessage = ex.message;
          gs.error("Actual Error: " + actualMessage);
          throw actualMessage;

      } finally {

          if (stream)
              stream.close();
      }
      gs.info("[SGOTDataStreamBase] DataStream : " + dataStreamName + ". Time taken to import records is " + ((new Date().getTime()) - startTime) + "ms.");

  },

  transformData: function(data) {
      //Returns the unchanged data by default, can be overridden to transform the data before the insertion in the import set table
      return data;
  },

  type: 'SGOTDataStreamBase'
};

Sys ID

0d2c53f4eb42301061a083bfc852282d

Offical Documentation

Official Docs: