I am trying to read large file into chunks from S3 without cutting any line for parallel processing.
Let me explain by example: There is file of size 1G on S3. I want to divide this file into chucks of 64 MB. It is easy I can do it like :
S3Object s3object = s3.getObject(new GetObjectRequest(bucketName, key));
InputStream stream = s3object.getObjectContent();
byte[] content = new byte[64*1024*1024];
while (stream.read(content) != -1) {
//process content here
}
but problem with chunk is it may have 100 complete line and one incomplete. but I can not process incomplete line and don't want to discard it.
Is any way to handle this situations ? means all chucks have no partial line.
My usual approach (InputStream
-> BufferedReader.lines()
-> batches of lines -> CompletableFuture
) won't work here because the underlying S3ObjectInputStream
times out eventually for huge files.
So I created a new class S3InputStream
, which doesn't care how long it's open for and reads byte blocks on demand using short-lived AWS SDK calls. You provide a byte[]
that will be reused. new byte[1 << 24]
(16Mb) appears to work well.
package org.harrison;
import java.io.IOException;
import java.io.InputStream;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.GetObjectRequest;
/**
* An {@link InputStream} for S3 files that does not care how big the file is.
*
* @author stephen harrison
*/
public class S3InputStream extends InputStream {
private static class LazyHolder {
private static final AmazonS3 S3 = AmazonS3ClientBuilder.defaultClient();
}
private final String bucket;
private final String file;
private final byte[] buffer;
private long lastByteOffset;
private long offset = 0;
private int next = 0;
private int length = 0;
public S3InputStream(final String bucket, final String file, final byte[] buffer) {
this.bucket = bucket;
this.file = file;
this.buffer = buffer;
this.lastByteOffset = LazyHolder.S3.getObjectMetadata(bucket, file).getContentLength() - 1;
}
@Override
public int read() throws IOException {
if (next >= length) {
fill();
if (length <= 0) {
return -1;
}
next = 0;
}
if (next >= length) {
return -1;
}
return buffer[this.next++];
}
public void fill() throws IOException {
if (offset >= lastByteOffset) {
length = -1;
} else {
try (final InputStream inputStream = s3Object()) {
length = 0;
int b;
while ((b = inputStream.read()) != -1) {
buffer[length++] = (byte) b;
}
if (length > 0) {
offset += length;
}
}
}
}
private InputStream s3Object() {
final GetObjectRequest request = new GetObjectRequest(bucket, file).withRange(offset,
offset + buffer.length - 1);
return LazyHolder.S3.getObject(request).getObjectContent();
}
}
The aws-java-sdk already provides streaming functionality for your S3 objects. You have to call "getObject" and the result will be an InputStream.
1) AmazonS3Client.getObject(GetObjectRequest getObjectRequest) -> S3Object
2) S3Object.getObjectContent()
Note: The method is a simple getter and does not actually create a stream. If you retrieve an S3Object, you should close this input stream as soon as possible, because the object contents aren't buffered in memory and stream directly from Amazon S3. Further, failure to close this stream can cause the request pool to become blocked.
aws java docs
100 complete line and one incomplete
do you mean you need to read the stream line by line? If so, instead of using a an InputStream try to read the s3 object stream by using BufferedReader so that you can read the stream line by line but I think this will make a little slower than by chunk.
S3Object s3object = s3.getObject(new GetObjectRequest(bucketName, key));
BufferedReader in = new BufferedReader(new InputStreamReader(s3object.getObjectContent()));
String line;
while ((line = in.readLine()) != null) {
//process line here
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With