The writeAsText
or writeAsCsv
methods of a DataStream
write as many files as worker threads. As far as I could see, the methods only let you specify the path to these files and some formatting.
For debugging and testing purposes, it would be really useful to be able to print everything to a single file, without having to change the set up to having a single worker thread.
Is there any non-overly-complicated way to achieve this? I suspect it should be possible implementing a custom SinkFunction
, but not sure about that one (besides, it also feels like a hassle for something that seems relatively simple).
Join in Action Now run the flink application and also tail the log to see the output. Enter messages in both of these two netcat windows within a window of 30 seconds to join both the streams. The resultant data stream has complete information of an individual-: the id, name, department, and salary.
You can create an initial DataStream by adding a source in a Flink program. Then you can derive new streams from this and combine them by using API methods such as map , filter , and so on.
This connector provides a Sink that writes partitioned files to filesystems supported by the Flink FileSystem abstraction. The streaming file sink writes incoming data into buckets. Given that the incoming streams can be unbounded, data in each bucket are organized into part files of finite size.
Using keyed streams - Flink Tutorial Flink users are hashing algorithms to divide the stream by partitions based on the number of slots allocated to the job. It then distributes the same keys to the same slots. Partitioning by key is ideal for aggregation operations that aggregate on a specific key.
You can achieve this by setting the parallelism to 1. This way, the writing happens only on one machine.
writeAsText(path).setParallelism(1);
In Flink 1.13 this is not done with writeAsText function anymore, as it's deprecated.
As can be seen here now StreamingFileSink class and addSink operation should be used. Regarding setting the parallelism to 1, this is also done differently (by setting the StreamExecutionEnvironment parallelism to 1, with setParallelism method)
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val sink: StreamingFileSink[String] = StreamingFileSink
.forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8"))
.build()
dataStream.map(_.toString).addSink(sink)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With