I have an ArrayList which has billions of records, I iterate over each record and post this to a server. The method called as follows in each iteration:
public void sendNotification(String url, String accountId, String accountPwd, String jsonPayLoad,
int maxConnections) {
notificationService = Executors.newFixedThreadPool(maxConnections);
notificationService.submit(new SendPushNotification(url, accountId, accountPwd, jsonPayLoad));
notificationService.shutdown();
}
My SendPushNotification class is as follows:
public class SendPushNotification implements Runnable {
String url;
String accountId;
String accountPwd;
String jsonPayLoad;
public SendPushNotification(String url, String accountId, String accountPwd, String jsonPayLoad) {
this.url = url;
this.accountId = accountId;
this.accountPwd = accountPwd;
this.jsonPayLoad = jsonPayLoad;
}
public void run() {
HttpsURLConnection conn = null;
try {
StringBuffer response;
URL url1 = new URL(url);
conn = (HttpsURLConnection) url1.openConnection();
// conn.setReadTimeout(20000);
// conn.setConnectTimeout(30000);
conn.setRequestProperty("X-Account-Id", accountId);
conn.setRequestProperty("X-Passcode", accountPwd);
conn.setRequestProperty("Content-Type", "application/json");
conn.setDoOutput(true);
conn.setRequestMethod("POST");
OutputStream out = new BufferedOutputStream(conn.getOutputStream());
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out, "UTF-8"));
writer.write(jsonPayLoad);
writer.close();
out.close();
int responseCode = conn.getResponseCode();
System.out.println(String.valueOf(responseCode));
switch (responseCode) {
case 200:
BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream()));
String inputLine;
response = new StringBuffer();
while ((inputLine = in.readLine()) != null) {
response.append(inputLine);
}
in.close();
System.out.println(response.toString());
}
} catch (IOException ez) {
ez.printStackTrace();
} finally {
if (conn != null) {
try {
conn.disconnect();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
So whats going wrong here, I suspect I have to run this on a decent system configuration. Basically I want to understand if there is anything wrong with my code?
Wrong approach! Looping over:
notificationService = Executors.newFixedThreadPool(maxConnections);
is a bad idea! Why do you intend to create billions of ThreadPools; to submit one task to then shut it down?! That's like buying a new Ferrari every time the ash tray is full ...
Please understand: your simple code creates quite a bunch of objects; and all of them go away after one loop iteration. Meaning: they become eligible for garbage collection. In other words: you are constantly creating garbage on a very high rate. And you are really surprised that doing so pushes you into "out of memory"?
Instead, use one ThreadPool and submit your billions of requests into that!
And beyond that, even that isn't exactly a great idea. Opening one network connection to your server per entry will simply not scale up to billions of entries: a real solution requires you to step back and think up something that works in a "reasonable way" end to end. For example you should consider to create some kind of "bulk" or "streaming" interface on your server. Iterating a billions-entry file on the client, and making billions of connections to the server is sorry, insane!
So instead of doing:
loop:
open connection / push ONE item / close connection
You better go for:
open connection / push all items / close connections
Or beyond that, you might even look into transmitting compressed, binary data. Meaning: compress your file on the client side, send it as blob; and extract / process it on the server side.
A lot of option space here; but be assured: your current "out of memory" exception is just one symptom caused by an "inappropriate" design.
EDIT: given your comment, my (personal) advice:
ExecutorService giving out of memory error
As @GhostCat pointed out you should create one ExecutorService at the top of notification class at the start of your program. As you get new requests in, you submit all of your notification requests to the same ExecutorService which will create threads as necessary. Creating a new pool each time through is a bad pattern.
// at top of class not in the sending loop
private final ExecutorService notificationThreadPool =
Executors.newFixedThreadPool(MAX_CONNECTIONS);
However it's important to realize that you might still run out of memory because the thread-pool uses an unbounded queue. The newFixedThreadPool(...) method under the covers uses new LinkedBlockingQueue<Runnable>() so if your sender is not keeping up with demand you will still run out of memory.
Then if increasing the number of threads is not an option, you are going to have to either slow down the producer somehow or write the requests to disk or other short-term storage.
If you need to use a bounded queue to detect when things are filling up then you should do something like:
private final ExecutorService notificationThreadPool =
new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(MAX_QUEUED_REQUESTS);
By default, if all of the threads are busy and the queue is full when you submit a SendPushNotification to the pool it will throw RejectedExecutionException if it is full giving you the opportunity to handle that somehow. You can also use notificationThreadPool.setRejectedExecutionHandler(...) to set a RejectedExecutionHandler that writes the rejected notification into temporary storage until the threads catch up.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With