Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Grouping consecutive documents with Elasticsearch

Is there a way to make Elasticsearch consider sequence-gaps when grouping?

Provided that the following data was bulk-imported to Elasticsearch:

{ "index": { "_index": "test", "_type": "groupingTest", "_id": "1" } }
{ "sequence": 1, "type": "A" }
{ "index": { "_index": "test", "_type": "groupingTest", "_id": "2" } }
{ "sequence": 2, "type": "A" }
{ "index": { "_index": "test", "_type": "groupingTest", "_id": "3" } }
{ "sequence": 3, "type": "B" }
{ "index": { "_index": "test", "_type": "groupingTest", "_id": "4" } }
{ "sequence": 4, "type": "A" }
{ "index": { "_index": "test", "_type": "groupingTest", "_id": "5" } }
{ "sequence": 5, "type": "A" }

Is there a way to query this data in a way that

  • the documents with sequence number 1 and 2 go to one output group,
  • the document with sequence number 3 goes to another one, and
  • the documents with sequence number 4 and 5 go to a third group?

... considering the fact that the type A sequence is interrupted by a type B item (or any other item that's not type A)?

I would like result buckets to look something like this (name and value for sequence_group may be different - just trying to illustrated the logic):

"buckets": [
    {
       "key": "a",
       "sequence_group": 1,
       "doc_count": 2
    },
    {
       "key": "b",
       "sequence_group": 3,
       "doc_count": 1
    },
    {
       "key": "a",
       "sequence_group": 4,
       "doc_count": 2
    }
]

There is a good description of the problem and some SQL solution-approaches at https://www.simple-talk.com/sql/t-sql-programming/the-sql-of-gaps-and-islands-in-sequences/. I would like to know if there is a solution for elasticsearch available as well.

like image 824
yaccob Avatar asked Sep 15 '25 01:09

yaccob


1 Answers

We can use Scripted Metric Aggregation here which works in a map-reduce fashion (Ref link). It has different parts like init, map, combine and reduce. And, the good thing is that the result of all of these could be a list or map too.

I played around a bit on this.

ElasticSearch version used: 7.1

Creating index:

PUT test
{
  "mappings": {
    "properties": {
      "sequence": {
        "type": "long"
      },
      "type": {
        "type": "text",
        "fielddata": true
      }
    }
  }
}

Bulk indexing: (Note that I removed mapping type 'groupingTest')

POST _bulk
{ "index": { "_index": "test", "_id": "1" } }
{ "sequence": 1, "type": "A" }
{ "index": { "_index": "test", "_id": "2" } }
{ "sequence": 2, "type": "A" }
{ "index": { "_index": "test", "_id": "3" } }
{ "sequence": 3, "type": "B" }
{ "index": { "_index": "test", "_id": "4" } }
{ "sequence": 4, "type": "A" }
{ "index": { "_index": "test", "_id": "5" } }
{ "sequence": 5, "type": "A" }

Query

GET test/_doc/_search
{
  "size": 0,
  "aggs": {
    "scripted_agg": {
      "scripted_metric": {
        "init_script": """ 
          state.seqTypeArr = [];
        """,
        "map_script": """ 
          def seqType = doc.sequence.value + '_' + doc['type'].value;
          state.seqTypeArr.add(seqType);
        """,
        "combine_script": """
          def list = [];
          for(seqType in state.seqTypeArr) {
            list.add(seqType);
          }
          return list;
        """,
        "reduce_script": """ 
          def fullList = [];
          for(agg_value in states) {
            for(x in agg_value) {
              fullList.add(x);
            }
          }
          fullList.sort((a,b) -> a.compareTo(b));
          def result = [];
          def item = new HashMap();
          for(int i=0; i<fullList.size(); i++) {
            def str = fullList.get(i);
            def index = str.indexOf("_");
            def ch = str.substring(index+1);
            def val = str.substring(0, index);
            if(item["key"] == null) {
              item["key"] = ch;
              item["sequence_group"] = val;
              item["doc_count"] = 1;
            } else if(item["key"] == ch) {
              item["doc_count"] = item["doc_count"] + 1;
            } else {
              result.add(item);
              item = new HashMap();
              item["key"] = ch;
              item["sequence_group"] = val;
              item["doc_count"] = 1;
            }
          }
          result.add(item);
          return result;
        """
      }
    }
  }
}

And, finally the output:

{
  "took" : 21,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 5,
      "relation" : "eq"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "scripted_agg" : {
      "value" : [
        {
          "doc_count" : 2,
          "sequence_group" : "1",
          "key" : "a"
        },
        {
          "doc_count" : 1,
          "sequence_group" : "3",
          "key" : "b"
        },
        {
          "doc_count" : 2,
          "sequence_group" : "4",
          "key" : "a"
        }
      ]
    }
  }
}

Please note that scripted aggregation impacts a lot on the performance of the query. So, you might notice some slowness if there is a large no of documents.

like image 102
ms_27 Avatar answered Sep 17 '25 20:09

ms_27