I wrote some core.async code in Clojure and when I ran it it consumed all available memory and failed with an error. It appears that using mapcat
in a core.async pipeline breaks back pressure. (Which is unfortunate for reasons beyond the scope of this question.)
Here is some code that demonstrates the problem by counting :x
s in and out of a mapcat
ing transducer:
(ns mapcat.core
(:require [clojure.core.async :as async]))
(defn test-backpressure [n length]
(let [message (repeat length :x)
input (async/chan)
transform (async/chan 1 (mapcat seq))
output (async/chan)
sent (atom 0)]
(async/pipe input transform)
(async/pipe transform output)
(async/go
(dotimes [_ n]
(async/>! input message)
(swap! sent inc))
(async/close! input))
(async/go-loop [x 0]
(when (= 0 (mod x (/ (* n length) 10)))
(println "in:" (* @sent length) "out:" x))
(when-let [_ (async/<! output)]
(recur (inc x))))))
=> (test-backpressure 1000 10)
in: 10 out: 0
in: 2680 out: 1000
in: 7410 out: 2000
in: 10000 out: 3000 ; Where are the other 7000 characters?
in: 10000 out: 4000
in: 10000 out: 5000
in: 10000 out: 6000
in: 10000 out: 7000
in: 10000 out: 8000
in: 10000 out: 9000
in: 10000 out: 10000
The producer races far ahead of the consumer.
It appears that I'm not the first person to discover this. But the explanation given here doesn't quite seem to cover it. (Although it does provide an adequate workaround.) Conceptually, I would expect the producer to be ahead, but only by the length of the few messages that might be buffered in the channels.
My question is, where are all the other messages? By the fourth line of output 7000 :x
s are unaccounted for.
Let's look at some causes of memory leaks. Memory leaks in React applications are primarily a result of not cancelling subscriptions made when a component was mounted before the component gets unmounted. These subscriptions could be a DOM Event listener, a WebSocket subscription, or even a request to an API.
Note: I agree coredump analysis is not best practice to find memory leak. Memory leak can be find with different static and dynamic tools like valgrind etc. I like the Unix way of thinking, but I would have to say that it should be possible to figure out what kinds of allocations are currently active in the core file...
I don't think there is a way to identify whether a process is causing memory leak or not directly looking at the core dump. Infact, there is no thing called memory leak as such, we can not make that comment with out knowing the programmers intention for writing the code. having said that, you can get an idea by looking at the size of the core dump.
The memory leak becomes quite obvious : the memory consumption keeps on growing, until it reaches a threshold matching the server’s specs. We can see some local drops here and there, but they are not nearly enough to break the uptrend. The only way to clear the memory is to reset the application which in our case was redeploying the app.
UPDATE 2020-01-14: The memory leak is now fixed.
There are two possible interpretations of the question "Where is the memory leak?"
Firstly, where is the data held? The answer seems to be in the channel buffer immediately downstream of the expanding transform.
Channels by default use a FixedBuffer
(clojure.core.async.impl.buffers/FixedBuffer) which can tell if it is full but does not object to being overfull.
Secondly, which passage of code causes the buffer to be overfull? This (correct me if I am wrong) appears to be in the take!
method of ManyToManyChannel
(clojure.core.async.impl.channels/ManyToManyChannel) where the first call to add!
on the buffer occurs before any calls to full?
have taken place.
It seems that take!
assumes that it can add at least one item to the buffer for every item it removes. In the case of long running expanding transducers such as mapcat
this is not always a safe assumption.
By changing this line to (when (and (.hasNext iter) (not (impl/full? buf)))
in a local copy of core.async I can make the code in the question behave as expected. (N.B. My understanding of core.async is insufficient for me to guarantee that this is a robust solution for your use case.)
UPDATE 2016-09-17: there is now an issue for this: http://dev.clojure.org/jira/browse/ASYNC-178
UPDATE 2020-01-14: this is now fixed as of: https://clojure.atlassian.net/browse/ASYNC-210 (although the earlier ticket was closed as 'Declined')
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With