What is the correct way, when using Elasticsearch with Spark, to update existing entities?
I wanted to something like the following:
However, there are several issues:
If, for testing, I hardcode an existing _id
in the map of new values, the following exception is thrown:
org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest
How should the _id
be retrieved, and how should it be passed back to Spark?
I include the following code below to better illustrate what I was trying to do:
JavaRDD<Map<String, Object>> esRDD = JavaEsSpark.esRDD(jsc, INDEX_NAME+"/"+TYPE_NAME,
"?source=,field1,field2).values();
Iterator<Map<String, Object>> iter = esRDD.toLocalIterator();
List<Map<String, Object>> listToPersist = new ArrayList<Map<String, Object>>();
while(iter.hasNext()){
Map<String, Object> map = iter.next();
// Get existing values, and do transformation logic
Map<String, Object> newMap = new HashMap<String, Object>();
newMap.put("_id", ??????);
newMap.put("field1", new_value);
listToPersist.add(newMap);
}
JavaRDD javaRDD = jsc.parallelize(ImmutableList.copyOf(listToPersist));
JavaEsSpark.saveToEs(javaRDD, INDEX_NAME+"/"+TYPE_NAME);
Ideally, I would want to update the existing map in place, rather than create a new one.
Does anyone have any example code to show, when using Spark, the correct way to update existing entities in elasticsearch?
Thanks
This is how I've done it (Scala/Spark 2.3/Elastic-Hadoop v6.5).
To read (id or other metadata):
spark
.read
.format("org.elasticsearch.spark.sql")
.option("es.read.metadata",true) // allow to read metadata
.load("yourindex/yourtype")
.select(col("_metadata._id").as("myId"),...)
To update particular columns in ES:
myDataFrame
.select("myId","columnToUpdate")
.saveToEs(
"yourindex/yourtype",
Map(
"es.mapping.id" -> "myId",
"es.write.operation" -> "update", // important to change operation to partial update
"es.mapping.exclude" -> "myId"
)
)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With