I have to iterate over 130 Data Transfer Objects, and each time will generate a json to be uploaded to aws S3.
With no improvements, it takes around 90 seconds the complete the whole process. I tried using lamba and not using lamba, same results for both.
for(AbstractDTO dto: dtos) {
try {
processDTO(dealerCode, yearPeriod, monthPeriod, dto);
} catch (FileAlreadyExistsInS3Exception e) {
failedToUploadDTOs.add(e.getLocalizedMessage() + ": " + dto.fileName() + ".json");
}
}
dtos.stream().forEach(dto -> {
try {
processDTO(dealerCode, yearPeriod, monthPeriod, dto);
} catch (FileAlreadyExistsInS3Exception e) {
failedToUploadDTOs.add(e.getLocalizedMessage() + ": " + dto.fileName() + ".json");
}
});
After some investigation, I concluded that the method processDTO takes around 0.650ms per item to run.
My first attempt was to use parallel streams, and the results were pretty good, taking around 15 seconds to complete the whole process:
dtos.parallelStream().forEach(dto -> {
try {
processDTO(dealerCode, yearPeriod, monthPeriod, dto);
} catch (FileAlreadyExistsInS3Exception e) {
failedToUploadDTOs.add(e.getLocalizedMessage() + ": " + dto.fileName() + ".json");
}
});
But I still need to decrease that time. I researched about improving parallel streams, and discovered the ForkJoinPool trick:
ForkJoinPool forkJoinPool = new ForkJoinPool(PARALLELISM_NUMBER);
forkJoinPool.submit(() ->
dtos.parallelStream().forEach(dto -> {
try {
processDTO(dealerCode, yearPeriod, monthPeriod, dto);
} catch (FileAlreadyExistsInS3Exception e) {
failedToUploadDTOs.add(e.getLocalizedMessage() + ": " + dto.fileName() + ".json");
}
})).get();
forkJoinPool.shutdown();
Unfortunately, the results were a bit confusing for me.
All tests were done using postman requests, calling the controller method which will end-up iterating the 130 items
I'm satisfied with 5 seconds, using 32 as PARALLELISM_NUMBER, but I'm worried about the consequences.
I'm running on a Mac 2.2GHZ I7
sysctl hw.physicalcpu hw.logicalcp
hw.physicalcpu: 4
hw.logicalcpu: 8
Here's what processDTO does:
private void processDTO(int dealerCode, int yearPeriod, int monthPeriod, AbstractDTO dto) throws FileAlreadyExistsInS3Exception {
String flatJson = JsonFlattener.flatten(new JSONObject(dto).toString());
String jsonFileName = dto.fileName() + JSON_TYPE;;
String jsonFilePath = buildFilePathNew(dto.endpoint(), dealerCode, yearPeriod, monthPeriod, AWS_S3_JSON_ROOT_FOLDER);
uploadFileToS3(jsonFilePath + jsonFileName, flatJson);
}
public void uploadFileToS3(String fileName, String fileContent) throws FileAlreadyExistsInS3Exception {
if (s3client.doesObjectExist(bucketName, fileName)) {
throw new FileAlreadyExistsInS3Exception(ErrorMessages.FILE_ALREADY_EXISTS_IN_S3.getMessage());
}
s3client.putObject(bucketName, fileName, fileContent);
}
The parallelism
parameters decides how many threads will be used by ForkJoinPool
. That's why by default parallelism
value is the available CPU core count:
Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors())
In your case the bottlneck should be checking that a file exists and uploading it to S3. The time here will depend on at least few factors: CPU, network card and driver, operating system, other. It seems that S3 network operation time is not CPU bound in your case as you are observing improvement by creating more simulations worker threads, perhaps the network request are enqueued by the operating system.
The right value for parallelism
varies from one workload type to another. A CPU-bound workflow is better with the default parallelism
equal to CPU cores due to the negative impact of context switching. A non CPU-bound workload like yours can be speed up with more worker threads assuming the workload won't block the CPU e.g. by busy waiting.
There is no one single ideal value for parallelism
in ForkJoinPool
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With