I've got a trivial spark program. I've trimmed the input down to one file with one line in it. So I'm confident this isn't traditional memory pressure.
Exception in thread "main" com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 32749568, required: 34359296
at com.esotericsoftware.kryo.io.Output.require(Output.java:138)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at carbonite.serializer$write_map.invoke(serializer.clj:69)
I can set spark.kryoserializer.buffer.mb
, but I think I'm only postponing the problem. I'd like to understand it.
I don't think there's anything non-standard about the program. If I remove a single line (seemingly at random) the error goes away.
It looks like I'm hitting some kind of fixed limit. But the fact that my input file is very small and the only operations I'm doing are predictable maps
and reduceByKey
s I suspect there's something else up.
I'm using the Flambo Clojure 0.4.0 library (but I don't think that's causing it) and Spark Core 2.10.
Here's the minimum working example. Sorry it's a bit cryptic but I have removed everything extraneous.
(ns mytest.core
(:require [flambo.conf :as conf])
(:require [flambo.api :as f]))
(def sc (f/spark-context (-> (conf/spark-conf)
(conf/master "local")
(conf/app-name "test")
(conf/set "spark.driver.memory" "1g")
(conf/set "spark.executor.memory" "1g"))))
(defn -main
[& args]
(let [logfile (f/text-file sc "file://tmp/one-line-file")
a (f/map logfile (f/fn [u] nil))
b (f/map logfile (f/fn [u] nil))
c (f/map logfile (f/fn [u] nil))
d (f/map logfile (f/fn [u] nil))
e (f/map logfile (f/fn [u] nil))
g (f/map logfile (f/fn [u] nil))
h (f/map logfile (f/fn [u] nil))
i (f/map logfile (f/fn [u] nil))
j (f/map logfile (f/fn [u] nil))
k (f/map logfile (f/fn [u] nil))
l (f/map logfile (f/fn [u] nil))
m (f/map logfile (f/fn [u] nil))
n (f/map logfile (f/fn [u] nil))
o (f/map logfile (f/fn [u] nil))
p (f/map logfile (f/fn [u] nil))
q (f/map logfile (f/fn [u] nil))
r (f/map logfile (f/fn [u] nil))
s (f/map logfile (f/fn [u] nil))
t (f/map logfile (f/fn [u] nil))
]))
EDIT
If I split this into two chunks and re-create the lazy file stream, it works:
(defn get-inputs []
(f/text-file sc "file://tmp/one-line-file"))
(defn -main
[& args]
(let [logfile (get-inputs)
a (f/map logfile (f/fn [u] nil))
b (f/map logfile (f/fn [u] nil))
c (f/map logfile (f/fn [u] nil))
d (f/map logfile (f/fn [u] nil))
e (f/map logfile (f/fn [u] nil))
g (f/map logfile (f/fn [u] nil))
h (f/map logfile (f/fn [u] nil))
i (f/map logfile (f/fn [u] nil))])
(let [logfile (get-inputs)
j (f/map logfile (f/fn [u] nil))
k (f/map logfile (f/fn [u] nil))
l (f/map logfile (f/fn [u] nil))
m (f/map logfile (f/fn [u] nil))
n (f/map logfile (f/fn [u] nil))
o (f/map logfile (f/fn [u] nil))
p (f/map logfile (f/fn [u] nil))
q (f/map logfile (f/fn [u] nil))
r (f/map logfile (f/fn [u] nil))
s (f/map logfile (f/fn [u] nil))
t (f/map logfile (f/fn [u] nil))]))
In Java this would be the equivalent of creating two local scopes (e.g. two separate methods). And get-inputs
is just a method that returns a newly constructed text file object.
I thought that the textFile
method would create a lazy stream that can be (re)read multiple times, so the two examples shouldn't be much different.
Add this to your spark context conf :
conf.set("spark.kryoserializer.buffer.mb","128")
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With