Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use PySpark to load a rolling window from daily files?

I have a large number of fairly large daily files stored in a blog storage engine(S3, Azure datalake exc.. exc..) data1900-01-01.csv, data1900-01-02.csv,....,data2017-04-27.csv. My goal is to preform a rolling N-day linear regression but I am having trouble with the data loading aspect. I am not sure how to do this without nested RDD's. The schema for every .csv file is the same.

In other words for every date d_t, I need data x_t and to join data (x_t-1, x_t-2,... x_t-N).

How can I use PySpark to load an N-day Window of these daily files? All of the PySpark examples I can find seem to load from one very large file or data set.

Here's an example of my current code:

dates = [('1995-01-03', '1995-01-04', '1995-01-05'), ('1995-01-04', '1995-01-05', '1995-01-06')]

p = sc.parallelize(dates)
def test_run(date_range):
    dt0 = date_range[-1] #get the latest date
    s = '/daily/data{}.csv'
    df0 = spark.read.csv(s.format(dt0), header=True, mode='DROPMALFORM')
    file_list = [s.format(dt) for dt in date_range[:-1]] # Get a window of trailing dates
    df1 = spark.read.csv(file_list, header=True, mode='DROPMALFORM')
    return 1

p.filter(test_run) 

p.map(test_run) #fails with same error as p.filter

I'm on PySpark version '2.1.0'

I'm running this on an Azure HDInsight cluster jupyter notebook.

spark here is of type <class 'pyspark.sql.session.SparkSession'>

A smaller more reproducible example is as follows:

p = sc.parallelize([1, 2, 3])
def foo(date_range):
    df = spark.createDataFrame([(1, 0, 3)], ["a", "b", "c"])
    return 1
p.filter(foo).count()
like image 482
pyCthon Avatar asked Apr 27 '17 13:04

pyCthon


1 Answers

You are better off with using Dataframes rather than RDD. Dataframe's read.csv api accepts list of paths like -

pathList = ['/path/to/data1900-01-01.csv','/path/to/data1900-01-02.csv']
df = spark.read.csv(pathList)

have a look at documentation for read.csv

You can form the list of paths to date files to your data files by doing some date operation over window of N days like "path/to/data"+datetime.today().strftime("%Y-%m-%d"))+.csv" (This will get you file name of today only but its not hard to figure out date calculation for N days)

However keep in mind that schema of all date csvs should be same for above to work.

edit : When you parallelize list of dates i.e. p, each date gets processed individually by different executors, so input to test_run2 wasnt really as list of dates, it was one individual string like 1995-01-01

Try this instead, see if this works.

# Get the list of dates 
date_range = window(dates, N) 
s = '/daily/data{}.csv'

dt0 = date_range[-1] # most recent file
df0 = spark.read.csv(s.format(dt0), header=True, mode='DROPMALFORM') 

# read previous files
file_list = [s.format(dt) for dt in date_range[:-1]]
df1 = spark.read.csv(file_list, header=True, mode='DROPMALFORM')

r, resid = computeLinearRegression(df0,df1)
r.write.save('daily/r{}.csv'.format(dt0))
resid.write.save('/daily/resid{}.csv'.format(dt0))
like image 180
Pushkr Avatar answered Oct 16 '22 06:10

Pushkr