Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

MongoDb Aggregation - Splitting into time buckets

Is it possible to use the MongoDB aggregation framework to generate a time series output where any source documents that are deemed to fall within each bucket are added to that bucket?

Say my collection looks something like this:

/*light_1 on from 10AM to 1PM*/
{
    "_id" : "light_1",
    "on" : ISODate("2015-01-01T10:00:00Z"),
    "off" : ISODate("2015-01-01T13:00:00Z"),

},
/*light_2 on from 11AM to 7PM*/
{
    "_id" : "light_2",
    "on" : ISODate("2015-01-01T11:00:00Z"),
    "off" : ISODate("2015-01-01T19:00:00Z")
}

..and I am using a 6 hour bucket interval to generate a report for 2015-01-01. I wish my result to look something like:

    {
        "start"         : ISODate("2015-01-01T00:00:00Z"),
        "end"           : ISODate("2015-01-01T06:00:00Z"),
        "lights"        : []
    },
    {
        "start"         : ISODate("2015-01-01T06:00:00Z"),
        "end"           : ISODate("2015-01-01T12:00:00Z"),
        "lights_on"     : ["light_1", "light_2"]
    },
    {
        "start"         : ISODate("2015-01-01T12:00:00Z"),
        "end"           : ISODate("2015-01-01T18:00:00Z"),
        "lights_on"     : ["light_1", "light_2"]
    },
    {
        "start"         : ISODate("2015-01-01T18:00:00Z"),
        "end"           : ISODate("2015-01-02T00:00:00Z"),
        "lights_on"     : ["light_2"]
    }

a light is considered to be 'on' during a range if its 'on' value < the bucket 'end' AND its 'off' value >= the bucket 'start'

I know I can use $group and the aggregation date operators to group by a either start or end time, but in that case, it's a one-to-one mapping. Here, a single source document may make it into several time buckets if it spans several buckets.

The report range and interval span are not known until run-time.

like image 540
David Black Avatar asked Jul 29 '15 09:07

David Black


2 Answers

Introduction

Your goal here demands a bit of thinking about the considerations for when to record the events as you have them structured into the given time period aggregations. The obvious point being that one single document as you have them represented can actually represent events to be reported in "multiple" time periods in the end aggregated result.

This turns out on analysis to be a problem that is outside of the scope of the aggregation framework due to the time periods that to are looking for. Certain events need to be "generated" outside of what can be just grouped on, which you should be able to see.

In order to do this "generataion", you need mapReduce. This has the "flow control" via JavaScript as it's processing language to be able to essentially determine whether the time betweeen on/off crosses more than one period and therefore emit the data that it occurred in more than one of those periods.

As a side note, the "light" is probably not best suited to the _id since it can possibly be turned on/off many times in a given day. So the "instance" of on/off is likely better. However I am just following your example here, so to tranpose this then just replace the reference to _id within the mapper code with whatever actual field represents the light's identifier.

But onto the code:

// start date and next date for query ( should be external to main code )
var oneHour = ( 1000 * 60 * 60 ),
    sixHours = ( oneHour * 6 ),
    oneDay = ( oneHour * 24 ),
    today = new Date("2015-01-01"),               // your input
    tomorrow = new Date( today.valueOf() + oneDay ),
    yesterday = new Date( today.valueOf() - sixHours ),
    nextday = new Date( tomorrow.valueOf() + sixHours);

// main logic
db.collection.mapReduce(
    // mapper to emit data
    function() {
        // Constants and round date to hour
        var oneHour = ( 1000 * 60 * 60 )
            sixHours = ( oneHour * 6 )
            startPeriod = new Date( this.on.valueOf() 
              - ( this.on.valueOf() % oneHour )),
            endPeriod = new Date( this.off.valueOf()
              - ( this.off.valueOf() % oneHour ));

        // Hour to 6 hour period and convert to UTC timestamp
        startPeriod = startPeriod.setUTCHours( 
            Math.floor( startPeriod.getUTCHours() / 6) * 6 );
        endPeriod = endPeriod.setUTCHours( 
            Math.floor( endPeriod.getUTCHours() / 6) * 6 );

        // Init empty reults for each period only on first document processed
        if ( counter == 0 ) {
            for ( var x = startDay.valueOf(); x < endDay.valueOf(); x+= sixHours ) {
                emit(
                    { start: new Date(x), end: new Date(x + sixHours) },
                    { lights_on: [] }
                );
            }
        }

        // Emit for every period until turned off only within the day
        for ( var x = startPeriod; x <= endPeriod; x+= sixHours ) {
           if ( ( x >= startDay ) && ( x < endDay ) ) {
               emit(
                   { start: new Date(x), end: new Date(x + sixHours)  },
                   { lights_on: [this._id] }
               );
           }
        }
        counter++;
    },

    // reducer to keep all lights in one array per period
    function(key,values) {
        var result = { lights_on: [] };
        values.forEach(function(value) {
            value.lights_on.forEach(function(light){
                if ( result.lights_on.indexOf(light) == -1 )
                    result.lights_on.push(light);
            });
        });
        result.lights_on.sort();
        return result;
    },

    // options and query
    { 
        "out": { "inline": 1 },
        "query": {
            "on": { "$gte": yesterday, "$lt": tomorrow }, 
            "$or": [
                { "off": { "$gte:"  today, "$lt": nextday } },
                { "off": null },
                { "off": { "$exists": false } }
            ]
        },
        "scope": { 
            "startDay": today,
            "endDay": tomorrow,
            "counter": 0
        }
    }
)

Map and Reduce

In essence, the "mapper" function looks at the current record, rounds each on/off time to hours and then works out the start hour of which six hour period the event occurred in.

With those new date values a loop is initiated to take the starting "on" time and emit an event for the current "light" being turned on during that period, within a single element array as explained later. Each loop increments the start period by six hours until the end "light off" time is reached.

These appear in the reducer function, which requires the same expected input that it will return, so hence the array of lights turned on in the period within the value object. It processes the emitted data under the same key as a list of these value objects.

First iterate the list of values to reduce, then looking at the inner array of lights, which could have come from a previous reduce pass, and processing each of those into a singular result array of unique lights. Simply done by looking for the current light value within the result array and pushing to that array where it does not exist.

Note the "previous pass", as if you are not familiar with how mapReduce works, then you should understand that the reducer function itself emits a result that might not have been achived by processing "all" of the possible values for the "key" in a single pass. It can and often does only process a "sub-set" of the emitted data for a key, and therefore will take a "reduced" result as input in just the same way as the data is emitted from the mapper.

That point of design is why both the mapper and reducer need to output the data with the same structure, as the reducer itself can also get it's input from data that has been previously reduced. This is how mapReduce deals with large data sets emitting a large number of the same key values. It processes typically in "chunks" and not all at once.

The end reduction comes down to the list of lights turned on during the period with each period start and end as the emitted key. Like this:

    {
        "_id": {
            "start": ISODate("2015-01-01T06:00:00Z"),
            "end": ISODate("2015-01-01T12:00:00Z")
        },
        {
            "result": {
                "lights_on": [ "light_1", "light_2" ]
            }
        }
    },

That "_id", "result" structure is just a property of how all mapReduce output comes out, but the desired values are all there.

Query

Now there is also a note on the query selection here which needs to take into account that a light could already be "on" via its collection entry at a date before the start of the current day. The same is true that it can be turned "off" after the current date being reported on as well, and may in fact either have a null value or no "off" key in the document depending on how your data is being stored and what day is actually being observed.

That logic creates some required calculation from the start of the day to be reported on and consider the six hour period both before and after that date with query conditions as listed:

        {
            "on": { "$gte": yesterday, "$lt": tomorrow }, 
            "$or": [
                { "off": { "$gte:"  today, "$lt": nextday } },
                { "off": null },
                { "off": { "$exists": false } }
            ]
        }

The basic selectors there use the range operators of $gte and $lt to find the values that are greater than or equal to and less than respectively on the fields that they are testing the values of in order to find the data within a suitable range.

Within the $or condition, the various possibilities for the "off" value are considered. Either being that it falls within the range criteria, or either has a null value or possibly no key present in the document at all via the $exists operator. It depends on how you actually represent "off" where a light has not yet been turned off as to the requirements of those conditions within $or, but these would be the reasonable assumptions.

Like all MongoDB queries, all conditions are implicity an "AND" conditon unless stated otherwise.

That is still somewhat flawed depending on how long a light is possibly expected to be turned on for. But the variables are all intentionally listed externally for adjustment to your needs, with consideration to the expected duration to fetch either before or after the date to be reported on.

Creating Empty Time Series

The other note here is that the data itself is likely not to have any events that show a light turned on within a given time period. For that reason, there is a simple method embedded in the mapper function that looks to see if we are on the first iteration of results.

On that first time only, a set of the possible period keys is emitted that includes an empty array for the lights turned on in each period. This allows the reporting to also show those periods where no light was on at all as this is inserted into the data sent to the reducer and output.

You may vary on this approach, as it is still dependant on there being some data that meets the query criteria in order to output anything. So to cater for a truly "blank day" where no data is recorded or meets the criteria, then it might be better to create an external hash table of the keys all showing an empty result for the lights. Then to just "merge" the result of the mapReduce operation into those pre-existing keys to produce the report.

Summary

There are a number of calculations here on the dates, and being unaware of the actual end language implementation I am just declaring anything that works externally to the actual mapReduce operation seperately. So anything that looks like duplication here is done to that intent, making that part of the logic language independant. Most programming languages support the capabilities to manipulate the dates as per the methods used.

The inputs that are then all language specific are passed in as the options block shown as the last argument to the mapReduce method here. Notably there is the query with it's paramterized values that are all calculated from the date to be reported on. Then there is the "scope", which is a way to pass in values that can be used by the functions within the mapReduce operation.

With those things considered, the JavaScript code of the mapper and reducer remains unaltered, as that is what is expected by the method as input. Any variables to the process are fed by both the scope and query results in order to get the outcome without changing that code.

It is mainly therefore that because the duration of a "light being on" can span over different periods to be reported on, that this becomes something the aggregation framework is not designed to do. It cannot perform the "looping" and "emission of data" that is required to get to the result, and therefore why we use mapReduce for this task instead.

That said, great question. I don't know if you considered the concepts of how to acheive the results here already, but at least now there is a guide for someone approaching a similar problem.

like image 68
Blakes Seven Avatar answered Oct 21 '22 13:10

Blakes Seven


I originally misunderstood your question. Assuming I understand what you need now, this looks more like a job for map-reduce. I am not sure how you are determining the range or the interval span, so I will make these constants, you can modify that section of code properly. You could do something like this:

var mapReduceObj = {};

mapReduceObj.map = function() {
    var start = new Date("2015-01-01T00:00:00Z").getTime(),
    end = new Date("2015-01-02T00:00:00Z").getTime(),
    interval = 21600000;                     //6 hours in milliseconds

    var time = start;
    while(time < end) {
        var endtime = time + interval;
        if(this.on < endtime && this.off >= time) {
            emit({start : new Date(time), end : new Date(endtime)}, [this._id]);
            break;
        }

        time = endtime;
    }
};

mapReduceObj.reduce = function(times, light_ids) {
    var lightsArr = {lights : []};

    for(var i = 0; i < light_ids.length; i++) {
        lightsArr.lights.push(light_ids[i]);
    }

    return lightsArr;
};

The result will have the following form:

results :    {
    _id     :    {
        start   :   ISODate("2015-01-01T06:00:00Z"),
        end     :   ISODate("2015-01-01T12:00:00Z")
    },
    value   :    {
        lights  :    [
            "light_6",
            "light_7"
        ]
    },
    ...
}

~~~Original Answer~~~

This should give you the exact format that you want.

db.lights.aggregate([
    { "$match": {
        "$and": [ 
            { on  : { $lt : ISODate("2015-01-01T06:00:00Z") } },
            { off : { $gte: ISODate("2015-01-01T12:00:00Z") } }
        ]
    }},
    { "$group": {
        _id         :   null,
        "lights_on" : {$push : "$_id"}
    }},
    { "$project": {
        _id     :    false,
        start   :    { $add : ISODate("2015-01-01T06:00:00Z") },
        end     :    { $add : ISODate("2015-01-01T12:00:00Z") },
        lights_on:   true
    }}
]);

First, the $match condition finds all documents that meet your time constraints. Then $group pushes the _id field (in this case, light_n where n is an integer) into the lights_on field. Either $addToSet or $push could be used since the _id field is unique, but if you were using a field that could have duplicates you would need to decide if duplicates in the array were acceptable. Finally, use $project to get the exact format you want.

like image 30
c1moore Avatar answered Oct 21 '22 13:10

c1moore