Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I skip a header from CSV files in Spark?

Suppose I give three files paths to a Spark context to read and each file has a schema in the first row. How can we skip schema lines from headers?

val rdd=sc.textFile("file1,file2,file3")

Now, how can we skip header lines from this rdd?

like image 877
Hafiz Mujadid Avatar asked Jan 09 '15 06:01

Hafiz Mujadid


People also ask

How do I read a CSV file in Spark without header?

In order to read a CSV file without headers use None value to header param in pandas read_csv() function.


Video Answer


4 Answers

data = sc.textFile('path_to_data') header = data.first() #extract header data = data.filter(row => row != header)   #filter out header 
like image 159
Jimmy Avatar answered Oct 02 '22 16:10

Jimmy


If there were just one header line in the first record, then the most efficient way to filter it out would be:

rdd.mapPartitionsWithIndex {   (idx, iter) => if (idx == 0) iter.drop(1) else iter  } 

This doesn't help if of course there are many files with many header lines inside. You can union three RDDs you make this way, indeed.

You could also just write a filter that matches only a line that could be a header. This is quite simple, but less efficient.

Python equivalent:

from itertools import islice  rdd.mapPartitionsWithIndex(     lambda idx, it: islice(it, 1, None) if idx == 0 else it  ) 
like image 28
Sean Owen Avatar answered Oct 02 '22 17:10

Sean Owen


In Spark 2.0 a CSV reader is build into Spark, so you can easily load a CSV file as follows:

spark.read.option("header","true").csv("filePath")
like image 32
Sandeep Purohit Avatar answered Oct 02 '22 17:10

Sandeep Purohit


From Spark 2.0 onwards what you can do is use SparkSession to get this done as a one liner:

val spark = SparkSession.builder.config(conf).getOrCreate()

and then as @SandeepPurohit said:

val dataFrame = spark.read.format("CSV").option("header","true").load(csvfilePath)

I hope it solved your question !

P.S: SparkSession is the new entry point introduced in Spark 2.0 and can be found under spark_sql package

like image 32
Shivansh Avatar answered Oct 02 '22 15:10

Shivansh