Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to process events in batches with elixir flow

Tags:

elixir

I have a csv_file in which a.) first, each rows need to converted to xml and b.) second, converted xml will be send to rails side for some database write operation.

Below is my Flow code for the same.

flow = csv_rows
 |> Flow.from_enumerable()
 |> Flow.partition
 |> Flow.map(&(CSV.generate_xml(&1)))
 |> Flow.map(&(CSV.save_to_rails_databse(&1)))
 |> Flow.run

Everyting is working fine for the small csv file, but when the csv_file is very large(suppose 20,000) records, then performing the second operation(i.e writing to database on rails side) is trying to insert two many records at the same time, since elixir is sending too many request to the rails side at the same time, therefore database is reaching at its peak limit.

Will it be good to process the events in the batch of 50, and will the min_demand and max_demand will be useful in this case.

like image 742
chandradot99 Avatar asked Dec 24 '22 11:12

chandradot99


1 Answers

You can use Flow.map_state/2 to receive the whole state for a particular state (in your case, since you are mapping, the state will be the events in that batch).

You will want to use three parameters here, all given to from_enumerable:

  • min_demand: this will be effectively the batch size
  • max_demand: the maximum of rows that will be in flux between stages
  • stages: the number of concurrent stages processing the data. In your case, how many batches being processed at the same time

A few other considerations:

  • You don't need partitioning, since you are not doing any grouping
  • Consider using NimbleCSV that allows CSV to be consumed as a stream - this helps with memory usage if the CSV is too large
  • You likely don't need Flow at all in this example, Task.asycn_stream/3 should suffice

When we worked on Flow, we were able to get some of Flow lessons and apply it back to Elixir. One of those lessons resulted in Task.async_stream/3, which is useful when you want to map over a collection without a reduce stage, exactly what you have:

batch_size = 100

# 8 tasks running at the same time and we don't care about the results order
async_options = [max_concurrency: 8, ordered: false]

csv_rows
|> Stream.chunk(batch_size)
|> Task.async_stream(fn batch -> 
  batch
  |> Enum.map(&CSV.generate_xml/1)
  |> CSV.save_to_batch_rails_database()
end, async_options)
|> Stream.run()

I haven't tested the code but it should provide enough guidance. It should be as fast as Flow but without an extra dependency.

like image 139
José Valim Avatar answered Dec 31 '22 04:12

José Valim