Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Set schema in pyspark dataframe read.csv with null elements

I have a data set (example) that when imported with

df = spark.read.csv(filename, header=True, inferSchema=True)
df.show()

will assign the column with 'NA' as a stringType(), where I would like it to be IntegerType() (or ByteType()).

inferSchema

I then tried to set

schema = StructType([
    StructField("col_01", IntegerType()),
    StructField("col_02", DateType()),
    StructField("col_03", IntegerType())
])
df = spark.read.csv(filename, header=True, schema=schema)
df.show()

The output shows the entire row with 'col_03' = null to be null.

entire_row_null

However col_01 and col_02 return appropriate data if they are called with

df.select(['col_01','col_02']).show()

row_actually_not_null

I can find a way around this by post casting the data type of col_3

df = spark.read.csv(filename, header=True, inferSchema=True)
df = df.withColumn('col_3',df['col_3'].cast(IntegerType()))
df.show()

import_then_cast

, but I think it is not ideal and would be much better if I can assign the data type for each column directly with setting schema.

Would anyone be able to guide me what I do incorrectly? Or casting the data types after importing is the only solution? Any comment regarding performance of the two approaches (if we can make assigning schema to work) is also welcome.

Thank you,

like image 811
clumdee Avatar asked Feb 09 '18 02:02

clumdee


2 Answers

You can set a new null value in spark's csv loader using nullValue:

for a csv file looking like this:

col_01,col_02,col_03
111,2007-11-18,3
112,2002-12-03,4
113,2007-02-14,5
114,2003-04-16,NA
115,2011-08-24,2
116,2003-05-03,3
117,2001-06-11,4
118,2004-05-06,NA
119,2012-03-25,5
120,2006-10-13,4

and forcing schema:

from pyspark.sql.types import StructType, IntegerType, DateType

schema = StructType([
    StructField("col_01", IntegerType()),
    StructField("col_02", DateType()),
    StructField("col_03", IntegerType())
])

You'll get:

df = spark.read.csv(filename, header=True, nullValue='NA', schema=schema)
df.show()
df.printSchema()

    +------+----------+------+
    |col_01|    col_02|col_03|
    +------+----------+------+
    |   111|2007-11-18|     3|
    |   112|2002-12-03|     4|
    |   113|2007-02-14|     5|
    |   114|2003-04-16|  null|
    |   115|2011-08-24|     2|
    |   116|2003-05-03|     3|
    |   117|2001-06-11|     4|
    |   118|2004-05-06|  null|
    |   119|2012-03-25|     5|
    |   120|2006-10-13|     4|
    +------+----------+------+

    root
     |-- col_01: integer (nullable = true)
     |-- col_02: date (nullable = true)
     |-- col_03: integer (nullable = true)
like image 165
MaFF Avatar answered Sep 28 '22 08:09

MaFF


Try this once - (But this will read every column as string type. You can type caste as per your requirement)

import csv
from pyspark.sql.types import IntegerType

data = []
with open('filename', 'r' ) as doc:
    reader = csv.DictReader(doc)
    for line in reader:
        data.append(line)

df = sc.parallelize(data).toDF()
df = df.withColumn("col_03", df["col_03"].cast(IntegerType()))
like image 22
Abhijeet Avatar answered Sep 28 '22 08:09

Abhijeet