Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to extract Google PubSub publish time in Apache Beam

My goal is to be able to access PubSub message Publish Time as recorded and set by Google PubSub in Apache Beam (Dataflow).

    PCollection<PubsubMessage> pubsubMsg
            = pipeline.apply("Read Messages From PubSub",
            PubsubIO.readMessagesWithAttributes()
                .fromSubscription(pocOptions.getInputSubscription()));

Does not seem to contain one as an attribute. I have tried

 .withTimestampAttribute("publish_time")

No luck either. What am I missing? Is it possible to extract Google PubSub publish time in dataflow?

like image 234
Evgeny Minkevich Avatar asked Mar 27 '19 04:03

Evgeny Minkevich


People also ask

What is runner in Apache beam?

A Beam Runner runs a Beam pipeline on a specific (often distributed) data processing system.

Can Pubsub write to BigQuery?

Pub/Sub service account permissions To create a BigQuery subscription, the Pub/Sub service account must have permission to write to the specific BigQuery table and to read the table metadata.


1 Answers

Java version:

PubsubIO will read the message from Pub/Sub and assign the message publish time to the element as the record timestamp. Therefore, you can access it using ProcessContext.timestamp(). As an example:

p
    .apply("Read Messages", PubsubIO.readStrings().fromSubscription(subscription))
    .apply("Log Publish Time", ParDo.of(new DoFn<String, Void>() {
        @ProcessElement
        public void processElement(ProcessContext c) throws Exception {
            LOG.info("Message: " + c.element());
            LOG.info("Publish time: " + c.timestamp().toString());
            Date date= new Date();
            Long time = date.getTime();
            LOG.info("Processing time: " + new Instant(time).toString());
        }
    }));

I published a message a little bit ahead (to have a significant difference between event and processing time) and output with DirectRunner was:

Mar 27, 2019 11:03:08 AM com.dataflow.samples.LogPublishTime$1 processElement
INFO: Message: I published this message a little bit before
Mar 27, 2019 11:03:08 AM com.dataflow.samples.LogPublishTime$1 processElement
INFO: Publish time: 2019-03-27T09:57:07.005Z
Mar 27, 2019 11:03:08 AM com.dataflow.samples.LogPublishTime$1 processElement
INFO: Processing time: 2019-03-27T10:03:08.229Z

Minimal code here


Python version:

Now the timestamp can be accessed through DoFn.TimestampParam of the process method (docs):

class GetTimestampFn(beam.DoFn):
  """Prints element timestamp"""
  def process(self, element, timestamp=beam.DoFn.TimestampParam):
    timestamp_utc = datetime.datetime.utcfromtimestamp(float(timestamp))
    logging.info(">>> Element timestamp: %s", timestamp_utc.strftime("%Y-%m-%d %H:%M:%S"))
    yield element

Note: date parsing thanks to this answer.

Output:

INFO:root:>>> Element timestamp: 2019-08-12 20:16:53

Full code

like image 59
Guillem Xercavins Avatar answered Sep 27 '22 03:09

Guillem Xercavins