Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pyspark read multiple csv files into a dataframe (OR RDD?)

I've got a Spark 2.0.2 cluster that I'm hitting via Pyspark through Jupyter Notebook. I have multiple pipe delimited txt files (loaded into HDFS. but also available on a local directory) that I need to load using spark-csv into three separate dataframes, depending on the name of the file.

I see three approaches I can take - either I can use python to somehow iterate through the HDFS directory (haven't figured out how to do this yet, load each file and then do a union.

I also know that there exists some wildcard functionalty (see here) in spark - I can probably leverage

Lastly, I could use pandas to load the vanilla csv file from disk as a pandas dataframe and then create a spark dataframe. The downside here is that these files are large, and loading into memory on a single node could take ~8gb. (that's why this is moving to a cluster in the first place).

Here is the code I have so far and some pseudo code for the two methods:

import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
import pandas as pd

sc = pyspark.SparkContext(appName = 'claims_analysis', master='spark://someIP:7077')

spark = SparkSession(sc)

#METHOD 1 - iterate over HDFS directory
for currFile in os.listdir(HDFS:///someDir//):
    if #filename contains 'claim':
        #create or unionAll to merge claim_df
    if #filename contains 'pharm':
        #create or unionAll to merge pharm_df
    if #filename contains 'service':
        #create or unionAll to merge service_df

#Method 2 - some kind of wildcard functionality
claim_df = spark.read.format('com.databricks.spark.csv').options(delimiter = '|',header ='true',nullValue ='null').load('HDFS:///someDir//*<claim>.csv')
pharm_df = spark.read.format('com.databricks.spark.csv').options(delimiter = '|',header ='true',nullValue ='null').load('HDFS:///someDir//*<pharm>.csv')
service_df = spark.read.format('com.databricks.spark.csv').options(delimiter = '|',header ='true',nullValue ='null').load('HDFS:///someDir//*<service>.csv')


#METHOD 3 - load to a pandas df and then convert to spark df
for currFile in os.listdir(HDFS:///someDir//)
    pd_df = pd.read_csv(currFile, sep = '|')
    df = spark.createDataFrame(pd_df)
    if #filename contains 'claim':
        #create or unionAll to merge claim_df
    if #filename contains 'pharm':
        #create or unionAll to merge pharm_df
    if #filename contains 'service':
        #create or unionAll to merge service_df

Does anyone know how to implement method 1 or 2? I haven't been able to figure these out. Also, I was surprised that there isn't a better way to get csv files loaded into a pyspark dataframe - using a third party package for something that seems like it should be a native feature confused me (did I just miss the standard use case for loading csv files into a dataframe?) Ultimately, I'm going to be writing a consolidated single dataframe back to HDFS (using .write.parquet() ) so that I can then clear the memory and do some analytics using MLlib. If the approach I've highlighted isn't best practice, I would appreciate a push in the right direction!

like image 820
flyingmeatball Avatar asked Dec 13 '16 20:12

flyingmeatball


1 Answers

Approach 1 :

In python you cannot directly refer to HDFS location. You need to take help of another library like pydoop. In scala and java, you have API. Even with pydoop, you will be reading the files one by one. It is bad to read files one by one and not use the parallel reading option provided by spark.

Approach 2 :

You should be able to point the multiple files with comma separated or with wild card. This way spark takes care of reading files and distribute them into partitions. But if you go with union option with each data frame there is one edge case when you dynamically read each file. When you have lot of files, the list can become so huge at driver level and can cause memory issues. Main reason is that, the read process is still happening at driver level.

This option is better. The spark will read all the files related to regex and convert them into partitions. You get one RDD for all the wildcard matches and from there you dont need to worry about union for individual rdd's

Sample code cnippet :

distFile = sc.textFile("/hdfs/path/to/folder/fixed_file_name_*.csv")

Approach 3 :

Unless you have some legacy application in python which uses the features of pandas, I would better prefer using spark provided API

like image 118
Ramzy Avatar answered Oct 21 '22 10:10

Ramzy