Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

spark streaming fileStream

I'm programming with spark streaming but have some trouble with scala. I'm trying to use the function StreamingContext.fileStream

The definition of this function is like this:

def fileStream[K, V, F <: InputFormat[K, V]](directory: String)(implicit arg0: ClassManifest[K], arg1: ClassManifest[V], arg2: ClassManifest[F]): DStream[(K, V)]

Create a input stream that monitors a Hadoop-compatible filesystem for new files and reads them using the given key-value types and input format. File names starting with . are ignored. K Key type for reading HDFS file V Value type for reading HDFS file F Input format for reading HDFS file directory HDFS directory to monitor for new file

I don't know how to pass the type of Key and Value. My Code in spark streaming:

val ssc = new StreamingContext(args(0), "StreamingReceiver", Seconds(1),
  System.getenv("SPARK_HOME"), Seq("/home/mesos/StreamingReceiver.jar"))

// Create a NetworkInputDStream on target ip:port and count the
val lines = ssc.fileStream("/home/sequenceFile")

Java code to write the hadoop file:

public class MyDriver {

private static final String[] DATA = { "One, two, buckle my shoe",
        "Three, four, shut the door", "Five, six, pick up sticks",
        "Seven, eight, lay them straight", "Nine, ten, a big fat hen" };

public static void main(String[] args) throws IOException {
    String uri = args[0];
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(uri), conf);
    Path path = new Path(uri);
    IntWritable key = new IntWritable();
    Text value = new Text();
    SequenceFile.Writer writer = null;
    try {
        writer = SequenceFile.createWriter(fs, conf, path, key.getClass(),
                value.getClass());
        for (int i = 0; i < 100; i++) {
            key.set(100 - i);
            value.set(DATA[i % DATA.length]);
            System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key,
                    value);
            writer.append(key, value);
        }
    } finally {
        IOUtils.closeStream(writer);
    }
}

}

like image 280
user2384993 Avatar asked May 15 '13 09:05

user2384993


People also ask

What is the main disadvantage of Spark Streaming?

Expensive While working with Spark, memory consumption is very high. Spark needs huge RAM for processing in-memory. The consumption of memory is very high in Spark which doesn't make it much user-friendly. The additional memory needed to run Spark costs very high which makes Spark expensive.

What is back pressure in Spark Streaming?

Backpressure refers to the situation where a system is receiving data at a higher rate than it can process during a temporary load spike. If there is a sudden spike in traffic, this could cause bottlenecks in downstream dependencies, that slows down the stream processing.

What does Spark readStream do?

Spark Streaming uses readStream to monitors the folder and process files that arrive in the directory real-time and uses writeStream to write DataFrame or Dataset.

Is Spark Streaming deprecated?

Now that the Direct API of Spark Streaming (we currently have version 2.3. 2) is deprecated and we recently added the Confluent platform (comes with Kafka 2.2.


1 Answers

If you want to use fileStream, you're going to have to supply all 3 type params to it when calling it. You need to know what your Key, Value and InputFormat types are before calling it. If your types were LongWritable, Text and TextInputFormat, you would call fileStream like so:

val lines = ssc.fileStream[LongWritable, Text, TextInputFormat]("/home/sequenceFile")

If those 3 types do happen to be your types, then you might want to use textFileStream instead as it does not require any type params and delegates to fileStream using those 3 types I mentioned. Using that would look like this:

val lines = ssc.textFileStream("/home/sequenceFile")
like image 185
cmbaxter Avatar answered Oct 11 '22 20:10

cmbaxter