In Scala Spark, I can easily add a column to an existing Dataframe writing
val newDf = df.withColumn("date_min", anotherDf("date_min"))
Doing so in PySpark results in an AnalysisException
.
Here is what I'm doing :
minDf.show(5)
maxDf.show(5)
+--------------------+
| date_min|
+--------------------+
|2016-11-01 10:50:...|
|2016-11-01 11:46:...|
|2016-11-01 19:23:...|
|2016-11-01 17:01:...|
|2016-11-01 09:00:...|
+--------------------+
only showing top 5 rows
+--------------------+
| date_max|
+--------------------+
|2016-11-01 10:50:...|
|2016-11-01 11:46:...|
|2016-11-01 19:23:...|
|2016-11-01 17:01:...|
|2016-11-01 09:00:...|
+--------------------+
only showing top 5 rows
And then, what results in an error :
newDf = minDf.withColumn("date_max", maxDf["date_max"])
AnalysisExceptionTraceback (most recent call last)
<ipython-input-13-7e19c841fa51> in <module>()
2 maxDf.show(5)
3
----> 4 newDf = minDf.withColumn("date_max", maxDf["date_max"])
/opt/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/dataframe.pyc in withColumn(self, colName, col)
1491 """
1492 assert isinstance(col, Column), "col should be Column"
-> 1493 return DataFrame(self._jdf.withColumn(colName, col._jc), self.sql_ctx)
1494
1495 @ignore_unicode_prefix
/opt/spark-2.1.0-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
/opt/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw)
67 e.java_exception.getStackTrace()))
68 if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
70 if s.startswith('org.apache.spark.sql.catalyst.analysis'):
71 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
AnalysisException: u'resolved attribute(s) date_max#67 missing from date_min#66 in operator !Project [date_min#66, date_max#67 AS date_max#106];;\n!Project [date_min#66, date_max#67 AS date_max#106]\n+- Project [date_min#66]\n +- Project [cast((cast(date_min#6L as double) / cast(1000 as double)) as timestamp) AS date_min#66, cast((cast(date_max#7L as double) / cast(1000 as double)) as timestamp) AS date_max#67]\n +- SubqueryAlias df, `df`\n +- LogicalRDD [idvisiteur#5, date_min#6L, date_max#7L, sales_sum#8, sales_count#9L]\n'
In PySpark, to add a new column to DataFrame use lit() function by importing from pyspark. sql. functions import lit , lit() function takes a constant value you wanted to add and returns a Column type, if you wanted to add a NULL / None use lit(None) .
append() function is used to append rows of other dataframe to the end of the given dataframe, returning a new dataframe object. Columns not in the original dataframes are added as new columns and the new cells are populated with NaN value.
To add anew column with constant value, use the square bracket i.e. the index operator and set that value.
How to add column from another DataFrame in Pandas? The insert () method is used to add a column from another DataFrame. At first, let us create our first DataFrame −
As shown in Tables 1 and 2, the previous Python programming code has created two pandas DataFrames with different columns. Both of these data sets have the same number of rows.
Using this approach, the column to be added to the second dataframe is first extracted from the first using its name. Here the extracted column has been assigned to a variable. Syntax: dataframe1 [“name_of_the_column”] After extraction, the column needs to be simply added to the second dataframe using join () function.
Notice that this new DataFrame only contains the points and column from the old DataFrame. Notice that this new DataFrame contains all of the columns from the original DataFrame except the points column.
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql.window import Window
minDf = sc.parallelize([['2016-11-01 10:50:00'],['2016-11-01 11:46:00']]).toDF(["date_min"])
maxDf = sc.parallelize([['2016-11-01 10:50:00'],['2016-11-01 11:46:00']]).toDF(["date_max"])
# since there is no common column between these two dataframes add row_index so that it can be joined
minDf=minDf.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
maxDf=maxDf.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
minDf = minDf.join(maxDf, on=["row_index"]).drop("row_index")
minDf.show()
Output is:
+-------------------+-------------------+
| date_min| date_max|
+-------------------+-------------------+
|2016-11-01 10:50:00|2016-11-01 10:50:00|
|2016-11-01 11:46:00|2016-11-01 11:46:00|
+-------------------+-------------------+
The short answer is that this is not supported by the Spark DataFrame API, at least not in Spark 2.x. However, you can write a helper function to achieve something similar.
First let's create some test data:
minDf = sc.parallelize(['2016-11-01','2016-11-02','2016-11-03']).map(lambda x: (x, )).toDF(['date_min'])
maxDf = sc.parallelize(['2016-12-01','2016-12-02','2016-12-03']).map(lambda x: (x, )).toDF(['date_max'])
You can then use zip
to combine the two data frames provided the dataframes are partitioned identically:
from pyspark.sql.types import StructType
def zip_df(l, r):
return l.rdd.zip(r.rdd).map(lambda x: (x[0][0],x[1][0])).toDF(StructType([l.schema[0],r.schema[0]]))
combined = zip_df(minDf, maxDf.select('date_max'))
combined.show()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With