Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Watching for new files matching a filepattern in Apache Beam

I have a directory on GCS or another supported filesystem to which new files are being written by an external process.

I would like to write an Apache Beam streaming pipeline that continuously watches this directory for new files and reads and processes each new file as it arrives. Is this possible?

like image 305
jkff Avatar asked Dec 19 '17 23:12

jkff


2 Answers

This is possible starting with Apache Beam 2.2.0. Several APIs support this use case:

If you're using TextIO or AvroIO, they support this explicitly via TextIO.read().watchForNewFiles() and the same on readAll(), for example:

PCollection<String> lines = p.apply(TextIO.read()
    .from("gs://path/to/files/*")
    .watchForNewFiles(
        // Check for new files every 30 seconds
        Duration.standardSeconds(30),
        // Never stop checking for new files
        Watch.Growth.<String>never()));

If you're using a different file format, you may use FileIO.match().continuously() and FileIO.matchAll().continuously() which support the same API, in combination with FileIO.readMatches().

The APIs support specifying how often to check for new files, and when to stop checking (supported conditions are e.g. "if no new output appears within a given time", "after observing N outputs", "after a given time since starting to check" and their combinations).

Note that right now this feature currently works only in the Direct runner and the Dataflow runner, and only in the Java SDK. In general, it will work in any runner that supports Splittable DoFn (see capability matrix).

like image 146
jkff Avatar answered Jan 01 '23 10:01

jkff


To add to Eugene's excellent answer as well as the watchfornewfiles options there are a couple of other choices;

There are several options available to solve this requirement dependent on your latency requirements. As of SDK 2.9.0:

Option 1: Continuous read mode:

Java: FileIO , TextIO and several other IO sources support continuous reading of the source for new files.

FileIO class supports the ability to watch a single file pattern continuously. This example matches a single filepattern repeatedly every 30 seconds, continuously returns new matched files as an unbounded PCollection and stops if no new files appear for 1 hour.

 PCollection<Metadata> matches = p.apply(FileIO.match()
     .filepattern("...")
     .continuously(
       Duration.standardSeconds(30), afterTimeSinceNewOutput(Duration.standardHours(1))));

TextIO class supports streaming new file matching using the watchForNewFiles property.

PCollection<String> lines = p.apply(TextIO.read()
     .from("/local/path/to/files/*")
     .watchForNewFiles(
       // Check for new files every minute
       Duration.standardMinutes(1),
       // Stop watching the filepattern if no new files appear within an hour
       afterTimeSinceNewOutput(Duration.standardHours(1))));

It is important to note that the file list is not retained across restarts of the pipeline. To deal with that scenario, you can move the files either through a process downstream of the pipeline or as part of the pipeline itself. Another option would be to store processed file names in an external file and de-dupe the lists at the next transform.

Python: The continuously option is not available as of SDK 2.9.0 for python.

Option 2: Stream processing triggered from external source You can have a Beam pipeline running in stream mode, which has an unbounded source, for example PubSub. When new files arrive you can use an external to Beam process to detect the file arrival and then send a PubSub message which has a URI as payload to the file. In a DoFn which is preceded by the PubSub source you can then use that URI to process the file.

Java : Use an Unbounded Source IO ( PubSubIO, KafakIO, etc...)

Python: Use an UnBounded Source IO ( PubSubIO, etc...)

Option 3: Batch mode processing triggered from external source This approach, introduces latency over Option 1 & 2 as the pipeline needs to startup before processing can begin. Here you can have a triggering event from your source file system to schedule or immediately start a Dataflow process. This option is best suited for low frequency large file size updates.

like image 31
Reza Rokni Avatar answered Jan 01 '23 10:01

Reza Rokni