Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to convert Spark Streaming data into Spark DataFrame

So far, Spark hasn't created the DataFrame for streaming data, but when I am doing anomalies detection, it is more convenient and faster to use DataFrame for data analysis. I have done this part, but when I try to do real time anomalies detection using streaming data, the problems appeared. I tried several ways and still could not convert DStream to DataFrame, and cannot convert the RDD inside of DStream into DataFrame either.

Here's part of my latest version of the code:

import sys
import re

from pyspark import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql import Row
from pyspark.streaming import StreamingContext
from pyspark.mllib.clustering import KMeans, KMeansModel, StreamingKMeans
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import udf
import operator


sc = SparkContext(appName="test")
ssc = StreamingContext(sc, 5)
sqlContext = SQLContext(sc)

model_inputs = sys.argv[1]

def streamrdd_to_df(srdd):
    sdf = sqlContext.createDataFrame(srdd)
    sdf.show(n=2, truncate=False)
    return sdf

def main():
    indata = ssc.socketTextStream(sys.argv[2], int(sys.argv[3]))
    inrdd = indata.map(lambda r: get_tuple(r))
    Features = Row('rawFeatures')
    features_rdd = inrdd.map(lambda r: Features(r))
    features_rdd.pprint(num=3)
    streaming_df = features_rdd.flatMap(streamrdd_to_df)

    ssc.start()
    ssc.awaitTermination()

if __name__ == "__main__":
    main()

As you can see in the main() function, when I am reading the input streaming data using ssc.socketTextStream() method, it generates DStream, then I tried to convert each individual in DStream into Row, hoping I could convert the data into DataFrame later.

If I use ppprint() to print out features_rdd here, it works, which makes me think, each individual in features_rdd is a batch of RDD while the whole features_rdd is a DStream.

Then I created streamrdd_to_df() method and hoped to convert each batch of RDD into dataframe, it gives me the error, showing:

ERROR StreamingContext: Error starting the context, marking it as stopped java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute

Is there any thought about how can I do DataFrame operations on Spark streaming data?

like image 986
Cherry Wu Avatar asked Feb 06 '16 19:02

Cherry Wu


People also ask

Can stream be converted to DataFrame?

You can convert from core Stream objects to Batch, and DataFrame objects using the . to_batch and . to_dataframe methods. In each case we assume that the stream is a stream of batches (lists or tuples) or a list of Pandas dataframes.

What method should be used to read streaming data into a DataFrame?

readStream() . In R, with the read. stream() method. Similar to the read interface for creating static DataFrame, you can specify the details of the source – data format, schema, options, etc.

How do you convert a Spark RDD into a DataFrame?

Converting Spark RDD to DataFrame can be done using toDF(), createDataFrame() and transforming rdd[Row] to the data frame.


1 Answers

Spark has provided us with structured streaming which can solve such problems. It can generate streaming DataFrame i.e DataFrames being appended continuously. Please check below link

http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

like image 84
OSK Avatar answered Oct 05 '22 01:10

OSK