Unable to merge spark dataframe columns with df.withColumn()

Im trying to merge two columns of different datatypes. In the code snippet below I pick the columns from the same data frame for simplicity.

from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *
from datetime import datetime


schemaString = "name time ROT SOG COG"
fields = [StructField(schemaString.split()[i], strtype[i],True) for i in range(0,len(strtype))]


| name|                time|ROT|  SOG| COG|
|ship1|2015-01-01 00:00:...|  2|  3.0| 4.0|
|ship1|2015-01-02 00:00:...|  4|  8.0| 9.0|
|ship1|2015-01-03 00:00:...|  5| 39.0|49.0|
|ship2|2015-01-04 00:00:...|  2|  3.0| 4.0|
|ship2|2015-01-05 00:00:...|  4|  4.0| 6.0|
|ship3|2015-01-15 00:00:...| 33| 56.0| 6.0|
|ship3|2015-01-12 00:00:...|  3|566.0|64.0|
|ship4|2015-01-05 00:00:...|  3|  3.0|null|

when I extract two columns from df, into new DataFrames and try to merge them with df.withColumn()


I get this:

Py4JJavaError                             Traceback (most recent call last)
<ipython-input-48-4845b5dc1c80> in <module>()
      4 c=aa.select("SOG")
----> 6 d=b.withColumn("SOG",c.SOG)

/hadoop-dist/spark/python/pyspark/sql/dataframe.pyc in withColumn(self, colName, col)
   1166         [Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)]
   1167         """
-> 1168         return self.select('*', col.alias(colName))
   1170     @ignore_unicode_prefix

/hadoop-dist/spark/python/pyspark/sql/dataframe.pyc in select(self, *cols)
    719         [Row(name=u'Alice', age=12), Row(name=u'Bob', age=15)]
    720         """
--> 721         jdf = self._jdf.select(self._jcols(*cols))
    722         return DataFrame(jdf, self.sql_ctx)

/hadoop-dist/spark/python/lib/py4j- in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    540         for temp_arg in temp_args:

/hadoop-dist/spark/python/lib/py4j- in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling o231.select.
: org.apache.spark.sql.AnalysisException: resolved attribute(s) SOG#3 missing from time#1 in operator !Project [time#1,SOG#3 AS SOG#34];
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:42)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:121)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:98)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:42)
    at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:931)
    at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:131)
    at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$logicalPlanToDataFrame(DataFrame.scala:154)
    at org.apache.spark.sql.DataFrame.select(DataFrame.scala:595)
    at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:207)
    at java.lang.Thread.run(Thread.java:745)

Im I doing something stupid, or is it some kind of bug? (I am able to merge columns of a similar datatype)

What your are trying to achieve here is simply not supported. When you use DataFrame.withColumn, column expression can reference only the columns from a given data frame. It is not possible to add a column based on the data from an another table.

If you want to merge columns from multiple data frames you have to use join. It means you need either a natural key which can be used to match corresponding or performed join based on a row numbers. The latter one is possible only if you can ensure that order of rows in both data frames is exactly the same.

