Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

KryoException: Buffer overflow with very small input

Tags:

apache-spark

I've got a trivial spark program. I've trimmed the input down to one file with one line in it. So I'm confident this isn't traditional memory pressure.

Exception in thread "main" com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 32749568, required: 34359296
    at com.esotericsoftware.kryo.io.Output.require(Output.java:138)
    at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
    at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
    at carbonite.serializer$write_map.invoke(serializer.clj:69)

I can set spark.kryoserializer.buffer.mb, but I think I'm only postponing the problem. I'd like to understand it.

I don't think there's anything non-standard about the program. If I remove a single line (seemingly at random) the error goes away.

It looks like I'm hitting some kind of fixed limit. But the fact that my input file is very small and the only operations I'm doing are predictable maps and reduceByKeys I suspect there's something else up.

I'm using the Flambo Clojure 0.4.0 library (but I don't think that's causing it) and Spark Core 2.10.

Here's the minimum working example. Sorry it's a bit cryptic but I have removed everything extraneous.

(ns mytest.core
  (:require [flambo.conf :as conf])
  (:require [flambo.api :as f]))

(def sc (f/spark-context (-> (conf/spark-conf)
           (conf/master "local")
           (conf/app-name "test")
           (conf/set "spark.driver.memory" "1g")
           (conf/set "spark.executor.memory" "1g"))))

(defn -main

  [& args]
  (let [logfile (f/text-file sc "file://tmp/one-line-file")
        a (f/map logfile (f/fn [u] nil))
        b (f/map logfile (f/fn [u] nil))
        c (f/map logfile (f/fn [u] nil))
        d (f/map logfile (f/fn [u] nil))
        e (f/map logfile (f/fn [u] nil))
        g (f/map logfile (f/fn [u] nil))
        h (f/map logfile (f/fn [u] nil))
        i (f/map logfile (f/fn [u] nil))
        j (f/map logfile (f/fn [u] nil))
        k (f/map logfile (f/fn [u] nil))
        l (f/map logfile (f/fn [u] nil))
        m (f/map logfile (f/fn [u] nil))
        n (f/map logfile (f/fn [u] nil))  
        o (f/map logfile (f/fn [u] nil))
        p (f/map logfile (f/fn [u] nil))
        q (f/map logfile (f/fn [u] nil))
        r (f/map logfile (f/fn [u] nil))
        s (f/map logfile (f/fn [u] nil))
        t (f/map logfile (f/fn [u] nil))
]))

EDIT

If I split this into two chunks and re-create the lazy file stream, it works:

(defn get-inputs []
  (f/text-file sc "file://tmp/one-line-file"))

(defn -main

  [& args]
  (let [logfile (get-inputs)
        a (f/map logfile (f/fn [u] nil))
        b (f/map logfile (f/fn [u] nil))
        c (f/map logfile (f/fn [u] nil))
        d (f/map logfile (f/fn [u] nil))
        e (f/map logfile (f/fn [u] nil))
        g (f/map logfile (f/fn [u] nil))
        h (f/map logfile (f/fn [u] nil))
        i (f/map logfile (f/fn [u] nil))])

  (let [logfile (get-inputs)
        j (f/map logfile (f/fn [u] nil))
        k (f/map logfile (f/fn [u] nil))
        l (f/map logfile (f/fn [u] nil))
        m (f/map logfile (f/fn [u] nil))
        n (f/map logfile (f/fn [u] nil))  
        o (f/map logfile (f/fn [u] nil))
        p (f/map logfile (f/fn [u] nil))
        q (f/map logfile (f/fn [u] nil))
        r (f/map logfile (f/fn [u] nil))
        s (f/map logfile (f/fn [u] nil))
        t (f/map logfile (f/fn [u] nil))]))

In Java this would be the equivalent of creating two local scopes (e.g. two separate methods). And get-inputs is just a method that returns a newly constructed text file object.

I thought that the textFile method would create a lazy stream that can be (re)read multiple times, so the two examples shouldn't be much different.

like image 626
Joe Avatar asked Dec 10 '14 14:12

Joe


1 Answers

Add this to your spark context conf :

conf.set("spark.kryoserializer.buffer.mb","128")
like image 115
Oussama Avatar answered Nov 01 '22 08:11

Oussama