I have a csv_file in which a.) first, each rows need to converted to xml and b.) second, converted xml will be send to rails side for some database write operation.
Below is my Flow code for the same.
flow = csv_rows
|> Flow.from_enumerable()
|> Flow.partition
|> Flow.map(&(CSV.generate_xml(&1)))
|> Flow.map(&(CSV.save_to_rails_databse(&1)))
|> Flow.run
Everyting is working fine for the small csv file, but when the csv_file is very large(suppose 20,000) records, then performing the second operation(i.e writing to database on rails side) is trying to insert two many records at the same time, since elixir is sending too many request to the rails side at the same time, therefore database is reaching at its peak limit.
Will it be good to process the events in the batch of 50, and will the min_demand
and max_demand
will be useful in this case.
You can use Flow.map_state/2
to receive the whole state for a particular state (in your case, since you are mapping, the state will be the events in that batch).
You will want to use three parameters here, all given to from_enumerable:
A few other considerations:
When we worked on Flow, we were able to get some of Flow lessons and apply it back to Elixir. One of those lessons resulted in Task.async_stream/3
, which is useful when you want to map over a collection without a reduce stage, exactly what you have:
batch_size = 100
# 8 tasks running at the same time and we don't care about the results order
async_options = [max_concurrency: 8, ordered: false]
csv_rows
|> Stream.chunk(batch_size)
|> Task.async_stream(fn batch ->
batch
|> Enum.map(&CSV.generate_xml/1)
|> CSV.save_to_batch_rails_database()
end, async_options)
|> Stream.run()
I haven't tested the code but it should provide enough guidance. It should be as fast as Flow but without an extra dependency.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With