Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Junit cannot delete @TempDir with file created by Spark Structured Streaming

I created an integration test for my pipeline to check if the right CSV file is generated:

class CsvBatchSinkTest {

    @RegisterExtension
    static SparkExtension spark = new SparkExtension();

    @TempDir
    static Path directory;

    //this checks if the file is already available
    static boolean isFileWithSuffixAvailable(File directory, String suffix) throws IOException {
        return Files.walk(directory.toPath()).anyMatch(f -> f.toString().endsWith(suffix));
    }

    //this gets content of file
    static List<String> extractFileWithSuffixContent(File file, String suffix) throws IOException {
        return Files.readAllLines(
                Files.walk(file.toPath())
                        .filter(f -> f.toString().endsWith(suffix))
                        .findFirst()
                        .orElseThrow(AssertionException::new));
    }

    @Test
    @DisplayName("When correct dataset is sent to sink, then correct csv file should be generated.")
    void testWrite() throws IOException, InterruptedException {

        File file = new File(directory.toFile(), "output");


        List<Row> data =
                asList(RowFactory.create("value1", "value2"), RowFactory.create("value3", "value4"));

        Dataset<Row> dataset =
                spark.session().createDataFrame(data, CommonTestSchemas.SCHEMA_2_STRING_FIELDS);

         dataset.coalesce(1)
                .write()
                .option("header", "true")
                .option("delimiter", ";")
                .csv(file.getAbsolutePath());

        Awaitility.await()
                .atMost(10, TimeUnit.SECONDS)
                .until(() -> isFileWithSuffixAvailable(file, ".csv"));

        Awaitility.await()
                .atMost(10, TimeUnit.SECONDS)
                .untilAsserted(
                        () ->
                                assertThat(extractFileWithSuffixContent(file, ".csv"))
                                        .containsExactlyInAnyOrder("field1;field2", "value1;value2", "value3;value4"));
    }
}

The real code looks a little bit different, it is just an reproducible example.

Spark extension just starts local spark before every test and closes is after.

The test passes, but then when junit tries to clean up @TempDir following exception is thrown:

Failed to delete temp directory C:\Users\RK03GJ\AppData\Local\Temp\junit596680345801656194. The following paths could not be deleted

enter image description here

Can I somehow fix this error? I tried waiting for spark to stop using awaility, but I didn't really help.

Maybe I can somehow ignore this error?

like image 403
Krzysztof Atłasik Avatar asked May 24 '19 10:05

Krzysztof Atłasik


1 Answers

Quick guess: you need to close the stream returned by Files.walk. Quote from the docs:

If timely disposal of file system resources is required, the try-with-resources construct should be used to ensure that the stream's close method is invoked after the stream operations are completed.

-- https://docs.oracle.com/javase/8/docs/api/java/nio/file/Files.html#walk-java.nio.file.Path-java.nio.file.FileVisitOption...-

To fix this add a try-with-resources in the isFileWithSuffixAvailable method:

static boolean isFileWithSuffixAvailable(File directory, String suffix) throws IOException {
    try (Stream<Path> walk = Files.walk(directory.toPath())) {
        return walk.anyMatch(f -> f.toString().endsWith(suffix));
    }
}
like image 112
jannis Avatar answered Oct 25 '22 19:10

jannis