Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to upsert into elasticsearch in spark?

With HTTP POST, the following script can insert a new field createtime or update lastupdatetime:

curl -XPOST 'localhost:9200/test/type1/1/_update' -d '{
"doc": {
    "lastupdatetime": "2015-09-16T18:00:00"
}
"upsert" : {
    "createtime": "2015-09-16T18:00:00"
    "lastupdatetime": "2015-09-16T18:00",
}
}'

But in spark script, after setting "es.write.operation": "upsert", i don't know how to insert createtime at all. There is only es.update.script.* in the official document... So, can anyone give me an example?

UPDATE: In my case, i want to save the information of android devices from log into one elasticsearch type, and set it's first appearance time as createtime. If the device appear again, i only update the lastupdatetime, but leave the createtime as it was.

So the document id is android ID, if the id exists, update lastupdatetime, else insert createtime and lastupdatetime.So the setting here is(in python):

conf = {
    "es.resource.write": "stats-device/activation",
    "es.nodes": "NODE1:9200",
    "es.write.operation": "upsert",
    "es.mapping.id": "id"
    # ???
}

rdd.saveAsNewAPIHadoopFile(
    path='-',
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
    keyClass="org.apache.hadoop.io.NullWritable",
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
    conf=conf
)

I just don't know how to insert a new field if the id not exist.

like image 860
Terran Avatar asked Oct 19 '22 02:10

Terran


2 Answers

Without seeing your Spark script, it will be hard to give a detailed answer. But in general you will want to use elasticsearch-hadoop (so you'll need to add that dependency to your Build.sbt file, e.g.) and then in your script you can:

import org.elasticsearch.spark._ 
val documents = sc.parallelize(Seq(Map(
                                   "id" -> 1, 
                                   "createtime" -> "2015-09-16T18:00:00"
                                   "lastupdatetime" -> "2015-09-16T18:00"),
                                  Map(<next document>), ...)
                   .saveToEs("test/type1", Map("es.mapping.id" -> "id"))

as per the official docs. The second argument to saveToES specifies which key in your RDD of Maps to use as the ElasticSearch document id.

Of course, if you're doing this with Spark it implies you've got more rows than you'll want to type out by hand, so for your case you'd need to transform your data into an RDD of Maps from key -> value within your script. But without knowing the data sources I can't go into much more detail.

like image 78
Metropolis Avatar answered Oct 24 '22 14:10

Metropolis


Finally, i got a solution which is not perfect:

  1. add createtime to all source doc;
  2. save to es with create method and ignore already created error;
  3. remove createtime field;
  4. save to es again with update method;

For now(2015-09-27), step 2 can be implemented by this patch.

like image 23
Terran Avatar answered Oct 24 '22 13:10

Terran