I realize there is a push_bulk option for sidekiq but I'm currently being limited by latency to redis, so passing multiple items via push_bulk still isn't going quickly enough (only about 50/s).
I've tried to increase the number of redis connections like so:
redis_conn = proc {
Redis.new({ :url => Rails.configuration.redis.url })
}
Sidekiq.configure_client do |config|
Sidekiq.configure_client do |config|
config.redis = ConnectionPool.new(size: 50, &redis_conn)
end
config.client_middleware do |chain|
chain.add Sidekiq::Status::ClientMiddleware
end
end
And then fire off separate threads (Thread.new) to actually perform_async on the various objects. What is interesting is any thread that isn't the first thread NEVER gets thrown into the sidekiq queue, it's like they're ignored entirely.
Does anyone know of a better way to do this?
Edit: Here is the push_bulk method I was trying which is actually slower:
user_ids = User.need_scraping.pluck(:id)
bar = ProgressBar.new(user_ids.count)
user_ids.in_groups_of(10000, false).each do |user_id_group|
Sidekiq::Client.push_bulk(
'args' => user_id_group.map{ |user_id| [user_id] },
'class' => ScrapeUser,
'queue' => 'scrape_user',
'retry' => true
)
end
Thanks!
You DO want to use push_bulk
. You're limited by the latency/round-trip time to write elements to the redis queue backing sidekiq.
You're using multiple threads/connections to overcome a slow network, when you should really be removing extra network roundtrips.
Assuming you're trying to enqueuue 20k UserWorker
jobs that take a user_id
:
You would enqueue a single job via:
UserWorker.perform_async(user_id)
... which maps to:
Sidekiq::Client.push('class' => UserWorker, 'args' => [user_id] )
So the push_bulk
version for 20k user_ids is:
# This example takes 20k user_ids in an array, chunks them into groups of 1000 ids,
# and batch sends them to redis as a group.
User.need_scraping.select('id').find_in_batches do |user_group|
sidekiq_items = user_group.map {|user| { 'class' => UserWorker, 'args' => [user.id] } }
Sidekiq::Client.push_bulk(sidekiq_items)
end
This turns 20k redis calls into 20 redis calls, with an average round trip time of 5ms (optimistic), that's 1sec vs. 100 seconds. Your mileage may vary.
EDIT: Commenters seem confused about the behavior of the Sidekiq/Redis client for bulk enqueuing data.
The Sidekiq::Client.push_bulk()
method takes an array of jobs to be enqueud. It translates these into Sidekiq job payload hashes, and then calls SideKiq::Client.raw_push()
to deliver these payloads to redis. See source: https://github.com/mperham/sidekiq/blob/master/lib/sidekiq/client.rb#L158
SideKiq::Client.raw_push()
takes a list of Sidekiq hash payloads, converts them to JSON, and then executes a redis MULTI
command combining two redis commands. First, it adds to targeted queue to the list of active queues (redis SADD
), then it pushes all of the job payloads to the targeted queue redis list object (redis LPUSH
). This is a single redis command, executed together in a single redis atomic group.
If this is still slow, you likely have other problems (slow network, overloaded redis server, etc).
@Winfield's answer is correct, and he's absolutely right about latency. However, the correct syntax is actually as follows:
User.need_scraping.select('id').find_in_batches do |user_group|
Sidekiq::Client.push_bulk({ 'class' => UserWorker, 'args' => user_group.map {|user| [user.id] } })
end
Maybe it changed in the most recent Sidekiq (I was too lazy to check), but this is the correct syntax now.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With