What is the correct way to filter data frame by timestamp field?
I have tried different date formats and forms of filtering, nothing helps: either pyspark returns 0 objects, or throws an error that it doesn't understand datetime format
Here is what i got so far:
from pyspark import SparkContext from pyspark.sql import SQLContext  from django.utils import timezone from django.conf import settings  from myapp.models import Collection  sc = SparkContext("local", "DjangoApp") sqlc = SQLContext(sc) url = "jdbc:postgresql://%(HOST)s/%(NAME)s?user=%(USER)s&password=%(PASSWORD)s" % settings.DATABASES['default'] sf = sqlc.load(source="jdbc", url=url, dbtable='myapp_collection')   range for timestamp field:
system_tz = timezone.pytz.timezone(settings.TIME_ZONE) date_from = datetime.datetime(2014, 4, 16, 18, 30, 0, 0, tzinfo=system_tz) date_to = datetime.datetime(2015, 6, 15, 18, 11, 59, 999999, tzinfo=system_tz)   attempt 1
date_filter = "my_col >= '%s' AND my_col <= '%s'" % (     date_from.isoformat(), date_to.isoformat() ) sf = sf.filter(date_filter) sf.count()  Out[12]: 0   attempt 2
sf = sf.filter(sf.my_col >= date_from).filter(sf.my_col <= date_to) sf.count()  --------------------------------------------------------------------------- Py4JJavaError: An error occurred while calling o63.count. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure:  Lost task 0.0 in stage 4.0 (TID 3, localhost): org.postgresql.util.PSQLException:  ERROR: syntax error at or near "18" #  # ups.. JDBC doesn't understand 24h time format??   attempt 3
sf = sf.filter("my_col BETWEEN '%s' AND '%s'" % \      (date_from.isoformat(), date_to.isoformat())      ) --------------------------------------------------------------------------- Py4JJavaError: An error occurred while calling o97.count. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 17.0 failed 1 times, most recent failure: Lost task 0.0 in stage 17.0 (TID 13, localhost): org.postgresql.util.PSQLException: ERROR: syntax error at or near "18"   the data do exist in the table, though:
django_filters = {     'my_col__gte': date_from,     'my_col__lte': date_to     } Collection.objects.filter(**django_filters).count()  Out[17]: 1093436   Or this way
django_range_filter = {'my_col__range': (date_from, date_to)} Collection.objects.filter(**django_range_filter).count()  Out[19]: 1093436 
                Create a new RDD of int containing elements from start to end (exclusive), increased by step every element. Can be called the same way as python's built-in range() function. If called with a single argument, the argument is interpreted as end , and start is set to 0.
PySpark timestamp ( TimestampType ) consists of value in the format yyyy-MM-dd HH:mm:ss. SSSS and Date ( DateType ) format would be yyyy-MM-dd . Use to_date() function to truncate time from Timestamp or to convert the timestamp to date on DataFrame column.
The between() function in PySpark is used to select the values within the specified range. It can be used with the select() method. It will return true across all the values within the specified range. For the values that are not in the specified range, false is returned.
Lets assume your data frame looks as follows:
sf = sqlContext.createDataFrame([     [datetime.datetime(2013, 6, 29, 11, 34, 29)],     [datetime.datetime(2015, 7, 14, 11, 34, 27)],     [datetime.datetime(2012, 3, 10, 19, 00, 11)],     [datetime.datetime(2016, 2, 8, 12, 21)],     [datetime.datetime(2014, 4, 4, 11, 28, 29)] ], ('my_col', ))   with schema:
root  |-- my_col: timestamp (nullable = true)   and you want to find dates in a following range:
import datetime, time  dates = ("2013-01-01 00:00:00",  "2015-07-01 00:00:00")  timestamps = (     time.mktime(datetime.datetime.strptime(s, "%Y-%m-%d %H:%M:%S").timetuple())     for s in dates)   It is possible to query using timestamps either computed on a driver side:
q1 = "CAST(my_col AS INT) BETWEEN {0} AND {1}".format(*timestamps) sf.where(q1).show()   or using unix_timestamp function:
q2 = """CAST(my_col AS INT)         BETWEEN unix_timestamp('{0}', 'yyyy-MM-dd HH:mm:ss')         AND unix_timestamp('{1}', 'yyyy-MM-dd HH:mm:ss')""".format(*dates)  sf.where(q2).show()   It is also possible to use udf in a similar way I described in an another answer.
If you use raw SQL it is possible to extract different elements of timestamp using year, date, etc. 
sqlContext.sql("""SELECT * FROM sf     WHERE YEAR(my_col) BETWEEN 2014 AND 2015").show()   EDIT:
Since Spark 1.5 you can use built-in functions:
dates = ("2013-01-01",  "2015-07-01") date_from, date_to = [to_date(lit(s)).cast(TimestampType()) for s in dates]  sf.where((sf.my_col > date_from) & (sf.my_col < date_to))   You can also use pyspark.sql.Column.between, which is inclusive of the bounds:
from pyspark.sql.functions import col sf.where(col('my_col').between(*dates)).show(truncate=False) #+---------------------+ #|my_col               | #+---------------------+ #|2013-06-29 11:34:29.0| #|2014-04-04 11:28:29.0| #+---------------------+ 
                        How about something like this:
import pyspark.sql.functions as func  df = df.select(func.to_date(df.my_col).alias("time")) sf = df.filter(df.time > date_from).filter(df.time < date_to) 
                        If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With