class Sidekiq::Launcher

The Launcher is a very simple Actor whose job is to start, monitor and stop the core Actors in Sidekiq. If any of these actors die, the Sidekiq process exits immediately.

Attributes

fetcher[RW]
manager[RW]
poller[RW]

Public Class Methods

new(options) click to toggle source
# File lib/sidekiq/launcher.rb, line 15
def initialize(options)
  @manager = Sidekiq::Manager.new(options)
  @poller = Sidekiq::Scheduled::Poller.new
  @done = false
  @options = options
end

Public Instance Methods

quiet() click to toggle source

Stops this instance from processing any more jobs,

# File lib/sidekiq/launcher.rb, line 30
def quiet
  @done = true
  @manager.quiet
  @poller.terminate
end
run() click to toggle source
# File lib/sidekiq/launcher.rb, line 22
def run
  @thread = safe_thread("heartbeat", &method(:start_heartbeat))
  @poller.start
  @manager.start
end
stop() click to toggle source

Shuts down the process. This method does not return until all work is complete and cleaned up. It can take up to the timeout to complete.

# File lib/sidekiq/launcher.rb, line 39
def stop
  deadline = Time.now + @options[:timeout]

  @done = true
  @manager.quiet
  @poller.terminate

  @manager.stop(deadline)

  # Requeue everything in case there was a worker who grabbed work while stopped
  # This call is a no-op in Sidekiq but necessary for Sidekiq Pro.
  strategy = (@options[:fetch] || Sidekiq::BasicFetch)
  strategy.bulk_requeue([], @options)

  clear_heartbeat
end
stopping?() click to toggle source
# File lib/sidekiq/launcher.rb, line 56
def stopping?
  @done
end

Private Instance Methods

clear_heartbeat() click to toggle source
# File lib/sidekiq/launcher.rb, line 150
def clear_heartbeat
  # Remove record from Redis since we are shutting down.
  # Note we don't stop the heartbeat thread; if the process
  # doesn't actually exit, it'll reappear in the Web UI.
  Sidekiq.redis do |conn|
    conn.pipelined do
      conn.srem('processes', identity)
      conn.del("#{identity}:workers")
    end
  end
rescue
  # best effort, ignore network errors
end
heartbeat() click to toggle source
# File lib/sidekiq/launcher.rb, line 62
def heartbeat
  results = Sidekiq::CLI::PROCTITLES.map {|x| x.(self, to_data) }
  results.compact!
  $0 = results.join(' ')

  
end
start_heartbeat() click to toggle source
# File lib/sidekiq/launcher.rb, line 119
def start_heartbeat
  while true
    heartbeat
    sleep 5
  end
  Sidekiq.logger.info("Heartbeat stopping...")
end
to_data() click to toggle source
# File lib/sidekiq/launcher.rb, line 127
def to_data
  @data ||= begin
    {
      'hostname' => hostname,
      'started_at' => Time.now.to_f,
      'pid' => $$,
      'tag' => @options[:tag] || '',
      'concurrency' => @options[:concurrency],
      'queues' => @options[:queues].uniq,
      'labels' => @options[:labels],
      'identity' => identity,
    }
  end
end
to_json() click to toggle source
# File lib/sidekiq/launcher.rb, line 142
def to_json
  @json ||= begin
    # this data changes infrequently so dump it to a string
    # now so we don't need to dump it every heartbeat.
    Sidekiq.dump_json(to_data)
  end
end
() click to toggle source
# File lib/sidekiq/launcher.rb, line 70
def 
  key = identity
  fails = procd = 0
  begin
    Processor::FAILURE.update {|curr| fails = curr; 0 }
    Processor::PROCESSED.update {|curr| procd = curr; 0 }

    workers_key = "#{key}:workers"
    nowdate = Time.now.utc.strftime("%Y-%m-%d")
    Sidekiq.redis do |conn|
      conn.multi do
        conn.incrby("stat:processed", procd)
        conn.incrby("stat:processed:#{nowdate}", procd)
        conn.incrby("stat:failed", fails)
        conn.incrby("stat:failed:#{nowdate}", fails)
        conn.del(workers_key)
        Processor::WORKER_STATE.each_pair do |tid, hash|
          conn.hset(workers_key, tid, Sidekiq.dump_json(hash))
        end
        conn.expire(workers_key, 60)
      end
    end
    fails = procd = 0

    _, exists, _, _, msg = Sidekiq.redis do |conn|
      conn.multi do
        conn.sadd('processes', key)
        conn.exists(key)
        conn.hmset(key, 'info', to_json, 'busy', Processor::WORKER_STATE.size, 'beat', Time.now.to_f, 'quiet', @done)
        conn.expire(key, 60)
        conn.rpop("#{key}-signals")
      end
    end

    # first heartbeat or recovering from an outage and need to reestablish our heartbeat
    fire_event(:heartbeat) if !exists

    return unless msg

    ::Process.kill(msg, $$)
  rescue => e
    # ignore all redis/network issues
    logger.error("heartbeat: #{e.message}")
    # don't lose the counts if there was a network issue
    Processor::PROCESSED.increment(procd)
    Processor::FAILURE.increment(fails)
  end
end