Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Add a column from another DataFrame

In Scala Spark, I can easily add a column to an existing Dataframe writing

val newDf = df.withColumn("date_min", anotherDf("date_min"))

Doing so in PySpark results in an AnalysisException.

Here is what I'm doing :

minDf.show(5)
maxDf.show(5)
+--------------------+
|            date_min|
+--------------------+
|2016-11-01 10:50:...|
|2016-11-01 11:46:...|
|2016-11-01 19:23:...|
|2016-11-01 17:01:...|
|2016-11-01 09:00:...|
+--------------------+
only showing top 5 rows

+--------------------+
|            date_max|
+--------------------+
|2016-11-01 10:50:...|
|2016-11-01 11:46:...|
|2016-11-01 19:23:...|
|2016-11-01 17:01:...|
|2016-11-01 09:00:...|
+--------------------+
only showing top 5 rows

And then, what results in an error :

newDf = minDf.withColumn("date_max", maxDf["date_max"])

AnalysisExceptionTraceback (most recent call last)
<ipython-input-13-7e19c841fa51> in <module>()
      2 maxDf.show(5)
      3 
----> 4 newDf = minDf.withColumn("date_max", maxDf["date_max"])

/opt/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/dataframe.pyc in withColumn(self, colName, col)
   1491         """
   1492         assert isinstance(col, Column), "col should be Column"
-> 1493         return DataFrame(self._jdf.withColumn(colName, col._jc), self.sql_ctx)
   1494 
   1495     @ignore_unicode_prefix

/opt/spark-2.1.0-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

/opt/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw)
     67                                              e.java_exception.getStackTrace()))
     68             if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
     70             if s.startswith('org.apache.spark.sql.catalyst.analysis'):
     71                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)

AnalysisException: u'resolved attribute(s) date_max#67 missing from date_min#66 in operator !Project [date_min#66, date_max#67 AS date_max#106];;\n!Project [date_min#66, date_max#67 AS date_max#106]\n+- Project [date_min#66]\n   +- Project [cast((cast(date_min#6L as double) / cast(1000 as double)) as timestamp) AS date_min#66, cast((cast(date_max#7L as double) / cast(1000 as double)) as timestamp) AS date_max#67]\n      +- SubqueryAlias df, `df`\n         +- LogicalRDD [idvisiteur#5, date_min#6L, date_max#7L, sales_sum#8, sales_count#9L]\n'
like image 818
Romain Avatar asked Mar 17 '17 09:03

Romain


People also ask

How do you add a column from a DataFrame to another DataFrame in PySpark?

In PySpark, to add a new column to DataFrame use lit() function by importing from pyspark. sql. functions import lit , lit() function takes a constant value you wanted to add and returns a Column type, if you wanted to add a NULL / None use lit(None) .

How do I insert a row from one DataFrame to another?

append() function is used to append rows of other dataframe to the end of the given dataframe, returning a new dataframe object. Columns not in the original dataframes are added as new columns and the new cells are populated with NaN value.

How do you add a column with the same value in a DataFrame?

To add anew column with constant value, use the square bracket i.e. the index operator and set that value.

How to add column from another Dataframe in pandas?

How to add column from another DataFrame in Pandas? The insert () method is used to add a column from another DataFrame. At first, let us create our first DataFrame −

Can a Dataframe have two columns with the same number of rows?

As shown in Tables 1 and 2, the previous Python programming code has created two pandas DataFrames with different columns. Both of these data sets have the same number of rows.

How to add a column to a second Dataframe in R?

Using this approach, the column to be added to the second dataframe is first extracted from the first using its name. Here the extracted column has been assigned to a variable. Syntax: dataframe1 [“name_of_the_column”] After extraction, the column needs to be simply added to the second dataframe using join () function.

What is the difference between Old Dataframe and New Dataframe?

Notice that this new DataFrame only contains the points and column from the old DataFrame. Notice that this new DataFrame contains all of the columns from the original DataFrame except the points column.


2 Answers

from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql.window import Window

minDf = sc.parallelize([['2016-11-01 10:50:00'],['2016-11-01 11:46:00']]).toDF(["date_min"])
maxDf = sc.parallelize([['2016-11-01 10:50:00'],['2016-11-01 11:46:00']]).toDF(["date_max"])

# since there is no common column between these two dataframes add row_index so that it can be joined
minDf=minDf.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
maxDf=maxDf.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))

minDf = minDf.join(maxDf, on=["row_index"]).drop("row_index")
minDf.show()

Output is:

+-------------------+-------------------+
|           date_min|           date_max|
+-------------------+-------------------+
|2016-11-01 10:50:00|2016-11-01 10:50:00|
|2016-11-01 11:46:00|2016-11-01 11:46:00|
+-------------------+-------------------+
like image 178
1.618 Avatar answered Oct 16 '22 14:10

1.618


The short answer is that this is not supported by the Spark DataFrame API, at least not in Spark 2.x. However, you can write a helper function to achieve something similar.

First let's create some test data:

minDf = sc.parallelize(['2016-11-01','2016-11-02','2016-11-03']).map(lambda x: (x, )).toDF(['date_min'])
maxDf = sc.parallelize(['2016-12-01','2016-12-02','2016-12-03']).map(lambda x: (x, )).toDF(['date_max'])

You can then use zip to combine the two data frames provided the dataframes are partitioned identically:

from pyspark.sql.types import StructType

def zip_df(l, r):
    return l.rdd.zip(r.rdd).map(lambda x: (x[0][0],x[1][0])).toDF(StructType([l.schema[0],r.schema[0]]))

combined = zip_df(minDf, maxDf.select('date_max'))
combined.show()
like image 20
Alex Avatar answered Oct 16 '22 16:10

Alex