Im trying to merge two columns of different datatypes. In the code snippet below I pick the columns from the same data frame for simplicity.
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *
from datetime import datetime
a=sc.parallelize([('ship1',datetime(2015,1,1),2,3.,4.),('ship1',datetime(2015,1,2),4,8.,9.),('ship1',datetime(2015,1,3),5,39.,49.),('ship2',datetime(2015,1,4),2,3.,4.),('ship2',datetime(2015,1,5),4,4.,6.),('ship3',datetime(2015,1,15),33,56.,6.),('ship3',datetime(2015,1,12),3,566.,64.),('ship4',datetime(2015,1,5),3,3.,None)])
schemaString = "name time ROT SOG COG"
strtype=[StringType(),TimestampType(),IntegerType(),FloatType(),FloatType()]
fields = [StructField(schemaString.split()[i], strtype[i],True) for i in range(0,len(strtype))]
schema=StructType(fields)
df=sqlContext.createDataFrame(a,schema)
df.show()
+-----+--------------------+---+-----+----+
| name| time|ROT| SOG| COG|
+-----+--------------------+---+-----+----+
|ship1|2015-01-01 00:00:...| 2| 3.0| 4.0|
|ship1|2015-01-02 00:00:...| 4| 8.0| 9.0|
|ship1|2015-01-03 00:00:...| 5| 39.0|49.0|
|ship2|2015-01-04 00:00:...| 2| 3.0| 4.0|
|ship2|2015-01-05 00:00:...| 4| 4.0| 6.0|
|ship3|2015-01-15 00:00:...| 33| 56.0| 6.0|
|ship3|2015-01-12 00:00:...| 3|566.0|64.0|
|ship4|2015-01-05 00:00:...| 3| 3.0|null|
+-----+--------------------+---+-----+----+
when I extract two columns from df, into new DataFrames and try to merge them with df.withColumn()
b=df.select("time")
c=df.select("SOG")
d=b.withColumn("SOG",c.SOG)
I get this:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-48-4845b5dc1c80> in <module>()
4 c=aa.select("SOG")
5
----> 6 d=b.withColumn("SOG",c.SOG)
/hadoop-dist/spark/python/pyspark/sql/dataframe.pyc in withColumn(self, colName, col)
1166 [Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)]
1167 """
-> 1168 return self.select('*', col.alias(colName))
1169
1170 @ignore_unicode_prefix
/hadoop-dist/spark/python/pyspark/sql/dataframe.pyc in select(self, *cols)
719 [Row(name=u'Alice', age=12), Row(name=u'Bob', age=15)]
720 """
--> 721 jdf = self._jdf.select(self._jcols(*cols))
722 return DataFrame(jdf, self.sql_ctx)
723
/hadoop-dist/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
536 answer = self.gateway_client.send_command(command)
537 return_value = get_return_value(answer, self.gateway_client,
--> 538 self.target_id, self.name)
539
540 for temp_arg in temp_args:
/hadoop-dist/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.\n'.
--> 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(
Py4JJavaError: An error occurred while calling o231.select.
: org.apache.spark.sql.AnalysisException: resolved attribute(s) SOG#3 missing from time#1 in operator !Project [time#1,SOG#3 AS SOG#34];
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:121)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:98)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:42)
at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:931)
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:131)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$logicalPlanToDataFrame(DataFrame.scala:154)
at org.apache.spark.sql.DataFrame.select(DataFrame.scala:595)
at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
Im I doing something stupid, or is it some kind of bug? (I am able to merge columns of a similar datatype)
What your are trying to achieve here is simply not supported. When you use DataFrame.withColumn
, column expression can reference only the columns from a given data frame. It is not possible to add a column based on the data from an another table.
If you want to merge columns from multiple data frames you have to use join
. It means you need either a natural key which can be used to match corresponding or performed join based on a row numbers. The latter one is possible only if you can ensure that order of rows in both data frames is exactly the same.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With