Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Dataframe upsert to Elasticsearch

I am using Apache Spark DataFrame and I want to upsert data to Elasticsearch and I found I can overwrite them like this

val df = spark.read.option("header","true").csv("/mnt/data/akc_breed_info.csv")

df.write
  .format("org.elasticsearch.spark.sql")
  .option("es.nodes.wan.only","true")
  .option("es.port","443")
  .option("es.net.ssl","true")
  .option("es.nodes", esURL)
  .option("es.mapping.id", index)
  .mode("Overwrite")
  .save("index/dogs")

but what i noticed so far is this command mode("Overwrite") is actually delete all existing duplicated data and insert the new data

is there a way I can upsert them not delete and re-write them ? because I need to query those data almost real time. thanks in advance

like image 486
Daniel Avatar asked Jun 21 '18 07:06

Daniel


2 Answers

The reason why mode("Overwrite") was a problem is that when you overwrite your entire dataframe it deletes all data that matches with your rows of dataframe at once and it looks like the entire index is empty for me and I figure out how to actually upsert it

here is my code

df.write
  .format("org.elasticsearch.spark.sql")
  .option("es.nodes.wan.only","true")
  .option("es.nodes.discovery", "false")
  .option("es.nodes.client.only", "false")
  .option("es.net.ssl","true")
  .option("es.mapping.id", index)
  .option("es.write.operation", "upsert")
  .option("es.nodes", esURL)
  .option("es.port", "443")
  .mode("append")
  .save(path)

Note that you have to put "es.write.operation", "upert" and .mode("append")

like image 143
Daniel Avatar answered Sep 27 '22 03:09

Daniel


Try setting:

es.write.operation = upsert

This should perform the required operation. You can find more details in https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html

like image 33
Constantine Avatar answered Sep 26 '22 03:09

Constantine