Name

global.ActivityStreamPullServiceImpl

Description

Gets installed as part of com.snc.activity_subscriptions plugin Activity Subscriptions Framework. Contains all the methods to perform Activity stream pull related operations.

Script

var ActivityStreamPullServiceImpl = Class.create();
ActivityStreamPullServiceImpl.prototype = {
  initialize: function(actSubContext) {
  	this.activitySubContext = (actSubContext == null)?new ActivitySubscriptionContext():actSubContext;
  	this.subscriptionDAO = this.activitySubContext.getSubscriptionDAO();
  	this.activityDAO = this.activitySubContext.getActivityDAO();
  	this.VERB_MENTIONED = "mentioned";
  	this.pullEnabled = gs.getProperty('com.snc.actsub.activity.stream.inactive.disabled') == "true";
  	this.DEFAULT_PULL_DAYS = 30;
  	this.disableAfter = gs.getProperty('com.snc.actsub.activity.stream.inactive.duration', this.DEFAULT_PULL_DAYS);
  	this.DEFAULT_ACTIVITY_COUNT_TO_RETURN = 10;
  },

  /*
  * This function fetches records from 'activities' table based on subsctiptions and updates the stream table
  * @parameter - subscriptions: user subscriptions based on which activities will be fetched
  * @parameter - streamGr: Glide record of stream that has to be updated with latest activities
  * @parameter - reqParameters: parameters from api request: userId, streamId, countToReturn, enableAggregation
  * @return - void
  */
  updateStreamWithLatestActivities: function(reqParameters){
  	try{
  		//reqParameters - countToReturn, liveProfileId, streamId, enableAggregation
  		if(!reqParameters.liveProfileId || !reqParameters.streamId)
  			return false;
  		var liveProfileId = reqParameters.liveProfileId;
  		var liveProfileRecord = this.subscriptionDAO.getLiveProfileRec(liveProfileId);
  		var streamId = reqParameters.streamId;
  		var streamGr = this.activityDAO.getStreamRecord(streamId);
  		if(!streamGr || !streamGr.next() || !liveProfileRecord)
  			return false;
  		var userId = liveProfileRecord.document + '';
  		var subscriptionsList = this.subscriptionDAO.getSubscriptionsForStream(streamId);
  		var subscriptions = subscriptionsList.join(',');
  		if(!subscriptions)
  			return true;
  		var isAggregationEnabled = (reqParameters.enableAggregation)?reqParameters.enableAggregation:false;
  		var countToReturn = (reqParameters.countToReturn)?reqParameters.countToReturn:this.DEFAULT_ACTIVITY_COUNT_TO_RETURN;
  		//Get the latest activity and created time populated in the given stream
  		var lastActivityGr = this.activityDAO.getLatestActivityForStream(streamId);
  		var lastActivityTime = '';
  		if(lastActivityGr.next()) {
  			lastActivityTime = lastActivityGr.getValue('sys_created_on');
  		}
  		var multiplier = 2, currentRow = 0, counter = 0;
  		var totalActivitiesToFetch = countToReturn * multiplier;
  		var window ={};
  		window.from = currentRow;
  		window.to = totalActivitiesToFetch;
  		var currentTime = new GlideDateTime();
  		//get activities only till (n-1) second to handle concurrent activities pushed when pull is in progress
  		currentTime.addSeconds(-1);
  		var flagUpdateTime = this.activityDAO.updateFanoutToStream(streamId, true);
  		if(!flagUpdateTime){
  			gs.error('ACTSUB-ERROR: Error while setting fanout to stream flag : ' + streamId);
  			return false;
  		}
  		var commActivityService = null;
  		if (this.activitySubContext.isCommunityPluginActive == true)
  			commActivityService = new sn_communities.CommunityActivityService(liveProfileId, true);
  		while(counter < totalActivitiesToFetch){
  			var activities = this.activityDAO.getActivitiesForInterval(subscriptions, liveProfileId, lastActivityTime, currentTime, window);
  			var activities_query = activities.getEncodedQuery();
  			var activityCount = activities.getRowCount();
  			var aggregatedActivities = [];
  			while(counter < totalActivitiesToFetch && currentRow < activityCount && activities.next()){
  				currentRow++;
  				this.validateAndPopulateToStream(activities, streamGr, userId);
  				//if enableAggregation is true, pull more activities considering secondary content of same primary content as duplicate
  				var duplicate = false;
  				if(commActivityService != null && isAggregationEnabled){
  					var aggAct = commActivityService.getContentInfo(activities);
  					if (aggAct){
  						primaryContent = aggAct.root + "";
  						if(primaryContent && aggregatedActivities.indexOf(primaryContent) > -1)
  							duplicate = true;
  						else
  							aggregatedActivities.push(primaryContent);

  					}
  				}
  				if(!duplicate)
  					counter++;
  			}
  			if(currentRow >= activityCount || activityCount == 0){
  				break;
  			}
  			var remaining = parseInt(totalActivitiesToFetch - counter);
  			var probabilityRatio = ((window.to - window.from)/(counter));
  			window.from = currentRow;
  			window.to = currentRow + this.getInterprettedOffset(remaining, probabilityRatio);
  		}
  		if(activityCount > totalActivitiesToFetch){
  			var streamLimit = gs.getProperty(ActivityConstants.USER_STREAM_LIMIT, this.activityDAO.USER_STREAM_LIMIT);
  			this.populateActivitiesForStreamAsync(currentRow, streamLimit, activities_query, streamGr);
  		}
  		this.updateNthSecondActivities(subscriptions, streamGr, liveProfileId, currentTime, flagUpdateTime);
  		return true;
  	}
  	catch(er){
  		this.activityDAO.updateFanoutToStream(streamGr.getValue('sys_id'), false);
  		gs.error('ACTSUB-ERROR: Error while updating stream with lastest activities: ' + er.message);
  		return false;
  	}
  },
  
  updateNthSecondActivities: function(subscriptions, streamGr, liveProfileId, fromTime, toTime){
  	if(!subscriptions || !streamGr || !liveProfileId)
  		return;
  	var activities = this.activityDAO.getActivitiesForInterval(subscriptions, liveProfileId, fromTime, toTime);
  	if(!activities || !streamGr.user_id)
  		return;
  	var userId = streamGr.user_id.document + '';
  	while(activities.next()){
  		this.validateAndPopulateToStream(activities, streamGr, userId);
  	}
  },
  
  populateActivitiesForSubscriber: function(streamGr, eventParam){
  	if(!streamGr || !eventParam)
  		return false;
  	try{
  		var obj = JSON.parse(eventParam);
  		var window = {};
  		window.from = obj.from;
  		window.to = obj.to;
  		var activities = this.activityDAO.getActivitiesFromEncodedQuery(obj.query, window);
  		var streamId = streamGr.getUniqueValue();
  		var userId = streamGr.user_id.document + '';
  		while(activities.next()){
  			this.validateAndPopulateToStream(activities, streamGr, userId);
  		}
  	}
  	catch(er){
  		gs.error('ACTSUB-ERROR: Error while populating activities asynchronously: ' + er.message);
  		return false;
  	}
  },
  
  populateActivitiesForStreamAsync: function(from, to, activities_query, streamGr){
  	//creating a json string to pass to event
  	//json constains 'from' and 'to' indices to choose window and activities_query
  	if(!streamGr)
  		return;
  	var parser = new JSON();
  	var activityDetails = {};
  	activityDetails.from = from;
  	activityDetails.to = to;
  	activityDetails.query = activities_query;
  	var jsonEventParameter = parser.encode(activityDetails);
  	gs.eventQueue('sn_actsub.activities_fanout_to_stream', streamGr, jsonEventParameter);
  },
  
  validateAndPopulateToStream: function(activities, streamGr, userId){
  	if(!activities || !streamGr || !userId || !activities.activity_type_id)
  		return;
  	if(!activities.activity_type_id.active || !activities.activity_type_id.fanout_to_stream)
  		return;
  	var commUser = null, commActivityServiceBase = null;
  	if (this.activitySubContext.isCommunityPluginActive == true) {
  		commUser = new sn_communities.CommunityUserBase();
  		commActivityServiceBase = new sn_communities.CommunityActivityServiceBase();
  	}
  	if(commActivityServiceBase != null && !commActivityServiceBase.hasAccessToActivity(activities, 'read', userId, commUser))
  		return;
  	if(commActivityServiceBase == null && !activities.canRead())
  		return;
  	var objectIds = this.getSubscribableObjectIdsForActivity(activities);
  	if(this.canPopulateToStream(activities, streamGr.user_id + '', streamGr, objectIds)){
  			this.activityDAO.populateActivityStream(activities, userId);
  	}
  },
  
  getSubscribableObjectIdsForActivity: function(activities){
  	var objectIds = [];
  	if(!activities)
  		return objectIds;
  	if(activities.object_id)
  		objectIds.push(activities.object_id);
  	if(activities.target_id)
  		objectIds.push(activities.target_id);
  	if(activities.additional_field1)
  		objectIds.push(activities.additional_field1);
  	if(activities.addl_sub_obj2)
  		objectIds.push(activities.addl_sub_obj2);
  	if(activities.addl_sub_obj3)
  		objectIds.push(activities.addl_sub_obj3);
  	if(activities.actor_id)
  		objectIds.push(activities.actor_id);
  	return objectIds;
  },
  
  canPopulateToStream: function(activities, liveProfileId, streamGr, objectIds){
  	if(!activities || !streamGr || !liveProfileId || ! objectIds)
  		return false;
  	var followingUsers = this.subscriptionDAO.getFollowedUsersByProfile(liveProfileId);
  	var activity_json = {}, followees = [];
  	var hasSubscribedBeforeActivity = this.subscriptionDAO.hasSubscribedBeforeActivityDate(streamGr.getValue('sys_id'), objectIds, activities.getValue('sys_created_on'));
  	if(activities.getValue('target_id') == liveProfileId){
  		if(activities.activity_type_id && activities.activity_type_id.fanout_to_target + '' == 'true')
  			return true;//populate gamification activities
  	}
  	else if(activities.verb == this.VERB_MENTIONED){
  		var mentionedUsers = activities.additional_field1 ? activities.additional_field1 + '' : '';
  		var mentionedFollowingUser = false;
  		var mentionedThisUser = false;
  		if(mentionedUsers && mentionedUsers.indexOf(liveProfileId) > -1){
  			mentionedThisUser = true;
  		}
  		if(activities.actor_id == liveProfileId || mentionedThisUser)
  			return true;
  		var mentionedUsersList = mentionedUsers.split(',');
  		for(var i=0;i<mentionedUsersList.length;i++){
  			var id = mentionedUsersList[i];
  			var index = followingUsers.indexOf(id);
  			if(index > -1){
  				mentionedFollowingUser = true;
  				followees.push(id);
  			}
  		}
  		if(mentionedFollowingUser && hasSubscribedBeforeActivity){
  			activity_json = {
  				"message": activities.activity_type_id.activity_nl_string.message+'',
  				"actor": activities.getElement('actor_id').getRefRecord().getDisplayValue(),
  				"object": activities.getElement('object_id').getRefRecord().getDisplayValue(),
  				"target": activities.getElement('target_id').getRefRecord().getDisplayValue(),
  				"followees": followees
  			};
  			this.activityDAO.populateActivityStream(activities, streamGr.user_id.document + '', activity_json);
  		}
  		return false;
  	}
  	else if(followingUsers && followingUsers.indexOf(activities.getValue('actor_id')) > -1){
  		if(hasSubscribedBeforeActivity)
  			return true;//populate stream for following users
  	}
  	else if(hasSubscribedBeforeActivity){
  		return true;
  	}
  	return false;
  },
  
  updateInactiveStreams: function(){
  	try{
  		var pullStreamTypes = gs.getProperty('com.snc.actsub.activity.stream.inactive.streams');
  		if(this.pullEnabled){
  			var currentDate = new GlideDateTime();
  			var duration = -1*this.disableAfter;
  			currentDate.addDaysUTC(duration);
  			this.activityDAO.resetFanoutForInactiveStreams(pullStreamTypes, currentDate);
  		}
  		else{
  			var streamGr = this.activityDAO.getInactiveStreams(pullStreamTypes);
  			if(streamGr){
  				while(streamGr.next()){
  					var reqParameters = {
  						"liveProfileId":streamGr.getValue('user_id'),
  						"streamId":streamGr.getValue('sys_id')
  					};
  					if(!this.updateStreamWithLatestActivities(reqParameters)){
  						gs.error('ACTSUB-ERROR: Updating activities failed for stream ' + streamGr.sys_id);
  					}
  					else{
  						this.activityDAO.updateFanoutToStream(streamGr.getValue('sys_id'), true);
  					}
  				}
  			}
  		}
  	}
  	catch(er){
  		gs.error('ACTSUB-ERROR: Error while updating activity fanout flag: ' + er.message);
  	}
  },
  
  getInterprettedOffset: function(requiredCount, probabilityRatio) {
  	var multiplier = 5;
  	var offset = requiredCount * multiplier;
  	if (probabilityRatio && probabilityRatio > multiplier)
  		offset = requiredCount * probabilityRatio;
  	if (offset > multiplier * 100)
  		offset = multiplier * 100;
  	return offset;
  },

  hasAccessToUserStream: function(userStream) {
      var userId = userStream.user_id.document;
      if (gs.hasRole('actsub_admin'))
          return true;
      if (pm.isActive('com.sn_communities') && gs.hasRole('sn_communities.community_user') || gs.hasRole('actsub_user'))
          return gs.getUserID() == userId; //allows current user to access only their record
      return false;
  },

  type: 'ActivityStreamPullServiceImpl'
};

Sys ID

cdfc7439533300100f16ddeeff7b12f7

Offical Documentation

Official Docs: