Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PySpark: filtering a DataFrame by date field in range where date is string

My dataframes contains one field which is a date and it appears in the string format, as example

'2015-07-02T11:22:21.050Z'

I need to filter the DataFrame on the date to get only the records in the last week. So, I was trying a map approach where I transformed the string dates to datetime objects with strptime:

def map_to_datetime(row):
     format_string = '%Y-%m-%dT%H:%M:%S.%fZ'
     row.date = datetime.strptime(row.date, format_string)

df = df.map(map_to_datetime)

and then I would apply a filter as

df.filter(lambda row:
    row.date >= (datetime.today() - timedelta(days=7)))

I manage to get the mapping working but the filter fails with

TypeError: condition should be string or Column

Is there a way to use a filtering in a way that works or should I change the approach and how?

like image 207
mar tin Avatar asked Mar 20 '16 15:03

mar tin


People also ask

How do I query a date in PySpark?

The default format of the PySpark Date is yyyy-MM-dd . Returns the current date as a date column. Converts a date/timestamp/string to a value of string in the format specified by the date format given by the second argument. Converts the column into `DateType` by casting rules to `DateType`.

How do I convert a String column to a date in PySpark?

PySpark SQL function provides to_date() function to convert String to Date fromat of a DataFrame column. Note that Spark Date Functions support all Java Date formats specified in DateTimeFormatter. to_date() – function is used to format string ( StringType ) to date ( DateType ) column.

How do you select a range of rows from a DataFrame in PySpark?

By using SQL query with between() operator we can get the range of rows.


1 Answers

Spark >= 1.5

You can use INTERVAL

from pyspark.sql.functions import expr, current_date

df_casted.where(col("dt") >= current_date() - expr("INTERVAL 7 days"))

Spark < 1.5

You can solve this without using worker side Python code and switching to RDDs. First of all, since you use ISO 8601 string, your data can be directly casted to date or timestamp:

from pyspark.sql.functions import col

df = sc.parallelize([
    ('2015-07-02T11:22:21.050Z', ),
    ('2016-03-20T21:00:00.000Z', )
]).toDF(("d_str", ))

df_casted = df.select("*",
    col("d_str").cast("date").alias("dt"), 
    col("d_str").cast("timestamp").alias("ts"))

This will save one roundtrip between JVM and Python. There are also a few way you can approach the second part. Date only:

from pyspark.sql.functions import current_date, datediff, unix_timestamp

df_casted.where(datediff(current_date(), col("dt")) < 7)

Timestamps:

def days(i: int) -> int:
    return 60 * 60 * 24 * i

df_casted.where(unix_timestamp() - col("ts").cast("long") < days(7))

You can also take a look at current_timestamp and date_sub

Note: I would avoid using DataFrame.map. It is better to use DataFrame.rdd.map instead. It will save you some work when switching to 2.0+

like image 199
zero323 Avatar answered Oct 25 '22 14:10

zero323