I am reading a file in PySpark
and forming the rdd
of it. I then convert it to a normal dataframe
and then to pandas dataframe
. The issue that I am having is that there is header row in my input file and I want to make this as the header of dataframe columns as well but they are read in as an additional row and not as header. This is my current code:
def extract(line):
return line
input_file = sc.textFile('file1.txt').zipWithIndex().filter(lambda (line,rownum): rownum>=0).map(lambda (line, rownum): line)
input_data = (input_file
.map(lambda line: line.split(";"))
.filter(lambda line: len(line) >=0 )
.map(extract)) # Map to tuples
df_normal = input_data.toDF()
df= df_normal.toPandas()
Now when I look at the df
then the header row of text file becomes the first row of dataframe
and there is additional header in df
with 0,1,2...
as header. How can I make the first row as header?
Creating a data frame from CSV file and creating row header While reading the data and storing it in a data frame, or creating a fresh data frame , column names can be specified by using the names attribute of the read_csv() method in Python.
In Spark/PySpark, you can use show() action to get the top/first N (5,10,100 ..) rows of the DataFrame and display them on a console or a log, there are also several Spark Actions like take() , tail() , collect() , head() , first() that return top and last n rows as a list of Rows (Array[Row] for Scala).
Use DataFrame. You can use df. columns=df. iloc[0] to set the column labels by extracting the first row. In pandas, the index starts from 0 hence 0 means first row.
There are a couple of ways to do that, depending on the exact structure of your data. Since you do not give any details, I'll try to show it using a datafile nyctaxicab.csv
that you can download.
If your file is in csv
format, you should use the relevant spark-csv
package, provided by Databricks. No need to download it explicitly, just run pyspark
as follows:
$ pyspark --packages com.databricks:spark-csv_2.10:1.3.0
and then
>>> from pyspark.sql import SQLContext
>>> from pyspark.sql.types import *
>>> sqlContext = SQLContext(sc)
>>> df = sqlContext.read.load('file:///home/vagrant/data/nyctaxisub.csv',
format='com.databricks.spark.csv',
header='true',
inferSchema='true')
>>> df.count()
249999
The file has 250,000 rows including the header, so 249,999 is the correct number of actual records. Here is the schema, as inferred automatically by the package:
>>> df.dtypes
[('_id', 'string'),
('_rev', 'string'),
('dropoff_datetime', 'string'),
('dropoff_latitude', 'double'),
('dropoff_longitude', 'double'),
('hack_license', 'string'),
('medallion', 'string'),
('passenger_count', 'int'),
('pickup_datetime', 'string'),
('pickup_latitude', 'double'),
('pickup_longitude', 'double'),
('rate_code', 'int'),
('store_and_fwd_flag', 'string'),
('trip_distance', 'double'),
('trip_time_in_secs', 'int'),
('vendor_id', 'string')]
You can see more details in my relevant blog post.
If, for whatever reason, you cannot use the spark-csv
package, you'll have to subtract the first row from the data and then use it to construct your schema. Here is the general idea, and you can again find a full example with code details in another blog post of mine:
>>> taxiFile = sc.textFile("file:///home/ctsats/datasets/BDU_Spark/nyctaxisub.csv")
>>> taxiFile.count()
250000
>>> taxiFile.take(5)
[u'"_id","_rev","dropoff_datetime","dropoff_latitude","dropoff_longitude","hack_license","medallion","passenger_count","pickup_datetime","pickup_latitude","pickup_longitude","rate_code","store_and_fwd_flag","trip_distance","trip_time_in_secs","vendor_id"',
u'"29b3f4a30dea6688d4c289c9672cb996","1-ddfdec8050c7ef4dc694eeeda6c4625e","2013-01-11 22:03:00",+4.07033460000000E+001,-7.40144200000000E+001,"A93D1F7F8998FFB75EEF477EB6077516","68BC16A99E915E44ADA7E639B4DD5F59",2,"2013-01-11 21:48:00",+4.06760670000000E+001,-7.39810790000000E+001,1,,+4.08000000000000E+000,900,"VTS"',
u'"2a80cfaa425dcec0861e02ae44354500","1-b72234b58a7b0018a1ec5d2ea0797e32","2013-01-11 04:28:00",+4.08190960000000E+001,-7.39467470000000E+001,"64CE1B03FDE343BB8DFB512123A525A4","60150AA39B2F654ED6F0C3AF8174A48A",1,"2013-01-11 04:07:00",+4.07280540000000E+001,-7.40020370000000E+001,1,,+8.53000000000000E+000,1260,"VTS"',
u'"29b3f4a30dea6688d4c289c96758d87e","1-387ec30eac5abda89d2abefdf947b2c1","2013-01-11 22:02:00",+4.07277180000000E+001,-7.39942860000000E+001,"2D73B0C44F1699C67AB8AE322433BDB7","6F907BC9A85B7034C8418A24A0A75489",5,"2013-01-11 21:46:00",+4.07577480000000E+001,-7.39649810000000E+001,1,,+3.01000000000000E+000,960,"VTS"',
u'"2a80cfaa425dcec0861e02ae446226e4","1-aa8b16d6ae44ad906a46cc6581ffea50","2013-01-11 10:03:00",+4.07643050000000E+001,-7.39544600000000E+001,"E90018250F0A009433F03BD1E4A4CE53","1AFFD48CC07161DA651625B562FE4D06",5,"2013-01-11 09:44:00",+4.07308080000000E+001,-7.39928280000000E+001,1,,+3.64000000000000E+000,1140,"VTS"']
# Construct the schema from the header
>>> header = taxiFile.first()
>>> header
u'"_id","_rev","dropoff_datetime","dropoff_latitude","dropoff_longitude","hack_license","medallion","passenger_count","pickup_datetime","pickup_latitude","pickup_longitude","rate_code","store_and_fwd_flag","trip_distance","trip_time_in_secs","vendor_id"'
>>> schemaString = header.replace('"','') # get rid of the double-quotes
>>> schemaString
u'_id,_rev,dropoff_datetime,dropoff_latitude,dropoff_longitude,hack_license,medallion,passenger_count,pickup_datetime,pickup_latitude,pickup_longitude,rate_code,store_and_fwd_flag,trip_distance,trip_time_in_secs,vendor_id'
>>> fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split(',')]
>>> schema = StructType(fields)
# Subtract header and use the above-constructed schema:
>>> taxiHeader = taxiFile.filter(lambda l: "_id" in l) # taxiHeader needs to be an RDD - the string we constructed above will not do the job
>>> taxiHeader.collect() # for inspection purposes only
[u'"_id","_rev","dropoff_datetime","dropoff_latitude","dropoff_longitude","hack_license","medallion","passenger_count","pickup_datetime","pickup_latitude","pickup_longitude","rate_code","store_and_fwd_flag","trip_distance","trip_time_in_secs","vendor_id"']
>>> taxiNoHeader = taxiFile.subtract(taxiHeader)
>>> taxi_df = taxiNoHeader.toDF(schema) # Spark dataframe
>>> import pandas as pd
>>> taxi_DF = taxi_df.toPandas() # pandas dataframe
For brevity, here all columns end up being of type string
, but in the blog post I show in detail and explain how you can further refine the desired data types (and names) for specific fields.
The simple answer would be set header='true'
Eg:
df = spark.read.csv('housing.csv', header='true')
or
df = spark.read.option("header","true").format("csv").schema(myManualSchema).load("maestraDestacados.csv")
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With