The problem - I have 150GB csv file with headers and the same number of columns for each row. I need just the first column, minus the headers, and only the unique items. The csv cannot be on my local machine since I do not have the space. It is on an apple Airport. I will try and connect with a usb cable.
I have been scouring the internet for a solution for about 3 days now. I have heard a couple of solutions but am not sure which is the best approach. Which one is best and why?
Shell: I hear I can do this with shell but I have no shell writing experience in this realm
Python script: I did create a script but gave up after it was running for 4 hours. This might be because I was accessing it over wifi.
Elixir: I am currently learning elixir and have been told Flow would be a good choice to partition out my work over my cpu's while I read in new information. Comparing Stream with Flow. on 1 million list of similar data it took 8 seconds with Stream and 2 seconds with Flow to get all unique items in the file.
def stream_parse(file_path, chunk_size) do
file_path
|> File.stream!
|> Stream.drop(1)
|> Stream.map(&String.split(&1, ",") |> List.first)
|> Stream.chunk(chunk_size, chunk_size, [])
|> Stream.map(&MapSet.new(&1))
|> Enum.to_list
end
def flow_parse(file_path, chunk_size) do
file_path
|> File.stream!(read_ahead: chunk_size)
|> Stream.drop(1)
|> Flow.from_enumerable
|> Flow.map(&String.split(&1, ",") |> List.first)
|> Flow.partition
|> Flow.uniq
|> Enum.to_list
end
The I do not particularly have an issue with the stream solution although
it has high memory usage, uses 1 thread, and is ran on one core.
The flow solution is multithreaded, uses multiple cores, but has the issue of creating everything finally into one Enum.to_list that could end up being who knows how long
What is the best approach and if so is there an even better solution. Besides writing C.
I was able to finish the stream and flow examples with elixir. I was also given a shell script that accomplishes the needed results. So far the shell script and the stream operate at the same speed with flow winning. However since it is not local to my machine flow will make no difference because I am IO bound.
def stream_parse(file_path, chunk_size, output_file) do
file_path
|> File.stream!(read_ahead: chunk_size)
|> Stream.drop(1)
|> Stream.map(&String.split(&1, ",") |> List.first)
|> Stream.uniq
|> Stream.map(&"#{&1}\n")
|> Stream.into(File.stream!(output_file, [:write, :utf8]))
|> Stream.run
end
However this lacks the ability to write a file of results for each chunk and will store the unique items for the entire 150g in memory(not an option).
The shell script(Also stores all unique items in memory)
tail -n +2 my.csv | cut -d , -f 1 | sort -u > OUTPUT.csv
While finally searching many forums, the elixir slack channel. We finally came up with a solution. First was to split the file up since there is already a shell command for this, there is no need to over complicate the elixir code. I broke everything up into methods to better explain what is all going on.
Break file up into 10 million line sub files
$ mkdir split-files
$ split -a 8 -l 10000000 big_file.csv ./split-files
$ cd split-files
$ for f in *; do mv "$f" "$f.csv"; done
Next we needed to get the unique items from each file, and write a unique file for the output. I am able to actually use Flow.uniq since the chunk_size will be 10 million which can fit into memory.
def flow_parse_dir(path, chunk_size) do
Path.wildcard(path <> "/*.csv")
|> Flow.from_enumerable
|> Flow.map(fn filename ->
[dir, file] = String.split(filename,"/")
flow_parse(filename, chunk_size, dir <> "/unique_"<> file)
end)
|> Flow.run
end
def flow_parse(file_path, chunk_size, output_file) do
file_path
|> File.stream!(read_ahead: chunk_size)
|> Stream.drop(1)
|> Flow.from_enumerable
|> Flow.map(&String.split(&1, ",") |> List.first)
|> Flow.partition
|> Flow.uniq
|> Flow.map(&"#{&1}\n")
|> Stream.into(File.stream!(output_file, [:write, :utf8]))
|> Stream.run
end
After all the unique files were created we need to create a total unique file.
def concat_files(path, totol_unique_file_name) do
sum_file = File.open!(path <> "/" <> totol_unique_file_name, [:read, :utf8, :write])
Path.wildcard(path <> "/*.csv")
|> Stream.map(fn filename ->
[_, file] = String.split(filename, "/")
if String.contains?(file, "unique") do
write_concat_of_unique_files(file, path, sum_file)
end
end)
|> Stream.run
File.close(sum_file)
end
def write_concat_of_unique_files(file, path, totol_unique_file_name) do
# read in file contents line by line
path <> "/" <> file
|> File.stream!()
|> Stream.map(&String.trim(&1,"\n"))
|> Stream.map(fn line ->
IO.puts(totol_unique_file_name, line)
end)
|> Stream.run
end
Finally a method at the end that should finish the job.
def unique_column(file_path, chunk_size, output) do
total_file = File.open!(output, [:read, :utf8, :write])
file_path
|> File.stream!(read_ahead: chunk_size)
|> Stream.map(&String.trim(&1,"\n"))
|> Stream.chunk(chunk_size, chunk_size, [])
|> Flow.from_enumerable
|> Flow.map(fn chunk ->
chunk
|> MapSet.new
|> MapSet.to_list
|> List.flatten
end)
|> Flow.partition
|> Flow.map(fn line ->
Enum.map(line, fn item ->
IO.puts(total_file, item)
end)
end)
|> Flow.run
File.close(total_file)
end
Check if final file is completely unique. The number of unique items from previous files was realized to not be too large and fit completely into memory. If the contents are all unique then you will get the list as a return. If you get an error it was not unique.
def check_unique(file_path) do
original_length = file_path
|> File.stream!
|> Enum.to_list
unique_length = file_path
|> File.stream!
|> Stream.uniq
|> Enum.to_list
^unique_length = original_length
end
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With