Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why am I getting an exception when using a Range Join hint?

I'm trying to use the DataFrame.hint() method to add a Range Join hint to my join.

I have two tables: minutes and events.

The minutes table has the minute_start and minute_end columns that are time in seconds since a fixed moment in time. Naturally, their values are multiples of 60.

The events table has similar event_start and event_end columns, only for events. Events can start and end at any second.

For each event, I need to find all the minutes it overlaps with.

I'm trying this on Databricks (runtime 5.1, Python 3.5):

# from pyspark.sql.types import StructType, StructField, IntegerType

# minutes = spark.sparkContext\
#                .parallelize(((0,  60),
#                              (60, 120)))\
#                .toDF(StructType([
#                          StructField('minute_start', IntegerType()),
#                          StructField('minute_end', IntegerType())
#                        ]))

# events = spark.sparkContext\
#               .parallelize(((12, 33),
#                             (0,  120),
#                             (33, 72),
#                             (65, 178)))\
#               .toDF(StructType([
#                         StructField('event_start', IntegerType()),
#                         StructField('event_end', IntegerType())
#                       ]))

events.hint("range_join", "60")\
      .join(minutes,
            on=[events.event_start   < minutes.minute_end,
                minutes.minute_start < events.event_end])\
      .orderBy(events.event_start,
               events.event_end,
               minutes.minute_start)\
      .show()

Without the hint call, the result is as expected:

+-----------+---------+------------+----------+
|event_start|event_end|minute_start|minute_end|
+-----------+---------+------------+----------+
|          0|      120|           0|        60|
|          0|      120|          60|       120|
|         12|       33|           0|        60|
|         33|       72|           0|        60|
|         33|       72|          60|       120|
|         65|      178|          60|       120|
+-----------+---------+------------+----------+

With the hint, I get the exception:

AnalysisException: 'Range join hint: invalid arguments Buffer(60);'

When I tried passing the 60 in the hint as a number as opposed to a string, it complained that a parameter of a hint must be a string.

I'm not on Azure, but I expect the outcome would be the same.

Has anyone had a similar issue and found a solution or knows where I'm making a mistake?

UPDATE 1

(Currently, I'm trying it on Databricks Runtime 6.1, Python 3.7.3, Spark 2.4.4)

I thought I missed that the parameters are expected as an iterable, so I tried again, with events.hint("range_join", [60]). Same complaint about the argument not being a string: TypeError: all parameters should be str, got 60 of type <class 'int'>.

I'm wondering if Databricks' version of Spark is behind.

This is in Spark source code on GitHub:

def hint(self, name, *parameters):
    ... (no checks on `parameters` up to here)

    allowed_types = (basestring, list, float, int)
    for p in parameters:
        if not isinstance(p, allowed_types):
            raise TypeError(
                "all parameters should be in {0}, got {1} of type {2}".format(
                        allowed_types, p, type(p)))

    ... (no checks beyond this point)

so a list of ints should be allowed.

What I'm getting is all parameters should be str, but the GitHub version would say all parameters should be in (basestring, list, float, int) if I passed a parameter of a wrong type.

UPDATE 2

hint("skew", "col_name") appears to be working.

like image 229
Arseny Avatar asked Nov 06 '22 17:11

Arseny


1 Answers

I checked Spark source code on GitHub.

Version 2.4.4 has this:

def hint(self, name, *parameters):
    ...  # no checks on `parameters` up to here

    for p in parameters:
        if not isinstance(p, str):
            raise TypeError(
                "all parameters should be str, got {0} of type {1}".format(p, type(p)))

    ...  # no checks beyond here

But from version 3.0.0-preview-rc1 on, the source has this:

def hint(self, name, *parameters):
    ...  # no checks on `parameters` up to here

    allowed_types = (basestring, list, float, int)
    for p in parameters:
        if not isinstance(p, allowed_types):
            raise TypeError(
                "all parameters should be in {0}, got {1} of type {2}".format(
                    allowed_types, p, type(p)))

    ...  # no checks beyond here

So it seems as though version 2.4.4 had a bug, that has been fixed in versions starting from 3.0.0-preview-rc1.

like image 172
Arseny Avatar answered Nov 14 '22 21:11

Arseny