Question in short: if I have an aggregation for a top_hits per bucket, how do I sum a specific value in the resulting structure?
Details:
I have a number of records that contain per store a certain quantity. I want to get the sum of all latest record per store.
To get the latest record per store, I create the following aggregation:
"latest_quantity_per_store": {
"aggs": {
"latest_quantity": {
"top_hits": {
"sort": [
{
"datetime": "desc"
},
{
"quantity": "asc"
}
],
"_source": {
"includes": [
"quantity"
]
},
"size": 1
}
}
},
"terms": {
"field": "store",
"size": 10000
}
}
Suppose I have two stores, and two quantities per store for two different timestamps. This is the result of that aggregation:
"latest_quantity_per_store": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "01",
"doc_count": 2,
"latest_quantity": {
"hits": {
"total": 2,
"max_score": null,
"hits": [
{
"_index": "inventory-local",
"_type": "doc",
"_id": "O6wFD2UBG8e7nvSU8dYg",
"_score": null,
"_source": {
"quantity": 6
},
"sort": [
1532476800000,
6
]
}
]
}
}
},
{
"key": "02",
"doc_count": 2,
"latest_quantity": {
"hits": {
"total": 2,
"max_score": null,
"hits": [
{
"_index": "inventory-local",
"_type": "doc",
"_id": "pLUFD2UBHBuSGcoH0ZT4",
"_score": null,
"_source": {
"quantity": 11
},
"sort": [
1532476800000,
11
]
}
]
}
}
}
]
}
I now want to have an aggregation in ElasticSearch that takes the sum over these buckets. In the example data, the sum over 6 and 11. I tried the following aggregation:
"latest_quantity": {
"sum_bucket": {
"buckets_path": "latest_quantity_per_store>latest_quantity>hits>hits>_source>quantity"
}
}
But this results in this error:
{
"error": {
"root_cause": [
{
"type": "illegal_argument_exception",
"reason": "No aggregation [hits] found for path [latest_quantity_per_store>latest_quantity>hits>hits>_source>quantity]"
}
],
"type": "search_phase_execution_exception",
"reason": "all shards failed",
"phase": "query",
"grouped": true,
"failed_shards": [
{
"shard": 0,
"index": "inventory-local",
"node": "3z5CqmmAQ-yT2sUCb69DzA",
"reason": {
"type": "illegal_argument_exception",
"reason": "No aggregation [hits] found for path [latest_quantity_per_store>latest_quantity>hits>hits>_source>quantity]"
}
}
]
},
"status": 400
}
What would be the correct aggregation to get somehow the number 17 back from ElasticSearch?
I did something similar for another aggregation that I had, an average instead of a top_hits aggregation.
"average_quantity": {
"sum_bucket": {
"buckets_path": "average_quantity_per_store>average_quantity"
}
},
"average_quantity_per_store": {
"aggs": {
"average_quantity": {
"avg": {
"field": "quantity"
}
}
},
"terms": {
"field": "store",
"size": 10000
}
}
This works as expected, this is the result:
"average_quantity_per_store": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "01",
"doc_count": 2,
"average_quantity": {
"value": 6
}
},
{
"key": "02",
"doc_count": 2,
"average_quantity": {
"value": 11.5
}
}
]
},
"average_quantity": {
"value": 17.5
}
There's a way to solve this using a mix of scripted_metric
aggregation and sum_bucket
pipeline aggregation. The scripted metric aggregation is a bit complex, but to the main idea is to allow you to provide your own bucketing algorithm and spit out a single metric figure out of it.
In your case, what you want to do is to figure out the latest quantity for each store and then sum those store quantities. The solution looks like this, I'll explain some details below:
POST inventory-local/_search
{
"size": 0,
"aggs": {
"bystore": {
"terms": {
"field": "store.keyword",
"size": 10000
},
"aggs": {
"latest_quantity": {
"scripted_metric": {
"init_script": "params._agg.quantities = new TreeMap()",
"map_script": "params._agg.quantities.put(doc.datetime.date, [doc.datetime.date.millis, doc.quantity.value])",
"combine_script": "return params._agg.quantities.lastEntry().getValue()",
"reduce_script": "def maxkey = 0; def qty = 0; for (a in params._aggs) {def currentKey = a[0]; if (currentKey > maxkey) {maxkey = currentKey; qty = a[1]} } return qty;"
}
}
}
},
"sum_latest_quantities": {
"sum_bucket": {
"buckets_path": "bystore>latest_quantity.value"
}
}
}
}
Note that in order for this to work, you need to set script.painless.regex.enabled: true
in your elasticsearch.yml
configuration file.
The init_script
creates a TreeMap
for each shard.
The map_script
populates the TreeMap
on each shard with mappings of date/quantities. The value that we put in the map contains the timestamp and the quantity in a single string. We'll need that timestamp later in the reduce_script
.
The combine_script
simply takes the last value of the TreeMap
since this is the latest quantity for the given shard.
The bulk of the work is located in the reduce_script
. We iterate all latest quantities for each shard and return the latest one.
At this point, we have the latest quantity for each store. All what remains to be done is to use a sum_bucket
pipeline aggregation in order to sum each store quantity. And there you have the result of 17.
The response looks like this:
"aggregations": {
"bystore": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "01",
"doc_count": 2,
"latest_quantity": {
"value": 6
}
},
{
"key": "02",
"doc_count": 2,
"latest_quantity": {
"value": 11
}
}
]
},
"sum_latest_quantities": {
"value": 17
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With