Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

UDF with multiple rows as response pySpark

I want to apply splitUtlisation on each row of utilisationDataFarme and pass startTime and endTime as parameters., as a result splitUtlisation will return multiple rows of data hence I want to create a new DataFrame with (Id, Day, Hour, Minute)

def splitUtlisation(onDateTime, offDateTime):
    yield onDateTime
    rule = rrule.rrule(rrule.HOURLY, byminute = 0, bysecond = 0, dtstart=offDateTime)
    for result in rule.between(onDateTime, offDateTime):
      yield result
    yield offDateTime


utilisationDataFarme = (
sc.parallelize([
    (10001, "2017-02-12 12:01:40" , "2017-02-12 12:56:32"),
    (10001, "2017-02-13 12:06:32" , "2017-02-15 16:06:32"),
    (10001, "2017-02-16 21:45:56" , "2017-02-21 21:45:56"),
    (10001, "2017-02-21 22:32:41" , "2017-02-25 00:52:50"),
    ]).toDF(["id",  "startTime" ,  "endTime"])
    .withColumn("startTime", col("startTime").cast("timestamp"))
    .withColumn("endTime", col("endTime").cast("timestamp"))

In core Python I did like this

dayList = ['SUN' , 'MON' , 'TUE' , 'WED' , 'THR' , 'FRI' , 'SAT']
    for result in hours_aligned(datetime.datetime.now(), datetime.datetime.now() + timedelta(hours=68)):
      print(dayList[datetime.datetime.weekday(result)],  result.hour, 60 if result.minute == 0 else result.minute)

Result

THR 21 60
THR 22 60
THR 23 60
FRI 0 60
FRI 1 60
FRI 2 60
FRI 3 60

How to create it in pySpark?

I tried to create new Schema and apply

schema = StructType([StructField("Id", StringType(), False), StructField("Day", StringType(), False), StructField("Hour", StringType(), False) , StructField("Minute", StringType(), False)])
udf_splitUtlisation = udf(splitUtlisation, schema)
df = sqlContext.createDataFrame([],"id" , "Day" , "Hour" , "Minute")

Still I could not handle multiple rows as response.

like image 697
syv Avatar asked Mar 09 '18 09:03

syv


People also ask

Why UDF are not recommended in Spark?

1)When we use UDFs we end up losing all the optimization Spark does on our Dataframe/Dataset. When we use a UDF, it is as good as a Black box to Spark's optimizer. Let's consider an example of a general optimization when reading data from Database or columnar format files such as Parquet is PredicatePushdown.

Can a UDF return multiple columns?

UDF can return only a single column at the time.

Can Pyspark UDF return DataFrame?

Conclusion. PySpark UDF is a User Defined Function that is used to create a reusable function in Spark. Once UDF created, that can be re-used on multiple DataFrames and SQL (after registering). The default type of the udf() is StringType.

How do you select top 5 rows in Pyspark?

In Spark/PySpark, you can use show() action to get the top/first N (5,10,100 ..) rows of the DataFrame and display them on a console or a log, there are also several Spark Actions like take() , tail() , collect() , head() , first() that return top and last n rows as a list of Rows (Array[Row] for Scala).


1 Answers

You can use pyspark's explode to unpack a single row containing multiple values into multiple rows once you have your udf defined correctly.

As far as I know you won't be able to use generators with yield as an udf. Instead, you need to return all values at once as an array (see return_type) which then can be exploded and expanded:

from pyspark.sql.functions import col, udf, explode
from pyspark.sql.types import ArrayType, StringType, MapType
import pandas as pd

# input data as given by OP
df = sc.parallelize([
    (10001, "2017-02-12 12:01:40" , "2017-02-12 12:56:32"),
    (10001, "2017-02-13 12:06:32" , "2017-02-15 16:06:32"),
    (10001, "2017-02-16 21:45:56" , "2017-02-21 21:45:56"),
    (10001, "2017-02-21 22:32:41" , "2017-02-25 00:52:50")])\
    .toDF(["id",  "startTime" ,  "endTime"])\
    .withColumn("startTime", col("startTime").cast("timestamp"))\
    .withColumn("endTime", col("endTime").cast("timestamp"))

return_type = ArrayType(MapType(StringType(), StringType()))

@udf(returnType=return_type)
def your_udf_func(start, end):
    """Insert your function to return whatever you like
    as a list of dictionaries.

    For example, I chose to return hourly values for
    day, hour and minute.

    """

    date_range = pd.date_range(start, end, freq="h")
    df = pd.DataFrame({"day": date_range.strftime("%a"),
                      "hour": date_range.hour,
                      "minute": date_range.minute})

    values = df.to_dict("index").values()

    return list(values)


extracted = your_udf_func("startTime", "endTime")
exploded = explode(extracted).alias("exploded")
expanded = [col("exploded").getItem(k).alias(k) for k in ["hour", "day", "minute"]]

result = df.select("id", exploded).select("id", *expanded)

And the result is:

result.show(5)

+-----+----+---+------+
|   id|hour|day|minute|
+-----+----+---+------+
|10001|  12|Sun|     1|
|10001|  12|Mon|     6|
|10001|  13|Mon|     6|
|10001|  14|Mon|     6|
|10001|  15|Mon|     6|
+-----+----+---+------+
only showing top 5 rows
like image 190
pansen Avatar answered Sep 20 '22 01:09

pansen