Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to fix Dataflow unable to serialize my DoFn?

When I run my Dataflow pipeline, I get the exception below complaining that my DoFn can't be serialized. How do I fix this?

Here's the stack trace:

Caused by: java.lang.IllegalArgumentException: unable to serialize contrail.dataflow.AvroMRTransforms$AvroReducerDoFn@bba0fc2
    at com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:51)
    at com.google.cloud.dataflow.sdk.util.SerializableUtils.ensureSerializable(SerializableUtils.java:81)
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.ensureSerializable(DirectPipelineRunner.java:784)
    at com.google.cloud.dataflow.sdk.transforms.ParDo.evaluateHelper(ParDo.java:1025)
    at com.google.cloud.dataflow.sdk.transforms.ParDo.evaluateSingleHelper(ParDo.java:963)
    at com.google.cloud.dataflow.sdk.transforms.ParDo.access$000(ParDo.java:441)
    at com.google.cloud.dataflow.sdk.transforms.ParDo$1.evaluate(ParDo.java:951)
    at com.google.cloud.dataflow.sdk.transforms.ParDo$1.evaluate(ParDo.java:946)
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:611)
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:200)
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:196)
    at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:109)
    at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:204)
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:584)
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:328)
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:70)
    at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:145)
    at contrail.stages.DataflowStage.stageMain(DataflowStage.java:51)
    at contrail.stages.NonMRStage.execute(NonMRStage.java:130)
    at contrail.stages.NonMRStage.run(NonMRStage.java:157)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
    at contrail.stages.ValidateGraphDataflow.main(ValidateGraphDataflow.java:139)
    ... 6 more
Caused by: java.io.NotSerializableException: org.apache.hadoop.mapred.JobConf
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
    at com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:47)
    ... 27 more
like image 633
Jeremy Avatar asked Jan 19 '15 19:01

Jeremy


2 Answers

To add to what Jeremy says...

Another common cause of Serializable issues is when you use an anonymous DoFn within a non-static context. Anonymous inner classes have an implicit pointer to the enclosing class, which will cause it to get serialized as well.

like image 172
Frances Avatar answered Sep 21 '22 14:09

Frances


If you scroll through the stack trace, one of the causes clearly identifies the data that isn't serializable.

Caused by: java.io.NotSerializableException: org.apache.hadoop.mapred.JobConf

The problem was my DoFn was taking a JobConf instance in the constructor and storing it in an instance variable. I was assuming JobConf was serializable but it turns out it isn't.

To solve this I did the following

  • I marked the JobConf member variable as transient so that it wouldn't be serialized.
  • I created a separate variable of type byte[] to store a serialized version of JobConf
  • In my constructor I serialized JobConf to a byte[] and stored it in an instance variable.
  • I overrode startBundle and deserialized the JobConf from the byte[]

Here's a gist with my DoFn.

like image 27
Jeremy Avatar answered Sep 19 '22 14:09

Jeremy