I am using Apache Spark DataFrame and I want to upsert data to Elasticsearch and I found I can overwrite them like this
val df = spark.read.option("header","true").csv("/mnt/data/akc_breed_info.csv")
df.write
.format("org.elasticsearch.spark.sql")
.option("es.nodes.wan.only","true")
.option("es.port","443")
.option("es.net.ssl","true")
.option("es.nodes", esURL)
.option("es.mapping.id", index)
.mode("Overwrite")
.save("index/dogs")
but what i noticed so far is this command mode("Overwrite")
is actually delete all existing duplicated data and insert the new data
is there a way I can upsert
them not delete and re-write them ? because I need to query those data almost real time. thanks in advance
The reason why mode("Overwrite")
was a problem is that when you overwrite your entire dataframe it deletes all data that matches with your rows of dataframe at once and it looks like the entire index is empty for me and I figure out how to actually upsert
it
here is my code
df.write
.format("org.elasticsearch.spark.sql")
.option("es.nodes.wan.only","true")
.option("es.nodes.discovery", "false")
.option("es.nodes.client.only", "false")
.option("es.net.ssl","true")
.option("es.mapping.id", index)
.option("es.write.operation", "upsert")
.option("es.nodes", esURL)
.option("es.port", "443")
.mode("append")
.save(path)
Note that you have to put "es.write.operation", "upert"
and .mode("append")
Try setting:
es.write.operation = upsert
This should perform the required operation. You can find more details in https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With