Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Beam KinesisIO Java - Consume the data in a kinesis stream from where it left

First I want to say that am totally new to Beam world. I'm working on an Apache Beam focused task and my main data source is a Kinesis stream. In there, when I consuming the streaming data, I noticed that the same set of data is coming when I restart the program(my consumer application). This is my code,

    String awsStreamName = KinesisStream.getProperty("stream.name");
    String awsAccessKey = KinesisStream.getProperty("access.key");
    String awsSecretKey = KinesisStream.getProperty("secret.key");
    String awsRegion = KinesisStream.getProperty("aws.region");
    Regions region = Regions.fromName(awsRegion);

    return KinesisIO.read()
            .withStreamName(awsStreamName)
            .withInitialPositionInStream(InitialPositionInStream.LATEST)
            .withAWSClientsProvider(awsAccessKey, awsSecretKey, region);

Simply what I want is, I need to start reading the data where I left reading. Really appreciate if someone can provide some resources as well.

Also I found a similar question but it did not help me - Apache Beam KinesisIO Java processing pipeline - application state, error handling & fault-tolerance?

like image 681
Prasad Avatar asked Oct 18 '25 06:10

Prasad


1 Answers

UnboundedSources in Beam such as KinesisIO.read() support checkpointing using CheckpointMarks to resume from the latest checkpoint once restarting your application.

These checkpoints have to be persisted to durable storage. However, the specifics how this is done depend on the Beam runner your are using, e.g. Dataflow, Apache Flink or Apache Spark.

I recommend reading the documentation of your respective runtime on checkpointing and check the pipeline options of the corresponding Beam runner.

For example, in the case of Apache Flink you will have to enable checkpointing via checkpointingInterval (FlinkPipelineOptions) and additionally configure checkpointing in Flink.

like image 53
Moritz Avatar answered Oct 19 '25 20:10

Moritz