From the spark structured streaming documentation:
"This checkpoint location has to be a path in an HDFS compatible file system, and can be set as an option in the DataStreamWriter
when starting a query."
And sure enough, setting the checkpoint to a s3 path throws:
17/01/31 21:23:56 ERROR ApplicationMaster: User class threw exception: java.lang.IllegalArgumentException: Wrong FS: s3://xxxx/fact_checkpoints/metadata, expected: hdfs://xxxx:8020
java.lang.IllegalArgumentException: Wrong FS: s3://xxxx/fact_checkpoints/metadata, expected: hdfs://xxxx:8020
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:652)
at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:194)
at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:106)
at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1301)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1430)
at org.apache.spark.sql.execution.streaming.StreamMetadata$.read(StreamMetadata.scala:51)
at org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:100)
at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232)
at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262)
at com.roku.dea.spark.streaming.FactDeviceLogsProcessor$.main(FactDeviceLogsProcessor.scala:133)
at com.roku.dea.spark.streaming.FactDeviceLogsProcessor.main(FactDeviceLogsProcessor.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:637)
17/01/31 21:23:56 INFO SparkContext: Invoking stop() from shutdown hook
A couple of questions here:
With Amazon EMR release version 5.17. 0 and later, you can use S3 Select with Spark on Amazon EMR. S3 Select allows applications to retrieve only a subset of data from an object.
If any data is lost, the recovery should be speedy. Spark streaming accomplishes this using checkpointing. So, Checkpointing is a process to truncate RDD lineage graph. It saves the application state timely to reliable storage (HDFS).
Apache Spark Streaming is a scalable fault-tolerant streaming processing system that natively supports both batch and streaming workloads.
A checkpoint helps build fault-tolerant and resilient Spark applications. Spark Structured Streaming maintains an intermediate state on HDFS compatible file systems to recover from failures. To specify the checkpoint in a streaming query, we use the checkpointLocation parameter.
What makes an FS HDFS "compliant?" it's a file system, with the behaviours specified in Hadoop FS specification. The difference between an object store and FS is covered there, with the key point being "eventually consistent object stores without append or O(1) atomic renames are not compliant"
For S3 in particular
Spark streaming checkpoints by saving everything to a location and then renaming it to the checkpoint directory. This makes the time to checkpoint proportional to the time to do a copy of the data in S3, which is ~6-10 MB/s.
The current bit of streaming code isn't suited for s3
For now, do one of
If you are using EMR, you can pay the premium for a consistent, dynamo DB backed S3, which gives you better consistency. But copy time is still the same, so checkpointing will be just as slow
This is a known issue: https://issues.apache.org/jira/browse/SPARK-19407
Should be fixed in the next release. You can set the default file system to s3 using --conf spark.hadoop.fs.defaultFS=s3
as a workaround.
This problem is fixed in https://issues.apache.org/jira/browse/SPARK-19407.
However Structured Streaming checkpointing doesn't work well in S3 because of lack of eventual consistency in S3. It's not a good idea to use S3 for checkpointing https://issues.apache.org/jira/browse/SPARK-19013.
Micheal Armburst has said that this won't be fixed in Spark, and the solution is to wait for S3guard to be implemented. S3Guard is sometime away.
Edit: 2 developments since this post was made a) Support for S3Guard was merged in Spark 3.0. b) AWS made S3 immediately consistent.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With