Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Issues with Dynamic Destinations in Dataflow

I have a Dataflow job that reads data from pubsub and based on the time and filename writes the contents to GCS where the folder path is based on the YYYY/MM/DD. This allows files to be generated in folders based on date and uses apache beam's FileIO and Dynamic Destinations.

About two weeks ago, I noticed an unusual buildup of unacknowledged messages. Upon restarting the df job the errors disappeared and new files were being written in GCS.

After a couple of days, writing stopped again, except this time, there were errors claiming that processing was stuck. After some trusty SO research, I found out that this was likely caused by a deadlock issue in pre 2.90 Beam because it used the Conscrypt library as the default security provider. So, I upgraded to Beam 2.11 from Beam 2.8.

Once again, it worked, until it didn't. I looked more closely at the error and noticed that it had a problem with a SimpleDateFormat object, which isn't thread-safe. So, I switched to use Java.time and DateTimeFormatter, which is thread-safe. It worked until it didn't. However, this time, the error was slightly different and didn't point to anything in my code: The error is provided below.

Processing stuck in step FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles for at least 05m00s without outputting or completing in state process
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
  at org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:469)
  at org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:76)
  at org.apache.beam.runners.dataflow.worker.MetricTrackingWindmillServerStub.getStateData(MetricTrackingWindmillServerStub.java:202)
  at org.apache.beam.runners.dataflow.worker.WindmillStateReader.startBatchAndBlock(WindmillStateReader.java:409)
  at org.apache.beam.runners.dataflow.worker.WindmillStateReader$WrappedFuture.get(WindmillStateReader.java:311)
  at org.apache.beam.runners.dataflow.worker.WindmillStateReader$BagPagingIterable$1.computeNext(WindmillStateReader.java:700)
  at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
  at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
  at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.MultitransformedIterator.hasNext(MultitransformedIterator.java:47)
  at org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:701)
  at org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown Source)

This error started occurring approximately 5 hours after job deployment and at an increasing rate over time. Writing slowed significantly within 24 hours. I have 60 workers and I suspect that one worker fails every time there is an error, which eventually kills the job.

In my writer, I parse the lines for certain keywords (may not be the best way) in order to determine which folder it belongs in. I then proceed to insert the file to GCS with the determined filename. Here is the code I use for my writer:

The partition function is provided as the following:

@SuppressWarnings("serial")
public static class datePartition implements SerializableFunction<String, String> {     
    private String filename;

    public datePartition(String filename) {
        this.filename = filename;
    }

    @Override
    public String apply(String input) {

        String folder_name = "NaN";             
        String date_dtf    = "NaN";     
        String date_literal = "NaN";
        try {
            Matcher foldernames = Pattern.compile("\"foldername\":\"(.*?)\"").matcher(input);
            if(foldernames.find()) {
                folder_name = foldernames.group(1);
            }
            else {
                Matcher folderid = Pattern.compile("\"folderid\":\"(.*?)\"").matcher(input);
                if(folderid.find()) {
                    folder_name = folderid.group(1);
                }   
            }

            Matcher date_long = Pattern.compile("\"timestamp\":\"(.*?)\"").matcher(input);
            if(date_long.find()) {
                date_literal = date_long.group(1);
                if(Utilities.isNumeric(date_literal)) {
                    LocalDateTime date = LocalDateTime.ofInstant(Instant.ofEpochMilli(Long.valueOf(date_literal)), ZoneId.systemDefault());
                    date_dtf = date.format(dtf);                        
                }
                else {
                    date_dtf = date_literal.split(":")[0].replace("-", "/").replace("T", "/");
                }
            }
            return folder_name + "/" + date_dtf + "h/" + filename;
        }

        catch(Exception e) {
            LOG.error("ERROR with either foldername or date");
            LOG.error("Line : " + input);
            LOG.error("folder : " + folder_name);
            LOG.error("Date : " + date_dtf);

            return folder_name + "/" + date_dtf + "h/" + filename;
        }           
    }
}

And the actual place where the pipeline is deployed and run can be found below:

public void streamData() {

    Pipeline pipeline = Pipeline.create(options);
    pipeline.apply("Read PubSub Events", PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()))
            .apply(options.getWindowDuration() + " Window",
                        Window.<PubsubMessage>into(FixedWindows.of(parseDuration(options.getWindowDuration())))
                                  .triggering(AfterWatermark.pastEndOfWindow()) 
                                  .discardingFiredPanes()
                                  .withAllowedLateness(parseDuration("24h")))
                .apply(new GenericFunctions.extractMsg())
                .apply(FileIO.<String, String>writeDynamic()
                                 .by(new datePartition(options.getOutputFilenamePrefix()))
                                 .via(TextIO.sink())
                                 .withNumShards(options.getNumShards())
                                 .to(options.getOutputDirectory())
                                 .withNaming(type -> FileIO.Write.defaultNaming(type, ".txt"))
                                 .withDestinationCoder(StringUtf8Coder.of()));

    pipeline.run();
}
like image 796
Scicrazed Avatar asked Apr 18 '19 14:04

Scicrazed


1 Answers

The error 'Processing stuck ...' indicates that some particular operation took longer than 5m, not that the job is permanently stuck. However, since the step FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles is the one that is stuck and the job gets cancelled/killed, I would think on an issue while the job is writing temp files.

I found out the BEAM-7689 issue which is related to a second-granularity timestamp (yyyy-MM-dd_HH-mm-ss) that is used to write temporary files. This happens because several concurrent jobs can share the same temporary directory and this can cause that one of the jobs deletes it before the other(s) job finish(es).

According to the previous link, to mitigate the issue please upgrade to SDK 2.14. And let us know if the error is gone.

like image 134
rsantiago Avatar answered Nov 12 '22 18:11

rsantiago