class Sidekiq::Cron::Job

Constants

REMEMBER_THRESHOLD

how long we would like to store informations about previous enqueues

Attributes

args[RW]
cron[RW]
description[RW]
fetch_missing_args[R]
klass[RW]
last_enqueue_time[R]
message[RW]
name[RW]

Public Class Methods

all() click to toggle source

get all cron jobs

# File lib/sidekiq/cron/job.rb, line 202
def self.all
  job_hashes = nil
  Sidekiq.redis do |conn|
    set_members = conn.smembers(jobs_key)
    job_hashes = conn.pipelined do
      set_members.each do |key|
        conn.hgetall(key)
      end
    end
  end
  job_hashes.compact.reject(&:empty?).collect do |h|
    # no need to fetch missing args from redis since we just got this hash from there
    Sidekiq::Cron::Job.new(h.merge(fetch_missing_args: false))
  end
end
count() click to toggle source
# File lib/sidekiq/cron/job.rb, line 218
def self.count
  out = 0
  Sidekiq.redis do |conn|
    out = conn.scard(jobs_key)
  end
  out
end
create(hash) click to toggle source

create new instance of cron job

# File lib/sidekiq/cron/job.rb, line 240
def self.create hash
  new(hash).save
end
destroy(name) click to toggle source

destroy job by name

# File lib/sidekiq/cron/job.rb, line 245
def self.destroy name
  #if name is hash try to get name from it
  name = name[:name] || name['name'] if name.is_a?(Hash)

  if job = find(name)
    job.destroy
  else
    false
  end
end
destroy_all!() click to toggle source

remove all job from cron

# File lib/sidekiq/cron/job.rb, line 481
def self.destroy_all!
  all.each do |job|
    job.destroy
  end
  logger.info { "Cron Jobs - deleted all jobs" }
end
destroy_removed_jobs(new_job_names) click to toggle source

remove “removed jobs” between current jobs and new jobs

# File lib/sidekiq/cron/job.rb, line 489
def self.destroy_removed_jobs new_job_names
  current_job_names = Sidekiq::Cron::Job.all.map(&:name)
  removed_job_names = current_job_names - new_job_names
  removed_job_names.each { |j| Sidekiq::Cron::Job.destroy(j) }
  removed_job_names
end
exists?(name) click to toggle source
# File lib/sidekiq/cron/job.rb, line 510
def self.exists? name
  out = false
  Sidekiq.redis do |conn|
    out = conn.exists redis_key name
  end
  out
end
find(name) click to toggle source
# File lib/sidekiq/cron/job.rb, line 226
def self.find name
  #if name is hash try to get name from it
  name = name[:name] || name['name'] if name.is_a?(Hash)

  output = nil
  Sidekiq.redis do |conn|
    if exists? name
      output = Job.new conn.hgetall( redis_key(name) )
    end
  end
  output
end
load_from_array(array) click to toggle source

load cron jobs from Array input structure should look like: [

{
  'name'        => 'name_of_job',
  'class'       => 'MyClass',
  'cron'        => '1 * * * *',
  'args'        => '(OPTIONAL) [Array or Hash]',
  'description' => '(OPTIONAL) Description of job'
},
{
  'name'  => 'Cool Job for Second Class',
  'class' => 'SecondClass',
  'cron'  => '*/5 * * * *'
}

]

# File lib/sidekiq/cron/job.rb, line 184
def self.load_from_array array
  errors = {}
  array.each do |job_data|
    job = new(job_data)
    errors[job.name] = job.errors unless job.save
  end
  errors
end
load_from_array!(array) click to toggle source

like to {#load_from_array} If exists old jobs in redis but removed from args, destroy old jobs

# File lib/sidekiq/cron/job.rb, line 195
def self.load_from_array! array
  job_names = array.map { |job| job["name"] }
  destroy_removed_jobs(job_names)
  load_from_array(array)
end
load_from_hash(hash) click to toggle source

load cron jobs from Hash input structure should look like: {

'name_of_job' => {
  'class'       => 'MyClass',
  'cron'        => '1 * * * *',
  'args'        => '(OPTIONAL) [Array or Hash]',
  'description' => '(OPTIONAL) Description of job'
},
'My super iber cool job' => {
  'class' => 'SecondClass',
  'cron'  => '*/5 * * * *'
}

}

# File lib/sidekiq/cron/job.rb, line 152
def self.load_from_hash hash
  array = hash.inject([]) do |out,(key, job)|
    job['name'] = key
    out << job
  end
  load_from_array array
end
load_from_hash!(hash) click to toggle source

like to {#load_from_hash} If exists old jobs in redis but removed from args, destroy old jobs

# File lib/sidekiq/cron/job.rb, line 162
def self.load_from_hash! hash
  destroy_removed_jobs(hash.keys)
  load_from_hash(hash)
end
new(input_args = {}) click to toggle source
# File lib/sidekiq/cron/job.rb, line 259
def initialize input_args = {}
  args = Hash[input_args.map{ |k, v| [k.to_s, v] }]
  @fetch_missing_args = args.delete('fetch_missing_args')
  @fetch_missing_args = true if @fetch_missing_args.nil?

  @name = args["name"]
  @cron = args["cron"]
  @description = args["description"] if args["description"]

  #get class from klass or class
  @klass = args["klass"] || args["class"]

  #set status of job
  @status = args['status'] || status_from_redis

  #set last enqueue time - from args or from existing job
  if args['last_enqueue_time'] && !args['last_enqueue_time'].empty?
    @last_enqueue_time = Time.parse(args['last_enqueue_time'])
  else
    @last_enqueue_time = last_enqueue_time_from_redis
  end

  #get right arguments for job
  @args = args["args"].nil? ? [] : parse_args( args["args"] )

  @active_job = args["active_job"] == true || ("#{args["active_job"]}" =~ (/^(true|t|yes|y|1)$/i)) == 0 || false
  @active_job_queue_name_prefix = args["queue_name_prefix"]
  @active_job_queue_name_delimiter = args["queue_name_delimiter"]

  if args["message"]
    @message = args["message"]
    message_data = Sidekiq.load_json(@message) || {}
    @queue = message_data['queue'] || "default"
  elsif @klass
    message_data = {
      "class" => @klass.to_s,
      "args"  => @args,
    }

    #get right data for message
    #only if message wasn't specified before
    klass_data = case @klass
      when Class
        @klass.get_sidekiq_options
      when String
        begin
          Sidekiq::Cron::Support.constantize(@klass).get_sidekiq_options
        rescue Exception => e
          #Unknown class
          {"queue"=>"default"}
        end
    end

    message_data = klass_data.merge(message_data)
    #override queue if setted in config
    #only if message is hash - can be string (dumped JSON)
    if args['queue']
      @queue = message_data['queue'] = args['queue']
    else
      @queue = message_data['queue'] || "default"
    end

    #dump message as json
    @message = message_data
  end

  @queue_name_with_prefix = queue_name_with_prefix
end

Private Class Methods

job_enqueued_key(name) click to toggle source

Redis key for storing one cron job run times (when poller added job to queue)

# File lib/sidekiq/cron/job.rb, line 576
def self.job_enqueued_key name
  "cron_job:#{name}:enqueued"
end
jobs_key() click to toggle source

Redis key for set of all cron jobs

# File lib/sidekiq/cron/job.rb, line 560
def self.jobs_key
  "cron_jobs"
end
redis_key(name) click to toggle source

Redis key for storing one cron job

# File lib/sidekiq/cron/job.rb, line 565
def self.redis_key name
  "cron_job:#{name}"
end

Public Instance Methods

active_job_message() click to toggle source

active job has different structure how it is loading data from sidekiq queue, it createaswrapper arround job

# File lib/sidekiq/cron/job.rb, line 123
def active_job_message
  {
    'class'        => 'ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper',
    'queue'        => @queue_name_with_prefix,
    'description'  => @description,
    'args'         => [{
      'job_class'  => @klass,
      'job_id'     => SecureRandom.uuid,
      'queue_name' => @queue_name_with_prefix,
      'arguments'  => @args
    }]
  }
end
destroy() click to toggle source

remove job from cron jobs by name input:

first arg: name (string) - name of job (must be same - case sensitive)
# File lib/sidekiq/cron/job.rb, line 466
def destroy
  Sidekiq.redis do |conn|
    #delete from set
    conn.srem self.class.jobs_key, redis_key

    #delete runned timestamps
    conn.del job_enqueued_key

    #delete main job
    conn.del redis_key
  end
  logger.info { "Cron Jobs - deleted job with name: #{@name}" }
end
disable!() click to toggle source
# File lib/sidekiq/cron/job.rb, line 332
def disable!
  @status = "disabled"
  save
end
disabled?() click to toggle source
# File lib/sidekiq/cron/job.rb, line 346
def disabled?
  !enabled?
end
enable!() click to toggle source
# File lib/sidekiq/cron/job.rb, line 337
def enable!
  @status = "enabled"
  save
end
enabled?() click to toggle source
# File lib/sidekiq/cron/job.rb, line 342
def enabled?
  @status == "enabled"
end
enque!(time = Time.now.utc) click to toggle source

enque cron job to queue

# File lib/sidekiq/cron/job.rb, line 48
def enque! time = Time.now.utc
  @last_enqueue_time = time

  klass_const =
      begin
        Sidekiq::Cron::Support.constantize(@klass.to_s)
      rescue NameError
        nil
      end

  if klass_const
    if defined?(ActiveJob::Base) && klass_const < ActiveJob::Base
      enqueue_active_job(klass_const)
    else
      enqueue_sidekiq_worker(klass_const)
    end
  else
    if @active_job
      Sidekiq::Client.push(active_job_message)
    else
      Sidekiq::Client.push(sidekiq_worker_message)
    end
  end

  save_last_enqueue_time
  logger.debug { "enqueued #{@name}: #{@message}" }
end
enqueue_active_job(klass_const) click to toggle source
# File lib/sidekiq/cron/job.rb, line 82
def enqueue_active_job(klass_const)
  klass_const.set(queue: @queue).perform_later(*@args)

  true
end
enqueue_sidekiq_worker(klass_const) click to toggle source
# File lib/sidekiq/cron/job.rb, line 88
def enqueue_sidekiq_worker(klass_const)
  klass_const.set(queue: queue_name_with_prefix).perform_async(*@args)

  true
end
errors() click to toggle source
# File lib/sidekiq/cron/job.rb, line 388
def errors
  @errors ||= []
end
exists?() click to toggle source
# File lib/sidekiq/cron/job.rb, line 518
def exists?
  self.class.exists? @name
end
formated_enqueue_time(now = Time.now.utc) click to toggle source
# File lib/sidekiq/cron/job.rb, line 502
def formated_enqueue_time now = Time.now.utc
  last_time(now).getutc.to_f.to_s
end
formated_last_time(now = Time.now.utc) click to toggle source
# File lib/sidekiq/cron/job.rb, line 506
def formated_last_time now = Time.now.utc
  last_time(now).getutc.iso8601
end
is_active_job?() click to toggle source
# File lib/sidekiq/cron/job.rb, line 76
def is_active_job?
  @active_job || defined?(ActiveJob::Base) && Sidekiq::Cron::Support.constantize(@klass.to_s) < ActiveJob::Base
rescue NameError
  false
end
klass_valid() click to toggle source
# File lib/sidekiq/cron/job.rb, line 418
def klass_valid
  case @klass
    when Class
      true
    when String
      @klass.size > 0
    else
  end
end
last_enqueue_time_from_redis() click to toggle source
# File lib/sidekiq/cron/job.rb, line 361
def last_enqueue_time_from_redis
  out = nil
  if fetch_missing_args
    Sidekiq.redis do |conn|
      out = Time.parse(conn.hget(redis_key, "last_enqueue_time")) rescue nil
    end
  end
  out
end
last_time(now = Time.now.utc) click to toggle source

Parse cron specification '* * * * *' and returns time when last run should be performed

# File lib/sidekiq/cron/job.rb, line 498
def last_time now = Time.now.utc
  Rufus::Scheduler::CronLine.new(@cron).previous_time(now.utc).utc
end
queue_name_with_prefix() click to toggle source
# File lib/sidekiq/cron/job.rb, line 99
def queue_name_with_prefix
  return @queue unless is_active_job?

  if !"#{@active_job_queue_name_delimiter}".empty?
    queue_name_delimiter = @active_job_queue_name_delimiter
  elsif defined?(ActiveJob::Base) && defined?(ActiveJob::Base.queue_name_delimiter) && !ActiveJob::Base.queue_name_delimiter.empty?
    queue_name_delimiter = ActiveJob::Base.queue_name_delimiter
  else
    queue_name_delimiter = '_'
  end

  if !"#{@active_job_queue_name_prefix}".empty?
    queue_name = "#{@active_job_queue_name_prefix}#{queue_name_delimiter}#{@queue}"
  elsif defined?(ActiveJob::Base) && defined?(ActiveJob::Base.queue_name_prefix) && !"#{ActiveJob::Base.queue_name_prefix}".empty?
    queue_name = "#{ActiveJob::Base.queue_name_prefix}#{queue_name_delimiter}#{@queue}"
  else
    queue_name = @queue
  end

  queue_name
end
remove_previous_enques(time) click to toggle source

remove previous informations about run times this will clear redis and make sure that redis will not overflow with memory

# File lib/sidekiq/cron/job.rb, line 31
def remove_previous_enques time
  Sidekiq.redis do |conn|
    conn.zremrangebyscore(job_enqueued_key, 0, "(#{(time.to_f - REMEMBER_THRESHOLD).to_s}")
  end
end
save() click to toggle source

add job to cron jobs input:

name: (string) - name of job
cron: (string: '* * * * *' - cron specification when to run job
class: (string|class) - which class to perform

optional input:

queue: (string) - which queue to use for enquing (will override class queue)
args: (array|hash|nil) - arguments for permorm method
# File lib/sidekiq/cron/job.rb, line 437
def save
  #if job is invalid return false
  return false unless valid?

  Sidekiq.redis do |conn|

    #add to set of all jobs
    conn.sadd self.class.jobs_key, redis_key

    #add informations for this job!
    conn.hmset redis_key, *hash_to_redis(to_hash)

    #add information about last time! - don't enque right after scheduler poller starts!
    time = Time.now.utc
    conn.zadd(job_enqueued_key, time.to_f.to_s, formated_last_time(time).to_s) unless conn.exists(job_enqueued_key)
  end
  logger.info { "Cron Jobs - add job with name: #{@name}" }
end
save_last_enqueue_time() click to toggle source
# File lib/sidekiq/cron/job.rb, line 456
def save_last_enqueue_time
  Sidekiq.redis do |conn|
    # update last enqueue time
    conn.hset redis_key, 'last_enqueue_time', @last_enqueue_time
  end
end
should_enque?(time) click to toggle source

crucial part of whole enquing job

# File lib/sidekiq/cron/job.rb, line 17
def should_enque? time
  enqueue = false
  enqueue = Sidekiq.redis do |conn|
    status == "enabled" &&
      not_past_scheduled_time?(time) &&
      not_enqueued_after?(time) &&
      conn.zadd(job_enqueued_key, formated_enqueue_time(time), formated_last_time(time))
  end
  enqueue
end
sidekiq_worker_message() click to toggle source

siodekiq worker message

# File lib/sidekiq/cron/job.rb, line 95
def sidekiq_worker_message
  @message.is_a?(String) ? Sidekiq.load_json(@message) : @message
end
sort_name() click to toggle source
# File lib/sidekiq/cron/job.rb, line 522
def sort_name
  "#{status == "enabled" ? 0 : 1}_#{name}".downcase
end
status() click to toggle source
# File lib/sidekiq/cron/job.rb, line 328
def status
  @status
end
status_from_redis() click to toggle source
# File lib/sidekiq/cron/job.rb, line 350
def status_from_redis
  out = "enabled"
  if fetch_missing_args
    Sidekiq.redis do |conn|
      status = conn.hget redis_key, "status"
      out = status if status
    end
  end
  out
end
test_and_enque_for_time!(time) click to toggle source

test if job should be enqued If yes add it to queue

# File lib/sidekiq/cron/job.rb, line 38
def test_and_enque_for_time! time
  #should this job be enqued?
  if should_enque?(time)
    enque!

    remove_previous_enques(time)
  end
end
to_hash() click to toggle source

export job data to hash

# File lib/sidekiq/cron/job.rb, line 372
def to_hash
  {
    name: @name,
    klass: @klass,
    cron: @cron,
    description: @description,
    args: @args.is_a?(String) ? @args : Sidekiq.dump_json(@args || []),
    message: @message.is_a?(String) ? @message : Sidekiq.dump_json(@message || {}),
    status: @status,
    active_job: @active_job,
    queue_name_prefix: @active_job_queue_name_prefix,
    queue_name_delimiter: @active_job_queue_name_delimiter,
    last_enqueue_time: @last_enqueue_time,
  }
end
valid?() click to toggle source
# File lib/sidekiq/cron/job.rb, line 392
def valid?
  #clear previos errors
  @errors = []

  errors << "'name' must be set" if @name.nil? || @name.size == 0
  if @cron.nil? || @cron.size == 0
    errors << "'cron' must be set"
  else
    begin
      cron = Rufus::Scheduler::CronLine.new(@cron)
      cron.next_time(Time.now.utc).utc
    rescue Exception => e
      #fix for different versions of cron-parser
      if e.message == "Bad Vixie-style specification bad"
        errors << "'cron' -> #{@cron}: not a valid cronline"
      else
        errors << "'cron' -> #{@cron}: #{e.message}"
      end
    end
  end

  errors << "'klass' (or class) must be set" unless klass_valid

  !errors.any?
end

Private Instance Methods

hash_to_redis(hash) click to toggle source

Give Hash returns array for using it for redis.hmset

# File lib/sidekiq/cron/job.rb, line 588
def hash_to_redis hash
  hash.inject([]){ |arr,kv| arr + [kv[0], kv[1]] }
end
job_enqueued_key() click to toggle source

Redis key for storing one cron job run times (when poller added job to queue)

# File lib/sidekiq/cron/job.rb, line 582
def job_enqueued_key
  self.class.job_enqueued_key @name
end
not_enqueued_after?(time) click to toggle source
# File lib/sidekiq/cron/job.rb, line 528
def not_enqueued_after?(time)
  @last_enqueue_time.nil? || @last_enqueue_time.to_i < last_time(time).to_i
end
not_past_scheduled_time?(current_time) click to toggle source
# File lib/sidekiq/cron/job.rb, line 553
def not_past_scheduled_time?(current_time)
  last_cron_time = Rufus::Scheduler::CronLine.new(@cron).previous_time(current_time).utc
  return false if (current_time.to_i - last_cron_time.to_i) > 60
  true
end
parse_args(args) click to toggle source

Try parsing inbound args into an array. args from Redis will be encoded JSON; try to load JSON, then failover to string array.

# File lib/sidekiq/cron/job.rb, line 536
def parse_args(args)
  case args
  when String
    begin
      Sidekiq.load_json(args)
    rescue JSON::ParserError
      [*args]   # cast to string array
    end
  when Hash
    [args]      # just put hash into array
  when Array
    args        # do nothing, already array
  else
    [*args]     # cast to string array
  end
end
redis_key() click to toggle source

Redis key for storing one cron job

# File lib/sidekiq/cron/job.rb, line 570
def redis_key
  self.class.redis_key @name
end