Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I convert csv file to rdd

I'm new to spark. I want to perform some operations on particular data in a CSV record.

I'm trying to read a CSV file and convert it to RDD. My further operations are based on the heading provided in CSV file.

(From comments) This is my code so far:

final JavaRDD<String> File = sc.textFile(Filename).cache(); final JavaRDD<String> lines = File.flatMap(new FlatMapFunction<String, String>() {      @Override public Iterable<String> call(String s) {      return Arrays.asList(EOL.split(s));      }  }); final String heading=lines.first().toString(); 

I can get the header values like this. I want to map this to each record in CSV file.

final String[] header=heading.split(" ");  

I can get the header values like this. I want to map this to each record in CSV file.

In java I’m using CSVReader record.getColumnValue(Column header) to get the particular value. I need to do something similar to that here.

like image 394
Ramya Avatar asked Jun 19 '14 05:06

Ramya


People also ask

How do I create an RDD file?

There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

Is it possible to read a CSV file into R?

But if you are using large data files, or CSVs that are giving R GUI some trouble (especially with headers), it can be nice to read the CSV into R, but then for the purpose of the project, convert CSV to RDS, and export it as an RDS file – and then from then on, only export RDS files for data for the project.

How to convert an Excel file to CSV format?

Therefore, follow the below steps to Convert an Excel File to CSV Format. First, open the Excel workbook and the desired sheet. Then, click File. As a result, the File window will appear.

How to load a CSV file into Spark RDD using Scala?

In this tutorial, I will explain how to load a CSV file into Spark RDD using a Scala example. Using the textFile () the method in SparkContext class we can read CSV files, multiple CSV files (based on pattern matching), or all files from a directory into RDD [String] object.

How to read data from RDD in Java?

Now, read the data from rdd by using foreach, since the elements in RDD are array, we need to use the index to retrieve each element from an array. Note that the output we get from the above “println” also contains header names from a CSV file as header considered as data itself in RDD.


2 Answers

A simplistic approach would be to have a way to preserve the header.

Let's say you have a file.csv like:

user, topic, hits om,  scala, 120 daniel, spark, 80 3754978, spark, 1 

We can define a header class that uses a parsed version of the first row:

class SimpleCSVHeader(header:Array[String]) extends Serializable {   val index = header.zipWithIndex.toMap   def apply(array:Array[String], key:String):String = array(index(key)) } 

That we can use that header to address the data further down the road:

val csv = sc.textFile("file.csv")  // original file val data = csv.map(line => line.split(",").map(elem => elem.trim)) //lines in rows val header = new SimpleCSVHeader(data.take(1)(0)) // we build our header with the first line val rows = data.filter(line => header(line,"user") != "user") // filter the header out val users = rows.map(row => header(row,"user") val usersByHits = rows.map(row => header(row,"user") -> header(row,"hits").toInt) ... 

Note that the header is not much more than a simple map of a mnemonic to the array index. Pretty much all this could be done on the ordinal place of the element in the array, like user = row(0)

PS: Welcome to Scala :-)

like image 113
maasg Avatar answered Oct 04 '22 08:10

maasg


You can use the spark-csv library: https://github.com/databricks/spark-csv

This is directly from the documentation:

import org.apache.spark.sql.SQLContext  SQLContext sqlContext = new SQLContext(sc);  HashMap<String, String> options = new HashMap<String, String>(); options.put("header", "true"); options.put("path", "cars.csv");  DataFrame df = sqlContext.load("com.databricks.spark.csv", options); 
like image 21
Saman Avatar answered Oct 04 '22 08:10

Saman