Extracting Data from Streams with Tasks

You can continuously extract data from Streams using a Task. By using a Task you define JavaScript code that uses the Cloud Action API to access Events in Streams and define the periodicity at which to execute that code.

Follow the steps below to create a Task that periodically aggregates weather Events in a Stream to collect the average dewpoint. For example, a weather Event collected from an Octave edge device might look as follows:

{
   "creationDate":1611670509340,
   "creatorId":"i000000000000000000000001",
   "elems":{
      "alert":1,
      "dewpoint":9.8,
      "humidity":42.8,
      "humidity_th":10,
      "temperature":23.2,
      "temperature_th":10
   },
   "generatedDate":1611670507224,
   "hash":null,
   "id":"e601...",
   "lastEditDate":1611670509340,
   "lastEditorId":"i000000000000000000000001",
   "location":null,
   "metadata":{
      
   },
   "path":"/my_company/devices/fx30_demo_3/:default",
   "streamId":"s5e66...",
   "tags":{
      
   }
}

Note the streamId value that identifies the Event's Stream. This will be used in Step 2 below.

Based on the Event structure above, follow the steps below to aggregate these Events:

  1. Create a Task.
  2. Within the Task's JavaScript function invoke Octave's Octave.Event.aggregate() cloud API aggregate Events on a Stream. In this example, the Stream ID starting with s5e66... that is passed into Octave.Event.aggregate() corresponds to the Stream containing the weather Events:
function() {
	
var myAggregationResults = Octave.Event.aggregate('s5e66...', {
"filter": "AGE < 6000000",
"rules": {"x":"elems.temperature < 24"},
"groupBy": ["$minute"],
"output": [
"$avg:elem.dewpoint",
"$max:elems.dewpoint",
"$min:elems.dewpoint",
"$avg:x",
"$count"
],
"sorts":["$count:asc"]
});

console.log(myAggregationResults)

}

When the Task executes, it echos the aggregation results to the console.