I have a dataframe from which I need to create a new dataframe with a small change in the schema by doing the following operation.
>>> X = spark.createDataFrame([[1,2], [3,4]], ['a', 'b'])
>>> schema_new = X.schema.add('id_col', LongType(), False)
>>> _X = X.rdd.zipWithIndex().map(lambda l: list(l[0]) + [l[1]]).toDF(schema_new)
The problem is that in the above operation, the schema of X gets changed inplace. So when I print X.columns I get 
>>> X.columns
['a', 'b', 'id_col']
but the values in X are still the same
>>> X.show()
+---+---+
|  a|  b|
+---+---+
|  1|  2|
|  3|  4|
+---+---+
To avoid changing the schema of X, I tried creating a copy of X using three ways 
- using copy and deepcopy methods from the copy module
- simply using _X = X
The copy methods failed and returned a 
RecursionError: maximum recursion depth exceeded
The assignment method also doesn't work
>>> _X = X
>>> id(_X) == id(X)
True
Since their id are the same, creating a duplicate dataframe doesn't really help here and the operations done on _X reflect in X.
So my question really is two fold
how to change the schema outplace (that is without making any changes to X)?
and more importantly, how to create a duplicate of a pyspark dataframe?
Note:
This question is a followup to this post
.alias() is commonly used in renaming the columns, but it is also a DataFrame method and will give you what you want:
df2 = df.alias('df2')
id(df2) == id(df)  # False
                        As explained in the answer to the other question, you could make a deepcopy of your initial schema. We can then modify that copy and use it to initialize the new DataFrame _X:
import pyspark.sql.functions as F
from pyspark.sql.types import LongType
import copy
X = spark.createDataFrame([[1,2], [3,4]], ['a', 'b'])
_schema = copy.deepcopy(X.schema)
_schema.add('id_col', LongType(), False) # modified inplace
_X = X.rdd.zipWithIndex().map(lambda l: list(l[0]) + [l[1]]).toDF(_schema)
Now let's check:
print('Schema of X: ' + str(X.schema))
print('Schema of _X: ' + str(_X.schema))
Output:
Schema of X: StructType(List(StructField(a,LongType,true),StructField(b,LongType,true)))
Schema of _X: StructType(List(StructField(a,LongType,true),
                  StructField(b,LongType,true),StructField(id_col,LongType,false)))
Note that to copy a DataFrame you can just use _X = X. Whenever you add a new column with e.g. withColumn, the object is not altered in place, but a new copy is returned. 
Hope this helps!
If you need to create a copy of a pyspark dataframe, you could potentially use Pandas.
schema = X.schema
X_pd = X.toPandas()
_X = spark.createDataFrame(X_pd,schema=schema)
del X_pd
                        In Scala:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With