Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Upload ZipOutputStream to S3 without saving zip file (large) temporary to disk using AWS S3 Java

I have a requirement to download photos (not in same directory) from S3, ZIP them and again upload to S3 using AWS S3 Java SDK. This zip file size can go in GBs. Currently I am using AWS Lambda which has a limitation of temporary storage up to 500 MB. So I don't want to save ZIP file on disk instead I want to stream ZIP file (which is being created dynamically using downloaded photos from S3) directly to S3. I need this using AWS S3 Java SDK.

like image 242
pankaj Avatar asked Mar 17 '19 05:03

pankaj


2 Answers

The problem is the AWS Java SDK for S3 does not support a way to stream writing to an OutputStream. The following snippet implements an 'S3OutputStream', which extends from OutputStream and will automatically perform the 'putObject' or 'initiateMultipartUpload', depending on the size. This allows you to pass this S3OutputStream to the constructor of ZipOutputStream, e.g. new ZipOutputStream(new S3OutputStream(s3Client, "my_bucket", "path"))

import java.io.ByteArrayInputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class S3OutputStream extends OutputStream {

    private static final Logger LOG = LoggerFactory.getLogger(S3OutputStream.class);

    /** Default chunk size is 10MB */
    protected static final int BUFFER_SIZE = 10000000;

    /** The bucket-name on Amazon S3 */
    private final String bucket;

    /** The path (key) name within the bucket */
    private final String path;

    /** The temporary buffer used for storing the chunks */
    private final byte[] buf;

    /** The position in the buffer */
    private int position;

    /** Amazon S3 client. TODO: support KMS */
    private final AmazonS3 s3Client;

    /** The unique id for this upload */
    private String uploadId;

    /** Collection of the etags for the parts that have been uploaded */
    private final List<PartETag> etags;

    /** indicates whether the stream is still open / valid */
    private boolean open;

    /**
     * Creates a new S3 OutputStream
     * @param s3Client the AmazonS3 client
     * @param bucket name of the bucket
     * @param path path within the bucket
     */
    public S3OutputStream(AmazonS3 s3Client, String bucket, String path) {
        this.s3Client = s3Client;
        this.bucket = bucket;
        this.path = path;
        this.buf = new byte[BUFFER_SIZE];
        this.position = 0;
        this.etags = new ArrayList<>();
        this.open = true;
    }

    /**
     * Write an array to the S3 output stream.
     *
     * @param b the byte-array to append
     */
    @Override
    public void write(byte[] b) {
        write(b,0,b.length);
    }

    /**
     * Writes an array to the S3 Output Stream
     *
     * @param byteArray the array to write
     * @param o the offset into the array
     * @param l the number of bytes to write
     */
    @Override
    public void write(final byte[] byteArray, final int o, final int l) {
        this.assertOpen();
        int ofs = o, len = l;
        int size;
        while (len > (size = this.buf.length - position)) {
            System.arraycopy(byteArray, ofs, this.buf, this.position, size);
            this.position += size;
            flushBufferAndRewind();
            ofs += size;
            len -= size;
        }
        System.arraycopy(byteArray, ofs, this.buf, this.position, len);
        this.position += len;
    }

    /**
     * Flushes the buffer by uploading a part to S3.
     */
    @Override
    public synchronized void flush() {
        this.assertOpen();
        LOG.debug("Flush was called");
    }

    protected void flushBufferAndRewind() {
        if (uploadId == null) {
            LOG.debug("Starting a multipart upload for {}/{}",this.bucket,this.path);
            final InitiateMultipartUploadRequest request = new InitiateMultipartUploadRequest(this.bucket, this.path)
                    .withCannedACL(CannedAccessControlList.BucketOwnerFullControl);
            InitiateMultipartUploadResult initResponse = s3Client.initiateMultipartUpload(request);
            this.uploadId = initResponse.getUploadId();
        }
        uploadPart();
        this.position = 0;
    }

    protected void uploadPart() {
        LOG.debug("Uploading part {}",this.etags.size());
        UploadPartResult uploadResult = this.s3Client.uploadPart(new UploadPartRequest()
                .withBucketName(this.bucket)
                .withKey(this.path)
                .withUploadId(this.uploadId)
                .withInputStream(new ByteArrayInputStream(buf,0,this.position))
                .withPartNumber(this.etags.size() + 1)
                .withPartSize(this.position));
        this.etags.add(uploadResult.getPartETag());
    }

    @Override
    public void close() {
        if (this.open) {
            this.open = false;
            if (this.uploadId != null) {
                if (this.position > 0) {
                    uploadPart();
                }
                LOG.debug("Completing multipart");
                this.s3Client.completeMultipartUpload(new CompleteMultipartUploadRequest(bucket, path, uploadId, etags));
            }
            else {
                LOG.debug("Uploading object at once to {}/{}",this.bucket,this.path);
                final ObjectMetadata metadata = new ObjectMetadata();
                metadata.setContentLength(this.position);
                final PutObjectRequest request = new PutObjectRequest(this.bucket, this.path, new ByteArrayInputStream(this.buf, 0, this.position), metadata)
                        .withCannedAcl(CannedAccessControlList.BucketOwnerFullControl);
                this.s3Client.putObject(request);
            }
        }
    }

    public void cancel() {
        this.open = false;
        if (this.uploadId != null) {
            LOG.debug("Aborting multipart upload");
            this.s3Client.abortMultipartUpload(new AbortMultipartUploadRequest(this.bucket, this.path, this.uploadId));
        }
    }

    @Override
    public void write(int b) {
        this.assertOpen();
        if (position >= this.buf.length) {
            flushBufferAndRewind();
        }
        this.buf[position++] = (byte)b;
    }

    private void assertOpen() {
        if (!this.open) {
            throw new IllegalStateException("Closed");
        }
    }
}

like image 187
blagerweij Avatar answered Nov 11 '22 22:11

blagerweij


The basic idea is to use streaming operations. This way you won't wait till the ZIP is generated on a filesystem, but start uploading as soon, as the ZIP algorithm produces any data. Obviously, some data will be buffered in memory, still no need to wait for the whole ZIP to be generated on a disk. We'll also use stream compositions and PipedInputStream / PipedOutputStream in two threads: one to read the data, and the other to ZIP the contents.

Here is a version for aws-java-sdk:

final AmazonS3 client = AmazonS3ClientBuilder.defaultClient();

final PipedOutputStream pipedOutputStream = new PipedOutputStream();
final PipedInputStream pipedInputStream = new PipedInputStream(pipedOutputStream);

final Thread s3In = new Thread(() -> {
    try (final ZipOutputStream zipOutputStream = new ZipOutputStream(pipedOutputStream)) {
        S3Objects
                // It's just a convenient way to list all the objects. Replace with you own logic.
                .inBucket(client, "bucket")
                .forEach((S3ObjectSummary objectSummary) -> {
                    try {
                        if (objectSummary.getKey().endsWith(".png")) {
                            System.out.println("Processing " + objectSummary.getKey());

                            final ZipEntry entry = new ZipEntry(
                                    UUID.randomUUID().toString() + ".png" // I'm too lazy to extract file name from the
                                    // objectSummary
                            );

                            zipOutputStream.putNextEntry(entry);

                            IOUtils.copy(
                                    client.getObject(
                                            objectSummary.getBucketName(),
                                            objectSummary.getKey()
                                    ).getObjectContent(),
                                    zipOutputStream
                            );

                            zipOutputStream.closeEntry();
                        }
                    } catch (final Exception all) {
                        all.printStackTrace();
                    }
                });
    } catch (final Exception all) {
        all.printStackTrace();
    }
});
final Thread s3Out = new Thread(() -> {
    try {
        client.putObject(
                "another-bucket",
                "previews.zip",
                pipedInputStream,
                new ObjectMetadata()
        );

        pipedInputStream.close();
    } catch (final Exception all) {
        all.printStackTrace();
    }
});

s3In.start();
s3Out.start();

s3In.join();
s3Out.join();

However, note that it will print a warning:

WARNING: No content length specified for stream data.  Stream contents will be buffered in memory and could result in out of memory errors.

That's because S3 needs to know the size of data in advance, before the upload. It's impossible to know the size of a resulting ZIP in advance. You can probably try your luck with multipart uploads, but the code will be more trickier. Though, the idea would be similar: one thread should read the data and send the content in ZIP stream and the other thread should read ZIPped entries and upload them as multiparts. After all the entries (parts) are uploaded, the multipart should be completed.

Here is an example for aws-java-sdk-2.x:

final S3Client client = S3Client.create();

final PipedOutputStream pipedOutputStream = new PipedOutputStream();
final PipedInputStream pipedInputStream = new PipedInputStream(pipedOutputStream);

final Thread s3In = new Thread(() -> {
    try (final ZipOutputStream zipOutputStream = new ZipOutputStream(pipedOutputStream)) {
        client.listObjectsV2Paginator(
                ListObjectsV2Request
                        .builder()
                        .bucket("bucket")
                        .build()
        )
                .contents()
                .forEach((S3Object object) -> {
                    try {
                        if (object.key().endsWith(".png")) {
                            System.out.println("Processing " + object.key());

                            final ZipEntry entry = new ZipEntry(
                                    UUID.randomUUID().toString() + ".png" // I'm too lazy to extract file name from the object
                            );

                            zipOutputStream.putNextEntry(entry);

                            client.getObject(
                                    GetObjectRequest
                                            .builder()
                                            .bucket("bucket")
                                            .key(object.key())
                                            .build(),
                                    ResponseTransformer.toOutputStream(zipOutputStream)
                            );

                            zipOutputStream.closeEntry();
                        }
                    } catch (final Exception all) {
                        all.printStackTrace();
                    }
                });
    } catch (final Exception all) {
        all.printStackTrace();
    }
});
final Thread s3Out = new Thread(() -> {
    try {
        client.putObject(
                PutObjectRequest
                        .builder()
                        .bucket("another-bucket")
                        .key("previews.zip")
                        .build(),
                RequestBody.fromBytes(
                        IOUtils.toByteArray(pipedInputStream)
                )
        );
    } catch (final Exception all) {
        all.printStackTrace();
    }
});

s3In.start();
s3Out.start();

s3In.join();
s3Out.join();

It suffers from the same plague: the ZIP needs to be prepared in memory before the upload.

If you're interested, I've prepared a demo project, so you can play with the code.

like image 24
madhead - StandWithUkraine Avatar answered Nov 11 '22 23:11

madhead - StandWithUkraine