Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to skip lines while reading a CSV file as a dataFrame using PySpark?

I have a CSV file that is structured this way:

Header
Blank Row
"Col1","Col2"
"1,200","1,456"
"2,000","3,450"

I have two problems in reading this file.

  1. I want to Ignore the Header and Ignore the blank row
  2. The commas within the value is not a separator

Here is what I tried:

df = sc.textFile("myFile.csv")\
              .map(lambda line: line.split(","))\ #Split By comma
              .filter(lambda line: len(line) == 2).collect() #This helped me ignore the first two rows

However, This did not work, because the commas within the value was being read as a separator and the len(line) was returning 4 instead of 2.

I tried an alternate approach:

data = sc.textFile("myFile.csv")
headers = data.take(2) #First two rows to be skipped

The idea was to then use filter and not read the headers. But, when I tried to print the headers, I got encoded values.

[\x00A\x00Y\x00 \x00J\x00u\x00l\x00y\x00 \x002\x000\x001\x006\x00]

What is the correct way to read a CSV file and skip the first two rows?

like image 391
Rakesh Adhikesavan Avatar asked May 19 '17 19:05

Rakesh Adhikesavan


3 Answers

Answer by Zlidime had the right idea. The working solution is this:

import csv

customSchema = StructType([ \
    StructField("Col1", StringType(), True), \
    StructField("Col2", StringType(), True)])

df = sc.textFile("file.csv")\
        .mapPartitions(lambda partition: csv.reader([line.replace('\0','') for line in partition],delimiter=',', quotechar='"')).filter(lambda line: len(line) > 2 and line[0] != 'Col1')\
        .toDF(customSchema)
like image 67
Rakesh Adhikesavan Avatar answered Oct 06 '22 18:10

Rakesh Adhikesavan


For your first problem, just zip the lines in the RDD with zipWithIndex and filter the lines you don't want. For the second problem, you could try to strip the first and the last double quote characters from the lines and then split the line on ",".

rdd = sc.textFile("myfile.csv")
rdd.zipWithIndex().
    filter(lambda x: x[1] > 2).
    map(lambda x: x[0]).
    map(lambda x: x.strip('"').split('","')).
    toDF(["Col1", "Col2"])

Although, if you're looking for a standard way to deal with CSV files in Spark, it's better to use the spark-csv package from databricks.

like image 24
zenofsahil Avatar answered Oct 06 '22 18:10

zenofsahil


If CSV file structure always has two columns, on Scala can be implemented:

val struct = StructType(
  StructField("firstCol", StringType, nullable = true) ::
  StructField("secondCol", StringType, nullable = true) :: Nil)

val df = sqlContext.read
  .format("com.databricks.spark.csv")
  .option("header", "false")
  .option("inferSchema", "false")
  .option("delimiter", ",")
  .option("quote", "\"")
  .schema(struct)
  .load("myFile.csv")

df.show(false)

val indexed = df.withColumn("index", monotonicallyIncreasingId())
val filtered = indexed.filter(col("index") > 2).drop("index")

filtered.show(false)

Result is:

+---------+---------+
|firstCol |secondCol|
+---------+---------+
|Header   |null     |
|Blank Row|null     |
|Col1     |Col2     |
|1,200    |1,456    |
|2,000    |3,450    |
+---------+---------+

+--------+---------+
|firstCol|secondCol|
+--------+---------+
|1,200   |1,456    |
|2,000   |3,450    |
+--------+---------+
like image 39
pasha701 Avatar answered Oct 06 '22 18:10

pasha701