Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why python UDF returns unexpected datetime objects where as the same function applied over RDD gives proper datetime object

I am not sure if I am doing anything wrong so pardon me if this looks naive, My problem is reproducible by the following data

from pyspark.sql import Row
df = sc.parallelize([Row(C3=u'Dec  1 2013 12:00AM'),
 Row(C3=u'Dec  1 2013 12:00AM'),
 Row(C3=u'Dec  5 2013 12:00AM')]).toDF()

I have created a function to parse this date strings as datetime objects to process further

from datetime import datetime
def date_convert(date_str):
   date_format = '%b %d %Y %I:%M%p'
   try:
    dt=datetime.strptime(date_str,date_format)
   except ValueError,v:
    if len(v.args) > 0 and v.args[0].startswith('unconverted data remains: '):
      dt = dt[:-(len(v.args[0])-26)]
      dt=datetime.strptime(dt,date_format)
    else:
      raise v
   return dt

Now if I make a UDF out of this and apply to my dataframe I get unexpected data

from pyspark.sql.functions import udf
date_convert_udf = udf(date_convert)
df.select(date_convert_udf(df.C3).alias("datetime")).take(2)

The result is like below

Out[40]: 
[Row(datetime=u'java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet=false,lenient=true,zone=sun.util.calendar.ZoneInfo[id="Etc/UTC",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=?,YEAR=2013,MONTH=11,WEEK_OF_YEAR=?,WEEK_OF_MONTH=?,DAY_OF_MONTH=1,DAY_OF_YEAR=?,DAY_OF_WEEK=?,DAY_OF_WEEK_IN_MONTH=?,AM_PM=0,HOUR=0,HOUR_OF_DAY=0,MINUTE=0,SECOND=0,MILLISECOND=0,ZONE_OFFSET=?,DST_OFFSET=?]'),
 Row(datetime=u'java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet=false,lenient=true,zone=sun.util.calendar.ZoneInfo[id="Etc/UTC",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=?,YEAR=2013,MONTH=11,WEEK_OF_YEAR=?,WEEK_OF_MONTH=?,DAY_OF_MONTH=1,DAY_OF_YEAR=?,DAY_OF_WEEK=?,DAY_OF_WEEK_IN_MONTH=?,AM_PM=0,HOUR=0,HOUR_OF_DAY=0,MINUTE=0,SECOND=0,MILLISECOND=0,ZONE_OFFSET=?,DST_OFFSET=?]')]

but if I use it after making the dataframe a RDD then it returns a pythond datetime object

df.rdd.map(lambda row:date_convert(row.C3)).collect()
(1) Spark Jobs
Out[42]: 
[datetime.datetime(2013, 12, 1, 0, 0),
 datetime.datetime(2013, 12, 1, 0, 0),
 datetime.datetime(2013, 12, 5, 0, 0)]

I want to achieve the similar thing with dataframe . How can I do that and what is wrong with this approach (UDF over dataframe)

like image 583
Bg1850 Avatar asked Aug 25 '16 01:08

Bg1850


People also ask

Why we should not use UDF in Spark?

Spark UDFs are not good but why?? 1)When we use UDFs we end up losing all the optimization Spark does on our Dataframe/Dataset. When we use a UDF, it is as good as a Black box to Spark's optimizer.

Can PySpark UDF return DataFrame?

Conclusion. PySpark UDF is a User Defined Function that is used to create a reusable function in Spark. Once UDF created, that can be re-used on multiple DataFrames and SQL (after registering). The default type of the udf() is StringType.

How does pandas UDF work?

Pandas UDFs are user defined functions that are executed by Spark using Arrow to transfer data and Pandas to work with the data, which allows vectorized operations. A Pandas UDF is defined using the pandas_udf as a decorator or to wrap the function, and no additional configuration is required.

How do I change the datetime format on PySpark?

In PySpark use date_format() function to convert the DataFrame column from Date to String format.


1 Answers

It's because you have to set the return type data of your UDF. Apparently you are trying to obtain timestamps, if this is the case you have to write something like this.

from pyspark.sql.types import TimestampType
date_convert_udf = udf(date_convert, TimestampType())
like image 67
Alberto Bonsanto Avatar answered Sep 30 '22 14:09

Alberto Bonsanto