datetime range filter in PySpark SQL

What is the correct way to filter data frame by timestamp field?

I have tried different date formats and forms of filtering, nothing helps: either pyspark returns 0 objects, or throws an error that it doesn't understand datetime format

Here is what i got so far:

from pyspark import SparkContext from pyspark.sql import SQLContext  from django.utils import timezone from django.conf import settings  from myapp.models import Collection  sc = SparkContext("local", "DjangoApp") sqlc = SQLContext(sc) url = "jdbc:postgresql://%(HOST)s/%(NAME)s?user=%(USER)s&password=%(PASSWORD)s" % settings.DATABASES['default'] sf = sqlc.load(source="jdbc", url=url, dbtable='myapp_collection') 

range for timestamp field:

system_tz = timezone.pytz.timezone(settings.TIME_ZONE) date_from = datetime.datetime(2014, 4, 16, 18, 30, 0, 0, tzinfo=system_tz) date_to = datetime.datetime(2015, 6, 15, 18, 11, 59, 999999, tzinfo=system_tz) 

attempt 1

date_filter = "my_col >= '%s' AND my_col <= '%s'" % (     date_from.isoformat(), date_to.isoformat() ) sf = sf.filter(date_filter) sf.count()  Out[12]: 0 

attempt 2

sf = sf.filter(sf.my_col >= date_from).filter(sf.my_col <= date_to) sf.count()  --------------------------------------------------------------------------- Py4JJavaError: An error occurred while calling o63.count. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure:  Lost task 0.0 in stage 4.0 (TID 3, localhost): org.postgresql.util.PSQLException:  ERROR: syntax error at or near "18" #  # ups.. JDBC doesn't understand 24h time format?? 

attempt 3

sf = sf.filter("my_col BETWEEN '%s' AND '%s'" % \      (date_from.isoformat(), date_to.isoformat())      ) --------------------------------------------------------------------------- Py4JJavaError: An error occurred while calling o97.count. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 17.0 failed 1 times, most recent failure: Lost task 0.0 in stage 17.0 (TID 13, localhost): org.postgresql.util.PSQLException: ERROR: syntax error at or near "18" 

the data do exist in the table, though:

django_filters = {     'my_col__gte': date_from,     'my_col__lte': date_to     } Collection.objects.filter(**django_filters).count()  Out[17]: 1093436 

Or this way

django_range_filter = {'my_col__range': (date_from, date_to)} Collection.objects.filter(**django_range_filter).count()  Out[19]: 1093436 
2 Answers

Lets assume your data frame looks as follows:

sf = sqlContext.createDataFrame([     [datetime.datetime(2013, 6, 29, 11, 34, 29)],     [datetime.datetime(2015, 7, 14, 11, 34, 27)],     [datetime.datetime(2012, 3, 10, 19, 00, 11)],     [datetime.datetime(2016, 2, 8, 12, 21)],     [datetime.datetime(2014, 4, 4, 11, 28, 29)] ], ('my_col', )) 

with schema:

root  |-- my_col: timestamp (nullable = true) 

and you want to find dates in a following range:

import datetime, time  dates = ("2013-01-01 00:00:00",  "2015-07-01 00:00:00")  timestamps = (     time.mktime(datetime.datetime.strptime(s, "%Y-%m-%d %H:%M:%S").timetuple())     for s in dates) 

It is possible to query using timestamps either computed on a driver side:

q1 = "CAST(my_col AS INT) BETWEEN {0} AND {1}".format(*timestamps) sf.where(q1).show() 

or using unix_timestamp function:

q2 = """CAST(my_col AS INT)         BETWEEN unix_timestamp('{0}', 'yyyy-MM-dd HH:mm:ss')         AND unix_timestamp('{1}', 'yyyy-MM-dd HH:mm:ss')""".format(*dates)  sf.where(q2).show() 

It is also possible to use udf in a similar way I described in an another answer.

If you use raw SQL it is possible to extract different elements of timestamp using year, date, etc.

sqlContext.sql("""SELECT * FROM sf     WHERE YEAR(my_col) BETWEEN 2014 AND 2015").show() 


Since Spark 1.5 you can use built-in functions:

dates = ("2013-01-01",  "2015-07-01") date_from, date_to = [to_date(lit(s)).cast(TimestampType()) for s in dates]  sf.where((sf.my_col > date_from) & (sf.my_col < date_to)) 

You can also use pyspark.sql.Column.between, which is inclusive of the bounds:

from pyspark.sql.functions import col sf.where(col('my_col').between(*dates)).show(truncate=False) #+---------------------+ #|my_col               | #+---------------------+ #|2013-06-29 11:34:29.0| #|2014-04-04 11:28:29.0| #+---------------------+ 
How about something like this:

import pyspark.sql.functions as func  df = df.select(func.to_date(df.my_col).alias("time")) sf = df.filter(df.time > date_from).filter(df.time < date_to) 
