Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Filtering a spark dataframe based on date

I have a dataframe of

date, string, string

I want to select dates before a certain period. I have tried the following with no luck

 data.filter(data("date") < new java.sql.Date(format.parse("2015-03-14").getTime))

I'm getting an error stating the following

org.apache.spark.sql.AnalysisException: resolved attribute(s) date#75 missing from date#72,uid#73,iid#74 in operator !Filter (date#75 < 16508);

As far as I can guess the query is incorrect. Can anyone show me what way the query should be formatted?

I checked that all enteries in the dataframe have values - they do.

like image 671
Steve Avatar asked Aug 13 '15 17:08

Steve


People also ask

How do I filter rows in Spark DataFrame?

Spark filter() or where() function is used to filter the rows from DataFrame or Dataset based on the given one or multiple conditions or SQL expression. You can use where() operator instead of the filter if you are coming from SQL background. Both these functions operate exactly the same.

How do I sort my Spark data frame?

In Spark, we can use either sort() or orderBy() function of DataFrame/Dataset to sort by ascending or descending order based on single or multiple columns, you can also do sorting using Spark SQL sorting functions like asc_nulls_first(), asc_nulls_last(), desc_nulls_first(), desc_nulls_last().

What is the function of filter () in Spark?

In Spark, the Filter function returns a new dataset formed by selecting those elements of the source on which the function returns true. So, it retrieves only the elements that satisfy the given condition.


6 Answers

The following solutions are applicable since spark 1.5 :

For lower than :

// filter data where the date is lesser than 2015-03-14
data.filter(data("date").lt(lit("2015-03-14")))      

For greater than :

// filter data where the date is greater than 2015-03-14
data.filter(data("date").gt(lit("2015-03-14"))) 

For equality, you can use either equalTo or === :

data.filter(data("date") === lit("2015-03-14"))

If your DataFrame date column is of type StringType, you can convert it using the to_date function :

// filter data where the date is greater than 2015-03-14
data.filter(to_date(data("date")).gt(lit("2015-03-14"))) 

You can also filter according to a year using the year function :

// filter data where year is greater or equal to 2016
data.filter(year($"date").geq(lit(2016))) 
like image 127
eliasah Avatar answered Oct 21 '22 13:10

eliasah


Don't use this as suggested in other answers

.filter(f.col("dateColumn") < f.lit('2017-11-01'))

But use this instead

.filter(f.col("dateColumn") < f.unix_timestamp(f.lit('2017-11-01 00:00:00')).cast('timestamp'))

This will use the TimestampType instead of the StringType, which will be more performant in some cases. For example Parquet predicate pushdown will only work with the latter.

Edit: Both snippets assume this import:

from pyspark.sql import functions as f
like image 40
Ruurtjan Pul Avatar answered Oct 21 '22 13:10

Ruurtjan Pul


I find the most readable way to express this is using a sql expression:

df.filter("my_date < date'2015-01-01'")

we can verify this works correctly by looking at the physical plan from .explain()

+- *(1) Filter (isnotnull(my_date#22) && (my_date#22 < 16436))
like image 26
RobinL Avatar answered Oct 21 '22 13:10

RobinL


In PySpark(python) one of the option is to have the column in unix_timestamp format.We can convert string to unix_timestamp and specify the format as shown below. Note we need to import unix_timestamp and lit function

from pyspark.sql.functions import unix_timestamp, lit

df.withColumn("tx_date", to_date(unix_timestamp(df_cast["date"], "MM/dd/yyyy").cast("timestamp")))

Now we can apply the filters

df_cast.filter(df_cast["tx_date"] >= lit('2017-01-01')) \
       .filter(df_cast["tx_date"] <= lit('2017-01-31')).show()
like image 32
Prathap Kudupu Avatar answered Oct 21 '22 14:10

Prathap Kudupu


df=df.filter(df["columnname"]>='2020-01-13')
like image 21
Prastuti Srivastava Avatar answered Oct 21 '22 13:10

Prastuti Srivastava


We can also use SQL kind of expression inside filter :


Note -> Here I am showing two conditions and a date range for future reference :


ordersDf.filter("order_status = 'PENDING_PAYMENT' AND order_date BETWEEN '2013-07-01' AND '2013-07-31' ")
like image 35
Abhishek Sengupta Avatar answered Oct 21 '22 14:10

Abhishek Sengupta