Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using JodaTime in Spark's groupByKey and countByKey

I have a very simple Spark program (using Flambo in Clojure, but should be easy to follow). These are all objects on the JVM. I'm testing on a local instance (although I would guess that Spark still serialises and deserialises).

(let [dt (t/date-time 2014)
      input (f/parallelize sc [{:the-date dt :x "A"}
                               {:the-date dt :x "B"}
                               {:the-date dt :x "C"}
                               {:the-date dt :x "D"}])
      by-date (f/map input (f/fn [{the-date :the-date x :x}] [the-date x])))

Input is an RDD of four tuples, each with the same date object. The first map produces a key-value RDD of date => x.

The content of input is, as expected:

=> (f/foreach input prn)
[#<DateTime 2014-01-01T00:00:00.000Z> "A"]
[#<DateTime 2014-01-01T00:00:00.000Z> "B"]
[#<DateTime 2014-01-01T00:00:00.000Z> "C"]
[#<DateTime 2014-01-01T00:00:00.000Z> "D"]

Just to be clear, equality and .hashCode work on the date object :

=> (= dt dt)
true
=> (.hashCode dt)
1260848926
=> (.hashCode dt)
1260848926

They are instances of JodaTime's DateTime, which implement equals as expected.

When I try countByKey, I get the expected:

=> (f/count-by-key by-date)
{#<DateTime 2014-01-01T00:00:00.000Z> 4}

But when I groupByKey, it doesn't seem to work.

=> (f/foreach (f/group-by-key by-date) prn)
[#<DateTime 2014-01-01T00:00:00.000Z> ["A"]]
[#<DateTime 2014-01-01T00:00:00.000Z> ["B"]]
[#<DateTime 2014-01-01T00:00:00.000Z> ["C"]]
[#<DateTime 2014-01-01T00:00:00.000Z> ["D"]]

The keys are all identical so I would expect the result to be a single entry with the date as the key and ["A", "B", "C", "D"] as the value. Something's happening because the values are all lists.

Somehow groupByKey isn't correctly equating the keys. But countByKey is. What's the difference between the two? How can I make them behave the same?

Any ideas?

like image 828
Joe Avatar asked Oct 20 '22 18:10

Joe


1 Answers

I'm getting closer toward an answer. I think this belongs in the answer section rather than the question section.

This groups by key, turns into a local collect, extracts the first item (date).

=> (def result-dates (map first (f/collect (f/group-by-key by-date))))
=> result-dates
(#<DateTime 2014-01-01T00:00:00.000Z>
 #<DateTime 2014-01-01T00:00:00.000Z>
 #<DateTime 2014-01-01T00:00:00.000Z>
 #<DateTime 2014-01-01T00:00:00.000Z>)

The hashcodes are all the same

=> (map #(.hashCode %) result-dates)
(1260848926
 1260848926
 1260848926 
 1260848926)

The milliseconds are all the same:

=> (map #(.getMillis %) result-dates)
(1388534400000
 1388534400000
 1388534400000
 1388534400000)

equals fails, but isEquals succeeds

=> (.isEqual (first result-dates) (second result-dates))
true

=> (.equals (first result-dates) (second result-dates))
false

documentation for .equals says:

Compares this object with the specified object for equality based on the millisecond instant and the Chronology

Their milliseconds are all equal and their Chronologies appear to be:

=> (map #(.getChronology %) result-dates)
(#<ISOChronology ISOChronology[UTC]>
 #<ISOChronology ISOChronology[UTC]>
 #<ISOChronology ISOChronology[UTC]>
 #<ISOChronology ISOChronology[UTC]>)

However, the Chronologies don't equate.

=> (def a (first result-dates))
=> (def b (second result-dates))

=> (= (.getChronology a) (.getChronology b))
false

Although the hashcodes do

=> (= (.hashCode (.getChronology a)) (.hashCode (.getChronology b)))
true

But joda.time.Chronology doesn't provide its own equals method and inherits it from Object, which only uses reference equality.

My theory is that these dates are all being deserialized with their own individual, different, constructed Chronology objects, but JodaTime has its own serializer which probably deals with this. Maybe a custom Kryo serializer would help in this regard.

For now, my solution to using JodaTime in Spark is to use org.joda.time .Instant by calling toInstant, or a java.util.Date rather than a org.joda.time.DateTime.

Both involve throwing away timezone information, which isn't ideal, so if anyone has more info it would be very welcome!

like image 91
Joe Avatar answered Oct 23 '22 05:10

Joe