I'm new to spark. I want to perform some operations on particular data in a CSV record.
I'm trying to read a CSV file and convert it to RDD. My further operations are based on the heading provided in CSV file.
(From comments) This is my code so far:
final JavaRDD<String> File = sc.textFile(Filename).cache(); final JavaRDD<String> lines = File.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String s) { return Arrays.asList(EOL.split(s)); } }); final String heading=lines.first().toString();
I can get the header values like this. I want to map this to each record in CSV file.
final String[] header=heading.split(" ");
I can get the header values like this. I want to map this to each record in CSV file.
In java I’m using CSVReader record.getColumnValue(Column header)
to get the particular value. I need to do something similar to that here.
There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.
But if you are using large data files, or CSVs that are giving R GUI some trouble (especially with headers), it can be nice to read the CSV into R, but then for the purpose of the project, convert CSV to RDS, and export it as an RDS file – and then from then on, only export RDS files for data for the project.
Therefore, follow the below steps to Convert an Excel File to CSV Format. First, open the Excel workbook and the desired sheet. Then, click File. As a result, the File window will appear.
In this tutorial, I will explain how to load a CSV file into Spark RDD using a Scala example. Using the textFile () the method in SparkContext class we can read CSV files, multiple CSV files (based on pattern matching), or all files from a directory into RDD [String] object.
Now, read the data from rdd by using foreach, since the elements in RDD are array, we need to use the index to retrieve each element from an array. Note that the output we get from the above “println” also contains header names from a CSV file as header considered as data itself in RDD.
A simplistic approach would be to have a way to preserve the header.
Let's say you have a file.csv like:
user, topic, hits om, scala, 120 daniel, spark, 80 3754978, spark, 1
We can define a header class that uses a parsed version of the first row:
class SimpleCSVHeader(header:Array[String]) extends Serializable { val index = header.zipWithIndex.toMap def apply(array:Array[String], key:String):String = array(index(key)) }
That we can use that header to address the data further down the road:
val csv = sc.textFile("file.csv") // original file val data = csv.map(line => line.split(",").map(elem => elem.trim)) //lines in rows val header = new SimpleCSVHeader(data.take(1)(0)) // we build our header with the first line val rows = data.filter(line => header(line,"user") != "user") // filter the header out val users = rows.map(row => header(row,"user") val usersByHits = rows.map(row => header(row,"user") -> header(row,"hits").toInt) ...
Note that the header
is not much more than a simple map of a mnemonic to the array index. Pretty much all this could be done on the ordinal place of the element in the array, like user = row(0)
PS: Welcome to Scala :-)
You can use the spark-csv library: https://github.com/databricks/spark-csv
This is directly from the documentation:
import org.apache.spark.sql.SQLContext SQLContext sqlContext = new SQLContext(sc); HashMap<String, String> options = new HashMap<String, String>(); options.put("header", "true"); options.put("path", "cars.csv"); DataFrame df = sqlContext.load("com.databricks.spark.csv", options);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With