I have a CSV file that is structured this way:
Header
Blank Row
"Col1","Col2"
"1,200","1,456"
"2,000","3,450"
I have two problems in reading this file.
Here is what I tried:
df = sc.textFile("myFile.csv")\
.map(lambda line: line.split(","))\ #Split By comma
.filter(lambda line: len(line) == 2).collect() #This helped me ignore the first two rows
However, This did not work, because the commas within the value was being read as a separator and the len(line)
was returning 4 instead of 2.
I tried an alternate approach:
data = sc.textFile("myFile.csv")
headers = data.take(2) #First two rows to be skipped
The idea was to then use filter and not read the headers. But, when I tried to print the headers, I got encoded values.
[\x00A\x00Y\x00 \x00J\x00u\x00l\x00y\x00 \x002\x000\x001\x006\x00]
What is the correct way to read a CSV file and skip the first two rows?
Answer by Zlidime had the right idea. The working solution is this:
import csv
customSchema = StructType([ \
StructField("Col1", StringType(), True), \
StructField("Col2", StringType(), True)])
df = sc.textFile("file.csv")\
.mapPartitions(lambda partition: csv.reader([line.replace('\0','') for line in partition],delimiter=',', quotechar='"')).filter(lambda line: len(line) > 2 and line[0] != 'Col1')\
.toDF(customSchema)
For your first problem, just zip the lines in the RDD with zipWithIndex
and filter the lines you don't want.
For the second problem, you could try to strip the first and the last double quote characters from the lines and then split the line on ","
.
rdd = sc.textFile("myfile.csv")
rdd.zipWithIndex().
filter(lambda x: x[1] > 2).
map(lambda x: x[0]).
map(lambda x: x.strip('"').split('","')).
toDF(["Col1", "Col2"])
Although, if you're looking for a standard way to deal with CSV files in Spark, it's better to use the spark-csv
package from databricks.
If CSV file structure always has two columns, on Scala can be implemented:
val struct = StructType(
StructField("firstCol", StringType, nullable = true) ::
StructField("secondCol", StringType, nullable = true) :: Nil)
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "false")
.option("inferSchema", "false")
.option("delimiter", ",")
.option("quote", "\"")
.schema(struct)
.load("myFile.csv")
df.show(false)
val indexed = df.withColumn("index", monotonicallyIncreasingId())
val filtered = indexed.filter(col("index") > 2).drop("index")
filtered.show(false)
Result is:
+---------+---------+
|firstCol |secondCol|
+---------+---------+
|Header |null |
|Blank Row|null |
|Col1 |Col2 |
|1,200 |1,456 |
|2,000 |3,450 |
+---------+---------+
+--------+---------+
|firstCol|secondCol|
+--------+---------+
|1,200 |1,456 |
|2,000 |3,450 |
+--------+---------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With