Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

CoderException: java.io.EOFException when performing GroupByKey on Json values encoded with a CustomCoder using Jackson

Why do I get this EOFException when I execute the following code?

I have successfully used GroupByKey in simpler situations I think what seems to trigger the error is using a custom coder (for Json objects). Can anyone explain why this is happening?

Here's the error:

com.google.cloud.dataflow.sdk.Pipeline$PipelineExecutionException: com.google.cloud.dataflow.sdk.coders.CoderException: java.io.EOFException

    at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:186)
    at com.google.cloud.dataflow.sdk.testing.TestPipeline.run(TestPipeline.java:106)
    at com.example.dataflow.TestGroupByKeyCustomCoder.testPipeline(TestGroupByKeyCustomCoder.java:85)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:119)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:234)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:74)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: com.google.cloud.dataflow.sdk.coders.CoderException: java.io.EOFException
    at com.google.cloud.dataflow.sdk.coders.BigEndianLongCoder.decode(BigEndianLongCoder.java:62)
    at com.google.cloud.dataflow.sdk.coders.InstantCoder.decode(InstantCoder.java:83)
    at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:621)
    at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:553)
    at com.google.cloud.dataflow.sdk.coders.KvCoder.decode(KvCoder.java:98)
    at com.google.cloud.dataflow.sdk.coders.KvCoder.decode(KvCoder.java:42)
    at com.google.cloud.dataflow.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:157)
    at com.google.cloud.dataflow.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:140)
    at com.google.cloud.dataflow.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:134)
    at com.google.cloud.dataflow.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:107)
    at com.google.cloud.dataflow.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:44)
Caused by: java.io.EOFException
    at java.io.DataInputStream.readFully(DataInputStream.java:197)
    at java.io.DataInputStream.readLong(DataInputStream.java:416)
    at com.google.cloud.dataflow.sdk.coders.BigEndianLongCoder.decode(BigEndianLongCoder.java:58)
    at com.google.cloud.dataflow.sdk.coders.InstantCoder.decode(InstantCoder.java:83)
    at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:621)
    at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:553)
    at com.google.cloud.dataflow.sdk.coders.KvCoder.decode(KvCoder.java:98)
    at com.google.cloud.dataflow.sdk.coders.KvCoder.decode(KvCoder.java:42)
    at com.google.cloud.dataflow.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:157)
    at com.google.cloud.dataflow.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:140)
    at com.google.cloud.dataflow.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:134)
    at com.google.cloud.dataflow.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:107)
    at com.google.cloud.dataflow.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:44)
    at com.google.cloud.dataflow.sdk.transforms.ParDo$ImmutabilityCheckingOutputManager.output(ParDo.java:1303)
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:287)
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext.output(DoFnRunnerBase.java:449)
    at com.google.cloud.dataflow.sdk.util.ReifyTimestampAndWindowsDoFn.processElement(ReifyTimestampAndWindowsDoFn.java:38)
    at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:49)
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:138)
    at com.google.cloud.dataflow.sdk.transforms.ParDo.evaluateHelper(ParDo.java:1229)
    at com.google.cloud.dataflow.sdk.transforms.ParDo.evaluateSingleHelper(ParDo.java:1098)
    at com.google.cloud.dataflow.sdk.transforms.ParDo.access$300(ParDo.java:457)
    at com.google.cloud.dataflow.sdk.transforms.ParDo$1.evaluate(ParDo.java:1084)
    at com.google.cloud.dataflow.sdk.transforms.ParDo$1.evaluate(ParDo.java:1079)
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:858)
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
    at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
    at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259)
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814)
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526)
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96)
    at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
    at com.google.cloud.dataflow.sdk.testing.TestPipeline.run(TestPipeline.java:106)
    at com.example.dataflow.TestGroupByKeyCustomCoder.testPipeline(TestGroupByKeyCustomCoder.java:85)

Here is the code:

package com.example.dataflow;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.cloud.dataflow.sdk.coders.CustomCoder;
import com.google.cloud.dataflow.sdk.testing.CoderProperties;
import com.google.cloud.dataflow.sdk.testing.TestPipeline;
import com.google.cloud.dataflow.sdk.transforms.*;
import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Test;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;


class ParseJson extends DoFn<String, JsonNode> {

    private static final long serialVersionUID = 1L;
    private transient ObjectMapper om;

    { init(); }

    private void init() {
        om = new ObjectMapper();
    }

    private void readObject(java.io.ObjectInputStream in)
            throws IOException, ClassNotFoundException {
        init();
    }

    @Override
    public void processElement(ProcessContext c) throws Exception {
        JsonNode node = om.readTree(c.element());
        c.output(node);
    }
}

class JsonNodeCoder extends CustomCoder<JsonNode> {

    private static final long serialVersionUID = 1L;

    private ObjectMapper mapper = new ObjectMapper();

    private static final JsonNodeCoder INSTANCE = new JsonNodeCoder();

    public static JsonNodeCoder of() {
        return INSTANCE;
    }

    @Override
    public void encode(JsonNode value, OutputStream outStream, Context context) throws IOException {
        mapper.configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false).writeValue(outStream, value);
    }

    @Override
    public JsonNode decode(InputStream inStream, Context context) throws IOException {
        return mapper.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false).readTree(inStream);
    }
}

public class TestGroupByKeyCustomCoder {

    @Test // original code the produces the error
    public void testPipeline() throws IOException {

        TestPipeline p = TestPipeline.create();

        p.getCoderRegistry().registerCoder(JsonNode.class, JsonNodeCoder.class);

        p.apply(Create.of("{}"))
                .apply(ParDo.of(new ParseJson()))
                .apply(WithKeys.of("foo"))
                .apply("GroupByAction", GroupByKey.create());

        p.run();
    }

    // Test as per Kenn Knowles' suggestion
    // this throws the same error
    @Test
    public void testCustomCoder() throws Exception {
        ObjectMapper mapper = new ObjectMapper();
        JsonNode value = mapper.readTree("{}");

        WindowedValue.FullWindowedValueCoder<JsonNode> windowedValueCoder
                = WindowedValue.FullWindowedValueCoder
                    .of(JsonNodeCoder.of(), GlobalWindow.Coder.INSTANCE);

        WindowedValue<JsonNode> x = WindowedValue.of(
                value, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.ON_TIME_AND_ONLY_FIRING);
        CoderProperties.coderDecodeEncodeEqual(windowedValueCoder, x);
    }
}

This issue seems to be caused by readTree consuming too much input, consequently swallowing the timestamp that Dataflow is looking for:

@Test
public void testJackson() throws IOException {
    ObjectMapper mapper = new ObjectMapper();
    ByteArrayInputStream bis = new ByteArrayInputStream("{}1".getBytes());
    mapper.readTree(bis);
    Assert.assertNotEquals(bis.read(), -1); // assertion fails
}
like image 700
Frank Wilson Avatar asked May 18 '26 01:05

Frank Wilson


1 Answers

The stack trace indicates that the end of the file is reached when the big endian long of the timestamp is being parsed.

The encoding used by WindowedValue.FullWindowedValueCoder is your encoded value, followed by the timestamp, followed by the windows and finally the pane metadata. So this in turn implies that the JsonCoder is consuming too many bytes from the input stream (maybe all of them?) so the decoding of the timestamp hits the end of the file.

The SDK provides a lot of utilities for testing coders in CoderProperties. You can actually directly test this case, which is in the global window, by running CoderProperties#coderDecodeEncodeEqual with a coder WindowedValue.FullWindowedValueCoder.of(JsonCoder.of(), new GlobalWindow.Coder()).

There is a flag passed to encode and decode that you'll probably have to be aware of: the Coder.Context.

  • Coder.Context.OUTER indicates that your coder is the outermost Coder and owns the whole stream. In this case, on encoding you can leverage the EOF signal and leave out metadata such as a length-prefix or parentheses, and on decoding it is fine to consume as much as you like.
  • Coder.Context.NESTED indicates that your Coder is encoding just a part of the value, so it needs to write enough metadata that it can intelligently consume just the bytes from its own encoding.
like image 55
Kenn Knowles Avatar answered May 20 '26 13:05

Kenn Knowles