Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

In Flink, how to write DataStream to single file?

The writeAsText or writeAsCsv methods of a DataStream write as many files as worker threads. As far as I could see, the methods only let you specify the path to these files and some formatting.

For debugging and testing purposes, it would be really useful to be able to print everything to a single file, without having to change the set up to having a single worker thread.

Is there any non-overly-complicated way to achieve this? I suspect it should be possible implementing a custom SinkFunction, but not sure about that one (besides, it also feels like a hassle for something that seems relatively simple).

like image 784
houcros Avatar asked Aug 16 '16 14:08

houcros


People also ask

How do I combine two streams in Flink?

Join in Action Now run the flink application and also tail the log to see the output. Enter messages in both of these two netcat windows within a window of 30 seconds to join both the streams. The resultant data stream has complete information of an individual-: the id, name, department, and salary.

How do I create a DataStream in Flink?

You can create an initial DataStream by adding a source in a Flink program. Then you can derive new streams from this and combine them by using API methods such as map , filter , and so on.

What is Sink Flink?

This connector provides a Sink that writes partitioned files to filesystems supported by the Flink FileSystem abstraction. The streaming file sink writes incoming data into buckets. Given that the incoming streams can be unbounded, data in each bucket are organized into part files of finite size.

What is keyed stream in Flink?

Using keyed streams - Flink Tutorial Flink users are hashing algorithms to divide the stream by partitions based on the number of slots allocated to the job. It then distributes the same keys to the same slots. Partitioning by key is ideal for aggregation operations that aggregate on a specific key.


2 Answers

You can achieve this by setting the parallelism to 1. This way, the writing happens only on one machine.

writeAsText(path).setParallelism(1);
like image 195
Robert Metzger Avatar answered Sep 27 '22 18:09

Robert Metzger


In Flink 1.13 this is not done with writeAsText function anymore, as it's deprecated.

As can be seen here now StreamingFileSink class and addSink operation should be used. Regarding setting the parallelism to 1, this is also done differently (by setting the StreamExecutionEnvironment parallelism to 1, with setParallelism method)

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

val sink: StreamingFileSink[String] = StreamingFileSink
  .forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8"))
  .build()

dataStream.map(_.toString).addSink(sink)
like image 22
eseuteo Avatar answered Sep 27 '22 18:09

eseuteo