Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to create Spark RDD from an iterator?

To make it clear, I am not looking for RDD from an array/list like

List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7); // sample
JavaRDD<Integer> rdd = new JavaSparkContext().parallelize(list);

How can I create a spark RDD from a java iterator without completely buffering it in memory?

Iterator<Integer> iterator = Arrays.asList(1, 2, 3, 4).iterator(); //sample iterator for illustration
JavaRDD<Integer> rdd = new JavaSparkContext().what("?", iterator); //the Question

Additional Question:

Is it a requirement for source to be re-readable(or capable to read many times) to offer resilience for RDD? In other words, since iterators are fundamentally read-once, is it even possible to create Resilient Distributed Datasets(RDD) from iterators?

like image 276
Thamme Gowda Avatar asked Jun 26 '15 12:06

Thamme Gowda


People also ask

What are the two ways to create RDD in Spark?

There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

What are the three ways we can create an RDD?

There are three ways to create an RDD in Spark. Parallelizing already existing collection in driver program. Referencing a dataset in an external storage system (e.g. HDFS, Hbase, shared file system). Creating RDD from already existing RDDs.

Can we create RDD from existing RDD?

Resilient Distributed Datasets (RDDs) RDDs are immutable (read-only) in nature. You cannot change an original RDD, but you can create new RDDs by performing coarse-grain operations, like transformations, on an existing RDD.

How many RDDs can Cogroup () can work at once?

cogroup() can be used for much more than just implementing joins. We can also use it to implement intersect by key. Additionally, cogroup() can work on three or more RDDs at once.


1 Answers

As somebody else said, you could do something with spark streaming, but as for pure spark, you can't, and the reason is that what you're asking goes against spark's model. Let me explain. To distribute and parallelize work, spark has to divide it in chunks. When reading from HDFS, that 'chunking' is done for Spark by HDFS, since HDFS files are organized in blocks. Spark will generally generate one task per block. Now, iterators only provide sequential access to your data, so it's impossible for spark to organize it in chunks without reading it all in memory.

It may be possible to build a RDD that has a single iterable partition, but even then, it is impossible to say if the implementation of the Iterable could be sent to workers. When using sc.parallelize() spark creates partitions that implement serializable so each partition can be sent to a different worker. The iterable could be over a network connection, or file in the local FS, so they cannot be sent to the workers unless they are buffered in memory.

like image 142
Roberto Congiu Avatar answered Sep 18 '22 20:09

Roberto Congiu