.txt file looks like this:
1234567813572468
1234567813572468
1234567813572468
1234567813572468
1234567813572468
When I read it in, and sort into 3 distinct columns, I return this (perfect):
df = spark.read.option("header" , "false")\
.option("inferSchema", "true" )\
.text( "fixed-width-2.txt" )
sorted_df = df.select(
df.value.substr(1, 4).alias('col1'),
df.value.substr(5, 4).alias('col2'),
df.value.substr(8, 4).alias('col3'),
).show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
|1234|5678|8135|
|1234|5678|8135|
|1234|5678|8135|
|1234|5678|8135|
|1234|5678|8135|
|1234|5678|8135|
However, if I were to read it again, and apply a schema...
from pyspark.sql.types import *
schema = StructType([StructField('col1', IntegerType(), True),
StructField('col2', IntegerType(), True),
StructField('col3', IntegerType(), True)])
df_new = spark.read.csv("fixed-width-2.txt", schema=schema)
df_new.printSchema()
root
|-- col1: integer (nullable = true)
|-- col2: integer (nullable = true)
|-- col3: integer (nullable = true)
The data from the file is gone:
df_new.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
+----+----+----+
So my question is, how can I read in this text file and apply a schema?
When reading with schema for col1
as int
this value exceeds 1234567813572468
max int value. instead read with LongType
.
schema = StructType([StructField('col1', LongType(), True)])
spark.read.csv("path",schema=schema).show()
#+----------------+
#| col1|
#+----------------+
#|1234567813572468|
#|1234567813572468|
#|1234567813572468|
#|1234567813572468|
#|1234567813572468|
#+----------------+
Using RDD Api:
Easier way would be read the fixed width file using .textFile
(results an rdd) then apply transformations using .map
then convert to dataframe using the schema.
from pyspark.sql.types import *
schema = StructType([StructField('col1', IntegerType(), True),
StructField('col2', IntegerType(), True),
StructField('col3', IntegerType(), True)])
df=spark.createDataFrame(
spark.sparkContext.textFile("fixed_width.csv").\
map(lambda x:(int(x[0:4]),int(x[4:8]),int(x[8:12]))),schema)
df.show()
#+----+----+----+
#|col1|col2|col3|
#+----+----+----+
#|1234|5678|1357|
#|1234|5678|1357|
#|1234|5678|1357|
#|1234|5678|1357|
#|1234|5678|1357|
#+----+----+----+
df.printSchema()
#root
# |-- col1: integer (nullable = true)
# |-- col2: integer (nullable = true)
# |-- col3: integer (nullable = true)
Using DataFrame Api:
df = spark.read.option("header" , "false")\
.option("inferSchema", "true" )\
.text( "path")
sorted_df = df.select(
df.value.substr(1, 4).alias('col1'),
df.value.substr(5, 4).alias('col2'),
df.value.substr(8, 4).alias('col3'),
)
#dynamic cast expression
casting=[(col(col_name).cast("int")).name(col_name) for col_name in sorted_df.columns]
sorted_df=sorted_df.select(casting)
#required dataframe
sorted_df.show()
#+----+----+----+
#|col1|col2|col3|
#+----+----+----+
#|1234|5678|8135|
#|1234|5678|8135|
#|1234|5678|8135|
#|1234|5678|8135|
#|1234|5678|8135|
#+----+----+----+
#just in case if you want to change the types
schema = StructType([StructField('col1', IntegerType(), True),
StructField('col2', IntegerType(), True),
StructField('col3', IntegerType(), True)])
df=spark.createDataFrame(sorted_df.rdd,schema)
df.show()
#+----+----+----+
#|col1|col2|col3|
#+----+----+----+
#|1234|5678|8135|
#|1234|5678|8135|
#|1234|5678|8135|
#|1234|5678|8135|
#|1234|5678|8135|
#+----+----+----+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With