I'm trying to use the DataFrame.hint() method to add a Range Join hint to my join.
I have two tables: minutes
and events
.
The minutes table has the minute_start
and minute_end
columns that are time in seconds since a fixed moment in time. Naturally, their values are multiples of 60.
The events table has similar event_start
and event_end
columns, only for events. Events can start and end at any second.
For each event, I need to find all the minutes it overlaps with.
I'm trying this on Databricks (runtime 5.1, Python 3.5):
# from pyspark.sql.types import StructType, StructField, IntegerType
# minutes = spark.sparkContext\
# .parallelize(((0, 60),
# (60, 120)))\
# .toDF(StructType([
# StructField('minute_start', IntegerType()),
# StructField('minute_end', IntegerType())
# ]))
# events = spark.sparkContext\
# .parallelize(((12, 33),
# (0, 120),
# (33, 72),
# (65, 178)))\
# .toDF(StructType([
# StructField('event_start', IntegerType()),
# StructField('event_end', IntegerType())
# ]))
events.hint("range_join", "60")\
.join(minutes,
on=[events.event_start < minutes.minute_end,
minutes.minute_start < events.event_end])\
.orderBy(events.event_start,
events.event_end,
minutes.minute_start)\
.show()
Without the hint
call, the result is as expected:
+-----------+---------+------------+----------+
|event_start|event_end|minute_start|minute_end|
+-----------+---------+------------+----------+
| 0| 120| 0| 60|
| 0| 120| 60| 120|
| 12| 33| 0| 60|
| 33| 72| 0| 60|
| 33| 72| 60| 120|
| 65| 178| 60| 120|
+-----------+---------+------------+----------+
With the hint
, I get the exception:
AnalysisException: 'Range join hint: invalid arguments Buffer(60);'
When I tried passing the 60
in the hint as a number as opposed to a string, it complained that a parameter of a hint must be a string.
I'm not on Azure, but I expect the outcome would be the same.
Has anyone had a similar issue and found a solution or knows where I'm making a mistake?
UPDATE 1
(Currently, I'm trying it on Databricks Runtime 6.1, Python 3.7.3, Spark 2.4.4)
I thought I missed that the parameters are expected as an iterable, so I tried again, with events.hint("range_join", [60])
. Same complaint about the argument not being a string: TypeError: all parameters should be str, got 60 of type <class 'int'>
.
I'm wondering if Databricks' version of Spark is behind.
This is in Spark source code on GitHub:
def hint(self, name, *parameters):
... (no checks on `parameters` up to here)
allowed_types = (basestring, list, float, int)
for p in parameters:
if not isinstance(p, allowed_types):
raise TypeError(
"all parameters should be in {0}, got {1} of type {2}".format(
allowed_types, p, type(p)))
... (no checks beyond this point)
so a list of int
s should be allowed.
What I'm getting is all parameters should be str
, but the GitHub version would say all parameters should be in (basestring, list, float, int)
if I passed a parameter of a wrong type.
UPDATE 2
hint("skew", "col_name")
appears to be working.
I checked Spark source code on GitHub.
Version 2.4.4 has this:
def hint(self, name, *parameters):
... # no checks on `parameters` up to here
for p in parameters:
if not isinstance(p, str):
raise TypeError(
"all parameters should be str, got {0} of type {1}".format(p, type(p)))
... # no checks beyond here
But from version 3.0.0-preview-rc1 on, the source has this:
def hint(self, name, *parameters):
... # no checks on `parameters` up to here
allowed_types = (basestring, list, float, int)
for p in parameters:
if not isinstance(p, allowed_types):
raise TypeError(
"all parameters should be in {0}, got {1} of type {2}".format(
allowed_types, p, type(p)))
... # no checks beyond here
So it seems as though version 2.4.4 had a bug, that has been fixed in versions starting from 3.0.0-preview-rc1.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With