Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Quickly adding multiple items (1000/sec) to a sidekiq queue?

I realize there is a push_bulk option for sidekiq but I'm currently being limited by latency to redis, so passing multiple items via push_bulk still isn't going quickly enough (only about 50/s).

I've tried to increase the number of redis connections like so:

redis_conn = proc {
  Redis.new({ :url => Rails.configuration.redis.url })
}

Sidekiq.configure_client do |config|
  Sidekiq.configure_client do |config|
    config.redis = ConnectionPool.new(size: 50, &redis_conn)
  end
  config.client_middleware do |chain|
    chain.add Sidekiq::Status::ClientMiddleware
  end
end

And then fire off separate threads (Thread.new) to actually perform_async on the various objects. What is interesting is any thread that isn't the first thread NEVER gets thrown into the sidekiq queue, it's like they're ignored entirely.

Does anyone know of a better way to do this?

Edit: Here is the push_bulk method I was trying which is actually slower:

  user_ids = User.need_scraping.pluck(:id)
  bar = ProgressBar.new(user_ids.count)
  user_ids.in_groups_of(10000, false).each do |user_id_group|
    Sidekiq::Client.push_bulk(
      'args'  => user_id_group.map{ |user_id| [user_id] },
      'class' => ScrapeUser,
      'queue' => 'scrape_user',
      'retry' => true
    )
  end

Thanks!

like image 540
Geesu Avatar asked Dec 18 '13 21:12

Geesu


2 Answers

You DO want to use push_bulk. You're limited by the latency/round-trip time to write elements to the redis queue backing sidekiq.

You're using multiple threads/connections to overcome a slow network, when you should really be removing extra network roundtrips.

Assuming you're trying to enqueuue 20k UserWorker jobs that take a user_id:

You would enqueue a single job via:

UserWorker.perform_async(user_id)

... which maps to:

Sidekiq::Client.push('class' => UserWorker, 'args' => [user_id] )

So the push_bulk version for 20k user_ids is:

# This example takes 20k user_ids in an array, chunks them into groups of 1000 ids,
# and batch sends them to redis as a group.

User.need_scraping.select('id').find_in_batches do |user_group|

  sidekiq_items = user_group.map {|user| { 'class' => UserWorker, 'args' => [user.id] } }
  Sidekiq::Client.push_bulk(sidekiq_items)
end

This turns 20k redis calls into 20 redis calls, with an average round trip time of 5ms (optimistic), that's 1sec vs. 100 seconds. Your mileage may vary.

EDIT: Commenters seem confused about the behavior of the Sidekiq/Redis client for bulk enqueuing data.

The Sidekiq::Client.push_bulk() method takes an array of jobs to be enqueud. It translates these into Sidekiq job payload hashes, and then calls SideKiq::Client.raw_push() to deliver these payloads to redis. See source: https://github.com/mperham/sidekiq/blob/master/lib/sidekiq/client.rb#L158

SideKiq::Client.raw_push() takes a list of Sidekiq hash payloads, converts them to JSON, and then executes a redis MULTI command combining two redis commands. First, it adds to targeted queue to the list of active queues (redis SADD), then it pushes all of the job payloads to the targeted queue redis list object (redis LPUSH). This is a single redis command, executed together in a single redis atomic group.

If this is still slow, you likely have other problems (slow network, overloaded redis server, etc).

like image 161
Winfield Avatar answered Oct 20 '22 19:10

Winfield


@Winfield's answer is correct, and he's absolutely right about latency. However, the correct syntax is actually as follows:

User.need_scraping.select('id').find_in_batches do |user_group|
  Sidekiq::Client.push_bulk({ 'class' => UserWorker, 'args' => user_group.map {|user| [user.id] } })
end

Maybe it changed in the most recent Sidekiq (I was too lazy to check), but this is the correct syntax now.

like image 36
Michael Y. Avatar answered Oct 20 '22 19:10

Michael Y.