class GRPC::Pool
Pool is a simple thread pool.
Constants
- DEFAULT_KEEP_ALIVE
Default keep alive period is 1s
Private Class Methods
new(size, keep_alive: DEFAULT_KEEP_ALIVE)
click to toggle source
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 27 def initialize(size, keep_alive: DEFAULT_KEEP_ALIVE) fail 'pool size must be positive' unless size > 0 @jobs = Queue.new @size = size @stopped = false @stop_mutex = Mutex.new # needs to be held when accessing @stopped @stop_cond = ConditionVariable.new @workers = [] @keep_alive = keep_alive # Each worker thread has its own queue to push and pull jobs # these queues are put into @ready_queues when that worker is idle @ready_workers = Queue.new end
Private Instance Methods
forcibly_stop_workers()
click to toggle source
Forcibly shutdown any threads that are still alive.
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 111 def forcibly_stop_workers return unless @workers.size > 0 GRPC.logger.info("forcibly terminating #{@workers.size} worker(s)") @workers.each do |t| next unless t.alive? begin t.exit rescue StandardError => e GRPC.logger.warn('error while terminating a worker') GRPC.logger.warn(e) end end end
jobs_waiting()
click to toggle source
Returns the number of jobs waiting
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 43 def jobs_waiting @jobs.size end
loop_execute_jobs(worker_queue)
click to toggle source
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 134 def loop_execute_jobs(worker_queue) loop do begin blk, args = worker_queue.pop blk.call(*args) rescue StandardError => e GRPC.logger.warn('Error in worker thread') GRPC.logger.warn(e) end # there shouldn't be any work given to this thread while its busy fail('received a task while busy') unless worker_queue.empty? @stop_mutex.synchronize do return if @stopped @ready_workers << worker_queue end end end
ready_for_work?()
click to toggle source
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 47 def ready_for_work? # Busy worker threads are either doing work, or have a single job # waiting on them. Workers that are idle with no jobs waiting # have their "queues" in @ready_workers !@ready_workers.empty? end
remove_current_thread()
click to toggle source
removes the threads from workers, and signal when all the threads are complete.
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 127 def remove_current_thread @stop_mutex.synchronize do @workers.delete(Thread.current) @stop_cond.signal if @workers.size.zero? end end
schedule(*args, &blk)
click to toggle source
Runs the given block on the queue with the provided args.
@param args the args passed blk when it is called @param blk the block to call
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 58 def schedule(*args, &blk) return if blk.nil? @stop_mutex.synchronize do if @stopped GRPC.logger.warn('did not schedule job, already stopped') return end GRPC.logger.info('schedule another job') fail 'No worker threads available' if @ready_workers.empty? worker_queue = @ready_workers.pop fail 'worker already has a task waiting' unless worker_queue.empty? worker_queue << [blk, args] end end
start()
click to toggle source
Starts running the jobs in the thread pool.
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 75 def start @stop_mutex.synchronize do fail 'already stopped' if @stopped end until @workers.size == @size.to_i new_worker_queue = Queue.new @ready_workers << new_worker_queue next_thread = Thread.new(new_worker_queue) do |jobs| catch(:exit) do # allows { throw :exit } to kill a thread loop_execute_jobs(jobs) end remove_current_thread end @workers << next_thread end end
stop()
click to toggle source
Stops the jobs in the pool
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 93 def stop GRPC.logger.info('stopping, will wait for all the workers to exit') @stop_mutex.synchronize do # wait @keep_alive seconds for workers to stop @stopped = true loop do break unless ready_for_work? worker_queue = @ready_workers.pop worker_queue << [proc { throw :exit }, []] end @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0 end forcibly_stop_workers GRPC.logger.info('stopped, all workers are shutdown') end