I have a large number of fairly large daily files stored in a blog storage engine(S3, Azure datalake exc.. exc..) data1900-01-01.csv, data1900-01-02.csv,....,data2017-04-27.csv
. My goal is to preform a rolling N-day linear regression but I am having trouble with the data loading aspect. I am not sure how to do this without nested RDD's.
The schema for every .csv
file is the same.
In other words for every date d_t
, I need data x_t
and to join data (x_t-1, x_t-2,... x_t-N)
.
How can I use PySpark to load an N-day Window of these daily files? All of the PySpark examples I can find seem to load from one very large file or data set.
Here's an example of my current code:
dates = [('1995-01-03', '1995-01-04', '1995-01-05'), ('1995-01-04', '1995-01-05', '1995-01-06')]
p = sc.parallelize(dates)
def test_run(date_range):
dt0 = date_range[-1] #get the latest date
s = '/daily/data{}.csv'
df0 = spark.read.csv(s.format(dt0), header=True, mode='DROPMALFORM')
file_list = [s.format(dt) for dt in date_range[:-1]] # Get a window of trailing dates
df1 = spark.read.csv(file_list, header=True, mode='DROPMALFORM')
return 1
p.filter(test_run)
p.map(test_run) #fails with same error as p.filter
I'm on PySpark version '2.1.0'
I'm running this on an Azure HDInsight cluster jupyter notebook.
spark
here is of type <class 'pyspark.sql.session.SparkSession'>
A smaller more reproducible example is as follows:
p = sc.parallelize([1, 2, 3])
def foo(date_range):
df = spark.createDataFrame([(1, 0, 3)], ["a", "b", "c"])
return 1
p.filter(foo).count()
You are better off with using Dataframes
rather than RDD
. Dataframe's read.csv
api accepts list of paths like -
pathList = ['/path/to/data1900-01-01.csv','/path/to/data1900-01-02.csv']
df = spark.read.csv(pathList)
have a look at documentation for read.csv
You can form the list of paths to date files to your data files by doing some date operation over window of N days like "path/to/data"+datetime.today().strftime("%Y-%m-%d"))+.csv"
(This will get you file name of today only but its not hard to figure out date calculation for N days)
However keep in mind that schema of all date csvs should be same for above to work.
edit : When you parallelize list of dates i.e. p
, each date gets processed individually by different executors, so input to test_run2 wasnt really as list of dates, it was one individual string like 1995-01-01
Try this instead, see if this works.
# Get the list of dates
date_range = window(dates, N)
s = '/daily/data{}.csv'
dt0 = date_range[-1] # most recent file
df0 = spark.read.csv(s.format(dt0), header=True, mode='DROPMALFORM')
# read previous files
file_list = [s.format(dt) for dt in date_range[:-1]]
df1 = spark.read.csv(file_list, header=True, mode='DROPMALFORM')
r, resid = computeLinearRegression(df0,df1)
r.write.save('daily/r{}.csv'.format(dt0))
resid.write.save('/daily/resid{}.csv'.format(dt0))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With