I have a custom reader for Spark Streaming that reads data from WebSocket. I'm going to try Spark Structured Streaming.
How to create a streaming data source in Spark Structured Streaming?
Use readStream. format("socket") from Spark session object to read data from the socket and provide options host and port where you want to stream data from.
Sink is the extension of the BaseStreamingSink contract for streaming sinks that can add batches to an output. Sink is part of Data Source API V1 and used in Micro-Batch Stream Processing only.
Spark Streaming uses readStream to monitors the folder and process files that arrive in the directory real-time and uses writeStream to write DataFrame or Dataset. Spark Streaming is a scalable, high-throughput, fault-tolerant streaming processing system that supports both batch and streaming workloads.
A streaming data source implements org.apache.spark.sql.execution.streaming.Source.
The scaladoc of org.apache.spark.sql.execution.streaming.Source
should give you enough information to get started (just follow the types to develop a compilable Scala type).
Once you have the Source
you have to register it so you can use it in format
of a DataStreamReader
. The trick to make the streaming source available so you can use it for format
is to register it by creating the DataSourceRegister
for the streaming source. You can find examples in META-INF/services/org.apache.spark.sql.sources.DataSourceRegister:
org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
org.apache.spark.sql.execution.datasources.json.JsonFileFormat
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
org.apache.spark.sql.execution.datasources.text.TextFileFormat
org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
org.apache.spark.sql.execution.streaming.TextSocketSourceProvider
org.apache.spark.sql.execution.streaming.RateSourceProvider
That's the file that links the short name in format
to the implementation.
What I usually recommend people doing during my Spark workshops is to start development from both sides:
Write the streaming query (with format
), e.g.
val input = spark
.readStream
.format("yourCustomSource") // <-- your custom source here
.load
Implement the streaming Source
and a corresponding DataSourceRegister
(it could be the same class)
(optional) Register the DataSourceRegister
by writing the fully-qualified class name, say com.mycompany.spark.MyDataSourceRegister
, to META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
:
$ cat META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
com.mycompany.spark.MyDataSourceRegister
The last step where you register the DataSourceRegister
implementation for your custom Source
is optional and is only to register the data source alias that your end users use in DataFrameReader.format method.
format(source: String): DataFrameReader Specifies the input data source format.
Review the code of org.apache.spark.sql.execution.streaming.RateSourceProvider for a good head start.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With