class GRPC::RpcDesc

RpcDesc is a Descriptor of an RPC method.

Private Instance Methods

arity_error(mth, want, msg) click to toggle source
# File src/ruby/lib/grpc/generic/rpc_desc.rb, line 191
def arity_error(mth, want, msg)
  "##{mth.name}: bad arg count; got:#{mth.arity}, want:#{want}, #{msg}"
end
assert_arity_matches(mth) click to toggle source
# File src/ruby/lib/grpc/generic/rpc_desc.rb, line 156
def assert_arity_matches(mth)
  # A bidi handler function can optionally be passed a second
  # call object parameter for access to metadata, cancelling, etc.
  if bidi_streamer?
    if mth.arity != 2 && mth.arity != 1
      fail arity_error(mth, 2, "should be #{mth.name}(req, call) or "              "#{mth.name}(req)")
    end
  elsif request_response? || server_streamer?
    if mth.arity != 2
      fail arity_error(mth, 2, "should be #{mth.name}(req, call)")
    end
  else
    if mth.arity != 1
      fail arity_error(mth, 1, "should be #{mth.name}(call)")
    end
  end
end
bidi_streamer?() click to toggle source
# File src/ruby/lib/grpc/generic/rpc_desc.rb, line 187
def bidi_streamer?
  input.is_a?(Stream) && output.is_a?(Stream)
end
client_streamer?() click to toggle source
# File src/ruby/lib/grpc/generic/rpc_desc.rb, line 179
def client_streamer?
  input.is_a?(Stream) && !output.is_a?(Stream)
end
handle_bidi_streamer(active_call, mth, inter_ctx) click to toggle source

@param [GRPC::ActiveCall] active_call @param [Method] mth @param [Array<GRPC::InterceptionContext>] inter_ctx

# File src/ruby/lib/grpc/generic/rpc_desc.rb, line 105
def handle_bidi_streamer(active_call, mth, inter_ctx)
  active_call.run_server_bidi(mth, inter_ctx)
  send_status(active_call, OK, 'OK', active_call.output_metadata)
end
handle_client_streamer(active_call, mth, inter_ctx) click to toggle source
# File src/ruby/lib/grpc/generic/rpc_desc.rb, line 68
def handle_client_streamer(active_call, mth, inter_ctx)
  call = active_call.multi_req_view

  inter_ctx.intercept!(
    :client_streamer,
    method: mth,
    call: call
  ) do
    resp = mth.call(call)
    active_call.server_unary_response(
      resp,
      trailing_metadata: active_call.output_metadata
    )
  end
end
handle_request_response(active_call, mth, inter_ctx) click to toggle source
# File src/ruby/lib/grpc/generic/rpc_desc.rb, line 50
def handle_request_response(active_call, mth, inter_ctx)
  req = active_call.read_unary_request
  call = active_call.single_req_view

  inter_ctx.intercept!(
    :request_response,
    method: mth,
    call: call,
    request: req
  ) do
    resp = mth.call(req, call)
    active_call.server_unary_response(
      resp,
      trailing_metadata: active_call.output_metadata
    )
  end
end
handle_server_streamer(active_call, mth, inter_ctx) click to toggle source
# File src/ruby/lib/grpc/generic/rpc_desc.rb, line 84
def handle_server_streamer(active_call, mth, inter_ctx)
  req = active_call.read_unary_request
  call = active_call.single_req_view

  inter_ctx.intercept!(
    :server_streamer,
    method: mth,
    call: call,
    request: req
  ) do
    replies = mth.call(req, call)
    replies.each { |r| active_call.remote_send(r) }
    send_status(active_call, OK, 'OK', active_call.output_metadata)
  end
end
marshal_proc() click to toggle source

@return [Proc] { |instance| marshalled(instance) }

# File src/ruby/lib/grpc/generic/rpc_desc.rb, line 34
def marshal_proc
  proc { |o| o.class.method(marshal_method).call(o).to_s }
end
request_response?() click to toggle source
# File src/ruby/lib/grpc/generic/rpc_desc.rb, line 175
def request_response?
  !input.is_a?(Stream) && !output.is_a?(Stream)
end
run_server_method(active_call, mth, inter_ctx = InterceptionContext.new) click to toggle source

@param [GRPC::ActiveCall] active_call The current active call object

for the request

@param [Method] mth The current RPC method being called @param [GRPC::InterceptionContext] inter_ctx The interception context

being executed
# File src/ruby/lib/grpc/generic/rpc_desc.rb, line 117
def run_server_method(active_call, mth, inter_ctx = InterceptionContext.new)
  # While a server method is running, it might be cancelled, its deadline
  # might be reached, the handler could throw an unknown error, or a
  # well-behaved handler could throw a StatusError.
  if request_response?
    handle_request_response(active_call, mth, inter_ctx)
  elsif client_streamer?
    handle_client_streamer(active_call, mth, inter_ctx)
  elsif server_streamer?
    handle_server_streamer(active_call, mth, inter_ctx)
  else  # is a bidi_stream
    handle_bidi_streamer(active_call, mth, inter_ctx)
  end
rescue BadStatus => e
  # this is raised by handlers that want GRPC to send an application error
  # code and detail message and some additional app-specific metadata.
  GRPC.logger.debug("app err:#{active_call}, status:#{e.code}:#{e.details}")
  send_status(active_call, e.code, e.details, e.metadata)
rescue Core::CallError => e
  # This is raised by GRPC internals but should rarely, if ever happen.
  # Log it, but don't notify the other endpoint..
  GRPC.logger.warn("failed call: #{active_call}\n#{e}")
rescue Core::OutOfTime
  # This is raised when active_call#method.call exceeds the deadline
  # event.  Send a status of deadline exceeded
  GRPC.logger.warn("late call: #{active_call}")
  send_status(active_call, DEADLINE_EXCEEDED, 'late')
rescue StandardError, NotImplementedError => e
  # This will usuaally be an unhandled error in the handling code.
  # Send back a UNKNOWN status to the client
  #
  # Note: this intentionally does not map NotImplementedError to
  # UNIMPLEMENTED because NotImplementedError is intended for low-level
  # OS interaction (e.g. syscalls) not supported by the current OS.
  GRPC.logger.warn("failed handler: #{active_call}; sending status:UNKNOWN")
  GRPC.logger.warn(e)
  send_status(active_call, UNKNOWN, "#{e.class}: #{e.message}")
end
send_status(active_client, code, details, metadata = {}) click to toggle source
# File src/ruby/lib/grpc/generic/rpc_desc.rb, line 195
def send_status(active_client, code, details, metadata = {})
  details = 'Not sure why' if details.nil?
  GRPC.logger.debug("Sending status  #{code}:#{details}")
  active_client.send_status(code, details, code == OK, metadata: metadata)
rescue StandardError => e
  GRPC.logger.warn("Could not send status #{code}:#{details}")
  GRPC.logger.warn(e)
end
server_streamer?() click to toggle source
# File src/ruby/lib/grpc/generic/rpc_desc.rb, line 183
def server_streamer?
  !input.is_a?(Stream) && output.is_a?(Stream)
end
unmarshal_proc(target) click to toggle source

@param [:input, :output] target determines whether to produce the an

unmarshal Proc for the rpc input parameter or
its output parameter

@return [Proc] An unmarshal proc { |marshalled(instance)| instance }

# File src/ruby/lib/grpc/generic/rpc_desc.rb, line 43
def unmarshal_proc(target)
  fail ArgumentError unless [:input, :output].include?(target)
  unmarshal_class = method(target).call
  unmarshal_class = unmarshal_class.type if unmarshal_class.is_a? Stream
  proc { |o| unmarshal_class.method(unmarshal_method).call(o) }
end