Logo Questions Linux Laravel Mysql Ubuntu Git Menu

Equivalent to getLines in Apache Spark RDD

I have a Scala program that works fine on a single computer. However, I'd like to get it working on multiple nodes.

The start of the program looks like this:

val filename = Source.fromFile("file://...")

val lines = filename.getLines

val linesArray = lines.map(x => x.split("   ").slice(0, 3))

val mapAsStrings = linesArray.toList.groupBy(_(0)).mapValues(x => x.map(_.tail))

val mappedUsers = mapAsStrings map {case (k,v) => k -> v.map(x => x(0) -> x(1).toInt).toMap}

When trying to use Spark to run the program I know I need a SparkContext and SparkConf object, and they are used to create the RDD.

So now I have:

class myApp(filePath: String) {

private val conf = new SparkConf().setAppName("myApp")
private val sc = new SparkContext(conf)
private val inputData = sc.textFile(filePath)

inputData is now an RDD, its equivalent in the previous program was filename (I assume). For an RDD the methods are different. So, what is the equivalent to getLines? Or is there no equivalent? I'm having a hard time visualising what the RDD gives me to work with, e.g. is inputData an Array[String] or something else?


like image 262
monster Avatar asked Dec 10 '14 18:12


People also ask

What are the 4 ways provided to construct an RDD?

Spark create RDD from Seq or List (using Parallelize) Creating an RDD from a text file. Creating from another RDD. Creating from existing DataFrames and DataSet.

Which method is used to convert the list to RDD?

PySpark parallelize() is a function in SparkContext and is used to create an RDD from a list collection.

What is an RDD Inspark?

RDD was the primary user-facing API in Spark since its inception. At the core, an RDD is an immutable distributed collection of elements of your data, partitioned across nodes in your cluster that can be operated in parallel with a low-level API that offers transformations and actions.

Can we iterate over RDD?

Using map() to Loop Through Rows in DataFrame PySpark map() Transformation is used to loop/iterate through the PySpark DataFrame/RDD by applying the transformation function (lambda) on every element (Rows and Columns) of RDD/DataFrame.

2 Answers

The documentation seems to answer this directly:

def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] 

Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings.

So textFile is the equivalent of both fromFile and getLines, and returns an RDD where each entry is a line from the file. inputData is the equivalent of linesArray

like image 85
The Archetypal Paul Avatar answered Oct 31 '22 07:10

The Archetypal Paul

An RDD is a distributed collection, so conceptually it's not very different to a List, an Array or a Seq, providing you with functional operations that lets you transform the collection of elements. The main difference with the Scala collections is that an RDD in inherent distributed. Given a Spark cluster, when an RDD is created, the collection it represents is partitioned over some nodes of that cluster.

rdd.textFile(...) returns an RDD[String]. Given a distributed file system each worker will load a piece or that file into a 'partition', where further transformations and actions (in Spark lingo) can take place.

Given that the Spark API resembles quite closely the Scala collections API, once you have an RDD, applying functional transformations on it is quite similar to what you would do using a Scala collection.

Your Scala program can therefore be easily ported to Spark:

//val filename = Source.fromFile("file://...")
//val lines = filename.getLines
val rdd = sc.textFile("file://...")

//val linesArray = lines.map(x => x.split("   ").slice(0, 3))
val lines = rdd.map(x => x.split("   ").slice(0, 3))

//val mapAsStrings = linesArray.toList.groupBy(_(0)).mapValues(x => x.map(_.tail))
val mappedLines = lines.groupBy(_(0)).mapValues(x => x.map(_.tail))

//val mappedUsers = mapAsStrings map {case (k,v) => k -> v.map(x => x(0) -> x(1).toInt).toMap}
val mappedUsers = mappedLines.mapValues{v => v.map(x => x(0) -> x(1).toInt).toMap}

One important difference is that there's no associative 'Map' collection as an RDD. Therefore, the mappedUsers is a collection of tuples (String, Map[String,String])

like image 2
maasg Avatar answered Oct 31 '22 07:10
