What is the correct way to filter data frame by timestamp field?
I have tried different date formats and forms of filtering, nothing helps: either pyspark returns 0 objects, or throws an error that it doesn't understand datetime format
Here is what i got so far:
from pyspark import SparkContext from pyspark.sql import SQLContext from django.utils import timezone from django.conf import settings from myapp.models import Collection sc = SparkContext("local", "DjangoApp") sqlc = SQLContext(sc) url = "jdbc:postgresql://%(HOST)s/%(NAME)s?user=%(USER)s&password=%(PASSWORD)s" % settings.DATABASES['default'] sf = sqlc.load(source="jdbc", url=url, dbtable='myapp_collection')
range for timestamp field:
system_tz = timezone.pytz.timezone(settings.TIME_ZONE) date_from = datetime.datetime(2014, 4, 16, 18, 30, 0, 0, tzinfo=system_tz) date_to = datetime.datetime(2015, 6, 15, 18, 11, 59, 999999, tzinfo=system_tz)
attempt 1
date_filter = "my_col >= '%s' AND my_col <= '%s'" % ( date_from.isoformat(), date_to.isoformat() ) sf = sf.filter(date_filter) sf.count() Out[12]: 0
attempt 2
sf = sf.filter(sf.my_col >= date_from).filter(sf.my_col <= date_to) sf.count() --------------------------------------------------------------------------- Py4JJavaError: An error occurred while calling o63.count. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 3, localhost): org.postgresql.util.PSQLException: ERROR: syntax error at or near "18" # # ups.. JDBC doesn't understand 24h time format??
attempt 3
sf = sf.filter("my_col BETWEEN '%s' AND '%s'" % \ (date_from.isoformat(), date_to.isoformat()) ) --------------------------------------------------------------------------- Py4JJavaError: An error occurred while calling o97.count. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 17.0 failed 1 times, most recent failure: Lost task 0.0 in stage 17.0 (TID 13, localhost): org.postgresql.util.PSQLException: ERROR: syntax error at or near "18"
the data do exist in the table, though:
django_filters = { 'my_col__gte': date_from, 'my_col__lte': date_to } Collection.objects.filter(**django_filters).count() Out[17]: 1093436
Or this way
django_range_filter = {'my_col__range': (date_from, date_to)} Collection.objects.filter(**django_range_filter).count() Out[19]: 1093436
Create a new RDD of int containing elements from start to end (exclusive), increased by step every element. Can be called the same way as python's built-in range() function. If called with a single argument, the argument is interpreted as end , and start is set to 0.
PySpark timestamp ( TimestampType ) consists of value in the format yyyy-MM-dd HH:mm:ss. SSSS and Date ( DateType ) format would be yyyy-MM-dd . Use to_date() function to truncate time from Timestamp or to convert the timestamp to date on DataFrame column.
The between() function in PySpark is used to select the values within the specified range. It can be used with the select() method. It will return true across all the values within the specified range. For the values that are not in the specified range, false is returned.
Lets assume your data frame looks as follows:
sf = sqlContext.createDataFrame([ [datetime.datetime(2013, 6, 29, 11, 34, 29)], [datetime.datetime(2015, 7, 14, 11, 34, 27)], [datetime.datetime(2012, 3, 10, 19, 00, 11)], [datetime.datetime(2016, 2, 8, 12, 21)], [datetime.datetime(2014, 4, 4, 11, 28, 29)] ], ('my_col', ))
with schema:
root |-- my_col: timestamp (nullable = true)
and you want to find dates in a following range:
import datetime, time dates = ("2013-01-01 00:00:00", "2015-07-01 00:00:00") timestamps = ( time.mktime(datetime.datetime.strptime(s, "%Y-%m-%d %H:%M:%S").timetuple()) for s in dates)
It is possible to query using timestamps either computed on a driver side:
q1 = "CAST(my_col AS INT) BETWEEN {0} AND {1}".format(*timestamps) sf.where(q1).show()
or using unix_timestamp
function:
q2 = """CAST(my_col AS INT) BETWEEN unix_timestamp('{0}', 'yyyy-MM-dd HH:mm:ss') AND unix_timestamp('{1}', 'yyyy-MM-dd HH:mm:ss')""".format(*dates) sf.where(q2).show()
It is also possible to use udf in a similar way I described in an another answer.
If you use raw SQL it is possible to extract different elements of timestamp using year
, date
, etc.
sqlContext.sql("""SELECT * FROM sf WHERE YEAR(my_col) BETWEEN 2014 AND 2015").show()
EDIT:
Since Spark 1.5 you can use built-in functions:
dates = ("2013-01-01", "2015-07-01") date_from, date_to = [to_date(lit(s)).cast(TimestampType()) for s in dates] sf.where((sf.my_col > date_from) & (sf.my_col < date_to))
You can also use pyspark.sql.Column.between
, which is inclusive of the bounds:
from pyspark.sql.functions import col sf.where(col('my_col').between(*dates)).show(truncate=False) #+---------------------+ #|my_col | #+---------------------+ #|2013-06-29 11:34:29.0| #|2014-04-04 11:28:29.0| #+---------------------+
How about something like this:
import pyspark.sql.functions as func df = df.select(func.to_date(df.my_col).alias("time")) sf = df.filter(df.time > date_from).filter(df.time < date_to)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With