class Sidekiq::Cron::Job
Constants
- REMEMBER_THRESHOLD
how long we would like to store informations about previous enqueues
Attributes
Public Class Methods
get all cron jobs
# File lib/sidekiq/cron/job.rb, line 202 def self.all job_hashes = nil Sidekiq.redis do |conn| set_members = conn.smembers(jobs_key) job_hashes = conn.pipelined do set_members.each do |key| conn.hgetall(key) end end end job_hashes.compact.reject(&:empty?).collect do |h| # no need to fetch missing args from redis since we just got this hash from there Sidekiq::Cron::Job.new(h.merge(fetch_missing_args: false)) end end
# File lib/sidekiq/cron/job.rb, line 218 def self.count out = 0 Sidekiq.redis do |conn| out = conn.scard(jobs_key) end out end
create new instance of cron job
# File lib/sidekiq/cron/job.rb, line 240 def self.create hash new(hash).save end
destroy job by name
# File lib/sidekiq/cron/job.rb, line 245 def self.destroy name #if name is hash try to get name from it name = name[:name] || name['name'] if name.is_a?(Hash) if job = find(name) job.destroy else false end end
remove all job from cron
# File lib/sidekiq/cron/job.rb, line 481 def self.destroy_all! all.each do |job| job.destroy end logger.info { "Cron Jobs - deleted all jobs" } end
remove “removed jobs” between current jobs and new jobs
# File lib/sidekiq/cron/job.rb, line 489 def self.destroy_removed_jobs new_job_names current_job_names = Sidekiq::Cron::Job.all.map(&:name) removed_job_names = current_job_names - new_job_names removed_job_names.each { |j| Sidekiq::Cron::Job.destroy(j) } removed_job_names end
# File lib/sidekiq/cron/job.rb, line 510 def self.exists? name out = false Sidekiq.redis do |conn| out = conn.exists redis_key name end out end
# File lib/sidekiq/cron/job.rb, line 226 def self.find name #if name is hash try to get name from it name = name[:name] || name['name'] if name.is_a?(Hash) output = nil Sidekiq.redis do |conn| if exists? name output = Job.new conn.hgetall( redis_key(name) ) end end output end
load cron jobs from Array input structure should look like: [
{ 'name' => 'name_of_job', 'class' => 'MyClass', 'cron' => '1 * * * *', 'args' => '(OPTIONAL) [Array or Hash]', 'description' => '(OPTIONAL) Description of job' }, { 'name' => 'Cool Job for Second Class', 'class' => 'SecondClass', 'cron' => '*/5 * * * *' }
]
# File lib/sidekiq/cron/job.rb, line 184 def self.load_from_array array errors = {} array.each do |job_data| job = new(job_data) errors[job.name] = job.errors unless job.save end errors end
like to {#load_from_array} If exists old jobs in redis but removed from args, destroy old jobs
# File lib/sidekiq/cron/job.rb, line 195 def self.load_from_array! array job_names = array.map { |job| job["name"] } destroy_removed_jobs(job_names) load_from_array(array) end
load cron jobs from Hash input structure should look like: {
'name_of_job' => { 'class' => 'MyClass', 'cron' => '1 * * * *', 'args' => '(OPTIONAL) [Array or Hash]', 'description' => '(OPTIONAL) Description of job' }, 'My super iber cool job' => { 'class' => 'SecondClass', 'cron' => '*/5 * * * *' }
}
# File lib/sidekiq/cron/job.rb, line 152 def self.load_from_hash hash array = hash.inject([]) do |out,(key, job)| job['name'] = key out << job end load_from_array array end
like to {#load_from_hash} If exists old jobs in redis but removed from args, destroy old jobs
# File lib/sidekiq/cron/job.rb, line 162 def self.load_from_hash! hash destroy_removed_jobs(hash.keys) load_from_hash(hash) end
# File lib/sidekiq/cron/job.rb, line 259 def initialize input_args = {} args = Hash[input_args.map{ |k, v| [k.to_s, v] }] @fetch_missing_args = args.delete('fetch_missing_args') @fetch_missing_args = true if @fetch_missing_args.nil? @name = args["name"] @cron = args["cron"] @description = args["description"] if args["description"] #get class from klass or class @klass = args["klass"] || args["class"] #set status of job @status = args['status'] || status_from_redis #set last enqueue time - from args or from existing job if args['last_enqueue_time'] && !args['last_enqueue_time'].empty? @last_enqueue_time = Time.parse(args['last_enqueue_time']) else @last_enqueue_time = last_enqueue_time_from_redis end #get right arguments for job @args = args["args"].nil? ? [] : parse_args( args["args"] ) @active_job = args["active_job"] == true || ("#{args["active_job"]}" =~ (/^(true|t|yes|y|1)$/i)) == 0 || false @active_job_queue_name_prefix = args["queue_name_prefix"] @active_job_queue_name_delimiter = args["queue_name_delimiter"] if args["message"] @message = args["message"] message_data = Sidekiq.load_json(@message) || {} @queue = message_data['queue'] || "default" elsif @klass message_data = { "class" => @klass.to_s, "args" => @args, } #get right data for message #only if message wasn't specified before klass_data = case @klass when Class @klass.get_sidekiq_options when String begin Sidekiq::Cron::Support.constantize(@klass).get_sidekiq_options rescue Exception => e #Unknown class {"queue"=>"default"} end end message_data = klass_data.merge(message_data) #override queue if setted in config #only if message is hash - can be string (dumped JSON) if args['queue'] @queue = message_data['queue'] = args['queue'] else @queue = message_data['queue'] || "default" end #dump message as json @message = message_data end @queue_name_with_prefix = queue_name_with_prefix end
Private Class Methods
Redis key for storing one cron job run times (when poller added job to queue)
# File lib/sidekiq/cron/job.rb, line 576 def self.job_enqueued_key name "cron_job:#{name}:enqueued" end
Redis key for set of all cron jobs
# File lib/sidekiq/cron/job.rb, line 560 def self.jobs_key "cron_jobs" end
Redis key for storing one cron job
# File lib/sidekiq/cron/job.rb, line 565 def self.redis_key name "cron_job:#{name}" end
Public Instance Methods
active job has different structure how it is loading data from sidekiq queue, it createaswrapper arround job
# File lib/sidekiq/cron/job.rb, line 123 def active_job_message { 'class' => 'ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper', 'queue' => @queue_name_with_prefix, 'description' => @description, 'args' => [{ 'job_class' => @klass, 'job_id' => SecureRandom.uuid, 'queue_name' => @queue_name_with_prefix, 'arguments' => @args }] } end
remove job from cron jobs by name input:
first arg: name (string) - name of job (must be same - case sensitive)
# File lib/sidekiq/cron/job.rb, line 466 def destroy Sidekiq.redis do |conn| #delete from set conn.srem self.class.jobs_key, redis_key #delete runned timestamps conn.del job_enqueued_key #delete main job conn.del redis_key end logger.info { "Cron Jobs - deleted job with name: #{@name}" } end
# File lib/sidekiq/cron/job.rb, line 332 def disable! @status = "disabled" save end
# File lib/sidekiq/cron/job.rb, line 346 def disabled? !enabled? end
# File lib/sidekiq/cron/job.rb, line 337 def enable! @status = "enabled" save end
# File lib/sidekiq/cron/job.rb, line 342 def enabled? @status == "enabled" end
enque cron job to queue
# File lib/sidekiq/cron/job.rb, line 48 def enque! time = Time.now.utc @last_enqueue_time = time klass_const = begin Sidekiq::Cron::Support.constantize(@klass.to_s) rescue NameError nil end if klass_const if defined?(ActiveJob::Base) && klass_const < ActiveJob::Base enqueue_active_job(klass_const) else enqueue_sidekiq_worker(klass_const) end else if @active_job Sidekiq::Client.push(active_job_message) else Sidekiq::Client.push(sidekiq_worker_message) end end save_last_enqueue_time logger.debug { "enqueued #{@name}: #{@message}" } end
# File lib/sidekiq/cron/job.rb, line 82 def enqueue_active_job(klass_const) klass_const.set(queue: @queue).perform_later(*@args) true end
# File lib/sidekiq/cron/job.rb, line 88 def enqueue_sidekiq_worker(klass_const) klass_const.set(queue: queue_name_with_prefix).perform_async(*@args) true end
# File lib/sidekiq/cron/job.rb, line 388 def errors @errors ||= [] end
# File lib/sidekiq/cron/job.rb, line 518 def exists? self.class.exists? @name end
# File lib/sidekiq/cron/job.rb, line 502 def formated_enqueue_time now = Time.now.utc last_time(now).getutc.to_f.to_s end
# File lib/sidekiq/cron/job.rb, line 506 def formated_last_time now = Time.now.utc last_time(now).getutc.iso8601 end
# File lib/sidekiq/cron/job.rb, line 76 def is_active_job? @active_job || defined?(ActiveJob::Base) && Sidekiq::Cron::Support.constantize(@klass.to_s) < ActiveJob::Base rescue NameError false end
# File lib/sidekiq/cron/job.rb, line 418 def klass_valid case @klass when Class true when String @klass.size > 0 else end end
# File lib/sidekiq/cron/job.rb, line 361 def last_enqueue_time_from_redis out = nil if fetch_missing_args Sidekiq.redis do |conn| out = Time.parse(conn.hget(redis_key, "last_enqueue_time")) rescue nil end end out end
Parse cron specification '* * * * *' and returns time when last run should be performed
# File lib/sidekiq/cron/job.rb, line 498 def last_time now = Time.now.utc Rufus::Scheduler::CronLine.new(@cron).previous_time(now.utc).utc end
# File lib/sidekiq/cron/job.rb, line 99 def queue_name_with_prefix return @queue unless is_active_job? if !"#{@active_job_queue_name_delimiter}".empty? queue_name_delimiter = @active_job_queue_name_delimiter elsif defined?(ActiveJob::Base) && defined?(ActiveJob::Base.queue_name_delimiter) && !ActiveJob::Base.queue_name_delimiter.empty? queue_name_delimiter = ActiveJob::Base.queue_name_delimiter else queue_name_delimiter = '_' end if !"#{@active_job_queue_name_prefix}".empty? queue_name = "#{@active_job_queue_name_prefix}#{queue_name_delimiter}#{@queue}" elsif defined?(ActiveJob::Base) && defined?(ActiveJob::Base.queue_name_prefix) && !"#{ActiveJob::Base.queue_name_prefix}".empty? queue_name = "#{ActiveJob::Base.queue_name_prefix}#{queue_name_delimiter}#{@queue}" else queue_name = @queue end queue_name end
remove previous informations about run times this will clear redis and make sure that redis will not overflow with memory
# File lib/sidekiq/cron/job.rb, line 31 def remove_previous_enques time Sidekiq.redis do |conn| conn.zremrangebyscore(job_enqueued_key, 0, "(#{(time.to_f - REMEMBER_THRESHOLD).to_s}") end end
add job to cron jobs input:
name: (string) - name of job cron: (string: '* * * * *' - cron specification when to run job class: (string|class) - which class to perform
optional input:
queue: (string) - which queue to use for enquing (will override class queue) args: (array|hash|nil) - arguments for permorm method
# File lib/sidekiq/cron/job.rb, line 437 def save #if job is invalid return false return false unless valid? Sidekiq.redis do |conn| #add to set of all jobs conn.sadd self.class.jobs_key, redis_key #add informations for this job! conn.hmset redis_key, *hash_to_redis(to_hash) #add information about last time! - don't enque right after scheduler poller starts! time = Time.now.utc conn.zadd(job_enqueued_key, time.to_f.to_s, formated_last_time(time).to_s) unless conn.exists(job_enqueued_key) end logger.info { "Cron Jobs - add job with name: #{@name}" } end
# File lib/sidekiq/cron/job.rb, line 456 def save_last_enqueue_time Sidekiq.redis do |conn| # update last enqueue time conn.hset redis_key, 'last_enqueue_time', @last_enqueue_time end end
crucial part of whole enquing job
# File lib/sidekiq/cron/job.rb, line 17 def should_enque? time enqueue = false enqueue = Sidekiq.redis do |conn| status == "enabled" && not_past_scheduled_time?(time) && not_enqueued_after?(time) && conn.zadd(job_enqueued_key, formated_enqueue_time(time), formated_last_time(time)) end enqueue end
siodekiq worker message
# File lib/sidekiq/cron/job.rb, line 95 def sidekiq_worker_message @message.is_a?(String) ? Sidekiq.load_json(@message) : @message end
# File lib/sidekiq/cron/job.rb, line 522 def sort_name "#{status == "enabled" ? 0 : 1}_#{name}".downcase end
# File lib/sidekiq/cron/job.rb, line 328 def status @status end
# File lib/sidekiq/cron/job.rb, line 350 def status_from_redis out = "enabled" if fetch_missing_args Sidekiq.redis do |conn| status = conn.hget redis_key, "status" out = status if status end end out end
test if job should be enqued If yes add it to queue
# File lib/sidekiq/cron/job.rb, line 38 def test_and_enque_for_time! time #should this job be enqued? if should_enque?(time) enque! remove_previous_enques(time) end end
export job data to hash
# File lib/sidekiq/cron/job.rb, line 372 def to_hash { name: @name, klass: @klass, cron: @cron, description: @description, args: @args.is_a?(String) ? @args : Sidekiq.dump_json(@args || []), message: @message.is_a?(String) ? @message : Sidekiq.dump_json(@message || {}), status: @status, active_job: @active_job, queue_name_prefix: @active_job_queue_name_prefix, queue_name_delimiter: @active_job_queue_name_delimiter, last_enqueue_time: @last_enqueue_time, } end
# File lib/sidekiq/cron/job.rb, line 392 def valid? #clear previos errors @errors = [] errors << "'name' must be set" if @name.nil? || @name.size == 0 if @cron.nil? || @cron.size == 0 errors << "'cron' must be set" else begin cron = Rufus::Scheduler::CronLine.new(@cron) cron.next_time(Time.now.utc).utc rescue Exception => e #fix for different versions of cron-parser if e.message == "Bad Vixie-style specification bad" errors << "'cron' -> #{@cron}: not a valid cronline" else errors << "'cron' -> #{@cron}: #{e.message}" end end end errors << "'klass' (or class) must be set" unless klass_valid !errors.any? end
Private Instance Methods
Give Hash returns array for using it for redis.hmset
# File lib/sidekiq/cron/job.rb, line 588 def hash_to_redis hash hash.inject([]){ |arr,kv| arr + [kv[0], kv[1]] } end
Redis key for storing one cron job run times (when poller added job to queue)
# File lib/sidekiq/cron/job.rb, line 582 def job_enqueued_key self.class.job_enqueued_key @name end
# File lib/sidekiq/cron/job.rb, line 528 def not_enqueued_after?(time) @last_enqueue_time.nil? || @last_enqueue_time.to_i < last_time(time).to_i end
# File lib/sidekiq/cron/job.rb, line 553 def not_past_scheduled_time?(current_time) last_cron_time = Rufus::Scheduler::CronLine.new(@cron).previous_time(current_time).utc return false if (current_time.to_i - last_cron_time.to_i) > 60 true end
Try parsing inbound args into an array. args from Redis will be encoded JSON; try to load JSON, then failover to string array.
# File lib/sidekiq/cron/job.rb, line 536 def parse_args(args) case args when String begin Sidekiq.load_json(args) rescue JSON::ParserError [*args] # cast to string array end when Hash [args] # just put hash into array when Array args # do nothing, already array else [*args] # cast to string array end end
Redis key for storing one cron job
# File lib/sidekiq/cron/job.rb, line 570 def redis_key self.class.redis_key @name end