Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I read a text file & apply a schema with PySpark?

.txt file looks like this:

1234567813572468
1234567813572468
1234567813572468
1234567813572468
1234567813572468

When I read it in, and sort into 3 distinct columns, I return this (perfect):

df = spark.read.option("header"     , "false")\
               .option("inferSchema", "true" )\
               .text( "fixed-width-2.txt"    )

sorted_df = df.select(
    df.value.substr(1, 4).alias('col1'),
    df.value.substr(5, 4).alias('col2'),
    df.value.substr(8, 4).alias('col3'),
).show()

+----+----+----+
|col1|col2|col3|
+----+----+----+
|1234|5678|8135|
|1234|5678|8135|
|1234|5678|8135|
|1234|5678|8135|
|1234|5678|8135|
|1234|5678|8135|

However, if I were to read it again, and apply a schema...

from pyspark.sql.types import *
schema = StructType([StructField('col1', IntegerType(), True),
                     StructField('col2', IntegerType(), True),
                     StructField('col3', IntegerType(), True)])
df_new = spark.read.csv("fixed-width-2.txt", schema=schema)
df_new.printSchema()
root
 |-- col1: integer (nullable = true)
 |-- col2: integer (nullable = true)
 |-- col3: integer (nullable = true)

The data from the file is gone:

df_new.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
+----+----+----+

So my question is, how can I read in this text file and apply a schema?

like image 770
Dave Voyles Avatar asked Sep 01 '25 02:09

Dave Voyles


1 Answers

When reading with schema for col1 as int this value exceeds 1234567813572468 max int value. instead read with LongType.

schema = StructType([StructField('col1', LongType(), True)])
spark.read.csv("path",schema=schema).show()
#+----------------+
#|            col1|
#+----------------+
#|1234567813572468|
#|1234567813572468|
#|1234567813572468|
#|1234567813572468|
#|1234567813572468|
#+----------------+

Using RDD Api:

Easier way would be read the fixed width file using .textFile(results an rdd) then apply transformations using .map then convert to dataframe using the schema.


from pyspark.sql.types import *
schema = StructType([StructField('col1', IntegerType(), True),
                     StructField('col2', IntegerType(), True),
                     StructField('col3', IntegerType(), True)])
df=spark.createDataFrame(
spark.sparkContext.textFile("fixed_width.csv").\
map(lambda x:(int(x[0:4]),int(x[4:8]),int(x[8:12]))),schema)

df.show()
#+----+----+----+
#|col1|col2|col3|
#+----+----+----+
#|1234|5678|1357|
#|1234|5678|1357|
#|1234|5678|1357|
#|1234|5678|1357|
#|1234|5678|1357|
#+----+----+----+

df.printSchema()
#root
# |-- col1: integer (nullable = true)
# |-- col2: integer (nullable = true)
# |-- col3: integer (nullable = true)

Using DataFrame Api:

df = spark.read.option("header"     , "false")\
               .option("inferSchema", "true" )\
               .text( "path")

sorted_df = df.select(
    df.value.substr(1, 4).alias('col1'),
    df.value.substr(5, 4).alias('col2'),
    df.value.substr(8, 4).alias('col3'),
)
#dynamic cast expression
casting=[(col(col_name).cast("int")).name(col_name) for col_name in sorted_df.columns]
sorted_df=sorted_df.select(casting)

#required dataframe
sorted_df.show()
#+----+----+----+
#|col1|col2|col3|
#+----+----+----+
#|1234|5678|8135|
#|1234|5678|8135|
#|1234|5678|8135|
#|1234|5678|8135|
#|1234|5678|8135|
#+----+----+----+

#just in case if you want to change the types
schema = StructType([StructField('col1', IntegerType(), True),
                     StructField('col2', IntegerType(), True),
                     StructField('col3', IntegerType(), True)])

df=spark.createDataFrame(sorted_df.rdd,schema)
df.show()
#+----+----+----+
#|col1|col2|col3|
#+----+----+----+
#|1234|5678|8135|
#|1234|5678|8135|
#|1234|5678|8135|
#|1234|5678|8135|
#|1234|5678|8135|
#+----+----+----+
like image 54
notNull Avatar answered Sep 02 '25 15:09

notNull