Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Acknowledge Google Pub/Sub message on Apache Beam

I'm trying to read from pub/sub with the following code

Read<String> pubsub = PubsubIO.<String>read().topic("projects/<projectId>/topics/<topic>").subscription("projects/<projectId>/subscriptions/<subscription>").withCoder(StringUtf8Coder.of()).withAttributes(new SimpleFunction<PubsubMessage,String>() {
    @Override
    public String apply(PubsubMessage input) {
        LOG.info("hola " + input.getAttributeMap());
        return new String(input.getMessage());
    }
});
PCollection<String> pps = p.apply(pubsub)
        .apply(
                Window.<String>into(
                    FixedWindows.of(Duration.standardSeconds(15))));
pps.apply("printdata",ParDo.of(new DoFn<String, String>() {
    @ProcessElement
    public void processElement(ProcessContext c) {
        LOG.info("hola amigo "+c.element());
        c.output(c.element());
    }
  }));

Compared to what I receive on NodeJS, I get the message that would be contained in the data field. How can I get the ackId field (which I can later use to acknowledge the message)? The attribute map that I'm printing is null. Is there some other way to acknowledge all messages without having to figure out the ackId?

like image 238
njLT Avatar asked May 16 '17 13:05

njLT


People also ask

What is Pubsub Acknowledgement?

If a message is sent out for delivery and a subscriber is yet to acknowledge it, the message is called outstanding. Pub/Sub repeatedly attempts to deliver any message that is not yet acknowledged. However, Pub/Sub tries not to deliver an outstanding message to any other subscriber on the same subscription.

How do I read a Pub/Sub message?

In the console, go to the Pub/Sub subscriptions page. Click the subscription ID. Click the Messages tab. Click Pull.


1 Answers

The PubsubIO reader is responsible for acknowledging messages. It is tied to the checkpointing behavior of the runner. Specifically, the source will only acknowledge messages when the resulting elements have been checkpointed.

In this case you should look at when the Flink runner checkpoints information the state of that source. I believe this is related to the Flink configuration for checkpoint frequency.

like image 178
Ben Chambers Avatar answered Oct 20 '22 00:10

Ben Chambers