Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to create a copy of a dataframe in pyspark?

I have a dataframe from which I need to create a new dataframe with a small change in the schema by doing the following operation.

>>> X = spark.createDataFrame([[1,2], [3,4]], ['a', 'b'])
>>> schema_new = X.schema.add('id_col', LongType(), False)
>>> _X = X.rdd.zipWithIndex().map(lambda l: list(l[0]) + [l[1]]).toDF(schema_new)

The problem is that in the above operation, the schema of X gets changed inplace. So when I print X.columns I get

>>> X.columns
['a', 'b', 'id_col']

but the values in X are still the same

>>> X.show()
+---+---+
|  a|  b|
+---+---+
|  1|  2|
|  3|  4|
+---+---+

To avoid changing the schema of X, I tried creating a copy of X using three ways - using copy and deepcopy methods from the copy module - simply using _X = X

The copy methods failed and returned a

RecursionError: maximum recursion depth exceeded

The assignment method also doesn't work

>>> _X = X
>>> id(_X) == id(X)
True

Since their id are the same, creating a duplicate dataframe doesn't really help here and the operations done on _X reflect in X.

So my question really is two fold

  • how to change the schema outplace (that is without making any changes to X)?

  • and more importantly, how to create a duplicate of a pyspark dataframe?

Note:

This question is a followup to this post

like image 412
Clock Slave Avatar asked Sep 12 '18 04:09

Clock Slave


4 Answers

.alias() is commonly used in renaming the columns, but it is also a DataFrame method and will give you what you want:

df2 = df.alias('df2')
id(df2) == id(df)  # False
like image 52
tozCSS Avatar answered Nov 04 '22 19:11

tozCSS


As explained in the answer to the other question, you could make a deepcopy of your initial schema. We can then modify that copy and use it to initialize the new DataFrame _X:

import pyspark.sql.functions as F
from pyspark.sql.types import LongType
import copy

X = spark.createDataFrame([[1,2], [3,4]], ['a', 'b'])
_schema = copy.deepcopy(X.schema)
_schema.add('id_col', LongType(), False) # modified inplace
_X = X.rdd.zipWithIndex().map(lambda l: list(l[0]) + [l[1]]).toDF(_schema)

Now let's check:

print('Schema of X: ' + str(X.schema))
print('Schema of _X: ' + str(_X.schema))

Output:

Schema of X: StructType(List(StructField(a,LongType,true),StructField(b,LongType,true)))
Schema of _X: StructType(List(StructField(a,LongType,true),
                  StructField(b,LongType,true),StructField(id_col,LongType,false)))

Note that to copy a DataFrame you can just use _X = X. Whenever you add a new column with e.g. withColumn, the object is not altered in place, but a new copy is returned. Hope this helps!

like image 24
Florian Avatar answered Nov 04 '22 18:11

Florian


If you need to create a copy of a pyspark dataframe, you could potentially use Pandas.

schema = X.schema
X_pd = X.toPandas()
_X = spark.createDataFrame(X_pd,schema=schema)
del X_pd
like image 2
CheapMango Avatar answered Nov 04 '22 17:11

CheapMango


In Scala:

  1. With "X.schema.copy" new schema instance created without old schema modification;
  2. In each Dataframe operation, which return Dataframe ("select","where", etc), new Dataframe is created, without modification of original. Original can be used again and again. Guess, duplication is not required for yours case. Performance is separate issue, "persist" can be used.
like image 1
pasha701 Avatar answered Nov 04 '22 19:11

pasha701