My dataframes contains one field which is a date and it appears in the string format, as example
'2015-07-02T11:22:21.050Z'
I need to filter the DataFrame on the date to get only the records in the last week. So, I was trying a map approach where I transformed the string dates to datetime objects with strptime:
def map_to_datetime(row):
format_string = '%Y-%m-%dT%H:%M:%S.%fZ'
row.date = datetime.strptime(row.date, format_string)
df = df.map(map_to_datetime)
and then I would apply a filter as
df.filter(lambda row:
row.date >= (datetime.today() - timedelta(days=7)))
I manage to get the mapping working but the filter fails with
TypeError: condition should be string or Column
Is there a way to use a filtering in a way that works or should I change the approach and how?
The default format of the PySpark Date is yyyy-MM-dd . Returns the current date as a date column. Converts a date/timestamp/string to a value of string in the format specified by the date format given by the second argument. Converts the column into `DateType` by casting rules to `DateType`.
PySpark SQL function provides to_date() function to convert String to Date fromat of a DataFrame column. Note that Spark Date Functions support all Java Date formats specified in DateTimeFormatter. to_date() – function is used to format string ( StringType ) to date ( DateType ) column.
By using SQL query with between() operator we can get the range of rows.
Spark >= 1.5
You can use INTERVAL
from pyspark.sql.functions import expr, current_date
df_casted.where(col("dt") >= current_date() - expr("INTERVAL 7 days"))
Spark < 1.5
You can solve this without using worker side Python code and switching to RDDs. First of all, since you use ISO 8601 string, your data can be directly casted to date or timestamp:
from pyspark.sql.functions import col
df = sc.parallelize([
('2015-07-02T11:22:21.050Z', ),
('2016-03-20T21:00:00.000Z', )
]).toDF(("d_str", ))
df_casted = df.select("*",
col("d_str").cast("date").alias("dt"),
col("d_str").cast("timestamp").alias("ts"))
This will save one roundtrip between JVM and Python. There are also a few way you can approach the second part. Date only:
from pyspark.sql.functions import current_date, datediff, unix_timestamp
df_casted.where(datediff(current_date(), col("dt")) < 7)
Timestamps:
def days(i: int) -> int:
return 60 * 60 * 24 * i
df_casted.where(unix_timestamp() - col("ts").cast("long") < days(7))
You can also take a look at current_timestamp
and date_sub
Note: I would avoid using DataFrame.map
. It is better to use DataFrame.rdd.map
instead. It will save you some work when switching to 2.0+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With