I have a Google App Engine application where data is stored in the Google Cloud Datastore. I want to use Dataflow to put part of that data into BigQuery, but I thought I'd start with just getting some information from the Datastore and writing it to Google Cloud Storage. My code looks like this:
public class DatastorePipeline {
private static final Logger LOG = LoggerFactory.getLogger(DatastorePipeline.class);
static class GetEmailFn extends DoFn<Entity, String> {
@Override
public void processElement(ProcessContext c) throws Exception {
Map<String, Value> properties = DatastoreHelper.getPropertyMap(c.element());
Value value = properties.get("email_address");
if(value != null) {
c.output(DatastoreHelper.getString(value));
}
}
}
public static void main(String[] args) {
Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
Query.Builder q = Query.newBuilder();
q.addKindBuilder().setName("User");
Query query = q.build();
DatastoreIO.Source source = DatastoreIO.source()
.withDataset("my-project-id")
.withQuery(query);
p.apply("ReadUsersFromDatastore", Read.from(source))
.apply(ParDo.named("GetEmailAddress").of(new GetEmailFn()))
.apply(TextIO.Write.to("gs://dataflow-output-bucket/emails.txt"));
p.run();
}
}
However, when I try to run this, I get 403 errors on making the Datastore query:
Request failed with code 403, will NOT retry: https://www.googleapis.com/datastore/v1beta2/datasets/my-project-id/runQuery
I'm running this from Eclipse with the Google Cloud Dataflow plugin. Running dataflow jobs which don't have Datastore reads in them work just fine. I did a
gcloud auth login
before running the job, as described in the tutorial. What am I doing wrong?
edit: Here is the full stacktrace.
Oct 11, 2015, 12:03:13 PM (b6119cca307b4d9a): com.google.api.services.datastore.client.DatastoreException: Unauthorized. at com.google.api.services.datastore.client.RemoteRpc.makeException(RemoteRpc.java:115) at com.google.api.services.datastore.client.RemoteRpc.call(RemoteRpc.java:81) at com.google.api.services.datastore.client.BaseDatastoreFactory$RemoteRpc.call(BaseDatastoreFactory.java:41) at com.google.api.services.datastore.client.Datastore.runQuery(Datastore.java:109) at com.google.api.services.datastore.client.QuerySplitterImpl.getScatterKeys(QuerySplitterImpl.java:189) at com.google.api.services.datastore.client.QuerySplitterImpl.getSplits(QuerySplitterImpl.java:75) at com.google.cloud.dataflow.sdk.io.DatastoreIO$Source.getSplitQueries(DatastoreIO.java:427) at com.google.cloud.dataflow.sdk.io.DatastoreIO$Source.splitIntoBundles(DatastoreIO.java:306) at com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat.performSplit(BasicSerializableSourceFormat.java:318) at com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat.performSourceOperation(BasicSerializableSourceFormat.java:167) at com.google.cloud.dataflow.sdk.runners.worker.SourceOperationExecutor.execute(SourceOperationExecutor.java:80) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:257) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:193) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:146) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:164) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:145) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:132) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: com.google.api.client.http.HttpResponseException: 403 Forbidden Unauthorized. at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1061) at com.google.api.services.datastore.client.RemoteRpc.call(RemoteRpc.java:78) ... 19 more
Answered: It turned out that the problem was that my project was restricting access based on my company's domain, which was preventing the service account from connecting. Thanks Dan for helping me work through it!
It looks like the permissions for your Datastore are not configured correctly.
Here are two generic pieces of advice:
In your case, however, you are hitting the following bug:
Is the associated AppEngine project locked down to all users of a specific domain? If so, there is an issue in the current beta of Cloud Datastore that prevents the Dataflow service account (email ending in, e.g., @cloudservices.gserviceaccount.com) from accessing the data.
There is a temporary workaround we can apply, at a minor cost if you are using the OAuth API. The workaround would no longer enforce that the user comes from your app's domain. If that's an important requirement for you, you can do the domain enforcement in your code. (The regular Users API is not affected.)
To request that we apply the temporary workaround, you can email us at [email protected] referencing this issue and including your numeric project ID.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With