First I want to say that am totally new to Beam world. I'm working on an Apache Beam focused task and my main data source is a Kinesis stream. In there, when I consuming the streaming data, I noticed that the same set of data is coming when I restart the program(my consumer application). This is my code,
String awsStreamName = KinesisStream.getProperty("stream.name");
String awsAccessKey = KinesisStream.getProperty("access.key");
String awsSecretKey = KinesisStream.getProperty("secret.key");
String awsRegion = KinesisStream.getProperty("aws.region");
Regions region = Regions.fromName(awsRegion);
return KinesisIO.read()
.withStreamName(awsStreamName)
.withInitialPositionInStream(InitialPositionInStream.LATEST)
.withAWSClientsProvider(awsAccessKey, awsSecretKey, region);
Simply what I want is, I need to start reading the data where I left reading. Really appreciate if someone can provide some resources as well.
Also I found a similar question but it did not help me - Apache Beam KinesisIO Java processing pipeline - application state, error handling & fault-tolerance?
UnboundedSource
s in Beam such as KinesisIO.read()
support checkpointing using CheckpointMark
s to resume from the latest checkpoint once restarting your application.
These checkpoints have to be persisted to durable storage. However, the specifics how this is done depend on the Beam runner your are using, e.g. Dataflow, Apache Flink or Apache Spark.
I recommend reading the documentation of your respective runtime on checkpointing and check the pipeline options of the corresponding Beam runner.
For example, in the case of Apache Flink you will have to enable checkpointing via checkpointingInterval
(FlinkPipelineOptions) and additionally configure checkpointing in Flink.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With