Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Upsert to CosmosDB from Spark error

I am really new to Spark/CosmosDB/Python, so I am going through code samples from MS site and GitHub while trying to create something on my own. After long fight with Spark-CosmosDB connector, I am able to read the data from CosmosDB collection. Right now I would like to do the opposite (upsert), but found another obstacle. Here is the example, which I am reffering to: Writing to Cosmos DB section.

I am able to read from Cosmos, and do stuff with the data, but I am not able to insert back to Cosmos. Below is my slightly modified code:

%%configure
{ "name":"Spark-to-Cosmos_DB_Connector", 
  "jars": ["wasb:///example/jars/1.0.0/azure-cosmosdb-spark_2.2.0_2.11-1.1.0.jar", "wasb:///example/jars/1.0.0/azure-documentdb-1.14.0.jar", "wasb:///example/jars/1.0.0/azure-documentdb-rx-0.9.0-rc2.jar", "wasb:///example/jars/1.0.0/json-20140107.jar", "wasb:///example/jars/1.0.0/rxjava-1.3.0.jar", "wasb:///example/jars/1.0.0/rxnetty-0.4.20.jar"],
  "conf": {
    "spark.jars.excludes": "org.scala-lang:scala-reflect"
   }
}

# Read Configuration
readConfig = {
  "Endpoint" : "https://doctorwho.documents.azure.com:443/",
  "Masterkey" : "SPSVkSfA7f6vMgMvnYdzc1MaWb65v4VQNcI2Tp1WfSP2vtgmAwGXEPcxoYra5QBHHyjDGYuHKSkguHIz1vvmWQ==",
  "Database" : "DepartureDelays",
  "preferredRegions" : "Central US;East US2",
  "Collection" : "flights_pcoll", 
  "SamplingRatio" : "1.0",
  "schema_samplesize" : "1000",
  "query_pagesize" : "2147483647",
  "query_custom" : "SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c WHERE c.origin = 'SEA'"
}

# Connect via azure-cosmosdb-spark to create Spark DataFrame
flights = spark.read.format("com.microsoft.azure.cosmosdb.spark").options(**readConfig).load()
flights.count()

# Write configuration
writeConfig = {
 "Endpoint" : "https://doctorwho.documents.azure.com:443/",
 "Masterkey" : "SPSVkSfA7f6vMgMvnYdzc1MaWb65v4VQNcI2Tp1WfSP2vtgmAwGXEPcxoYra5QBHHyjDGYuHKSkguHIz1vvmWQ==",
 "Database" : "DepartureDelays",
 "Collection" : "flights_pcoll",
 "Upsert" : "true"
}

# Write to Cosmos DB from the flights DataFrame
flights.write.format("com.microsoft.azure.cosmosdb.spark").options(**writeConfig).save()

So, when I try to run this, I get:

An error occurred while calling o90.save.
: java.lang.UnsupportedOperationException: Writing in a non-empty collection.

After quick googling, I tried to add mode("append"), to my last line:

flights.write.format("com.microsoft.azure.cosmosdb.spark").mode("append").options(**writeConfig).save()

Unfortunately, this leaves me with an error that I am not able to understand:

An error occurred while calling o127.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 4.0 failed 4 times, most recent failure: Lost task 2.3 in stage 4.0 (TID 90, wn2-MDMstr.zxmmgisclg5udfemnv0v3qva3e.ax.internal.cloudapp.net, executor 2): java.lang.NoClassDefFoundError: com/microsoft/azure/documentdb/bulkexecutor/DocumentBulkExecutor

Here is full stacktrace: error in pastebin

Can somebody help me out with this error? I have also receieved exactly same error when working with my own cosmosDB, not the example one from the documentation.

I am using Jupyter notebook with PySpark3 Kernel. Spark version 2.2, HDInsight cluster 3.6.

EDIT I didn't want to just sit waiting for a reply, so I tried the same thing with Scala. Guess what? Same error (or at least very similar): Scala error

Here is my Scala code for that:

%%configure
{ "name":"Spark-to-Cosmos_DB_Connector", 
  "jars": ["wasb:///example/jars/1.0.0/azure-cosmosdb-spark_2.2.0_2.11-1.1.0.jar", "wasb:///example/jars/1.0.0/azure-documentdb-1.14.0.jar", "wasb:///example/jars/1.0.0/azure-documentdb-rx-0.9.0-rc2.jar", "wasb:///example/jars/1.0.0/json-20140107.jar", "wasb:///example/jars/1.0.0/rxjava-1.3.0.jar", "wasb:///example/jars/1.0.0/rxnetty-0.4.20.jar"],
  "conf": {
    "spark.jars.excludes": "org.scala-lang:scala-reflect"
   }
}

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark._
import com.microsoft.azure.cosmosdb.spark.config.Config

val readConfig = Config(Map(
  "Endpoint" -> "https://$my_cosmos_db.documents.azure.com:443/",
  "Masterkey" -> "$my_key",
  "Database" -> "test",
  "PreferredRegions" -> "West Europe",
  "Collection" -> "$my_collection", 
  "SamplingRatio" -> "1.0"
))
val docs = spark.read.cosmosDB(readConfig)

docs.show()

val writeConfig = Config(Map(
  "Endpoint" -> "https://$my_cosmos_db.documents.azure.com:443/",
  "Masterkey" -> "$my_key",
  "Database" -> "test",
  "PreferredRegions" -> "West Europe",
  "Collection" -> "$my_collection", 
  "WritingBatchSize" -> "100"
))




val someData = Seq(
    Row(8, "bat"),
    Row(64, "mouse"),
    Row(-27, "test_name")
)

val someSchema = List(
  StructField("number", IntegerType, true),
  StructField("name", StringType, true)
)

val someDF = spark.createDataFrame(
  spark.sparkContext.parallelize(someData),
  StructType(someSchema)
)

someDF.show()

someDF.write.mode(SaveMode.Append).cosmosDB(writeConfig)

Maybe this would be helpful in troubleshooting.

Thanks!

like image 374
Jangcy Avatar asked Mar 06 '23 12:03

Jangcy


2 Answers

For the first issue when using python, please note that you are using the doctorwho Azure Cosmos DB collection. This is a demo collection where we provided the read-only key but not write-key. Hence the error you are receiving is lack of write access to the collection.

For the second issue, the error from pastebin looks the same. Saying this, some quick observations:

  • Are you using HDI 3.6, if you are this is on Spark 2.1 and the JAR being used is for Spark 2.2. If you're using HDI 3.7, then it's on Spark 2.2 and then you're using the correct jar.
  • You may want to use the maven coordinates to get the latest version of the JARs. Please note azure-cosmosdb-spark > Using Jupyter Notebooks for more information.
like image 156
Denny Lee Avatar answered Mar 10 '23 10:03

Denny Lee


After a communication with Microsoft engineers and few tests conducted by myself, I found out, that there are some issues about Spark-CosmosDB connector. Basicly, the best version of the connector to be used is 1.0.0, dated 15-Nov-2017(Both for Spark 2.1 and 2.2). Link to the repository Below are few solutions/workarounds that worked for me. You may try to experiment with them to find best solution for you.

1) If you work with Spark 2.1 or 2.2, use connector in version 1.0.0 (link above). At the time I am writing this answer (18-May-2018), the latest version of connector is 1.1.1 dated 23-Mar-2018 - it fails when there is a need to write data frame to Cosmos DB or when trying to count over 50k document data frame read from Cosmos (what is 50k docs for no-SQL db?).

2) If you use Spark 2.1 -> Jupter will work with 1.0.0 connector. If you use Spark 2.2 - > Do not use Jupyter notebook - it has some issues with using external packages, especially in Spark 2.2 installation. Please use Zeppelin notebook instead (with 1.0.0 connector). Once you opened Zeppelin, in top-right corner click user, then Interpreter. Go to Livy interpreter settings click edit and add package coordinates: com.microsoft.azure:azure-cosmosdb-spark_2.2.0_2.11:1.0.0

enter image description here enter image description here

Save and restart interpreter. Then create a new notebook using livy2 interpreter. One note, that in every cell in Zeppelin, you have to add %pyspark magic command in first line. Run of first cell would last 1-2 mins because of starting whole app.

3) Instead of using notebooks, you can use your cluster directly. Use putty to SSH into your cluster, using sshuser and password provided when creating the cluster:

enter image description here

Then start pyspark attaching the uber-jar file (you have to download the uber-jar file from repository then upload it to blob storage connected to your cluster. In my case file is in folder called example (first level from root of container). Here I also used 1.0.0 connector. Here is the command:

pyspark --master yarn --jars wasb:///example/azure-cosmosdb-spark_2.2.0_2.11-1.0.0-uber.jar

When spark is ready, you can paste and run your commands and everything should work properly.

If you have any questions or anything is not clear, please let me know.

like image 27
Jangcy Avatar answered Mar 10 '23 11:03

Jangcy