Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Flink 1.0.0 . Event Time related migration problems

I have tried to migrate some simple Task to Flink 1.0.0 version, but it fails with the following exception:

java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?

The code consists of two separated tasks connected via Kafka topic, where one task is simple messages generator and the other task is simple messages consumer which uses timeWindowAll to calculate the minutely messages arriving rate.

Again, the similar code worked with 0.10.2 version without any problems, but now it looks like the system wrongly interprets some event timestamps like Long.MIN_VALUE which causes task failure.

The question is do I something wrong or it is some bug which will be fixed in future releases?

The main Task:

public class Test1_0_0 {
    // Max Time lag between events time to current System time
    static final Time maxTimeLag = Time.of(3, TimeUnit.SECONDS);

    public static void main(String[] args) throws Exception {
        // set up the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment
                .getExecutionEnvironment();
        // Setting Event Time usage
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setBufferTimeout(1);
        // Properties initialization
        Properties properties = new Properties();

        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("group.id", "test");

        // Overwrites the default properties by one provided by command line
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        for(Map.Entry<String, String> e: parameterTool.toMap().entrySet()) {
            properties.setProperty(e.getKey(),e.getValue());
        }
        //properties.setProperty("auto.offset.reset", "smallest");
        System.out.println("Properties: " + properties);
        DataStream<Message> stream = env
        .addSource(new FlinkKafkaConsumer09<Message>("test", new MessageSDSchema(), properties)).filter(message -> message != null);
        // The call to rebalance() causes data to be re-partitioned so that all machines receive messages
        // (for example, when the number of Kafka partitions is fewer than the number of Flink parallel instances).
        stream.rebalance()
        .assignTimestampsAndWatermarks(new MessageTimestampExtractor(maxTimeLag));
        // Counts messages
        stream.timeWindowAll(Time.minutes(1)).apply(new AllWindowFunction<Message, String, TimeWindow>() {
            @Override
            public void apply(TimeWindow timeWindow, Iterable<Message> values, Collector<String> collector) throws Exception {
                Integer count = 0;
                if (values.iterator().hasNext()) {
                    for (Message value : values) {
                        count++;
                    }
                    collector.collect("Arrived last minute: " + count);
                }
            }
        }).print();
        // execute program
        env.execute("Messages Counting");
    }
 }

The timestamp extractor:

public class MessageTimestampExtractor implements AssignerWithPeriodicWatermarks<Message>, Serializable {

    private static final long serialVersionUID = 7526472295622776147L;
    // Maximum lag between the current processing time and the timestamp of an event
    long maxTimeLag = 0L;
    long currentWatermarkTimestamp = 0L;

    public MessageTimestampExtractor() {
    }

    public MessageTimestampExtractor(Time maxTimeLag) {
        this.maxTimeLag = maxTimeLag.toMilliseconds();
    }


    /**
     * Assigns a timestamp to an element, in milliseconds since the Epoch.
     *
     * <p>The method is passed the previously assigned timestamp of the element.
     * That previous timestamp may have been assigned from a previous assigner,
     * by ingestion time. If the element did not carry a timestamp before, this value is
     * {@code Long.MIN_VALUE}.
     *
     * @param message The element that the timestamp is wil be assigned to.
     * @param previousElementTimestamp The previous internal timestamp of the element,
     *                                 or a negative value, if no timestamp has been assigned, yet.
     * @return The new timestamp.
     */
    @Override
    public long extractTimestamp(Message message, long previousElementTimestamp) {
        long timestamp = message.getTimestamp();
        currentWatermarkTimestamp = Math.max(timestamp, currentWatermarkTimestamp);
        return timestamp;
    }


    /**
     * Returns the current watermark. This method is periodically called by the
     * system to retrieve the current watermark. The method may return null to
     * indicate that no new Watermark is available.
     *
     * <p>The returned watermark will be emitted only if it is non-null and larger
     * than the previously emitted watermark. If the current watermark is still
     * identical to the previous one, no progress in event time has happened since
     * the previous call to this method.
     *
     * <p>If this method returns a value that is smaller than the previously returned watermark,
     * then the implementation does not properly handle the event stream timestamps.
     * In that case, the returned watermark will not be emitted (to preserve the contract of
     * ascending watermarks), and the violation will be logged and registered in the metrics.
     *
     * <p>The interval in which this method is called and Watermarks are generated
     * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
     *
     * @see org.apache.flink.streaming.api.watermark.Watermark
     * @see ExecutionConfig#getAutoWatermarkInterval()
     */
    @Override
    public Watermark getCurrentWatermark() {
        if(currentWatermarkTimestamp <= 0) {
            return new Watermark(Long.MIN_VALUE);
        }
        return new Watermark(currentWatermarkTimestamp - maxTimeLag);
    }

    public long getMaxTimeLag() {
        return maxTimeLag;
    }

    public void setMaxTimeLag(long maxTimeLag) {
        this.maxTimeLag = maxTimeLag;
    }
}
like image 869
Alex Volmir Avatar asked Oct 19 '22 13:10

Alex Volmir


1 Answers

The problem is that calling assignTimestampsAndWatermarks returns a new DataStream which uses the timestamp extractor. Thus, you have to use the returned DataStream to perform the subsequent operations on it.

DataStream<Message> timestampStream = stream.rebalance()
        .assignTimestampsAndWatermarks(new MessageTimestampExtractor(maxTimeLag));
// Counts Strings
timestampStream.timeWindowAll(Time.minutes(1)).apply(new AllWindowFunction<Message, String, TimeWindow>() {
    @Override
    public void apply(TimeWindow timeWindow, Iterable<Message> values, Collector<String> collector) throws Exception {
        Integer count = 0;
        if (values.iterator().hasNext()) {
            for (Message value : values) {
                count++;
            }
            collector.collect("Arrived last minute: " + count);
        }
    }
}).print();
like image 58
Till Rohrmann Avatar answered Jan 04 '23 07:01

Till Rohrmann