I'm trying to read from pub/sub with the following code
Read<String> pubsub = PubsubIO.<String>read().topic("projects/<projectId>/topics/<topic>").subscription("projects/<projectId>/subscriptions/<subscription>").withCoder(StringUtf8Coder.of()).withAttributes(new SimpleFunction<PubsubMessage,String>() {
@Override
public String apply(PubsubMessage input) {
LOG.info("hola " + input.getAttributeMap());
return new String(input.getMessage());
}
});
PCollection<String> pps = p.apply(pubsub)
.apply(
Window.<String>into(
FixedWindows.of(Duration.standardSeconds(15))));
pps.apply("printdata",ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
LOG.info("hola amigo "+c.element());
c.output(c.element());
}
}));
Compared to what I receive on NodeJS, I get the message that would be contained in the data
field. How can I get the ackId
field (which I can later use to acknowledge the message)? The attribute map that I'm printing is null
.
Is there some other way to acknowledge all messages without having to figure out the ackId?
If a message is sent out for delivery and a subscriber is yet to acknowledge it, the message is called outstanding. Pub/Sub repeatedly attempts to deliver any message that is not yet acknowledged. However, Pub/Sub tries not to deliver an outstanding message to any other subscriber on the same subscription.
In the console, go to the Pub/Sub subscriptions page. Click the subscription ID. Click the Messages tab. Click Pull.
The PubsubIO
reader is responsible for acknowledging messages. It is tied to the checkpointing behavior of the runner. Specifically, the source will only acknowledge messages when the resulting elements have been checkpointed.
In this case you should look at when the Flink runner checkpoints information the state of that source. I believe this is related to the Flink configuration for checkpoint frequency.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With