class Sidekiq::Manager
The Manager is the central coordination point in Sidekiq, controlling the lifecycle of the Processors.
Tasks:
-
start: Spin up Processors.
-
#processor_died: Handle job failure, throw away Processor, create new one.
-
quiet: shutdown idle Processors.
-
stop: hard stop the Processors by deadline.
Note that only the last task requires its own Thread since it has to monitor the shutdown process. The other tasks are performed by other threads.
Constants
- PAUSE_TIME
hack for quicker development / testing environment #2774
Attributes
options[R]
workers[R]
Public Class Methods
new(options={})
click to toggle source
# File lib/sidekiq/manager.rb, line 29 def initialize(options={}) logger.debug { options.inspect } @options = options @count = options[:concurrency] || 25 raise ArgumentError, "Concurrency of #{@count} is not supported" if @count < 1 @done = false @workers = Set.new @count.times do @workers << Processor.new(self) end @plock = Mutex.new end
Public Instance Methods
processor_died(processor, reason)
click to toggle source
# File lib/sidekiq/manager.rb, line 89 def processor_died(processor, reason) @plock.synchronize do @workers.delete(processor) unless @done p = Processor.new(self) @workers << p p.start end end end
processor_stopped(processor)
click to toggle source
# File lib/sidekiq/manager.rb, line 83 def processor_stopped(processor) @plock.synchronize do @workers.delete(processor) end end
quiet()
click to toggle source
# File lib/sidekiq/manager.rb, line 49 def quiet return if @done @done = true logger.info { "Terminating quiet workers" } @workers.each { |x| x.terminate } fire_event(:quiet, reverse: true) end
start()
click to toggle source
# File lib/sidekiq/manager.rb, line 43 def start @workers.each do |x| x.start end end
stop(deadline)
click to toggle source
# File lib/sidekiq/manager.rb, line 61 def stop(deadline) quiet fire_event(:shutdown, reverse: true) # some of the shutdown events can be async, # we don't have any way to know when they're done but # give them a little time to take effect sleep PAUSE_TIME return if @workers.empty? logger.info { "Pausing to allow workers to finish..." } remaining = deadline - Time.now while remaining > PAUSE_TIME return if @workers.empty? sleep PAUSE_TIME remaining = deadline - Time.now end return if @workers.empty? hard_shutdown end
stopped?()
click to toggle source
# File lib/sidekiq/manager.rb, line 100 def stopped? @done end
Private Instance Methods
hard_shutdown()
click to toggle source
# File lib/sidekiq/manager.rb, line 106 def hard_shutdown # We've reached the timeout and we still have busy workers. # They must die but their jobs shall live on. cleanup = nil @plock.synchronize do cleanup = @workers.dup end if cleanup.size > 0 jobs = cleanup.map {|p| p.job }.compact logger.warn { "Terminating #{cleanup.size} busy worker threads" } logger.warn { "Work still in progress #{jobs.inspect}" } # Re-enqueue unfinished jobs # NOTE: You may notice that we may push a job back to redis before # the worker thread is terminated. This is ok because Sidekiq's # contract says that jobs are run AT LEAST once. Process termination # is delayed until we're certain the jobs are back in Redis because # it is worse to lose a job than to run it twice. strategy = (@options[:fetch] || Sidekiq::BasicFetch) strategy.bulk_requeue(jobs, @options) end cleanup.each do |processor| processor.kill end end