Is it possible to calculate a first order derivative using the aggregate framework?
For example, I have the data :
{time_series : [10,20,40,70,110]}
I'm trying to obtain an output like:
{derivative : [10,20,30,40]}
The pipeline provides efficient data aggregation using native operations within MongoDB, and is the preferred method for data aggregation in MongoDB. The aggregation pipeline can operate on a sharded collection.
$first selects the first document from each output group: The _id: null group is included. When the accumulator field, $quantity in this example, is missing, $first returns null .
Aggregation in MongoDB allows for the transforming of data and results in a more powerful fashion than from using the find() command. Through the use of multiple stages and expressions, you are able to build a "pipeline" of operations on your data to perform analytic operations.
An aggregation pipeline consists of one or more stages that process documents: Each stage performs an operation on the input documents. For example, a stage can filter documents, group documents, and calculate values. The documents that are output from a stage are passed to the next stage.
db.collection.aggregate(
[
{
"$addFields": {
"indexes": {
"$range": [
0,
{
"$size": "$time_series"
}
]
},
"reversedSeries": {
"$reverseArray": "$time_series"
}
}
},
{
"$project": {
"derivatives": {
"$reverseArray": {
"$slice": [
{
"$map": {
"input": {
"$zip": {
"inputs": [
"$reversedSeries",
"$indexes"
]
}
},
"in": {
"$subtract": [
{
"$arrayElemAt": [
"$$this",
0
]
},
{
"$arrayElemAt": [
"$reversedSeries",
{
"$add": [
{
"$arrayElemAt": [
"$$this",
1
]
},
1
]
}
]
}
]
}
}
},
{
"$subtract": [
{
"$size": "$time_series"
},
1
]
}
]
}
},
"time_series": 1
}
}
]
)
We can use the pipeline above in version 3.4+ to do this.
In the pipeline, we use the $addFields
pipeline stage. operator to add the array of the "time_series"'s elements index to do document, we also reversed the time series array and add it to the document using respectively the $range
and $reverseArray
operators
We reversed the array here because the element at position p
in the array is always greater than the element at position p+1
which means that [p] - [p+1] < 0
and we do not want to use the $multiply
here.(see pipeline for version 3.2)
Next we $zipped
the time series data with the indexes array and applied a substract
expression to the resulted array using the $map
operator.
We then $slice
the result to discard the null/None
value from the array and re-reversed the result.
In 3.2 we can use the $unwind
operator to unwind our array and include the index of each element in the array by specifying a document as operand instead of the traditional "path" prefixed by $.
Next in the pipeline, we need to $group
our documents and use the $push
accumulator operator to return an array of sub-documents that look like this:
{
"_id" : ObjectId("57c11ddbe860bd0b5df6bc64"),
"time_series" : [
{ "value" : 10, "index" : NumberLong(0) },
{ "value" : 20, "index" : NumberLong(1) },
{ "value" : 40, "index" : NumberLong(2) },
{ "value" : 70, "index" : NumberLong(3) },
{ "value" : 110, "index" : NumberLong(4) }
]
}
Finally comes the $project
stage. In this stage, we need to use the $map
operator to apply a series of expression to each element in the the newly computed array in the $group
stage.
Here is what is going on inside the $map
(see $map
as a for loop) in expression:
For each subdocument, we assign the value field to a variable using the $let
variable operator. We then subtract it value from the value of the "value" field of the next element in the array.
Since the next element in the array is the element at the current index plus one, all we need is the help of the $arrayElemAt
operator and a simple $add
ition of the current element's index and 1
.
The $subtract
expression return a negative value so we need to multiply the value by -1
using the $multiply
operator.
We also need to $filter
the resulted array because it the last element is None
or null
. The reason is that when the current element is the last element, $subtract
return None
because the index of the next element equal the size of the array.
db.collection.aggregate([
{
"$unwind": {
"path": "$time_series",
"includeArrayIndex": "index"
}
},
{
"$group": {
"_id": "$_id",
"time_series": {
"$push": {
"value": "$time_series",
"index": "$index"
}
}
}
},
{
"$project": {
"time_series": {
"$filter": {
"input": {
"$map": {
"input": "$time_series",
"as": "el",
"in": {
"$multiply": [
{
"$subtract": [
"$$el.value",
{
"$let": {
"vars": {
"nextElement": {
"$arrayElemAt": [
"$time_series",
{
"$add": [
"$$el.index",
1
]
}
]
}
},
"in": "$$nextElement.value"
}
}
]
},
-1
]
}
}
},
"as": "item",
"cond": {
"$gte": [
"$$item",
0
]
}
}
}
}
}
])
Another option which I think is less efficient is perform a map/reduce operation on our collection using the map_reduce
method.
>>> import pymongo
>>> from bson.code import Code
>>> client = pymongo.MongoClient()
>>> db = client.test
>>> collection = db.collection
>>> mapper = Code("""
... function() {
... var derivatives = [];
... for (var index=1; index<this.time_series.length; index++) {
... derivatives.push(this.time_series[index] - this.time_series[index-1]);
... }
... emit(this._id, derivatives);
... }
... """)
>>> reducer = Code("""
... function(key, value) {}
... """)
>>> for res in collection.map_reduce(mapper, reducer, out={'inline': 1})['results']:
... print(res) # or do something with the document.
...
{'value': [10.0, 20.0, 30.0, 40.0], '_id': ObjectId('57c11ddbe860bd0b5df6bc64')}
You can also retrieve all the document and use the numpy.diff
to return the derivative like this:
import numpy as np
for document in collection.find({}, {'time_series': 1}):
result = np.diff(document['time_series'])
it's a bit dirty, but perhaps something like this?
use test_db
db['data'].remove({})
db['data'].insert({id: 1, time_series: [10,20,40,70,110]})
var mapF = function() {
emit(this.id, this.time_series);
emit(this.id, this.time_series);
};
var reduceF = function(key, values){
var n = values[0].length;
var ret = [];
for(var i = 0; i < n-1; i++){
ret.push( values[0][i+1] - values[0][i] );
}
return {'gradient': ret};
};
var finalizeF = function(key, val){
return val.gradient;
}
db['data'].mapReduce(
mapF,
reduceF,
{ out: 'data_d1', finalize: finalizeF }
)
db['data_d1'].find({})
The "strategy" here is to emit the data to be operated on twice so that it is accessible in the reduce stage, return an object to avoid the message "reduce -> multiple not supported yet" and then filter back the array in the finalizer.
This script then produces:
MongoDB shell version: 3.2.9
connecting to: test
switched to db test_db
WriteResult({ "nRemoved" : 1 })
WriteResult({ "nInserted" : 1 })
{
"result" : "data_d1",
"timeMillis" : 13,
"counts" : {
"input" : 1,
"emit" : 2,
"reduce" : 1,
"output" : 1
},
"ok" : 1
}
{ "_id" : 1, "value" : [ 10, 20, 30, 40 ] }
bye
Alternatively, one could move all the processing into the finalizer (reduceF
is not called here since mapF
is assumed to emit unique keys):
use test_db
db['data'].remove({})
db['data'].insert({id: 1, time_series: [10,20,40,70,110]})
var mapF = function() {
emit(this.id, this.time_series);
};
var reduceF = function(key, values){
};
var finalizeF = function(key, val){
var x = val;
var n = x.length;
var ret = [];
for(var i = 0; i < n-1; i++){
ret.push( x[i+1] - x[i] );
}
return ret;
}
db['data'].mapReduce(
mapF,
reduceF,
{ out: 'data_d1', finalize: finalizeF }
)
db['data_d1'].find({})
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With