Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use google-cloud-storage directly in a Apache Beam project

We are working on an Apache Beam project (version 2.4.0) where we also want to work with a bucket directly through the google-cloud-storage API. However, combining some of the beam dependencies with cloud storage, leads to a hard to solve dependency problem.

We saw that Beam 2.4.0 depends on cloud-storage 1.22.0, so that is why we us it below. We had the same issues with 1.27.0. The following pom.xml specifies the four beam dependencies we use in our project, of which the last two lead to issues.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.bol</groupId>
    <artifactId>beam-plus-storage</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <beam.version>2.4.0</beam.version>
    </properties>

    <dependencies>
        <!-- These first two dependencies do not clash -->
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-runners-direct-java</artifactId>
            <version>${beam.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-extensions-join-library</artifactId>
            <version>${beam.version}</version>
        </dependency>
        <!-- This one leads to java.lang.ClassNotFoundException: com.google.api.gax.rpc.HeaderProvider -->
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
            <version>${beam.version}</version>
        </dependency>
        <!-- This one leads to java.lang.NoSuchMethodError: com.google.api.services.storage.Storage$Objects$List.setUserProject(...) -->
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
            <version>${beam.version}</version>
        </dependency>

        <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>google-cloud-storage</artifactId>
            <version>1.22.0</version>
        </dependency>

    </dependencies>
</project>

Below is a minimal working/broken usage of the storage API, listing files from a public bucket.

import com.google.api.gax.paging.Page;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;

public class CloudStorageReader {

    public static void main(String[] args) {
        Storage storage = StorageOptions.getDefaultInstance().getService();
        Page<Blob> list = storage.list("gcp-public-data-landsat", Storage.BlobListOption.currentDirectory(), Storage.BlobListOption.prefix("LC08/PRE/044/034/LC80440342016259LGN00/"));
        for (Blob blob : list.getValues()) {
            System.out.println(blob);
        }
    }
}

When removing the last two dependencies, listing the bucket's content works fine. With the java-io beam dependency, the HeaderProvider class is not found. With the dataflow dependency, the setUserProject method is not found. See the comments in the pom for full error messages.

We spent quite some time trying to fix the HeaderProvider error, which is the one appearing when all four beam dependencies are imported. We added explicit imports for the clashing dependencies, adding excludes on the beam imports as well. Every time we added an explicit dependency, another related issue popped up. We attempted maven shading, which is not that practical due to our project's packaging, so never got it to work.

In the end, we resorted to creating a separate sub-module + jar for the cloud-storage interaction, introducing more complexity to our packaging/running.

As a final note, we had the same issue when trying to use the BigQuery API, but worked around that by re-using package-private beam code.

It would be awesome if someone did have a (relatively simple) way to get these libraries working together, or could confirm this really is a challenging dependency issue that may need to be addressed in Apache Beam.

like image 878
Ivan Plantevin Avatar asked Apr 25 '18 09:04

Ivan Plantevin


People also ask

Is GCP dataflow Apache Beam?

Cloud Dataflow: Google Cloud Dataflow is a fully managed service for executing Apache Beam pipelines within the Google Cloud Platform ecosystem.

What is Apache Beam in GCP?

Apache Beam is an open source, unified model for defining both batch- and streaming-data parallel-processing pipelines. The Apache Beam programming model simplifies the mechanics of large-scale data processing. Using one of the Apache Beam SDKs, you build a program that defines the pipeline.


1 Answers

Instead of including a separate dependency for Cloud Storage, you can utilize Beam's included FileSystems API to list buckets, read/write files, and delete objects on Cloud Storage. Below is an example which lists all files under a bucket and then reads one of those files into a string.

// Set the default pipeline options so the various filesystems are
// loaded into the registry. This shouldn't be necessary if used
// within a pipeline.
FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());

// List Bucket
MatchResult listResult = FileSystems.match("gs://filesystems-demo/**/*");
listResult
    .metadata()
    .forEach(
        metadata -> {
          ResourceId resourceId = metadata.resourceId();
          System.out.println(resourceId.toString());
        });


// Read file
ResourceId existingFileResourceId = FileSystems
    .matchSingleFileSpec("gs://filesystems-demo/test-file1.csv")
    .resourceId();

try (ByteArrayOutputStream out = new ByteArrayOutputStream();
    ReadableByteChannel readerChannel = FileSystems.open(existingFileResourceId);
    WritableByteChannel writerChannel = Channels.newChannel(out)) {
  ByteStreams.copy(readerChannel, writerChannel);

  System.out.println("File contents: \n" + out.toString());
}


// Write file
String contentToWrite = "Laces out Dan!";

ResourceId newFileResourceId = FileSystems
    .matchNewResource("gs://filesystems-demo/new-file.txt", false);

try (ByteArrayInputStream in = new ByteArrayInputStream(contentToWrite.getBytes());
    ReadableByteChannel readerChannel = Channels.newChannel(in);
    WritableByteChannel writerChannel = FileSystems.create(newFileResourceId, MimeTypes.TEXT)) {

  ByteStreams.copy(readerChannel, writerChannel);
}
like image 66
Ryan McDowell Avatar answered Sep 30 '22 15:09

Ryan McDowell