I have a really strange error with spark dataframes which causes a string to be evaluated as a timestamp.
Here is my setup code:
from datetime import datetime
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
new_schema = StructType([StructField("item_id", StringType(), True),
StructField("date", TimestampType(), True),
StructField("description", StringType(), True)
])
df = sqlContext.createDataFrame([Row(description='description', date=datetime.utcnow(), item_id='id_string')], new_schema)
this gives me the following error:
AttributeError Traceback (most recent call last) in () ----> 1 df = sqlContext.createDataFrame([Row(description='hey', date=datetime.utcnow(), item_id='id_string')], new_schema)
/home/florian/spark/python/pyspark/sql/context.pyc in createDataFrame(self, data, schema, samplingRatio, verifySchema) 307 Py4JJavaError: ... 308 """ --> 309 return self.sparkSession.createDataFrame(data, schema, samplingRatio, verifySchema) 310 311 @since(1.3)
/home/florian/spark/python/pyspark/sql/session.pyc in createDataFrame(self, data, schema, samplingRatio, verifySchema) 522 rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio) 523 else: --> 524 rdd, schema = self._createFromLocal(map(prepare, data), schema) 525 jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd()) 526 jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd(), schema.json())
/home/florian/spark/python/pyspark/sql/session.pyc in _createFromLocal(self, data, schema) 397 398 # convert python objects to sql data --> 399 data = [schema.toInternal(row) for row in data] 400 return self._sc.parallelize(data), schema 401
/home/florian/spark/python/pyspark/sql/types.pyc in toInternal(self, obj) 574 return tuple(f.toInternal(obj.get(n)) for n, f in zip(self.names, self.fields)) 575 elif isinstance(obj, (tuple, list)): --> 576 return tuple(f.toInternal(v) for f, v in zip(self.fields, obj)) 577 elif hasattr(obj, "dict"): 578 d = obj.dict
/home/florian/spark/python/pyspark/sql/types.pyc in ((f, v)) 574 return tuple(f.toInternal(obj.get(n)) for n, f in zip(self.names, self.fields)) 575 elif isinstance(obj, (tuple, list)): --> 576 return tuple(f.toInternal(v) for f, v in zip(self.fields, obj)) 577 elif hasattr(obj, "dict"): 578 d = obj.dict
/home/florian/spark/python/pyspark/sql/types.pyc in toInternal(self, obj) 434 435 def toInternal(self, obj): --> 436 return self.dataType.toInternal(obj) 437 438 def fromInternal(self, obj):
/home/florian/spark/python/pyspark/sql/types.pyc in toInternal(self, dt) 188 def toInternal(self, dt): 189 if dt is not None: --> 190 seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo 191 else time.mktime(dt.timetuple())) 192 return int(seconds * 1e6 + dt.microsecond)
AttributeError: 'str' object has no attribute 'tzinfo'
this looks as if a string was passed to TimestampType.toInternal()
the really weird thing is that this data frame creates the same error:
df = sqlContext.createDataFrame([Row(description='hey', date=None, item_id='id_string')], new_schema)
while this one works:
df = sqlContext.createDataFrame([Row(description=None, date=datetime.now(), item_id='id_string')], new_schema)
and this one works as well:
df = sqlContext.createDataFrame([Row(description=None, date=datetime.now(), item_id=None)], new_schema)
For me, this now means that the pyspark somehow puts the value from "item_id" into the column "date" and therefore creates this error. Did I do something wrong? Is this a bug within data frames?
info: I am using pyspark 2.0.1
Edit:
df = sqlContext.createDataFrame([Row(description=None, date=datetime.now(), item_id=None)], new_schema)
df.first()
Row(item_id=u'java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet=false,lenient=true,zone=sun.util.calendar.ZoneInfo[id="Etc/UTC",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=?,YEAR=2017,MONTH=1,WEEK_OF_YEAR=?,WEEK_OF_MONTH=?,DAY_OF_MONTH=3,DAY_OF_YEAR=?,DAY_OF_WEEK=?,DAY_OF_WEEK_IN_MONTH=?,AM_PM=1,HOUR=3,HOUR_OF_DAY=15,MINUTE=19,SECOND=30,MILLISECOND=85,ZONE_OFFSET=?,DST_OFFSET=?]', date=None, description=None)
When you create a Row object, the fields are sorted alphabetically (http://spark.apache.org/docs/2.0.1/api/python/pyspark.sql.html#pyspark.sql.Row), so when you are creating the Row(description, date, item_id)
object, it will be ordered as (date, description, item_id)
.
As your schema is ordered as StringType, TimestampType, StringType
, when creating a DataFrame with this Row and schema, Spark will map what is in date
to a StringType
, what is in description
to a TimestampType
and item_id
to a StringType
.
Passing a Timestamp (in datetime
format) to a StringType
does not cause an error, but passing a String to a TimestampType
does, since it asks for the tzinfo
attribute, which, as the error stated, the String object does not has it.
Also, the reason the dataframes that worked for you actually worked is because None
is being passed to the TimestampType
in your schema, which is an acceptable value.
Basing this off of the answer above from @rafael-zanetti. You can do the following to sort your columns:
new_schema = [StructField("item_id", StringType(), True),
StructField("date", TimestampType(), True),
StructField("description", StringType(), True)]
new_schema = StructType(sorted(new_schema, key=lambda f: f.name))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With