I am currently having Spark parse a large number of small CSV-files into one large dataframe. Something along the lines of
df = spark.read.format("csv").load("file*.csv")
Because of how the data set being parsed is structured I need the line numbers within the corresponding source CSV-file of every row in df. Is there some simple way of achieving this (preferably without resorting to reconstructing them afterward by a combination of grouping on input_file_name() and zipwithindex())?
For example if
# file1.csv
col1, col2
A, B
C, D
and
# file2.csv
col1, col2
E, F
G, H
I need a resulting data frame equivalent to
row, col1, col2
1, A, B
2, C, D
1, E, F
2, G, H
If you require any arbitrary order of the row_number in a dataframe, you could use the following alternatives.
One alternative is to use monotonically_increasing_id function if you are using spark 2.x
Something like this
val df = spark.read.format("csv").load("file*.csv").withColumn("rowId", monotonically_increasing_id())
The other alternative would be using row_number. But that works if you have partition in the dataframe
Something like
val df = spark.read.format("csv").load("file*.csv").withColumn("rowId", row_number().over(Window.partitionBy("col1")
This will ensure the row number is populated per partition.
However, if you require exact ordering, I am afraid there are no "sparky" way to do it. The reason being once you read data as a dataframe it looses the ordering with which the data was persisted before.
You could merge the csv files using a java program in a single machine and add the row number in the program.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With