The Beam 2.1 pipeline uses ValueState in a stateful DoFn. It runs fine with a single worker but when scaling is enabled will fail with "Unable to read value from state" and the root exception below. Any ideas what could cause this?
Caused by: java.util.concurrent.ExecutionException: com.google.cloud.dataflow.worker.KeyTokenInvalidException: Unable to fetch data due to token mismatch for key ��
at com.google.cloud.dataflow.worker.repackaged.com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:500)
at com.google.cloud.dataflow.worker.repackaged.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:459)
at com.google.cloud.dataflow.worker.repackaged.com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:76)
at com.google.cloud.dataflow.worker.repackaged.com.google.common.util.concurrent.ForwardingFuture.get(ForwardingFuture.java:62)
at com.google.cloud.dataflow.worker.WindmillStateReader$WrappedFuture.get(WindmillStateReader.java:309)
at com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillValue.read(WindmillStateInternals.java:384)
... 16 more
Caused by: com.google.cloud.dataflow.worker.KeyTokenInvalidException: Unable to fetch data due to token mismatch for key ��
at com.google.cloud.dataflow.worker.WindmillStateReader.consumeResponse(WindmillStateReader.java:469)
at com.google.cloud.dataflow.worker.WindmillStateReader.startBatchAndBlock(WindmillStateReader.java:411)
at com.google.cloud.dataflow.worker.WindmillStateReader$WrappedFuture.get(WindmillStateReader.java:306)
... 17 more
I believe that exception should just be rethrown. It is thrown by the state mechanism to indicate that additional work on that key should not be performed, and will be automatically retried by the Dataflow runner.
These typically indicate that either that particular work should be performed on a different worker (thus proceeding wouldn't be helpful).
It may be possible that misusing state -- storing the state object from one key and attempting to use it on a different key -- could also lead to these errors. If that is the case, you may be able to see more diagnostic messages in either the worker or shuffler logs in Stackdriver logging.
If neither retrying nor looking at logging and how you use the state objects help, please provide a job ID demonstrating the problem.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With