I have a very simple Spark program (using Flambo in Clojure, but should be easy to follow). These are all objects on the JVM. I'm testing on a local
instance (although I would guess that Spark still serialises and deserialises).
(let [dt (t/date-time 2014)
input (f/parallelize sc [{:the-date dt :x "A"}
{:the-date dt :x "B"}
{:the-date dt :x "C"}
{:the-date dt :x "D"}])
by-date (f/map input (f/fn [{the-date :the-date x :x}] [the-date x])))
Input is an RDD of four tuples, each with the same date object. The first map produces a key-value RDD of date => x.
The content of input
is, as expected:
=> (f/foreach input prn)
[#<DateTime 2014-01-01T00:00:00.000Z> "A"]
[#<DateTime 2014-01-01T00:00:00.000Z> "B"]
[#<DateTime 2014-01-01T00:00:00.000Z> "C"]
[#<DateTime 2014-01-01T00:00:00.000Z> "D"]
Just to be clear, equality and .hashCode
work on the date object :
=> (= dt dt)
true
=> (.hashCode dt)
1260848926
=> (.hashCode dt)
1260848926
They are instances of JodaTime's DateTime, which implement equals as expected.
When I try countByKey
, I get the expected:
=> (f/count-by-key by-date)
{#<DateTime 2014-01-01T00:00:00.000Z> 4}
But when I groupByKey
, it doesn't seem to work.
=> (f/foreach (f/group-by-key by-date) prn)
[#<DateTime 2014-01-01T00:00:00.000Z> ["A"]]
[#<DateTime 2014-01-01T00:00:00.000Z> ["B"]]
[#<DateTime 2014-01-01T00:00:00.000Z> ["C"]]
[#<DateTime 2014-01-01T00:00:00.000Z> ["D"]]
The keys are all identical so I would expect the result to be a single entry with the date as the key and ["A", "B", "C", "D"]
as the value. Something's happening because the values are all lists.
Somehow groupByKey
isn't correctly equating the keys. But countByKey
is. What's the difference between the two? How can I make them behave the same?
Any ideas?
I'm getting closer toward an answer. I think this belongs in the answer section rather than the question section.
This groups by key, turns into a local collect, extracts the first item (date).
=> (def result-dates (map first (f/collect (f/group-by-key by-date))))
=> result-dates
(#<DateTime 2014-01-01T00:00:00.000Z>
#<DateTime 2014-01-01T00:00:00.000Z>
#<DateTime 2014-01-01T00:00:00.000Z>
#<DateTime 2014-01-01T00:00:00.000Z>)
The hashcodes are all the same
=> (map #(.hashCode %) result-dates)
(1260848926
1260848926
1260848926
1260848926)
The milliseconds are all the same:
=> (map #(.getMillis %) result-dates)
(1388534400000
1388534400000
1388534400000
1388534400000)
equals
fails, but isEquals
succeeds
=> (.isEqual (first result-dates) (second result-dates))
true
=> (.equals (first result-dates) (second result-dates))
false
documentation for .equals
says:
Compares this object with the specified object for equality based on the millisecond instant and the Chronology
Their milliseconds are all equal and their Chronologies appear to be:
=> (map #(.getChronology %) result-dates)
(#<ISOChronology ISOChronology[UTC]>
#<ISOChronology ISOChronology[UTC]>
#<ISOChronology ISOChronology[UTC]>
#<ISOChronology ISOChronology[UTC]>)
However, the Chronologies don't equate.
=> (def a (first result-dates))
=> (def b (second result-dates))
=> (= (.getChronology a) (.getChronology b))
false
Although the hashcodes do
=> (= (.hashCode (.getChronology a)) (.hashCode (.getChronology b)))
true
But joda.time.Chronology doesn't provide its own equals method and inherits it from Object, which only uses reference equality.
My theory is that these dates are all being deserialized with their own individual, different, constructed Chronology objects, but JodaTime has its own serializer which probably deals with this. Maybe a custom Kryo serializer would help in this regard.
For now, my solution to using JodaTime in Spark is to use org.joda.time .Instant by calling toInstant
, or a java.util.Date
rather than a org.joda.time.DateTime.
Both involve throwing away timezone information, which isn't ideal, so if anyone has more info it would be very welcome!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With