I have a Scala program that works fine on a single computer. However, I'd like to get it working on multiple nodes.
The start of the program looks like this:
val filename = Source.fromFile("file://...")
val lines = filename.getLines
val linesArray = lines.map(x => x.split(" ").slice(0, 3))
val mapAsStrings = linesArray.toList.groupBy(_(0)).mapValues(x => x.map(_.tail))
val mappedUsers = mapAsStrings map {case (k,v) => k -> v.map(x => x(0) -> x(1).toInt).toMap}
When trying to use Spark to run the program I know I need a SparkContext
and SparkConf
object, and they are used to create the RDD
.
So now I have:
class myApp(filePath: String) {
private val conf = new SparkConf().setAppName("myApp")
private val sc = new SparkContext(conf)
private val inputData = sc.textFile(filePath)
inputData
is now an RDD
, its equivalent in the previous program was filename
(I assume). For an RDD
the methods are different. So, what is the equivalent to getLines
? Or is there no equivalent? I'm having a hard time visualising what the RDD
gives me to work with, e.g. is inputData
an Array[String]
or something else?
Thanks
Spark create RDD from Seq or List (using Parallelize) Creating an RDD from a text file. Creating from another RDD. Creating from existing DataFrames and DataSet.
PySpark parallelize() is a function in SparkContext and is used to create an RDD from a list collection.
RDD was the primary user-facing API in Spark since its inception. At the core, an RDD is an immutable distributed collection of elements of your data, partitioned across nodes in your cluster that can be operated in parallel with a low-level API that offers transformations and actions.
Using map() to Loop Through Rows in DataFrame PySpark map() Transformation is used to loop/iterate through the PySpark DataFrame/RDD by applying the transformation function (lambda) on every element (Rows and Columns) of RDD/DataFrame.
The documentation seems to answer this directly:
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings.
So textFile
is the equivalent of both fromFile
and getLines
, and returns an RDD where each entry is a line from the file. inputData
is the equivalent of linesArray
An RDD is a distributed collection, so conceptually it's not very different to a List, an Array or a Seq, providing you with functional operations that lets you transform the collection of elements. The main difference with the Scala collections is that an RDD in inherent distributed. Given a Spark cluster, when an RDD is created, the collection it represents is partitioned over some nodes of that cluster.
rdd.textFile(...)
returns an RDD[String]
. Given a distributed file system each worker will load a piece or that file into a 'partition', where further transformations and actions (in Spark lingo) can take place.
Given that the Spark API resembles quite closely the Scala collections API, once you have an RDD, applying functional transformations on it is quite similar to what you would do using a Scala collection.
Your Scala program can therefore be easily ported to Spark:
//val filename = Source.fromFile("file://...")
//val lines = filename.getLines
val rdd = sc.textFile("file://...")
//val linesArray = lines.map(x => x.split(" ").slice(0, 3))
val lines = rdd.map(x => x.split(" ").slice(0, 3))
//val mapAsStrings = linesArray.toList.groupBy(_(0)).mapValues(x => x.map(_.tail))
val mappedLines = lines.groupBy(_(0)).mapValues(x => x.map(_.tail))
//val mappedUsers = mapAsStrings map {case (k,v) => k -> v.map(x => x(0) -> x(1).toInt).toMap}
val mappedUsers = mappedLines.mapValues{v => v.map(x => x(0) -> x(1).toInt).toMap}
One important difference is that there's no associative 'Map' collection as an RDD. Therefore, the mappedUsers
is a collection of tuples (String, Map[String,String])
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With