With HTTP POST, the following script can insert a new field createtime
or update lastupdatetime
:
curl -XPOST 'localhost:9200/test/type1/1/_update' -d '{
"doc": {
"lastupdatetime": "2015-09-16T18:00:00"
}
"upsert" : {
"createtime": "2015-09-16T18:00:00"
"lastupdatetime": "2015-09-16T18:00",
}
}'
But in spark script, after setting "es.write.operation": "upsert"
, i don't know how to insert createtime
at all. There is only es.update.script.*
in the official document... So, can anyone give me an example?
UPDATE: In my case, i want to save the information of android devices from log into one elasticsearch type, and set it's first appearance time as createtime
. If the device appear again, i only update the lastupdatetime
, but leave the createtime
as it was.
So the document id
is android ID, if the id exists, update lastupdatetime
, else insert createtime
and lastupdatetime
.So the setting here is(in python):
conf = {
"es.resource.write": "stats-device/activation",
"es.nodes": "NODE1:9200",
"es.write.operation": "upsert",
"es.mapping.id": "id"
# ???
}
rdd.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=conf
)
I just don't know how to insert a new field if the id
not exist.
Without seeing your Spark script, it will be hard to give a detailed answer. But in general you will want to use elasticsearch-hadoop (so you'll need to add that dependency to your Build.sbt file, e.g.) and then in your script you can:
import org.elasticsearch.spark._
val documents = sc.parallelize(Seq(Map(
"id" -> 1,
"createtime" -> "2015-09-16T18:00:00"
"lastupdatetime" -> "2015-09-16T18:00"),
Map(<next document>), ...)
.saveToEs("test/type1", Map("es.mapping.id" -> "id"))
as per the official docs. The second argument to saveToES specifies which key in your RDD of Maps to use as the ElasticSearch document id.
Of course, if you're doing this with Spark it implies you've got more rows than you'll want to type out by hand, so for your case you'd need to transform your data into an RDD of Maps from key -> value within your script. But without knowing the data sources I can't go into much more detail.
Finally, i got a solution which is not perfect:
createtime
to all source doc;create
method and ignore already created error;createtime
field;update
method;For now(2015-09-27), step 2 can be implemented by this patch.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With