Why do I get this EOFException when I execute the following code?
I have successfully used GroupByKey in simpler situations I think what seems to trigger the error is using a custom coder (for Json objects). Can anyone explain why this is happening?
Here's the error:
com.google.cloud.dataflow.sdk.Pipeline$PipelineExecutionException: com.google.cloud.dataflow.sdk.coders.CoderException: java.io.EOFException
at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:186)
at com.google.cloud.dataflow.sdk.testing.TestPipeline.run(TestPipeline.java:106)
at com.example.dataflow.TestGroupByKeyCustomCoder.testPipeline(TestGroupByKeyCustomCoder.java:85)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:119)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:234)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:74)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: com.google.cloud.dataflow.sdk.coders.CoderException: java.io.EOFException
at com.google.cloud.dataflow.sdk.coders.BigEndianLongCoder.decode(BigEndianLongCoder.java:62)
at com.google.cloud.dataflow.sdk.coders.InstantCoder.decode(InstantCoder.java:83)
at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:621)
at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:553)
at com.google.cloud.dataflow.sdk.coders.KvCoder.decode(KvCoder.java:98)
at com.google.cloud.dataflow.sdk.coders.KvCoder.decode(KvCoder.java:42)
at com.google.cloud.dataflow.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:157)
at com.google.cloud.dataflow.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:140)
at com.google.cloud.dataflow.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:134)
at com.google.cloud.dataflow.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:107)
at com.google.cloud.dataflow.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:44)
Caused by: java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:197)
at java.io.DataInputStream.readLong(DataInputStream.java:416)
at com.google.cloud.dataflow.sdk.coders.BigEndianLongCoder.decode(BigEndianLongCoder.java:58)
at com.google.cloud.dataflow.sdk.coders.InstantCoder.decode(InstantCoder.java:83)
at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:621)
at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:553)
at com.google.cloud.dataflow.sdk.coders.KvCoder.decode(KvCoder.java:98)
at com.google.cloud.dataflow.sdk.coders.KvCoder.decode(KvCoder.java:42)
at com.google.cloud.dataflow.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:157)
at com.google.cloud.dataflow.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:140)
at com.google.cloud.dataflow.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:134)
at com.google.cloud.dataflow.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:107)
at com.google.cloud.dataflow.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:44)
at com.google.cloud.dataflow.sdk.transforms.ParDo$ImmutabilityCheckingOutputManager.output(ParDo.java:1303)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:287)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext.output(DoFnRunnerBase.java:449)
at com.google.cloud.dataflow.sdk.util.ReifyTimestampAndWindowsDoFn.processElement(ReifyTimestampAndWindowsDoFn.java:38)
at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:49)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:138)
at com.google.cloud.dataflow.sdk.transforms.ParDo.evaluateHelper(ParDo.java:1229)
at com.google.cloud.dataflow.sdk.transforms.ParDo.evaluateSingleHelper(ParDo.java:1098)
at com.google.cloud.dataflow.sdk.transforms.ParDo.access$300(ParDo.java:457)
at com.google.cloud.dataflow.sdk.transforms.ParDo$1.evaluate(ParDo.java:1084)
at com.google.cloud.dataflow.sdk.transforms.ParDo$1.evaluate(ParDo.java:1079)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:858)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96)
at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
at com.google.cloud.dataflow.sdk.testing.TestPipeline.run(TestPipeline.java:106)
at com.example.dataflow.TestGroupByKeyCustomCoder.testPipeline(TestGroupByKeyCustomCoder.java:85)
Here is the code:
package com.example.dataflow;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.cloud.dataflow.sdk.coders.CustomCoder;
import com.google.cloud.dataflow.sdk.testing.CoderProperties;
import com.google.cloud.dataflow.sdk.testing.TestPipeline;
import com.google.cloud.dataflow.sdk.transforms.*;
import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
class ParseJson extends DoFn<String, JsonNode> {
private static final long serialVersionUID = 1L;
private transient ObjectMapper om;
{ init(); }
private void init() {
om = new ObjectMapper();
}
private void readObject(java.io.ObjectInputStream in)
throws IOException, ClassNotFoundException {
init();
}
@Override
public void processElement(ProcessContext c) throws Exception {
JsonNode node = om.readTree(c.element());
c.output(node);
}
}
class JsonNodeCoder extends CustomCoder<JsonNode> {
private static final long serialVersionUID = 1L;
private ObjectMapper mapper = new ObjectMapper();
private static final JsonNodeCoder INSTANCE = new JsonNodeCoder();
public static JsonNodeCoder of() {
return INSTANCE;
}
@Override
public void encode(JsonNode value, OutputStream outStream, Context context) throws IOException {
mapper.configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false).writeValue(outStream, value);
}
@Override
public JsonNode decode(InputStream inStream, Context context) throws IOException {
return mapper.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false).readTree(inStream);
}
}
public class TestGroupByKeyCustomCoder {
@Test // original code the produces the error
public void testPipeline() throws IOException {
TestPipeline p = TestPipeline.create();
p.getCoderRegistry().registerCoder(JsonNode.class, JsonNodeCoder.class);
p.apply(Create.of("{}"))
.apply(ParDo.of(new ParseJson()))
.apply(WithKeys.of("foo"))
.apply("GroupByAction", GroupByKey.create());
p.run();
}
// Test as per Kenn Knowles' suggestion
// this throws the same error
@Test
public void testCustomCoder() throws Exception {
ObjectMapper mapper = new ObjectMapper();
JsonNode value = mapper.readTree("{}");
WindowedValue.FullWindowedValueCoder<JsonNode> windowedValueCoder
= WindowedValue.FullWindowedValueCoder
.of(JsonNodeCoder.of(), GlobalWindow.Coder.INSTANCE);
WindowedValue<JsonNode> x = WindowedValue.of(
value, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.ON_TIME_AND_ONLY_FIRING);
CoderProperties.coderDecodeEncodeEqual(windowedValueCoder, x);
}
}
This issue seems to be caused by readTree consuming too much input, consequently swallowing the timestamp that Dataflow is looking for:
@Test
public void testJackson() throws IOException {
ObjectMapper mapper = new ObjectMapper();
ByteArrayInputStream bis = new ByteArrayInputStream("{}1".getBytes());
mapper.readTree(bis);
Assert.assertNotEquals(bis.read(), -1); // assertion fails
}
The stack trace indicates that the end of the file is reached when the big endian long of the timestamp is being parsed.
The encoding used by WindowedValue.FullWindowedValueCoder is your encoded value, followed by the timestamp, followed by the windows and finally the pane metadata. So this in turn implies that the JsonCoder is consuming too many bytes from the input stream (maybe all of them?) so the decoding of the timestamp hits the end of the file.
The SDK provides a lot of utilities for testing coders in CoderProperties. You can actually directly test this case, which is in the global window, by running CoderProperties#coderDecodeEncodeEqual with a coder WindowedValue.FullWindowedValueCoder.of(JsonCoder.of(), new GlobalWindow.Coder()).
There is a flag passed to encode and decode that you'll probably have to be aware of: the Coder.Context.
Coder.Context.OUTER indicates that your coder is the outermost Coder and owns the whole stream. In this case, on encoding you can leverage the EOF signal and leave out metadata such as a length-prefix or parentheses, and on decoding it is fine to consume as much as you like.Coder.Context.NESTED indicates that your Coder is encoding just a part of the value, so it needs to write enough metadata that it can intelligently consume just the bytes from its own encoding.If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With