Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

how to do a nested for-each loop with PySpark

Imagine a large dataset (>40GB parquet file) containing value observations of thousands of variables as triples (variable, timestamp, value).

Now think of a query in which you are just interested in a subset of 500 variables. And you want to retrieve the observations (values --> time series) for those variables for specific points in time (observation window or timeframe). Such having a start and end time.

Without distributed computing (Spark), you could code it like this:

for var_ in variables_of_interest:
    for incident in incidents:

        var_df = df_all.filter(
            (df.Variable == var_)
            & (df.Time > incident.startTime)
            & (df.Time < incident.endTime))

My question is: how to do that with Spark/PySpark? I was thinking of either:

  1. joining the incidents somehow with the variables and filter the dataframe afterward.
  2. broadcasting the incident dataframe and use it within a map-function when filtering the variable observations (df_all).
  3. use RDD.cartasian or RDD.mapParitions somehow (remark: the parquet file was saved partioned by variable).

The expected output should be:

incident1 --> dataframe 1
incident2 --> dataframe 2
...

Where dataframe 1 contains all variables and their observed values within the timeframe of incident 1 and dataframe 2 those values within the timeframe of incident 2.

I hope you got the idea.

UPDATE

I tried to code a solution based on idea #1 and the code from the answer given by zero323. Work's quite well, but I wonder how to aggregate/group it to the incident in the final step? I tried adding a sequential number to each incident, but then I got errors in the last step. Would be cool if you can review and/or complete the code. Therefore I uploaded sample data and the scripts. The environment is Spark 1.4 (PySpark):

  • Incidents: incidents.csv
  • Variable value observation data (77MB): parameters_sample.csv (put it to HDFS)
  • Jupyter Notebook: nested_for_loop_optimized.ipynb
  • Python Script: nested_for_loop_optimized.py
  • PDF export of Script: nested_for_loop_optimized.pdf
like image 385
Matthias Avatar asked Oct 29 '22 21:10

Matthias


1 Answers

Generally speaking only the first approach looks sensible to me. Exact joining strategy on the number of records and distribution but you can either create a top level data frame:

ref = sc.parallelize([(var_, incident) 
    for var_ in variables_of_interest:
    for incident in incidents
]).toDF(["var_", "incident"])

and simply join

same_var = col("Variable") == col("var_")
same_time = col("Time").between(
    col("incident.startTime"),
    col("incident.endTime")
)

ref.join(df.alias("df"), same_var &  same_time)

or perform joins against particular partitions:

incidents_ = sc.parallelize([
   (incident, ) for incident in incidents
]).toDF(["incident"])

for var_ in variables_of_interest:
    df = spark.read.parquet("/some/path/Variable={0}".format(var_))
    df.join(incidents_, same_time)

optionally marking one side as small enough to be broadcasted.

like image 200
zero323 Avatar answered Nov 02 '22 22:11

zero323