I'm programming with spark streaming but have some trouble with scala. I'm trying to use the function StreamingContext.fileStream
The definition of this function is like this:
def fileStream[K, V, F <: InputFormat[K, V]](directory: String)(implicit arg0: ClassManifest[K], arg1: ClassManifest[V], arg2: ClassManifest[F]): DStream[(K, V)]
Create a input stream that monitors a Hadoop-compatible filesystem for new files and reads them using the given key-value types and input format. File names starting with . are ignored. K Key type for reading HDFS file V Value type for reading HDFS file F Input format for reading HDFS file directory HDFS directory to monitor for new file
I don't know how to pass the type of Key and Value. My Code in spark streaming:
val ssc = new StreamingContext(args(0), "StreamingReceiver", Seconds(1),
System.getenv("SPARK_HOME"), Seq("/home/mesos/StreamingReceiver.jar"))
// Create a NetworkInputDStream on target ip:port and count the
val lines = ssc.fileStream("/home/sequenceFile")
Java code to write the hadoop file:
public class MyDriver {
private static final String[] DATA = { "One, two, buckle my shoe",
"Three, four, shut the door", "Five, six, pick up sticks",
"Seven, eight, lay them straight", "Nine, ten, a big fat hen" };
public static void main(String[] args) throws IOException {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path path = new Path(uri);
IntWritable key = new IntWritable();
Text value = new Text();
SequenceFile.Writer writer = null;
try {
writer = SequenceFile.createWriter(fs, conf, path, key.getClass(),
value.getClass());
for (int i = 0; i < 100; i++) {
key.set(100 - i);
value.set(DATA[i % DATA.length]);
System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key,
value);
writer.append(key, value);
}
} finally {
IOUtils.closeStream(writer);
}
}
}
Expensive While working with Spark, memory consumption is very high. Spark needs huge RAM for processing in-memory. The consumption of memory is very high in Spark which doesn't make it much user-friendly. The additional memory needed to run Spark costs very high which makes Spark expensive.
Backpressure refers to the situation where a system is receiving data at a higher rate than it can process during a temporary load spike. If there is a sudden spike in traffic, this could cause bottlenecks in downstream dependencies, that slows down the stream processing.
Spark Streaming uses readStream to monitors the folder and process files that arrive in the directory real-time and uses writeStream to write DataFrame or Dataset.
Now that the Direct API of Spark Streaming (we currently have version 2.3. 2) is deprecated and we recently added the Confluent platform (comes with Kafka 2.2.
If you want to use fileStream
, you're going to have to supply all 3 type params to it when calling it. You need to know what your Key
, Value
and InputFormat
types are before calling it. If your types were LongWritable
, Text
and TextInputFormat
, you would call fileStream
like so:
val lines = ssc.fileStream[LongWritable, Text, TextInputFormat]("/home/sequenceFile")
If those 3 types do happen to be your types, then you might want to use textFileStream
instead as it does not require any type params and delegates to fileStream
using those 3 types I mentioned. Using that would look like this:
val lines = ssc.textFileStream("/home/sequenceFile")
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With