Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I use joda.time in flink (or how do I use typeutils.runtime.kryo)

In a flink project I use a case class click.

case class click( date: LocalDateTime, stbId:String, channelId :Int)

This class populated DataSets and it worked fine with the date being a java 8 java.time.LocalDateTime. After switching to org.joda (version2.9) in a java 7 environment calls to the click Objects in the DataSets did not perform as previously. Access to certain functions of the date field of a click Object threw NullPointerExceptions. Example of those functions are getHourOfDay toString, etc. I was able to assure that the date field of the click class was not null. I suspect that the joda time library does not interact well with kryo serialization. See joda DateTime format cause null pointer error in spark RDD functions or NPE in spark with Joda DateTime In the Flink API there is org.apache.flink.api.java.typeutils.runtime.kryo.Serializers with the static method registerJodaTime. That seems to be relevant. I simple mindedly tried

import  org.apache.flink.api.common._
import org.apache.flink.api.java.typeutils.runtime.kryo._
Serializers.registerJodaTime(new ExecutionConfig)

That did not suffice. Am I right in this? How do I use java.typeutils.runtime.kryo?

Version used Flink: 0.9.1. scala : 2.10 joda.time 2.9

Follow up: Here is the exact added code as proposed (thanks to Fabian and Robert)

val env = ExecutionEnvironment.getExecutionEnvironment
//import  org.apache.flink.api.common._
import org.apache.flink.api.java.typeutils.runtime.kryo._
Serializers.registerJodaTime(env.getConfig)

In the log files of an embedded execution I could find the following relevant parts:

16:44:53,998 INFO  org.apache.flink.api.java.ExecutionEnvironment                - The job has 2 registered types and 0 default Kryo serializers
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo types: 
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo with Serializers types: 
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo with Serializer Classes types: Entry{k=class org.joda.time.DateTime, v=class de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer},Entry{k=class org.joda.time.Interval, v=class de.javakaffee.kryoserializers.jodatime.JodaIntervalSerializer}
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo default Serializers: 
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo default Serializers Classes 
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered POJO types: 
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Static code analysis mode: DISABLE
16:44:54,545 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
16:44:54,560 DEBUG akka.event.EventStream                                        - logger log1-Slf4jLogger started
....
16:44:57,103 DEBUG org.apache.flink.api.java.typeutils.TypeExtractor             - class org.joda.time.LocalDateTime does not contain a getter for field iLocalMillis
16:44:57,103 DEBUG org.apache.flink.api.java.typeutils.TypeExtractor             - class org.joda.time.LocalDateTime does not contain a setter for field iLocalMillis
16:44:57,103 INFO  org.apache.flink.api.java.typeutils.TypeExtractor                 - class org.joda.time.LocalDateTime is not a valid POJO type
16:44:57,275 DEBUG org.apache.flink.api.scala.ClosureCleaner$                        - accessedFields: Map()
16:44:57,369 INFO  org.apache.flink.api.java.ExecutionEnvironment                - The job has 2 registered types and 0 default Kryo serializers
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo types: 
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo with Serializers types: 
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo with Serializer Classes types: Entry{k=class org.joda.time.DateTime, v=class de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer},Entry{k=class org.joda.time.Interval, v=class de.javakaffee.kryoserializers.jodatime.JodaIntervalSerializer}
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo default Serializers: 
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo default Serializers Classes 
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered POJO types: 
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Static code analysis mode: DISABLE

Nevertheless I witnessed the following

Exception in thread "main" java.lang.NullPointerException
    at org.joda.time.LocalDateTime.isSupported(LocalDateTime.java:625)
    at org.joda.time.format.DateTimeFormatterBuilder$PaddedNumber.printTo(DateTimeFormatterBuilder.java:1435)
    at org.joda.time.format.DateTimeFormatterBuilder$Composite.printTo(DateTimeFormatterBuilder.java:2474)
    at org.joda.time.format.DateTimeFormatter.printTo(DateTimeFormatter.java:655)
    at org.joda.time.format.DateTimeFormatter.print(DateTimeFormatter.java:709)
    at org.joda.time.LocalDateTime.toString(LocalDateTime.java:2087)
    at java.lang.String.valueOf(Unknown Source)
    at scala.runtime.StringAdd$.$plus$extension(StringAdd.scala:13)
    at myflink.click.toString(Ingestor.scala:20)
    ...
like image 842
Spyros Komninos Avatar asked Mar 15 '23 04:03

Spyros Komninos


1 Answers

Flink is using Kryo for types it can not serialize. LocalDateTime is such a class.

Sadly, Kryo is also not able to properly serialize it, so we have to tell Kryo how to do it by giving it a specialized serializer for this class.

  1. Add de.javakaffee:kryo-serializers as a dependency:
<dependency>
    <groupId>de.javakaffee</groupId>
    <artifactId>kryo-serializers</artifactId>
    <version>0.30</version>
</dependency>

(please note that adding this dependency might cause problems using Flink on a cluster. Please let me know)

  1. Register the new serializer with the ExecutionEnvironment:
val env = ExecutionEnvironment.getExecutionEnvironment
env.registerTypeWithKryoSerializer(classOf[LocalDateTime], classOf[JodaLocalDateTimeSerializer])

I hope that helps (I'm keeping the old answer as a reference)


Some general remarks for debugging Kryo/Serializer issues in Flink:

When executing the job locally (should also work in the ./bin/flink frontend, but then the output is probably in the log/ directory), you should see something like:

14:05:52,863 INFO  org.apache.flink.api.java.ExecutionEnvironment                - The job has 15 registered types and 2 default Kryo serializers 
14:05:52,943 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster         - Starting FlinkMiniCluster. 
14:05:53,150 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started

With the number of registered types and Kryo serializers higher than 0.

With DEBUG log level (replace INFO with DEBUG in the log4j.properties) you can actually get even more detailed information about the registered serializers:

14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo types: 
14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo with Serializers types: 
14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo with Serializer Classes types: 
14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo default Serializers: 
14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo default Serializers Classes 
14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered POJO types: 
like image 167
Robert Metzger Avatar answered Apr 20 '23 10:04

Robert Metzger