Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Cross account GCS access using Spark on Dataproc

I am trying to ingest data in GCS of account A to BigQuery of account B using Spark running on Dataproc in account B.

I have tried to set GOOGLE_APPLICATION_CREDENTIALS to service account key file which allows access to necessary bucket in account A. But if I start spark-shell I get the following error.

Exception in thread "main" java.io.IOException: Error accessing Bucket dataproc-40222d04-2c40-42f9-a5de-413a123f949d-asia-south1

As per my understanding, setting the environment variable is switching the access from account B to account A.

Is there a way to have both the accesses within Spark i.e., default access to account B and additional access to account A?

Update: I tried running spark-shell with configuration as per Igor's Answer, but the error prevails. Here's the command I tried and the stack trace.

$ spark-shell --conf spark.hadoop.fs.gs.auth.service.account.json.keyfile=/home/shasank/watchful-origin-299914-fa29998bad08.json --jars gs://hadoop-lib/bigquery/bigquery-connector-hadoop2-latest.jar
Exception in thread "main" java.io.IOException: Error accessing Bucket dataproc-40999d04-2b99-99f9-a5de-999ad23f949d-asia-south1
  at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getBucket(GoogleCloudStorageImpl.java:1895)
  at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getItemInfo(GoogleCloudStorageImpl.java:1846)
  at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.getFileInfoInternal(GoogleCloudStorageFileSystem.java:1125)
  at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.getFileInfo(GoogleCloudStorageFileSystem.java:1116)
  at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.exists(GoogleCloudStorageFileSystem.java:440)
  at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.configureBuckets(GoogleHadoopFileSystemBase.java:1738)
  at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem.configureBuckets(GoogleHadoopFileSystem.java:76)
  at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.configure(GoogleHadoopFileSystemBase.java:1659)
  at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.initialize(GoogleHadoopFileSystemBase.java:683)
  at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.initialize(GoogleHadoopFileSystemBase.java:646)
  at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3242)
  at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:121)
  at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3291)
  at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3259)
  at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:470)
  at org.apache.spark.deploy.DependencyUtils$.org$apache$spark$deploy$DependencyUtils$$resolveGlobPath(DependencyUtils.scala:165)
  at org.apache.spark.deploy.DependencyUtils$$anonfun$resolveGlobPaths$2.apply(DependencyUtils.scala:146)
  at org.apache.spark.deploy.DependencyUtils$$anonfun$resolveGlobPaths$2.apply(DependencyUtils.scala:144)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
  at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
  at org.apache.spark.deploy.DependencyUtils$.resolveGlobPaths(DependencyUtils.scala:144)
  at org.apache.spark.deploy.SparkSubmit$$anonfun$doPrepareSubmitEnvironment$3.apply(SparkSubmit.scala:403)
  at org.apache.spark.deploy.SparkSubmit$$anonfun$doPrepareSubmitEnvironment$3.apply(SparkSubmit.scala:403)
  at scala.Option.map(Option.scala:146)
  at org.apache.spark.deploy.SparkSubmit$.doPrepareSubmitEnvironment(SparkSubmit.scala:403)
  at org.apache.spark.deploy.SparkSubmit$.prepareSubmitEnvironment(SparkSubmit.scala:250)
  at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:171)
  at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
  at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by:
com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException:
  403 Forbidden {
    "code" : 403,
    "errors" : [ {
      "domain" : "global",
      "message" : "[email protected] does not have storage.buckets.get access to dataproc-40999d04-2b99-99f9-a5de-999ad23f949d-asia-south1.",
      "reason" : "forbidden" } ],
    "message" : "[email protected] does not have storage.buckets.get access to  dataproc-40999d04-2b99-99f9-a5de-999ad23f949d-asia-south1." }
  at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:150)
  at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
  at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
  at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:401)
  at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1097)
  at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:499)
  at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:432)
  at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:549)
  at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getBucket(GoogleCloudStorageImpl.java:1889)
  ... 32 more
like image 341
Shasankar Avatar asked Oct 16 '22 12:10

Shasankar


1 Answers

To achieve this you need to re-configure GCS and BQ connectors to use different service accounts for authentication, by default both of them are using GCE VM service account.

To do so, please, refer to the Method 2 in the GCS connector configuration manual.

The same configuration applies to Hadoop BQ connector, but you need to replace fs.gs. prefix in the properties names with bq.mapred. prefix:

spark.hadoop.fs.gs.auth.service.account.json.keyfile=/path/to/local/gcs/key/file.json
spark.hadoop.bq.mapred.auth.service.account.json.keyfile=/path/to/local/bq/key/file.json

Update:

To disable Dataproc staging bucket check during GCS connector initialization, you need to use latest GCS connector version (1.9.17 at the moment) and set GCS connector system bucket property to empty string:

spark.hadoop.fs.gs.system.bucket=

Note, that this system bucket functionality is removed in upcoming GCS connector 2.0, so this will be not an issue going forward.

like image 123
Igor Dvorzhak Avatar answered Nov 15 '22 09:11

Igor Dvorzhak