I have a stream of inputs and I want to make 2 HTTPS
network requests for each before passing the result on to another part of the program. The typical throughput is 50 per second.
for each input:
HTTP request A
HTTP request B
pass event on with (A.body and B.body)
I am using the http-kit
client, which is asynchronous by default. It returns a promise, and can also take a callback. Http-kit uses Java NIO (see here and here)
The speed of requests coming in, combined with the time to make a request, is high enough that this needs to be done asynchronously.
I've tried 3 approaches:
go
routines pulling off the channel. Each make requests that 'block' the goblock by deref
ing promises from HTTP requests. This doesn't work because I don't think that the promise plays well with threads.future
, which 'blocks' on the async promises. This results in very high CPU usage. Plus starvation of network resources somehow.http-kit
request immediately for request A, passing in a callback which makes request B, passing a callback that passes the event on. This lead to an out of memory error after a few hours.These all work and handle the capacity for a while. They all crash eventually. The most recent crash, after about 12 hours:
Mar 10, 2016 2:05:59 AM com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector run
WARNING: com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector@1bc8a7f5 -- APPARENT DEADLOCK!!! Creating emergency threads for unassigned pending
tasks!
Mar 10, 2016 3:38:38 AM com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector run
WARNING: com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector@1bc8a7f5 -- APPARENT DEADLOCK!!! Complete Status:
Managed Threads: 3
Active Threads: 1
Active Tasks:
com.mchange.v2.resourcepool.BasicResourcePool$1DestroyResourceTask@65d8b232 (com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#0)
Pending Tasks:
com.mchange.v2.resourcepool.BasicResourcePool$AcquireTask@359acb0d
Pool thread stack traces:
Thread[com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#0,5,main]
com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread.run(ThreadPoolAsynchronousRunner.java:560)
Thread[com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#1,5,main]
java.lang.Object.wait(Native Method)
com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread.run(ThreadPoolAsynchronousRunner.java:534)
Thread[com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#2,5,main]
java.lang.Object.wait(Native Method)
com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread.run(ThreadPoolAsynchronousRunner.java:534)
Thu Mar 10 04:38:34 UTC 2016 [client-loop] ERROR - select exception, should not happen
java.lang.OutOfMemoryError: Java heap space
at java.io.ByteArrayOutputStream.<init>(ByteArrayOutputStream.java:77)
at sun.security.ssl.OutputRecord.<init>(OutputRecord.java:76)
at sun.security.ssl.EngineOutputRecord.<init>(EngineOutputRecord.java:65)
at sun.security.ssl.HandshakeOutStream.<init>(HandshakeOutStream.java:63)
at sun.security.ssl.Handshaker.activate(Handshaker.java:514)
at sun.security.ssl.SSLEngineImpl.kickstartHandshake(SSLEngineImpl.java:717)
at sun.security.ssl.SSLEngineImpl.beginHandshake(SSLEngineImpl.java:743)
at org.httpkit.client.HttpClient.finishConnect(HttpClient.java:310)
at org.httpkit.client.HttpClient.run(HttpClient.java:375)
at java.lang.Thread.run(Thread.java:745)
Mar 10, 2016 4:56:34 AM baleen.events invoke
SEVERE: Thread error: Java heap space
java.lang.OutOfMemoryError: Java heap space
Mar 10, 2016 5:00:43 AM baleen.events invoke
SEVERE: Thread error: Java heap space
java.lang.OutOfMemoryError: Java heap space
Mar 10, 2016 4:58:25 AM baleen.events invoke
SEVERE: Thread error: Java heap space
java.lang.OutOfMemoryError: Java heap space
I don't know what the cause of failure is. It might be that there are too many closures being held on to, or gradual resource leakage, or thread starvation.
Questions
Does making 50 HTTP requests per second, each of which might take 200ms, meaning that there might be 100 requests in flight at any given time, sound like an excessive burden?
How do I do this in a way that handles the throughput and is robust?
EDIT
YourKit profiler tells me that I have about 2GB of char[]
s via org.httpkit.client.Handler
s via java.util.concurrent.FutureTask
s which suggests that references to old Handlers (i.e. requests) are being retained somehow. The whole reason for trying to use callbacks was to avoid this (although they might somehow be caught up in closures)
- Does making 50 HTTP requests per second, each of which might take 200ms, meaning that there might be 100 requests in flight at any given time, sound like an excessive burden?
This is definitely not excessive on modern hardware.
- How do I do this in a way that handles the throughput and is robust?
You can combine core.async pipelines and http-kit's callbacks to achieve this. You don't really need to create a go
routine for each request (although that shouldn't hurt), because you can use async put!
from the http-kit callback.
Use bounded buffers for each step of the pipeline to limit the number of active connections, which will (at least) be constrained by the number of ephemeral TCP ports available on your system.
Here's an example of a small program that does something similar to what you described. It reads "events" from a channel—in this case, each event is the ID "1"—and looks up those IDs on an HTTP service. It takes the response from that first call, looks up the JSON key "next"
and enqueues that as the URL for step 2. Finally, when this lookup is complete it adds an event to the out
channel which a go
routine monitors to report statistics.
(ns concur-req.core
(require [clojure.core.async :as async]
[cheshire.core :refer [decode]]
[org.httpkit.client :as http]))
(defn url-of
[id]
;; this service responds within 100-200ms
(str "http://localhost:28080/" id ".json"))
(defn retrieve-json-async
[url c]
(http/get url nil
(fn [{body :body status :status :as resp}]
(if (= 200 status)
(async/put! c (decode body true))
(println "ERROR:" resp))
(async/close! c))))
(defn run [parallelism stop-chan]
(let [;; allocate half of the parallelism to each step
step1-n (int (max (/ parallelism 2) 1))
step2-n step1-n
;; buffer to take ids, transform them into urls
step1-chan (async/chan step1-n (map url-of))
;; buffer for result of pulling urls from step1, xform by extracting :next url
step2-chan (async/chan step2-n (map :next))
;; buffer to count completed results
out-chan (async/chan 1 (map (constantly 1)))
;; for delivering the final result
final-chan (async/chan)
start-time (System/currentTimeMillis)]
;; process URLs from step1 and put the result in step2
(async/pipeline-async step1-n step2-chan retrieve-json-async step1-chan)
;; process URLs from step2 and put the result in out
(async/pipeline-async step2-n out-chan retrieve-json-async step2-chan)
;; keep the input channel full until stop-chan is closed.
(async/go-loop []
(let [[v c] (async/alts! [stop-chan [step1-chan "1"]])]
(if (= c stop-chan)
(async/close! step1-chan)
(recur))))
;; count messages on out-chan until the pipeline is closed, printing
;; status message every second
(async/go-loop [status-timer (async/timeout 1000) subt 0 accu 0]
(let [[v c] (async/alts! [status-timer out-chan])]
(cond (= c status-timer)
(do (println subt "records...")
(recur (async/timeout 1000) 0 (+ subt accu)))
(nil? v)
(async/>! final-chan (+ subt accu))
:else
(recur status-timer (+ v subt) accu))))
;; block until done, then emit final report.
(let [final-total (async/<!! final-chan)
elapsed-ms (- (System/currentTimeMillis) start-time)
elapsed-s (/ elapsed-ms 1000.0)]
(print (format "Processed %d records with parallelism %d in %.3f seconds (%d/sec)\n"
final-total parallelism elapsed-s
(int (/ final-total elapsed-s)))))))
(defn run-for
[seconds parallelism]
(let [stop-chan (async/chan)]
(future
(Thread/sleep (* seconds 1000))
(async/close! stop-chan))
(run parallelism stop-chan)))
(do
;; Warm up the connection pool, avoid somaxconn problems...
(doseq [p (map #(* 20 (inc %)) (range 25))]
(run-for 1 p))
(run-for (* 60 60 6) 500))
To test this, I set up an HTTP service that responds only after sleeping a random time between 100-200ms. Then I ran this program for 6 hours on my Macbook Pro.
With parallelism set to 500, I got 1155 completed transactions per second on average (2310 completed HTTP requests per second). I'm sure this could be much higher with some tuning (and especially by moving the HTTP service onto a different machine). The JVM memory creeped up to 1.5 GB within the first 30 minutes and then maintained that size. I'm using Oracle's 64-bit 1.8 JVM.
An alternative to your method A (deref
ing the HTTP-kit returned futures inside a go-block) might be a possibility, just doing so in a way that doesn't cause blocking the core.async handler threads on the future, which you can do by combining httpkit's callbacks and core.async:
(defn handle-event
"Return a core.async channel that will contain the result of making both HTTP call A and B."
[event-data]
(let [event-a-chan (clojure.core.async/chan)
event-b-chan (clojure.core.async/chan)
return-chan (clojure.core.async/chan)]
(org.httpkit.client/request "https://event-a-call"
{:method :get :params {"param1-k" "param1-v"}}
(fn [resp]
(clojure.core.async/put! event-a-chan resp)))
(org.httpkit.client/request "https://event-b-call"
{:method :get :params {"param1-k" "param1-v"}}
(fn [resp]
(clojure.core.async/put! event-b-chan resp)))
(clojure.core.async/go
(clojure.core.async/>! return-chan {:event-a-response (clojure.core.async/<! event-a-chan)
:event-b-response (clojure.core.async/<! event-b-chan)}))
return-chan))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With