class Rufus::Scheduler
Constants
- EoTime
- MAX_WORK_THREADS
MIN_WORK_THREADS = 3
- VERSION
Attributes
attr_accessor :min_work_threads
Public Class Methods
Produces a hour/min/sec/milli string representation of Time instance
# File lib/rufus/scheduler/util.rb, line 186 def h_to_s(t=Time.now) "#{t.strftime('%H:%M:%S')}.#{sprintf('%06d', t.usec)}" end
# File lib/rufus/scheduler.rb, line 55 def initialize(opts={}) @opts = opts @started_at = nil @paused = false @jobs = JobArray.new @frequency = Rufus::Scheduler.parse(opts[:frequency] || 0.300) @mutexes = {} @work_queue = Queue.new #@min_work_threads = opts[:min_work_threads] || MIN_WORK_THREADS @max_work_threads = opts[:max_work_threads] || MAX_WORK_THREADS @stderr = $stderr @thread_key = "rufus_scheduler_#{self.object_id}" @scheduler_lock = if lockfile = opts[:lockfile] Rufus::Scheduler::FileLock.new(lockfile) else opts[:scheduler_lock] || Rufus::Scheduler::NullLock.new end @trigger_lock = opts[:trigger_lock] || Rufus::Scheduler::NullLock.new # If we can't grab the @scheduler_lock, don't run. lock || return start end
# File lib/rufus/scheduler/util.rb, line 12 def parse(o, opts={}) opts[:no_error] = true parse_cron(o, opts) || parse_in(o, opts) || # covers 'every' schedule strings parse_at(o, opts) || fail(ArgumentError.new("couldn't parse #{o.inspect} (#{o.class})")) end
# File lib/rufus/scheduler/util.rb, line 42 def parse_at(o, opts={}) return o if o.is_a?(EoTime) return EoTime.make(o) if o.is_a?(Time) EoTime.parse(o, opts) rescue StandardError => se return nil if opts[:no_error] fail se end
# File lib/rufus/scheduler/util.rb, line 22 def parse_cron(o, opts) Fugit.parse_cron(o) end
Turns a string like '1m10s' into a float like '70.0', more formally, turns a time duration expressed as a string into a Float instance (millisecond count).
w -> week d -> day h -> hour m -> minute s -> second M -> month y -> year 'nada' -> millisecond
Some examples:
Rufus::Scheduler.parse_duration "0.5" # => 0.5 Rufus::Scheduler.parse_duration "500" # => 0.5 Rufus::Scheduler.parse_duration "1000" # => 1.0 Rufus::Scheduler.parse_duration "1h" # => 3600.0 Rufus::Scheduler.parse_duration "1h10s" # => 3610.0 Rufus::Scheduler.parse_duration "1w2d" # => 777600.0
Negative time strings are OK (Thanks Danny Fullerton):
Rufus::Scheduler.parse_duration "-0.5" # => -0.5 Rufus::Scheduler.parse_duration "-1h" # => -3600.0
# File lib/rufus/scheduler/util.rb, line 81 def parse_duration(str, opts={}) d = opts[:no_error] ? Fugit::Duration.parse(str, opts) : Fugit::Duration.do_parse(str, opts) d ? d.to_sec : nil end
# File lib/rufus/scheduler/util.rb, line 27 def parse_in(o, opts={}) #o.is_a?(String) ? parse_duration(o, opts) : o return parse_duration(o, opts) if o.is_a?(String) return o if o.is_a?(Numeric) fail ArgumentError.new("couldn't parse time point in #{o.inspect}") rescue ArgumentError => ae return nil if opts[:no_error] fail ae end
Alias for ::singleton
# File lib/rufus/scheduler.rb, line 100 def self.s(opts={}); singleton(opts); end
Returns a singleton Rufus::Scheduler instance
# File lib/rufus/scheduler.rb, line 93 def self.singleton(opts={}) @singleton ||= Rufus::Scheduler.new(opts) end
Releasing the gem would probably require redirecting .start_new to .new and emit a simple deprecation message.
For now, let's assume the people pointing at rufus-scheduler/master on GitHub know what they do…
# File lib/rufus/scheduler.rb, line 108 def self.start_new fail "this is rufus-scheduler 3.x, use .new instead of .start_new" end
Turns a number of seconds into a a time string
Rufus.to_duration 0 # => '0s' Rufus.to_duration 60 # => '1m' Rufus.to_duration 3661 # => '1h1m1s' Rufus.to_duration 7 * 24 * 3600 # => '1w' Rufus.to_duration 30 * 24 * 3600 + 1 # => "4w2d1s"
It goes from seconds to the year. Months are not counted (as they are of variable length). Weeks are counted.
For 30 days months to be counted, the second parameter of this method can be set to true.
Rufus.to_duration 30 * 24 * 3600 + 1, true # => "1M1s"
If a Float value is passed, milliseconds will be displayed without 'marker'
Rufus.to_duration 0.051 # => "51" Rufus.to_duration 7.051 # => "7s51" Rufus.to_duration 0.120 + 30 * 24 * 3600 + 1 # => "4w2d1s120"
(this behaviour mirrors the one found for parse_time_string()).
Options are :
-
:months, if set to true, months (M) of 30 days will be taken into account when building up the result
-
:drop_seconds, if set to true, seconds and milliseconds will be trimmed from the result
# File lib/rufus/scheduler/util.rb, line 124 def to_duration(seconds, options={}) #d = Fugit::Duration.parse(seconds, options).deflate #d = d.drop_seconds if options[:drop_seconds] #d = d.deflate(:month => options[:months]) if options[:months] #d.to_rufus_s to_fugit_duration(seconds, options).to_rufus_s end
Turns a number of seconds (integer or Float) into a hash like in :
Rufus.to_duration_hash 0.051 # => { :s => 0.051 } Rufus.to_duration_hash 7.051 # => { :s => 7.051 } Rufus.to_duration_hash 0.120 + 30 * 24 * 3600 + 1 # => { :w => 4, :d => 2, :s => 1.120 }
This method is used by ::to_duration behind the scenes.
Options are :
-
:months, if set to true, months (M) of 30 days will be taken into account when building up the result
-
:drop_seconds, if set to true, seconds and milliseconds will be trimmed from the result
# File lib/rufus/scheduler/util.rb, line 152 def to_duration_hash(seconds, options={}) to_fugit_duration(seconds, options).to_rufus_h end
Used by both .to_duration and .to_duration_hash
# File lib/rufus/scheduler/util.rb, line 159 def to_fugit_duration(seconds, options={}) d = Fugit::Duration .parse(seconds, options) .deflate d = d.drop_seconds if options[:drop_seconds] d = d.deflate(:month => options[:months]) if options[:months] d end
Produces the UTC string representation of a Time instance
like “2009/11/23 11:11:50.947109 UTC”
# File lib/rufus/scheduler/util.rb, line 179 def utc_to_s(t=Time.now) "#{t.utc.strftime('%Y-%m-%d %H:%M:%S')}.#{sprintf('%06d', t.usec)} UTC" end
Public Instance Methods
# File lib/rufus/scheduler.rb, line 182 def at(time, callable=nil, opts={}, &block) do_schedule(:once, time, callable, opts, opts[:job], block) end
# File lib/rufus/scheduler.rb, line 293 def at_jobs(opts={}) jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::AtJob) } end
Callback called when a job is triggered. If the lock cannot be acquired, the job won't run (though it'll still be scheduled to run again if necessary).
# File lib/rufus/scheduler.rb, line 361 def confirm_lock @trigger_lock.lock end
# File lib/rufus/scheduler.rb, line 222 def cron(cronline, callable=nil, opts={}, &block) do_schedule(:cron, cronline, callable, opts, opts[:job], block) end
# File lib/rufus/scheduler.rb, line 313 def cron_jobs(opts={}) jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::CronJob) } end
# File lib/rufus/scheduler.rb, line 153 def down? ! @started_at end
# File lib/rufus/scheduler.rb, line 202 def every(duration, callable=nil, opts={}, &block) do_schedule(:every, duration, callable, opts, opts[:job], block) end
# File lib/rufus/scheduler.rb, line 303 def every_jobs(opts={}) jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::EveryJob) } end
# File lib/rufus/scheduler.rb, line 192 def in(duration, callable=nil, opts={}, &block) do_schedule(:once, duration, callable, opts, opts[:job], block) end
# File lib/rufus/scheduler.rb, line 298 def in_jobs(opts={}) jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::InJob) } end
# File lib/rufus/scheduler.rb, line 212 def interval(duration, callable=nil, opts={}, &block) do_schedule(:interval, duration, callable, opts, opts[:job], block) end
# File lib/rufus/scheduler.rb, line 308 def interval_jobs(opts={}) jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::IntervalJob) } end
# File lib/rufus/scheduler.rb, line 318 def job(job_id) @jobs[job_id] end
Returns all the scheduled jobs (even those right before re-schedule).
# File lib/rufus/scheduler.rb, line 275 def jobs(opts={}) opts = { opts => true } if opts.is_a?(Symbol) jobs = @jobs.to_a if opts[:running] jobs = jobs.select { |j| j.running? } elsif ! opts[:all] jobs = jobs.reject { |j| j.next_time.nil? || j.unscheduled_at } end tags = Array(opts[:tag] || opts[:tags]).collect(&:to_s) jobs = jobs.reject { |j| tags.find { |t| ! j.tags.include?(t) } } jobs end
# File lib/rufus/scheduler.rb, line 144 def join fail NotRunningError.new( 'cannot join scheduler that is not running' ) unless @thread @thread.join end
Returns true if the scheduler has acquired the [exclusive] lock and thus may run.
Most of the time, a scheduler is run alone and this method should return true. It is useful in cases where among a group of applications only one of them should run the scheduler. For schedulers that should not run, the method should return false.
Out of the box, rufus-scheduler proposes the :lockfile => 'path/to/lock/file' scheduler start option. It makes it easy for schedulers on the same machine to determine which should run (the first to write the lockfile and lock it). It uses “man 2 flock” so it probably won't work reliably on distributed file systems.
If one needs to use a special/different locking mechanism, the scheduler accepts :scheduler_lock => lock_object. lock_object only needs to respond to lock and unlock, and both of these methods should be idempotent.
Look at rufus/scheduler/locks.rb for an example.
# File lib/rufus/scheduler.rb, line 344 def lock @scheduler_lock.lock end
# File lib/rufus/scheduler.rb, line 414 def occurrences(time0, time1, format=:per_job) h = {} jobs.each do |j| os = j.occurrences(time0, time1) h[j] = os if os.any? end if format == :timeline a = [] h.each { |j, ts| ts.each { |t| a << [ t, j ] } } a.sort_by { |(t, _)| t } else h end end
# File lib/rufus/scheduler.rb, line 437 def on_error(job, err) pre = err.object_id.to_s ms = {}; mutexes.each { |k, v| ms[k] = v.locked? } stderr.puts("{ #{pre} rufus-scheduler intercepted an error:") stderr.puts(" #{pre} job:") stderr.puts(" #{pre} #{job.class} #{job.original.inspect} #{job.opts.inspect}") # TODO: eventually use a Job#detail or something like that stderr.puts(" #{pre} error:") stderr.puts(" #{pre} #{err.object_id}") stderr.puts(" #{pre} #{err.class}") stderr.puts(" #{pre} #{err}") err.backtrace.each do |l| stderr.puts(" #{pre} #{l}") end stderr.puts(" #{pre} tz:") stderr.puts(" #{pre} ENV['TZ']: #{ENV['TZ']}") stderr.puts(" #{pre} Time.now: #{Time.now}") stderr.puts(" #{pre} local_tzone: #{EoTime.local_tzone.inspect}") stderr.puts(" #{pre} et-orbi:") stderr.puts(" #{pre} #{EoTime.platform_info}") stderr.puts(" #{pre} scheduler:") stderr.puts(" #{pre} object_id: #{object_id}") stderr.puts(" #{pre} opts:") stderr.puts(" #{pre} #{@opts.inspect}") stderr.puts(" #{pre} frequency: #{self.frequency}") stderr.puts(" #{pre} scheduler_lock: #{@scheduler_lock.inspect}") stderr.puts(" #{pre} trigger_lock: #{@trigger_lock.inspect}") stderr.puts(" #{pre} uptime: #{uptime} (#{uptime_s})") stderr.puts(" #{pre} down?: #{down?}") stderr.puts(" #{pre} threads: #{self.threads.size}") stderr.puts(" #{pre} thread: #{self.thread}") stderr.puts(" #{pre} thread_key: #{self.thread_key}") stderr.puts(" #{pre} work_threads: #{work_threads.size}") stderr.puts(" #{pre} active: #{work_threads(:active).size}") stderr.puts(" #{pre} vacant: #{work_threads(:vacant).size}") stderr.puts(" #{pre} max_work_threads: #{max_work_threads}") stderr.puts(" #{pre} mutexes: #{ms.inspect}") stderr.puts(" #{pre} jobs: #{jobs.size}") stderr.puts(" #{pre} at_jobs: #{at_jobs.size}") stderr.puts(" #{pre} in_jobs: #{in_jobs.size}") stderr.puts(" #{pre} every_jobs: #{every_jobs.size}") stderr.puts(" #{pre} interval_jobs: #{interval_jobs.size}") stderr.puts(" #{pre} cron_jobs: #{cron_jobs.size}") stderr.puts(" #{pre} running_jobs: #{running_jobs.size}") stderr.puts(" #{pre} work_queue: #{work_queue.size}") stderr.puts("} #{pre} .") rescue => e stderr.puts("failure in #on_error itself:") stderr.puts(e.inspect) stderr.puts(e.backtrace) ensure stderr.flush end
# File lib/rufus/scheduler.rb, line 168 def pause @paused = true end
# File lib/rufus/scheduler.rb, line 163 def paused? @paused end
# File lib/rufus/scheduler.rb, line 246 def repeat(arg, callable=nil, opts={}, &block) callable, opts = nil, callable if callable.is_a?(Hash) opts = opts.dup opts[:_t] = Scheduler.parse(arg, opts) case opts[:_t] when ::Fugit::Cron then schedule_cron(arg, callable, opts, &block) else schedule_every(arg, callable, opts, &block) end end
# File lib/rufus/scheduler.rb, line 173 def resume @paused = false end
# File lib/rufus/scheduler.rb, line 409 def running_jobs(opts={}) jobs(opts.merge(:running => true)) end
# File lib/rufus/scheduler.rb, line 232 def schedule(arg, callable=nil, opts={}, &block) callable, opts = nil, callable if callable.is_a?(Hash) opts = opts.dup opts[:_t] = Scheduler.parse(arg, opts) case opts[:_t] when ::Fugit::Cron then schedule_cron(arg, callable, opts, &block) when ::EtOrbi::EoTime, Time then schedule_at(arg, callable, opts, &block) else schedule_in(arg, callable, opts, &block) end end
# File lib/rufus/scheduler.rb, line 187 def schedule_at(time, callable=nil, opts={}, &block) do_schedule(:once, time, callable, opts, true, block) end
# File lib/rufus/scheduler.rb, line 227 def schedule_cron(cronline, callable=nil, opts={}, &block) do_schedule(:cron, cronline, callable, opts, true, block) end
# File lib/rufus/scheduler.rb, line 207 def schedule_every(duration, callable=nil, opts={}, &block) do_schedule(:every, duration, callable, opts, true, block) end
# File lib/rufus/scheduler.rb, line 197 def schedule_in(duration, callable=nil, opts={}, &block) do_schedule(:once, duration, callable, opts, true, block) end
# File lib/rufus/scheduler.rb, line 217 def schedule_interval(duration, callable=nil, opts={}, &block) do_schedule(:interval, duration, callable, opts, true, block) end
Returns true if this job is currently scheduled.
Takes extra care to answer true if the job is a repeat job currently firing.
# File lib/rufus/scheduler.rb, line 371 def scheduled?(job_or_job_id) job, _ = fetch(job_or_job_id) !! (job && job.unscheduled_at.nil? && job.next_time != nil) end
# File lib/rufus/scheduler.rb, line 113 def shutdown(opt=nil) @started_at = nil #jobs.each { |j| j.unschedule } # provokes https://github.com/jmettraux/rufus-scheduler/issue/98 @jobs.array.each { |j| j.unschedule } @work_queue.clear if opt == :wait join_all_work_threads elsif opt == :kill kill_all_work_threads end unlock end
Lists all the threads associated with this scheduler.
# File lib/rufus/scheduler.rb, line 380 def threads Thread.list.select { |t| t[thread_key] } end
# File lib/rufus/scheduler.rb, line 432 def timeline(time0, time1) occurrences(time0, time1, :timeline) end
Sister method to lock, is called when the scheduler shuts down.
# File lib/rufus/scheduler.rb, line 351 def unlock @trigger_lock.unlock @scheduler_lock.unlock end
# File lib/rufus/scheduler.rb, line 259 def unschedule(job_or_job_id) job, job_id = fetch(job_or_job_id) fail ArgumentError.new("no job found with id '#{job_id}'") unless job job.unschedule if job end
# File lib/rufus/scheduler.rb, line 158 def up? !! @started_at end
# File lib/rufus/scheduler.rb, line 134 def uptime @started_at ? EoTime.now - @started_at : nil end
# File lib/rufus/scheduler.rb, line 139 def uptime_s uptime ? self.class.to_duration(uptime) : '' end
Lists all the work threads (the ones actually running the scheduled block code)
Accepts a query option, which can be set to:
-
:all (default), returns all the threads that are work threads or are currently running a job
-
:active, returns all threads that are currently running a job
-
:vacant, returns the threads that are not running a job
If, thanks to :blocking => true, a job is scheduled to monopolize the main scheduler thread, that thread will get returned when :active or :all.
# File lib/rufus/scheduler.rb, line 398 def work_threads(query=:all) ts = threads.select { |t| t[:rufus_scheduler_work_thread] } case query when :active then ts.select { |t| t[:rufus_scheduler_job] } when :vacant then ts.reject { |t| t[:rufus_scheduler_job] } else ts end end
Protected Instance Methods
# File lib/rufus/scheduler.rb, line 593 def do_schedule(job_type, t, callable, opts, return_job_instance, block) fail NotRunningError.new( 'cannot schedule, scheduler is down or shutting down' ) if @started_at.nil? callable, opts = nil, callable if callable.is_a?(Hash) opts = opts.dup unless opts.has_key?(:_t) return_job_instance ||= opts[:job] job_class = case job_type when :once opts[:_t] ||= Rufus::Scheduler.parse(t, opts) opts[:_t].is_a?(Numeric) ? InJob : AtJob when :every EveryJob when :interval IntervalJob when :cron CronJob end job = job_class.new(self, t, opts, block || callable) job.check_frequency @jobs.push(job) return_job_instance ? job : job.job_id end
Returns [ job, job_id ]
# File lib/rufus/scheduler.rb, line 502 def fetch(job_or_job_id) if job_or_job_id.respond_to?(:job_id) [ job_or_job_id, job_or_job_id.job_id ] else [ job(job_or_job_id), job_or_job_id ] end end
# File lib/rufus/scheduler.rb, line 518 def join_all_work_threads work_threads.size.times { @work_queue << :sayonara } work_threads.each { |t| t.join } @work_queue.clear end
# File lib/rufus/scheduler.rb, line 527 def kill_all_work_threads work_threads.each { |t| t.kill } end
def free_all_work_threads
work_threads.each { |t| t.raise(KillSignal) }
end
# File lib/rufus/scheduler.rb, line 537 def start @started_at = EoTime.now @thread = Thread.new do while @started_at do unschedule_jobs trigger_jobs unless @paused timeout_jobs sleep(@frequency) end end @thread[@thread_key] = true @thread[:rufus_scheduler] = self @thread[:name] = @opts[:thread_name] || "#{@thread_key}_scheduler" end
# File lib/rufus/scheduler.rb, line 511 def terminate_all_jobs jobs.each { |j| j.unschedule } sleep 0.01 while running_jobs.size > 0 end
# File lib/rufus/scheduler.rb, line 574 def timeout_jobs work_threads(:active).each do |t| job = t[:rufus_scheduler_job] to = t[:rufus_scheduler_timeout] ts = t[:rufus_scheduler_time] next unless job && to && ts # thread might just have become inactive (job -> nil) to = ts + to unless to.is_a?(EoTime) next if to > EoTime.now t.raise(Rufus::Scheduler::TimeoutError) end end
# File lib/rufus/scheduler.rb, line 564 def trigger_jobs now = EoTime.now @jobs.each(now) do |job| job.trigger(now) end end
# File lib/rufus/scheduler.rb, line 559 def unschedule_jobs @jobs.delete_unscheduled end