I have a problem with a long-time consuming migration, which I desired to run in the parallel (it can be runned in the parallel). Actually migration is about taking all records in the database and implement time- and resource- consuming operations on each of them.
Sometimes individual record migration hanging out, so I give 10 minutes to finish. If migration isn't finished, I want it to gracefully shut down without any exception (see below)
I'm also using poolboy erlang package to parallelize implementation since migration consumes not only the time, but resources too. The problem is that I don't know how to handle error when timeout happened and code is going to break. My supervision tree is:
defmodule MyReelty.Repo.Migrations.MoveVideosFromVimeoToB2 do
use Ecto.Migration
alias MyReelty.Repo
alias MyReelty.Repo.Migrations.MoveVideosFromVimeoToB2.Migrator
# parallel nature of migration force us to disable transaction
@disable_ddl_transaction true
@migrator_waiting_time 10 * 60 * 1000 # timeout
@poolboy_waiting_time @migrator_waiting_time + 10 * 1000 # give a time for graceful shutdown
@pool_name :migrator
@pool_size 3
@pool_config [
{ :name, { :local, @pool_name }},
{ :worker_module, Migrator },
{ :size, @pool_size },
{ :max_overflow, 0 },
{ :strategy, :fifo }
]
def up do
children = [
:poolboy.child_spec(@pool_name, @pool_config)
]
opts = [strategy: :one_for_one, name: MyReelty.Supervisor]
Supervisor.start_link(children, opts)
rows = Review |> Repo.all
IO.puts "Total amount of reviews is: #{length(rows)}"
parallel_migrations(rows)
end
def parallel_migrations(rows) do
Enum.map(rows, fn(row) ->
pooled_migration(@pool_name, row)
end)
end
def pooled_migration(pool, x) do
:poolboy.transaction(
pool,
(fn(pid) -> Migrator.move(pid, { x, @migrator_waiting_time }) end),
@poolboy_waiting_time
)
end
defmodule Migrator do
alias MyReelty.Repo
alias MyReelty.Review
use GenServer
def start_link(_) do
GenServer.start_link(__MODULE__, nil, [])
end
def move(server, { params, waiting_time }) do
GenServer.call(server, { :move, params }, waiting_time)
end
def handle_call({ :move, result }, _from, state) do
big_time_and_resource_consuming_task_here
{:reply, %{}, state}
end
end
end
The problem if migration of some record in the database takes more than 10 mins I have this kind of exception:
20:18:16.917 [error] Task #PID<0.282.0> started from #PID<0.70.0> terminating
** (stop) exited in: GenServer.call(#PID<0.278.0>, {:move, [2, "/videos/164064419", "w 35th st Springfield United States Illinois 60020"]}, 60000)
** (EXIT) time out
(elixir) lib/gen_server.ex:604: GenServer.call/3
(poolboy) src/poolboy.erl:76: :poolboy.transaction/3
(elixir) lib/task/supervised.ex:94: Task.Supervised.do_apply/2
(elixir) lib/task/supervised.ex:45: Task.Supervised.reply/5
(stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Function: #Function<5.53617785/0 in MyReelty.Repo.Migrations.MoveVideosFromVimeoToB2.parallel_migrations/1>
Args: []
20:18:16.918 [error] GenServer MyReelty.Repo terminating
** (stop) exited in: GenServer.call(#PID<0.278.0>, {:move, [2, "/videos/164064419", "w 35th st Springfield United States Illinois 60020"]}, 60000)
** (EXIT) time out
Last message: {:EXIT, #PID<0.70.0>, {:timeout, {GenServer, :call, [#PID<0.278.0>, {:move, [2, "/videos/164064419", "w 35th st Springfield United States Illinois 60020"]}, 60000]}}}
State: {:state, {:local, MyReelty.Repo}, :one_for_one, [{:child, #PID<0.231.0>, DBConnection.Poolboy, {:poolboy, :start_link, [[name: {:local, MyReelty.Repo.Pool}, strategy: :fifo, size: 1, max_overflow: 0, worker_module: DBConnection.Poolboy.Worker], {Postgrex.Protocol, [types: true, username: "adik", types: true, name: MyReelty.Repo.Pool, otp_app: :my_reelty, repo: MyReelty.Repo, adapter: Ecto.Adapters.Postgres, database: "my_reelty_dev", hostname: "localhost", extensions: [{Geo.PostGIS.Extension, [library: Geo]}, {Ecto.Adapters.Postgres.DateTime, []}, {Postgrex.Extensions.JSON, [library: Poison]}], pool_size: 1, pool_timeout: 5000, timeout: 15000, adapter: Ecto.Adapters.Postgres, database: "my_dev", hostname: "localhost", pool_size: 10, pool: DBConnection.Poolboy, port: 5432]}]}, :permanent, 5000, :worker, [:poolboy]}], :undefined, 3, 5, [], 0, Ecto.Repo.Supervisor, {MyReelty.Repo, :my_reelty, Ecto.Adapters.Postgres, [otp_app: :my_reelty, repo: MyReelty.Repo, adapter: Ecto.Adapters.Postgres, database: "my_reelty_dev", hostname: "localhost", extensions: [{Geo.PostGIS.Extension, [library: Geo]}], pool_size: 1]}}
I tried to insert terminate/2
or handle_info/2
to Migrator
and play with it, but I even haven't reached this functions to be invoked. How can I handle timeouts and prevent them to break my migration?
UPDATED
I used @johlo's hint, but I still getting time out. My function is:
def init(_) do
Process.flag(:trap_exit, true)
{:ok, %{}}
end
When the Migrator.move/2
(i.e. the GenServer.call
) function times out it will crash the entire MoveVideosFromVimeoToB2
process since that's the actual process that makes the GenServer
call.
The solution here is to catch the timeout in the anonymous function in pooled_migration
, something like (I'm not very familiar with Elixir syntax, so it might not compile, but you should get the idea) :
def pooled_migration(pool, x) do
:poolboy.transaction(
pool,
(fn(pid) ->
try do
Migrator.move(pid, { x, @migrator_waiting_time })
catch
:exit, reason ->
# Ignore error, log it or something else
:ok
end
end),
@poolboy_waiting_time
)
end
It's not the Migrator
process that times out, it's the GenServer
call to the Migrator
that does and we need to try-catch
that.
Also note that the Migrator
process isn't killed it is still running, see the timeouts
section in the GenServer call documentation.
UPDATE:
As @asiniy mentions in the comments the @poolboy_waiting_time
should be set to :infinity
so the poolboy.transaction
function doesn't throw a timeout error when waiting for a free Migrator
worker process. Since the Migrator
will exit eventually this is safe.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With