# Creating PySpark Object
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("XMLParser").getOrCreate()
sc=spark.sparkContext
hadoop_conf=sc._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoop_conf.set("fs.s3n.awsAccessKeyId", aws_key)
hadoop_conf.set("fs.s3n.awsSecretAccessKey", aws_secret)
Then I am able to read the file using following code from my s3 bucket
df = spark.read.format("xml").options(rootTag='returnResult', rowTag="query").load("s3n://bucketName/folder/file.xml")
But when I tried to write back to s3 using delta lake (parquet file) using this code
df.write.format("delta").mode('overwrite').save("s3n://bucket/folder/file")
I am getting this error
Py4JJavaError: An error occurred while calling o778.save.
: java.io.IOException: The error typically occurs when the default LogStore implementation, that
is, HDFSLogStore, is used to write into a Delta table on a non-HDFS storage system.
In order to get the transactional ACID guarantees on table updates, you have to use the
correct implementation of LogStore that is appropriate for your storage system.
See https://docs.delta.io/latest/delta-storage.html " for details.
at org.apache.spark.sql.delta.DeltaErrors$.incorrectLogStoreImplementationException(DeltaErrors.scala:157)
at org.apache.spark.sql.delta.storage.HDFSLogStore.writeInternal(HDFSLogStore.scala:73)
at org.apache.spark.sql.delta.storage.HDFSLogStore.write(HDFSLogStore.scala:64)
at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$doCommit$1.apply$mcJ$sp(OptimisticTransaction.scala:434)
at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$doCommit$1.apply(OptimisticTransaction.scala:416)
at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$doCommit$1.apply(OptimisticTransaction.scala:416)
at org.apache.spark.sql.delta.DeltaLog.lockInterruptibly(DeltaLog.scala:152)
at org.apache.spark.sql.delta.OptimisticTransactionImpl$class.doCommit(OptimisticTransaction.scala:415)
at org.apache.spark.sql.delta.OptimisticTransaction.doCommit(OptimisticTransaction.scala:80)
at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$commit$1.apply$mcJ$sp(OptimisticTransaction.scala:326)
at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$commit$1.apply(OptimisticTransaction.scala:284)
at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$commit$1.apply(OptimisticTransaction.scala:284)
at com.databricks.spark.util.DatabricksLogging$class.recordOperation(DatabricksLogging.scala:77)
at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:80)
at org.apache.spark.sql.delta.metering.DeltaLogging$class.recordDeltaOperation(DeltaLogging.scala:103)
at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:80)
at org.apache.spark.sql.delta.OptimisticTransactionImpl$class.commit(OptimisticTransaction.scala:284)
at org.apache.spark.sql.delta.OptimisticTransaction.commit(OptimisticTransaction.scala:80)
at org.apache.spark.sql.delta.commands.WriteIntoDelta$$anonfun$run$1.apply(WriteIntoDelta.scala:67)
at org.apache.spark.sql.delta.commands.WriteIntoDelta$$anonfun$run$1.apply(WriteIntoDelta.scala:64)
at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:188)
at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:64)
at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:134)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: fs.AbstractFileSystem.s3n.impl=null: No AbstractFileSystem configured for scheme: s3n
at org.apache.hadoop.fs.AbstractFileSystem.createFileSystem(AbstractFileSystem.java:160)
at org.apache.hadoop.fs.AbstractFileSystem.get(AbstractFileSystem.java:249)
at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:334)
at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:331)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:331)
at org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:448)
at org.apache.spark.sql.delta.storage.HDFSLogStore.getFileContext(HDFSLogStore.scala:47)
at org.apache.spark.sql.delta.storage.HDFSLogStore.writeInternal(HDFSLogStore.scala:70)
... 53 more
I tried to follow the link given in the stacktrace, but not able to figure out how can i resolve this. Any help would be appericiated
After creating the spark session, you need to add configuration provided by databricks for enabling s3 as delta store like:
conf = spark.sparkContext._conf.setAll([('spark.delta.logStore.class','org.apache.spark.sql.delta.storage.S3SingleDriverLogStore')])
spark.sparkContext._conf.getAll()
As the name suggests, the S3SingleDriverLogStore implementation only works properly when all concurrent writes originate from a single Spark driver. This is an application property, must be set before starting SparkContext, and cannot change during the lifetime of the context.
From Databricks visit here for configuring s3a path access key and secret key.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With