Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Elasticearch and Spark: Updating existing entities

What is the correct way, when using Elasticsearch with Spark, to update existing entities?

I wanted to something like the following:

  1. Get existing data as a map.
  2. Create a new map, and populate it with the updated fields.
  3. Persist the new map.

However, there are several issues:

  1. The list of returned fields cannot contain the _id, as it is not part of the source.
  2. If, for testing, I hardcode an existing _id in the map of new values, the following exception is thrown:

    org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest

How should the _id be retrieved, and how should it be passed back to Spark?

I include the following code below to better illustrate what I was trying to do:

JavaRDD<Map<String, Object>> esRDD = JavaEsSpark.esRDD(jsc, INDEX_NAME+"/"+TYPE_NAME, 
"?source=,field1,field2).values();

Iterator<Map<String, Object>> iter = esRDD.toLocalIterator();
List<Map<String, Object>> listToPersist = new ArrayList<Map<String, Object>>();
while(iter.hasNext()){
   Map<String, Object> map = iter.next();
   // Get existing values, and do transformation logic

   Map<String, Object> newMap = new HashMap<String, Object>();
   newMap.put("_id", ??????);
   newMap.put("field1", new_value);
   listToPersist.add(newMap);
}
JavaRDD javaRDD = jsc.parallelize(ImmutableList.copyOf(listToPersist));
JavaEsSpark.saveToEs(javaRDD, INDEX_NAME+"/"+TYPE_NAME); 

Ideally, I would want to update the existing map in place, rather than create a new one.

Does anyone have any example code to show, when using Spark, the correct way to update existing entities in elasticsearch?

Thanks

like image 958
user1052610 Avatar asked Nov 09 '22 11:11

user1052610


1 Answers

This is how I've done it (Scala/Spark 2.3/Elastic-Hadoop v6.5).

To read (id or other metadata):

spark
    .read
    .format("org.elasticsearch.spark.sql")
    .option("es.read.metadata",true) // allow to read metadata
    .load("yourindex/yourtype")
    .select(col("_metadata._id").as("myId"),...)

To update particular columns in ES:

myDataFrame
    .select("myId","columnToUpdate")
    .saveToEs(
        "yourindex/yourtype",
        Map(
            "es.mapping.id" -> "myId",
            "es.write.operation" -> "update", // important to change operation to partial update
            "es.mapping.exclude" -> "myId"
        )
    )
like image 195
abdolence Avatar answered Nov 15 '22 09:11

abdolence