So far, Spark hasn't created the DataFrame for streaming data, but when I am doing anomalies detection, it is more convenient and faster to use DataFrame for data analysis. I have done this part, but when I try to do real time anomalies detection using streaming data, the problems appeared. I tried several ways and still could not convert DStream to DataFrame, and cannot convert the RDD inside of DStream into DataFrame either.
Here's part of my latest version of the code:
import sys
import re
from pyspark import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql import Row
from pyspark.streaming import StreamingContext
from pyspark.mllib.clustering import KMeans, KMeansModel, StreamingKMeans
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import udf
import operator
sc = SparkContext(appName="test")
ssc = StreamingContext(sc, 5)
sqlContext = SQLContext(sc)
model_inputs = sys.argv[1]
def streamrdd_to_df(srdd):
sdf = sqlContext.createDataFrame(srdd)
sdf.show(n=2, truncate=False)
return sdf
def main():
indata = ssc.socketTextStream(sys.argv[2], int(sys.argv[3]))
inrdd = indata.map(lambda r: get_tuple(r))
Features = Row('rawFeatures')
features_rdd = inrdd.map(lambda r: Features(r))
features_rdd.pprint(num=3)
streaming_df = features_rdd.flatMap(streamrdd_to_df)
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()
As you can see in the main() function, when I am reading the input streaming data using ssc.socketTextStream() method, it generates DStream, then I tried to convert each individual in DStream into Row, hoping I could convert the data into DataFrame later.
If I use ppprint() to print out features_rdd here, it works, which makes me think, each individual in features_rdd is a batch of RDD while the whole features_rdd is a DStream.
Then I created streamrdd_to_df() method and hoped to convert each batch of RDD into dataframe, it gives me the error, showing:
ERROR StreamingContext: Error starting the context, marking it as stopped java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
Is there any thought about how can I do DataFrame operations on Spark streaming data?
You can convert from core Stream objects to Batch, and DataFrame objects using the . to_batch and . to_dataframe methods. In each case we assume that the stream is a stream of batches (lists or tuples) or a list of Pandas dataframes.
readStream() . In R, with the read. stream() method. Similar to the read interface for creating static DataFrame, you can specify the details of the source – data format, schema, options, etc.
Converting Spark RDD to DataFrame can be done using toDF(), createDataFrame() and transforming rdd[Row] to the data frame.
Spark has provided us with structured streaming which can solve such problems. It can generate streaming DataFrame i.e DataFrames being appended continuously. Please check below link
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With