Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Compute first order derivative with MongoDB aggregation framework

Is it possible to calculate a first order derivative using the aggregate framework?

For example, I have the data :

{time_series : [10,20,40,70,110]}

I'm trying to obtain an output like:

{derivative : [10,20,30,40]}
like image 589
user666 Avatar asked Aug 15 '16 15:08

user666


People also ask

Which aggregation method is preferred for use by MongoDB?

The pipeline provides efficient data aggregation using native operations within MongoDB, and is the preferred method for data aggregation in MongoDB. The aggregation pipeline can operate on a sharded collection.

What does $first do in MongoDB?

$first selects the first document from each output group: The _id: null group is included. When the accumulator field, $quantity in this example, is missing, $first returns null .

What is MongoDB aggregation framework?

Aggregation in MongoDB allows for the transforming of data and results in a more powerful fashion than from using the find() command. Through the use of multiple stages and expressions, you are able to build a "pipeline" of operations on your data to perform analytic operations.

What passes through a MongoDB aggregation pipeline?

An aggregation pipeline consists of one or more stages that process documents: Each stage performs an operation on the input documents. For example, a stage can filter documents, group documents, and calculate values. The documents that are output from a stage are passed to the next stage.


2 Answers

db.collection.aggregate(
    [
      {
        "$addFields": {
          "indexes": {
            "$range": [
              0,
              {
                "$size": "$time_series"
              }
            ]
          },
          "reversedSeries": {
            "$reverseArray": "$time_series"
          }
        }
      },
      {
        "$project": {
          "derivatives": {
            "$reverseArray": {
              "$slice": [
                {
                  "$map": {
                    "input": {
                      "$zip": {
                        "inputs": [
                          "$reversedSeries",
                          "$indexes"
                        ]
                      }
                    },
                    "in": {
                      "$subtract": [
                        {
                          "$arrayElemAt": [
                            "$$this",
                            0
                          ]
                        },
                        {
                          "$arrayElemAt": [
                            "$reversedSeries",
                            {
                              "$add": [
                                {
                                  "$arrayElemAt": [
                                    "$$this",
                                    1
                                  ]
                                },
                                1
                              ]
                            }
                          ]
                        }
                      ]
                    }
                  }
                },
                {
                  "$subtract": [
                    {
                      "$size": "$time_series"
                    },
                    1
                  ]
                }
              ]
            }
          },
          "time_series": 1
        }
      }
    ]
)

We can use the pipeline above in version 3.4+ to do this. In the pipeline, we use the $addFields pipeline stage. operator to add the array of the "time_series"'s elements index to do document, we also reversed the time series array and add it to the document using respectively the $range and $reverseArray operators

We reversed the array here because the element at position p in the array is always greater than the element at position p+1 which means that [p] - [p+1] < 0 and we do not want to use the $multiply here.(see pipeline for version 3.2)

Next we $zipped the time series data with the indexes array and applied a substract expression to the resulted array using the $map operator.

We then $slice the result to discard the null/None value from the array and re-reversed the result.


In 3.2 we can use the $unwind operator to unwind our array and include the index of each element in the array by specifying a document as operand instead of the traditional "path" prefixed by $.

Next in the pipeline, we need to $group our documents and use the $push accumulator operator to return an array of sub-documents that look like this:

{
    "_id" : ObjectId("57c11ddbe860bd0b5df6bc64"),
    "time_series" : [
        { "value" : 10, "index" : NumberLong(0) },
        { "value" : 20, "index" : NumberLong(1) },
        { "value" : 40, "index" : NumberLong(2) },
        { "value" : 70, "index" : NumberLong(3) },
        { "value" : 110, "index" : NumberLong(4) }
    ]
}

Finally comes the $project stage. In this stage, we need to use the $map operator to apply a series of expression to each element in the the newly computed array in the $group stage.

Here is what is going on inside the $map (see $map as a for loop) in expression:

For each subdocument, we assign the value field to a variable using the $let variable operator. We then subtract it value from the value of the "value" field of the next element in the array.

Since the next element in the array is the element at the current index plus one, all we need is the help of the $arrayElemAt operator and a simple $addition of the current element's index and 1.

The $subtract expression return a negative value so we need to multiply the value by -1 using the $multiply operator.

We also need to $filter the resulted array because it the last element is None or null. The reason is that when the current element is the last element, $subtract return None because the index of the next element equal the size of the array.

db.collection.aggregate([
  {
    "$unwind": {
      "path": "$time_series",
      "includeArrayIndex": "index"
    }
  },
  {
    "$group": {
      "_id": "$_id",
      "time_series": {
        "$push": {
          "value": "$time_series",
          "index": "$index"
        }
      }
    }
  },
  {
    "$project": {
      "time_series": {
        "$filter": {
          "input": {
            "$map": {
              "input": "$time_series",
              "as": "el",
              "in": {
                "$multiply": [
                  {
                    "$subtract": [
                      "$$el.value",
                      {
                        "$let": {
                          "vars": {
                            "nextElement": {
                              "$arrayElemAt": [
                                "$time_series",
                                {
                                  "$add": [
                                    "$$el.index",
                                    1
                                  ]
                                }
                              ]
                            }
                          },
                          "in": "$$nextElement.value"
                        }
                      }
                    ]
                  },
                  -1
                ]
              }
            }
          },
          "as": "item",
          "cond": {
            "$gte": [
              "$$item",
              0
            ]
          }
        }
      }
    }
  }
])

Another option which I think is less efficient is perform a map/reduce operation on our collection using the map_reduce method.

>>> import pymongo
>>> from bson.code import Code
>>> client = pymongo.MongoClient()
>>> db = client.test
>>> collection = db.collection
>>> mapper = Code("""
...               function() {
...                 var derivatives = [];
...                 for (var index=1; index<this.time_series.length; index++) {
...                   derivatives.push(this.time_series[index] - this.time_series[index-1]);
...                 }
...                 emit(this._id, derivatives);
...               }
...               """)
>>> reducer = Code("""
...                function(key, value) {}
...                """)
>>> for res in collection.map_reduce(mapper, reducer, out={'inline': 1})['results']:
...     print(res)  # or do something with the document.
... 
{'value': [10.0, 20.0, 30.0, 40.0], '_id': ObjectId('57c11ddbe860bd0b5df6bc64')}

You can also retrieve all the document and use the numpy.diff to return the derivative like this:

import numpy as np


for document in collection.find({}, {'time_series': 1}):
    result = np.diff(document['time_series']) 
like image 115
styvane Avatar answered Oct 12 '22 12:10

styvane


it's a bit dirty, but perhaps something like this?

use test_db
db['data'].remove({})
db['data'].insert({id: 1, time_series: [10,20,40,70,110]})

var mapF = function() {
    emit(this.id, this.time_series);
    emit(this.id, this.time_series);
};

var reduceF = function(key, values){
    var n = values[0].length;
    var ret = [];
    for(var i = 0; i < n-1; i++){
        ret.push( values[0][i+1] - values[0][i] );
    }
    return {'gradient': ret};
};

var finalizeF = function(key, val){
    return val.gradient;
}

db['data'].mapReduce(
    mapF,
    reduceF,
    { out: 'data_d1', finalize: finalizeF }
)

db['data_d1'].find({})

The "strategy" here is to emit the data to be operated on twice so that it is accessible in the reduce stage, return an object to avoid the message "reduce -> multiple not supported yet" and then filter back the array in the finalizer.

This script then produces:

MongoDB shell version: 3.2.9
connecting to: test
switched to db test_db
WriteResult({ "nRemoved" : 1 })
WriteResult({ "nInserted" : 1 })
{
    "result" : "data_d1",
        "timeMillis" : 13,
        "counts" : {
            "input" : 1,
            "emit" : 2,     
            "reduce" : 1,           
            "output" : 1                    
        },                                      
        "ok" : 1                                    
}                                                   
{ "_id" : 1, "value" : [ 10, 20, 30, 40 ] }         
bye

Alternatively, one could move all the processing into the finalizer (reduceF is not called here since mapF is assumed to emit unique keys):

use test_db
db['data'].remove({})
db['data'].insert({id: 1, time_series: [10,20,40,70,110]})

var mapF = function() {
    emit(this.id, this.time_series);
};

var reduceF = function(key, values){
};

var finalizeF = function(key, val){
    var x = val;
    var n = x.length;

    var ret = [];
    for(var i = 0; i < n-1; i++){
        ret.push( x[i+1] - x[i] );
    }
    return ret;
}

db['data'].mapReduce(
    mapF,
    reduceF,
    { out: 'data_d1', finalize: finalizeF }
)

db['data_d1'].find({})
like image 29
ewcz Avatar answered Oct 12 '22 14:10

ewcz