Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to specify document version in elasticsearch pipeline?

I currently use an ingest node pipeline which looks like this:

{
    "my-pipeline": {
        "description": "pipeline for my filebeat",
        "processors": [
            {
                "json": {
                    "field": "message",
                    "add_to_root": true,
                    "on_failure": [
                        {
                            "rename": {
                                "field": "message",
                                "target_field": "originalMessage",
                                "ignore_missing": true
                            }
                        },
                        {
                            "set": {
                                "field": "indexName",
                                "value": "pipeline-errors"
                            }
                        },
                        {
                            "set": {
                                "field": "indexType",
                                "value": "pipeline-error"
                            }
                        },
                        {
                            "rename": {
                                "field": "@timestamp",
                                "target_field": "errorTimestamp",
                                "ignore_missing": true
                            }
                        }
                    ]
                }
            },
            {
                "remove": {
                    "field": "@timestamp",
                    "ignore_failure": true
                }
            },
            {
                "remove": {
                    "field": "message",
                    "ignore_failure": true
                }
            },
            {
                "script": {
                    "inline": "ctx._index = ctx.indexName; ctx._type=ctx.indexType; if (ctx.docVersion != null) {ctx._version = ctx.docVersion; ctx._version_type='external'}"
                }
            },
            {
                "remove": {
                    "field": "indexName",
                    "ignore_failure": true
                }
            },
            {
                "remove": {
                    "field": "indexType",
                    "ignore_failure": true
                }
            }
        ]
    }
}

This pipeline is used simply unbox a log forwarded by filebeat. In the script processor i look for the 'indexName' and 'indexType' fields and assign it to '_index' and '_type' respectively. Since i need to take the version into account, a 'version' field is included in the log (but this is optional as some logs does not contain the version).

Using this pipeline triggers:

org.elasticsearch.index.mapper.MapperParsingException: Cannot generate dynamic mappings of type [_version] for [_version]
    at org.elasticsearch.index.mapper.DocumentParser.createBuilderFromFieldType(DocumentParser.java:656) ~[elasticsearch-5.5.0.jar:5.5.0]
    at org.elasticsearch.index.mapper.DocumentParser.parseDynamicValue(DocumentParser.java:805) ~[elasticsearch-5.5.0.jar:5.5.0]

What i've tried so far (updated 09-16):

  • Replaced the field name to something like 'docVersion' just to be sure that it does not collide if its a keyword. This does not work too
  • Tried to use the ctx._source.version, this would trigger a ScriptException[runtime error]; after all, notice that the _index and _type values come from ctx.indexName and ctx.indexType respectively
  • Tried adding a 'version_type=external' on the script as well;i still get the MapperParsingException as above;
  • Tried using a 'version_type=external_gte' but i got the MapperParsingException as well

How do i specify/use external versioning in elasticsearch documents when using ingester node pipelines? if this is not possible through pipelines' script processor, what are the options to use an external version when working with filebeat-to-elasticsearch in such a way that older version of the document gets rejected?

Update 10-24-2017 Seems that this is a feature that does not exist with the current elasticsearch version (5.6 in my case). As per checking in the code, the IndexRequest in the pipeline execution service does not include any reference to the document version nor version type thus defaulting to an internal version. Perhaps this can be added as a feature in future elasticsearch releases.

like image 488
geneqew Avatar asked Nov 08 '22 15:11

geneqew


1 Answers

The following variables are available through the ctx map: _index, _type, _id, _version, _routing, _parent, _now and _source. You can access the original source for a field as ctx._source.field-name.

It looks the script is trying to access a document field named "version" via ctx.version but that maps to ctx._version.

The internal doc value should be retrieved as ctx._source.version , can you try that?

like image 109
KostasB Avatar answered Nov 14 '22 21:11

KostasB