Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Where is the memory leak when mapcat breaks backpressure in core.async?

I wrote some core.async code in Clojure and when I ran it it consumed all available memory and failed with an error. It appears that using mapcat in a core.async pipeline breaks back pressure. (Which is unfortunate for reasons beyond the scope of this question.)

Here is some code that demonstrates the problem by counting :xs in and out of a mapcating transducer:

(ns mapcat.core
  (:require [clojure.core.async :as async]))

(defn test-backpressure [n length]
  (let [message (repeat length :x)
        input (async/chan)
        transform (async/chan 1 (mapcat seq))
        output (async/chan)
        sent (atom 0)]
    (async/pipe input transform)
    (async/pipe transform output)
    (async/go
      (dotimes [_ n]
        (async/>! input message)
        (swap! sent inc))
      (async/close! input))
    (async/go-loop [x 0]
      (when (= 0 (mod x (/ (* n length) 10)))
        (println "in:" (* @sent length) "out:" x))
      (when-let [_ (async/<! output)]
        (recur (inc x))))))

=> (test-backpressure 1000 10)
in: 10 out: 0
in: 2680 out: 1000
in: 7410 out: 2000
in: 10000 out: 3000 ; Where are the other 7000 characters?
in: 10000 out: 4000
in: 10000 out: 5000
in: 10000 out: 6000
in: 10000 out: 7000
in: 10000 out: 8000
in: 10000 out: 9000
in: 10000 out: 10000

The producer races far ahead of the consumer.

It appears that I'm not the first person to discover this. But the explanation given here doesn't quite seem to cover it. (Although it does provide an adequate workaround.) Conceptually, I would expect the producer to be ahead, but only by the length of the few messages that might be buffered in the channels.

My question is, where are all the other messages? By the fourth line of output 7000 :xs are unaccounted for.

like image 893
Rachel K. Westmacott Avatar asked Jun 21 '16 19:06

Rachel K. Westmacott


People also ask

What causes memory leaks in react apps?

Let's look at some causes of memory leaks. Memory leaks in React applications are primarily a result of not cancelling subscriptions made when a component was mounted before the component gets unmounted. These subscriptions could be a DOM Event listener, a WebSocket subscription, or even a request to an API.

Is Coredump analysis the best practice to find memory leak?

Note: I agree coredump analysis is not best practice to find memory leak. Memory leak can be find with different static and dynamic tools like valgrind etc. I like the Unix way of thinking, but I would have to say that it should be possible to figure out what kinds of allocations are currently active in the core file...

How to identify if a process is causing memory leak?

I don't think there is a way to identify whether a process is causing memory leak or not directly looking at the core dump. Infact, there is no thing called memory leak as such, we can not make that comment with out knowing the programmers intention for writing the code. having said that, you can get an idea by looking at the size of the core dump.

How to clear a memory leak in an app?

The memory leak becomes quite obvious : the memory consumption keeps on growing, until it reaches a threshold matching the server’s specs. We can see some local drops here and there, but they are not nearly enough to break the uptrend. The only way to clear the memory is to reset the application which in our case was redeploying the app.


1 Answers

UPDATE 2020-01-14: The memory leak is now fixed.

There are two possible interpretations of the question "Where is the memory leak?"

Firstly, where is the data held? The answer seems to be in the channel buffer immediately downstream of the expanding transform.

Channels by default use a FixedBuffer (clojure.core.async.impl.buffers/FixedBuffer) which can tell if it is full but does not object to being overfull.

Secondly, which passage of code causes the buffer to be overfull? This (correct me if I am wrong) appears to be in the take! method of ManyToManyChannel (clojure.core.async.impl.channels/ManyToManyChannel) where the first call to add! on the buffer occurs before any calls to full? have taken place.

It seems that take! assumes that it can add at least one item to the buffer for every item it removes. In the case of long running expanding transducers such as mapcat this is not always a safe assumption.

By changing this line to (when (and (.hasNext iter) (not (impl/full? buf))) in a local copy of core.async I can make the code in the question behave as expected. (N.B. My understanding of core.async is insufficient for me to guarantee that this is a robust solution for your use case.)

UPDATE 2016-09-17: there is now an issue for this: http://dev.clojure.org/jira/browse/ASYNC-178

UPDATE 2020-01-14: this is now fixed as of: https://clojure.atlassian.net/browse/ASYNC-210 (although the earlier ticket was closed as 'Declined')

like image 131
Rachel K. Westmacott Avatar answered Sep 24 '22 21:09

Rachel K. Westmacott