Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Creating/accessing dataframe inside the transformation of another dataframe

I'm retrofitting some existing code to use Spark. I've multiple data frames that hold different data sets. While transforming my main dataframe (or my main data set), I need to use data from the other data frames to complete the transformation. I also have a situation (atleast in the current structure) where I need to create new data frames in the transformation function of another data frames.

I'm trying to determine the following:

  1. Can I access a data frame inside the transformation function of another data frame?
  2. Can a data frame be created on an executor inside the transformation function of a dataframe?

Pointers on how to deal with such a situation would be very helpful.

like image 955
IceMan Avatar asked Sep 01 '17 21:09

IceMan


1 Answers

The answer to both questions is NO:

DataFrames are driver-side abstractions of distributed collections. They cannot be used, created, or referenced in any executor-side transformation.

Why? DataFrames (like RDDs and Datasets) can only be used within the context of an active SparkSession - without it, the DataFrame cannot "point" to its partitions on the active executors; The SparkSession should be thought of as a live "connection" to the cluster of executors.

Now, if you try using a DataFrame inside another transformation, that DataFrame would have to be serialized on the driver side, sent to the executor(s), and then deserialized there. But this deserialized instance (in a separate JVM) would necessarily lose it's SparkSession - that "connection" was from the driver to the executor, not from this new executor we're now operating in.

So what should you do? You have a few options for referencing one DataFrame's data in another, and choosing the right one is mostly dependent on the amounts of data that would have to be shuffled (or - transferred between executors):

  1. Collect one of the DataFrames (if you can guarantee it's small!), and then use the resulting local collection (either directly or using spark.broadcast) in any transformation.

  2. Join the two DataFrames on some common fields. This is a very common solution, as the logic of using one DataFrame's data when transforming another usually has to do with some kind of "lookup" for the right value based on some subset of the columns. This usecase translates into a JOIN operation rather naturally

  3. Use set operators like except, intersect and union, if they provide the logical operation you're after.

like image 144
Tzach Zohar Avatar answered Oct 26 '22 09:10

Tzach Zohar