Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

datetime range filter in PySpark SQL

What is the correct way to filter data frame by timestamp field?

I have tried different date formats and forms of filtering, nothing helps: either pyspark returns 0 objects, or throws an error that it doesn't understand datetime format

Here is what i got so far:

from pyspark import SparkContext from pyspark.sql import SQLContext  from django.utils import timezone from django.conf import settings  from myapp.models import Collection  sc = SparkContext("local", "DjangoApp") sqlc = SQLContext(sc) url = "jdbc:postgresql://%(HOST)s/%(NAME)s?user=%(USER)s&password=%(PASSWORD)s" % settings.DATABASES['default'] sf = sqlc.load(source="jdbc", url=url, dbtable='myapp_collection') 

range for timestamp field:

system_tz = timezone.pytz.timezone(settings.TIME_ZONE) date_from = datetime.datetime(2014, 4, 16, 18, 30, 0, 0, tzinfo=system_tz) date_to = datetime.datetime(2015, 6, 15, 18, 11, 59, 999999, tzinfo=system_tz) 

attempt 1

date_filter = "my_col >= '%s' AND my_col <= '%s'" % (     date_from.isoformat(), date_to.isoformat() ) sf = sf.filter(date_filter) sf.count()  Out[12]: 0 

attempt 2

sf = sf.filter(sf.my_col >= date_from).filter(sf.my_col <= date_to) sf.count()  --------------------------------------------------------------------------- Py4JJavaError: An error occurred while calling o63.count. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure:  Lost task 0.0 in stage 4.0 (TID 3, localhost): org.postgresql.util.PSQLException:  ERROR: syntax error at or near "18" #  # ups.. JDBC doesn't understand 24h time format?? 

attempt 3

sf = sf.filter("my_col BETWEEN '%s' AND '%s'" % \      (date_from.isoformat(), date_to.isoformat())      ) --------------------------------------------------------------------------- Py4JJavaError: An error occurred while calling o97.count. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 17.0 failed 1 times, most recent failure: Lost task 0.0 in stage 17.0 (TID 13, localhost): org.postgresql.util.PSQLException: ERROR: syntax error at or near "18" 

the data do exist in the table, though:

django_filters = {     'my_col__gte': date_from,     'my_col__lte': date_to     } Collection.objects.filter(**django_filters).count()  Out[17]: 1093436 

Or this way

django_range_filter = {'my_col__range': (date_from, date_to)} Collection.objects.filter(**django_range_filter).count()  Out[19]: 1093436 
like image 824
funkifunki Avatar asked Jul 14 '15 13:07

funkifunki


People also ask

How do you use a range in PySpark?

Create a new RDD of int containing elements from start to end (exclusive), increased by step every element. Can be called the same way as python's built-in range() function. If called with a single argument, the argument is interpreted as end , and start is set to 0.

How do I use datetime in PySpark?

PySpark timestamp ( TimestampType ) consists of value in the format yyyy-MM-dd HH:mm:ss. SSSS and Date ( DateType ) format would be yyyy-MM-dd . Use to_date() function to truncate time from Timestamp or to convert the timestamp to date on DataFrame column.

How do I use between in PySpark SQL?

The between() function in PySpark is used to select the values within the specified range. It can be used with the select() method. It will return true across all the values within the specified range. For the values that are not in the specified range, false is returned.


2 Answers

Lets assume your data frame looks as follows:

sf = sqlContext.createDataFrame([     [datetime.datetime(2013, 6, 29, 11, 34, 29)],     [datetime.datetime(2015, 7, 14, 11, 34, 27)],     [datetime.datetime(2012, 3, 10, 19, 00, 11)],     [datetime.datetime(2016, 2, 8, 12, 21)],     [datetime.datetime(2014, 4, 4, 11, 28, 29)] ], ('my_col', )) 

with schema:

root  |-- my_col: timestamp (nullable = true) 

and you want to find dates in a following range:

import datetime, time  dates = ("2013-01-01 00:00:00",  "2015-07-01 00:00:00")  timestamps = (     time.mktime(datetime.datetime.strptime(s, "%Y-%m-%d %H:%M:%S").timetuple())     for s in dates) 

It is possible to query using timestamps either computed on a driver side:

q1 = "CAST(my_col AS INT) BETWEEN {0} AND {1}".format(*timestamps) sf.where(q1).show() 

or using unix_timestamp function:

q2 = """CAST(my_col AS INT)         BETWEEN unix_timestamp('{0}', 'yyyy-MM-dd HH:mm:ss')         AND unix_timestamp('{1}', 'yyyy-MM-dd HH:mm:ss')""".format(*dates)  sf.where(q2).show() 

It is also possible to use udf in a similar way I described in an another answer.

If you use raw SQL it is possible to extract different elements of timestamp using year, date, etc.

sqlContext.sql("""SELECT * FROM sf     WHERE YEAR(my_col) BETWEEN 2014 AND 2015").show() 

EDIT:

Since Spark 1.5 you can use built-in functions:

dates = ("2013-01-01",  "2015-07-01") date_from, date_to = [to_date(lit(s)).cast(TimestampType()) for s in dates]  sf.where((sf.my_col > date_from) & (sf.my_col < date_to)) 

You can also use pyspark.sql.Column.between, which is inclusive of the bounds:

from pyspark.sql.functions import col sf.where(col('my_col').between(*dates)).show(truncate=False) #+---------------------+ #|my_col               | #+---------------------+ #|2013-06-29 11:34:29.0| #|2014-04-04 11:28:29.0| #+---------------------+ 
like image 172
zero323 Avatar answered Sep 20 '22 20:09

zero323


How about something like this:

import pyspark.sql.functions as func  df = df.select(func.to_date(df.my_col).alias("time")) sf = df.filter(df.time > date_from).filter(df.time < date_to) 
like image 21
Sean Avatar answered Sep 17 '22 20:09

Sean