Here is the code:
(ns typedclj.async
(:require [clojure.core.async
:as a
:refer [>! <! >!! <!!
go chan buffer
close! thread
alts! alts!! timeout]]
[clj-http.client :as -cc]))
(time (dorun
(let [c (chan)]
(doseq [i (range 10 1e4)]
(go (>! c i))))))
And I got an error:
Exception in thread "async-dispatch-12" java.lang.AssertionError: Assert failed: No more than 1024 pending puts are allowed on a single channel. Consider using a windowed buffer.
(< (.size puts) impl/MAX-QUEUE-SIZE)
at clojure.core.async.impl.channels.ManyToManyChannel.put_BANG_(channels.clj:150)
at clojure.core.async.impl.ioc_macros$put_BANG_.invoke(ioc_macros.clj:959)
at typedclj.async$eval11807$fn__11816$state_machine__6185__auto____11817$fn__11819.invoke(async.clj:19)
at typedclj.async$eval11807$fn__11816$state_machine__6185__auto____11817.invoke(async.clj:19)
at clojure.core.async.impl.ioc_macros$run_state_machine.invoke(ioc_macros.clj:940)
at clojure.core.async.impl.ioc_macros$run_state_machine_wrapped.invoke(ioc_macros.clj:944)
at typedclj.async$eval11807$fn__11816.invoke(async.clj:19)
at clojure.lang.AFn.run(AFn.java:22)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)...
According to http://martintrojer.github.io/clojure/2013/07/07/coreasync-and-blocking-io/
... This will break the 1 job = 1 thread knot, thus this thread parking will allow us to scale the number of jobs way beyond any thread limit on the platform (usually around 1000 on the JVM).
core.async gives (blocking) channels and a new (unbounded) thread pool when using 'thread'. This (in effect) is just some sugar over using java threads (or clojure futures) and BlockingQueues from java.util.concurrent. The main feature is go blocks in which threads can be parked and resumed on the (potentially) blocking calls dealing with core.async's channels...
Is 1e4 jobs already too many? What is the upper limit then?
I don't usually rant like this so I hope you will forgive me this one transgression:
In a more prefect world every programmer would repeat to themselves "there is no such thing as an unbounded queue" five times before sleeping and first thing upon waking. This mode of thinking requires firguring out how backpressure will be handled in your system so when there is a slowdown somewhere in the process the parts before that have a way to find out about it and slow themselves down in response. In core.async the default back pressure is immediate because the default buffer size is zero. No go block succeeds in putting something into a chan until someone is ready to consume it.
chans look basically like this:
"queue of pending puts" --> buffer --> "queue of pending takes"
The putter and taker queues are intended to allow time for the two processes that are communicating via this pipe to schedule themselves so progress can be made. Without these there would be no room for threads to schedule and deadlocks would happen. They are NOT intended to be used as the buffer. thats what the buffer in the middle is for, and this was the design behind making that the only one that has a explicit size. explicitly set the buffer size for your system by setting the size of the buffer in the chan:
user> (time (dorun
(let [c (chan 1e6)]
(doseq [i (range 10 1e4)]
(go (>! c i))))))
"Elapsed time: 83.526679 msecs"
nil
In this case I have "calculated" that my system as a whole will be in a good state if there are up to a million waiting jobs. Of course your real world expierences will be different, and very much unique to your situation.
Thanks for your patience,
The limit of unconsumed puts is the size of the channels buffer plus the size of the queue.
The queue size in core.async is limited to 1024 but one should not rely on that.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With