Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Writing Parquet in Azure Blob Storage: "One of the request inputs is not valid"

I'm trying to write a simple DataFrame in parquet format to Azure Blob Storage. Note that the following code snippets work in local, so my guess is that it has to be something related with Azure libraries. I tried as well with delta format and it works (even if it uses parquet under the hood).

Using Spark 3.1.1, Scala 2.12.10, OpenJDK 1.8.0_292.

I set up my Spark session as usual, something like:

$SPARK_HOME/bin/spark-shell \
  (...cluster settings...) \
  --conf spark.hadoop.fs.azure.account.key.<account>.blob.core.windows.net="${AZURE_BLOB_STORAGE_KEY}" \
  --conf spark.hadoop.fs.AbstractFileSystem.wasb.impl=org.apache.hadoop.fs.azure.Wasb \
  --conf spark.hadoop.fs.wasb.impl=org.apache.hadoop.fs.azure.NativeAzureFileSystem \
  --conf spark.delta.logStore.class=org.apache.spark.sql.delta.storage.AzureLogStore \
  --packages org.apache.hadoop:hadoop-azure:2.7.0,com.azure:azure-storage-blob:12.8.0,com.azure:azure-storage-common:12.8.0,com.microsoft.azure:azure-storage:2.0.0,io.delta:delta-core_2.12:0.8.0
  (...other irrelevant settings...)

I tried other versions for azure-storage-blob, azure-storage-common and azure-storage packages, all resulting in the same problem.

To reproduce the problem I create a simple dataframe and write it to the storage:

val columns = Seq("language", "users_count")
val data = Seq(("Java", "20000"), ("Python", "100000"), ("Scala", "3000"))
val rdd = spark.sparkContext.parallelize(data)
val df = spark.createDataFrame(rdd).toDF(columns: _*)
df.show
// +--------+-----------+                                                                                                                                  
// |language|users_count|
// +--------+-----------+
// |    Java|      20000|
// |  Python|     100000|
// |   Scala|       3000|
// +--------+-----------+

df.write.parquet("wasb://<container>@<account>.blob.core.windows.net/<path>")

When writing on parquet format I get the com.microsoft.azure.storage.StorageException: One of the request inputs is not valid exception:

21/09/21 13:38:14 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID 83) (10.244.6.3 executor 6): org.apache.spark.SparkException: Task failed while writing rows.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:296)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hadoop.fs.azure.AzureException: com.microsoft.azure.storage.StorageException: One of the request inputs is not valid.
        at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2482)
        at org.apache.hadoop.fs.azure.NativeAzureFileSystem$FolderRenamePending.execute(NativeAzureFileSystem.java:424)
        at org.apache.hadoop.fs.azure.NativeAzureFileSystem.rename(NativeAzureFileSystem.java:1997)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:531)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:502)
        at org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:50)
        at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:77)
        at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitTask(HadoopMapReduceCommitProtocol.scala:260)
        at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:79)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:280)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:286)
        ... 9 more
Caused by: com.microsoft.azure.storage.StorageException: One of the request inputs is not valid.
        at com.microsoft.azure.storage.StorageException.translateException(StorageException.java:162)
        at com.microsoft.azure.storage.core.StorageRequest.materializeException(StorageRequest.java:307)
        at com.microsoft.azure.storage.core.ExecutionEngine.executeWithRetry(ExecutionEngine.java:177)
        at com.microsoft.azure.storage.blob.CloudBlob.startCopyFromBlob(CloudBlob.java:764)
        at org.apache.hadoop.fs.azure.StorageInterfaceImpl$CloudBlobWrapperImpl.startCopyFromBlob(StorageInterfaceImpl.java:399)
        at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2449)
        ... 20 more

Any hints or ideas on what is causing it or what to do to make it work? Thank you!

like image 257
Eric Ávila Avatar asked Oct 21 '25 11:10

Eric Ávila


1 Answers

Just change wasb to abfss and blob to dfs where ever used. Sample code: wasb://@.blob.core.windows.net/ to abfss://@.dfs.core.windows.net

This worked for me.

like image 81
Nisha Avatar answered Oct 23 '25 03:10

Nisha