class Object
Constants
- ALGORITHMS
Names of supported compression algorithms
- AUTH_ENV
- ActiveCall
The ActiveCall class provides simple methods for sending marshallable data to a call
- Args
Args is used to hold the command line info.
- BidiStub
- COMPRESS_LEVELS
Names of valid supported compression levels
- Call
- CallCredentials
- CallError
- CallOps
- ChannelCredentials
- CheckCallAfterFinishedServiceStub
- CheckerStub
- Creds
- Dsl
- EchoStub
- FailingStub
- GenericService
Provides behaviour used to implement schema-derived service classes.
Is intended to be used to support both client and server IDL-schema-derived servers.
- GoogleRpcStatusTestStub
- HCReq
- HCResp
- HEADER_DIRS
- INCLUDEDIR
- INTERNAL
- LIBDIR
- LIB_DIRS
- NoProtoStub
- NoStatusDetailsBinTestServiceStub
- OK
- Pool
Pool is a simple thread pool.
- RpcDesc
RpcDesc is a Descriptor of an RPC method.
- RpcServer
RpcServer hosts a number of services and makes them available on the network.
- Server
- ServingStatus
- SlowStub
- SslTestServiceStub
- StatusCodes
- Stream
- SynchronizedCancellationStub
- TEST_WRITE_FLAGS
- TimeConsts
TimeConsts is a module from the C extension.
Here it's re-opened to add a utility func.
- UNKNOWN
- WriteFlags
Public Instance Methods
# File src/ruby/bin/apis/pubsub_demo.rb, line 226 def _check_args(args) %w(host port action).each do |a| if args[a].nil? raise OptionParser::MissingArgument.new("please specify --#{a}") end end args end
# File src/ruby/spec/generic/rpc_desc_spec.rb, line 361 def arg_error_msg(error = nil) error ||= ArgumentError.new('other error') "#{error.class}: #{error.message}" end
Fails with AssertionError if the block does evaluate to true
# File src/ruby/pb/test/client.rb, line 66 def assert(msg = 'unknown cause') fail 'No assertion block provided' unless block_given? fail AssertionError, msg unless yield end
Builds the metadata authentication update proc.
# File src/ruby/bin/apis/pubsub_demo.rb, line 48 def auth_proc(opts) auth_creds = Google::Auth.get_application_default return auth_creds.updater_proc end
# File src/ruby/spec/generic/rpc_desc_spec.rb, line 337 def bad_status(_req, _call) fail GRPC::BadStatus.new(@bs_code, 'NOK') end
# File src/ruby/spec/generic/rpc_desc_spec.rb, line 345 def bad_status_alt(_call) fail GRPC::BadStatus.new(@bs_code, 'NOK') end
# File src/ruby/spec/pb/duplicate/codegen_spec.rb, line 18 def can_run_codegen_check system('which grpc_ruby_plugin') && system('which protoc') end
# File src/ruby/spec/generic/rpc_server_spec.rb, line 22 def check_md(wanted_md, received_md) wanted_md.zip(received_md).each do |w, r| w.each do |key, value| expect(r[key]).to eq(value) end end end
# File src/ruby/spec/generic/rpc_server_spec.rb, line 646 def check_multi_req_view_of_finished_call(call) common_check_of_finished_server_call(call) expect do call.each_remote_read.each { |r| p r } end.to raise_error(GRPC::Core::CallError) end
check that methods on a finished/closed call t crash
# File src/ruby/spec/generic/client_stub_spec.rb, line 40 def check_op_view_of_finished_client_call(op_view, expected_metadata, expected_trailing_metadata) # use read_response_stream to try to iterate through # possible response stream fail('need something to attempt reads') unless block_given? expect do resp = op_view.execute yield resp end.to raise_error(GRPC::Core::CallError) expect { op_view.start_call }.to raise_error(RuntimeError) sanity_check_values_of_accessors(op_view, expected_metadata, expected_trailing_metadata) expect do op_view.wait op_view.cancel op_view.write_flag = 1 end.to_not raise_error end
check that the server-side call is still in a usable state even after it has finished
# File src/ruby/spec/generic/rpc_server_spec.rb, line 639 def check_single_req_view_of_finished_call(call) common_check_of_finished_server_call(call) expect(call.peer).to be_a(String) expect(call.peer_cert).to be(nil) end
# File src/ruby/spec/client_server_spec.rb, line 277 def client_cancel_test(cancel_proc, expected_code, expected_details) call = new_client_call server_call = nil server_thread = Thread.new do server_call = server_allows_client_to_proceed end client_ops = { CallOps::SEND_INITIAL_METADATA => {}, CallOps::RECV_INITIAL_METADATA => nil } client_batch = call.run_batch(client_ops) expect(client_batch.send_metadata).to be true expect(client_batch.metadata).to eq({}) cancel_proc.call(call) server_thread.join server_ops = { CallOps::RECV_CLOSE_ON_SERVER => nil } server_batch = server_call.run_batch(server_ops) expect(server_batch.send_close).to be true client_ops = { CallOps::RECV_STATUS_ON_CLIENT => {} } client_batch = call.run_batch(client_ops) expect(client_batch.status.code).to be expected_code expect(client_batch.status.details).to eq expected_details end
# File src/ruby/spec/client_auth_spec.rb, line 24 def client_cert test_root = File.join(File.dirname(__FILE__), 'testdata') cert = File.open(File.join(test_root, 'client.pem')).read fail unless cert.is_a?(String) cert end
# File src/ruby/spec/generic/client_stub_spec.rb, line 86 def close_active_server_call(active_server_call) active_server_call.send(:set_input_stream_done) active_server_call.send(:set_output_stream_done) end
# File src/ruby/spec/generic/rpc_server_spec.rb, line 654 def common_check_of_finished_server_call(call) expect do call.merge_metadata_to_send({}) end.to raise_error(RuntimeError) expect do call.send_initial_metadata end.to_not raise_error expect(call.cancelled?).to be(false) expect(call.metadata).to be_a(Hash) expect(call.metadata['user-agent']).to be_a(String) expect(call.metadata_sent).to be(true) expect(call.output_metadata).to eq({}) expect(call.metadata_to_send).to eq({}) expect(call.deadline.is_a?(Time)).to be(true) end
# File src/ruby/spec/channel_spec.rb, line 85 def construct_with_args(a) proc { GRPC::Core::Channel.new('dummy_host', a, create_test_cert) } end
# File src/ruby/spec/client_auth_spec.rb, line 17 def create_channel_creds test_root = File.join(File.dirname(__FILE__), 'testdata') files = ['ca.pem', 'client.key', 'client.pem'] creds = files.map { |f| File.open(File.join(test_root, f)).read } GRPC::Core::ChannelCredentials.new(creds[0], creds[1], creds[2]) end
# File src/ruby/spec/generic/client_stub_spec.rb, line 1043 def create_secure_test_server certs = load_test_certs secure_credentials = GRPC::Core::ServerCredentials.new( nil, [{ private_key: certs[1], cert_chain: certs[2] }], false) @server = new_core_server_for_testing(nil) @server.add_http2_port('0.0.0.0:0', secure_credentials) end
# File src/ruby/spec/client_auth_spec.rb, line 31 def create_server_creds test_root = File.join(File.dirname(__FILE__), 'testdata') p "test root: #{test_root}" files = ['ca.pem', 'server1.key', 'server1.pem'] creds = files.map { |f| File.open(File.join(test_root, f)).read } GRPC::Core::ServerCredentials.new( creds[0], [{ private_key: creds[1], cert_chain: creds[2] }], true) # force client auth end
creates a test stub that accesses host:port securely.
# File src/ruby/pb/test/client.rb, line 97 def create_stub(opts) address = "#{opts.host}:#{opts.port}" # Provide channel args that request compression by default # for compression interop tests if ['client_compressed_unary', 'client_compressed_streaming'].include?(opts.test_case) compression_options = GRPC::Core::CompressionOptions.new(default_algorithm: :gzip) compression_channel_args = compression_options.to_channel_arg_hash else compression_channel_args = {} end if opts.secure creds = ssl_creds(opts.use_test_ca) stub_opts = { channel_args: { GRPC::Core::Channel::SSL_TARGET => opts.host_override } } # Add service account creds if specified wants_creds = %w(all compute_engine_creds service_account_creds) if wants_creds.include?(opts.test_case) unless opts.oauth_scope.nil? auth_creds = Google::Auth.get_application_default(opts.oauth_scope) call_creds = GRPC::Core::CallCredentials.new(auth_creds.updater_proc) creds = creds.compose call_creds end end if opts.test_case == 'oauth2_auth_token' auth_creds = Google::Auth.get_application_default(opts.oauth_scope) kw = auth_creds.updater_proc.call({}) # gives as an auth token # use a metadata update proc that just adds the auth token. call_creds = GRPC::Core::CallCredentials.new(proc { |md| md.merge(kw) }) creds = creds.compose call_creds end if opts.test_case == 'jwt_token_creds' # don't use a scope auth_creds = Google::Auth.get_application_default call_creds = GRPC::Core::CallCredentials.new(auth_creds.updater_proc) creds = creds.compose call_creds end GRPC.logger.info("... connecting securely to #{address}") stub_opts[:channel_args].merge!(compression_channel_args) if opts.test_case == "unimplemented_service" Grpc::Testing::UnimplementedService::Stub.new(address, creds, **stub_opts) else Grpc::Testing::TestService::Stub.new(address, creds, **stub_opts) end else GRPC.logger.info("... connecting insecurely to #{address}") if opts.test_case == "unimplemented_service" Grpc::Testing::UnimplementedService::Stub.new( address, :this_channel_is_insecure, channel_args: compression_channel_args ) else Grpc::Testing::TestService::Stub.new( address, :this_channel_is_insecure, channel_args: compression_channel_args ) end end end
# File src/ruby/spec/channel_spec.rb, line 26 def create_test_cert GRPC::Core::ChannelCredentials.new(load_test_certs[0]) end
# File src/ruby/spec/generic/client_stub_spec.rb, line 1052 def create_test_server @server = new_core_server_for_testing(nil) @server.add_http2_port('0.0.0.0:0', :this_port_is_insecure) end
# File src/ruby/spec/client_server_spec.rb, line 597 def credentials_update_test(creds_update_md) auth_proc = proc { creds_update_md } call_creds = GRPC::Core::CallCredentials.new(auth_proc) initial_md_key = 'k2' initial_md_val = 'v2' initial_md = { initial_md_key => initial_md_val } expected_md = creds_update_md.clone fail 'bad test param' unless expected_md[initial_md_key].nil? expected_md[initial_md_key] = initial_md_val recvd_rpc = nil rcv_thread = Thread.new do recvd_rpc = @server.request_call end call = new_client_call call.set_credentials! call_creds client_batch = call.run_batch( CallOps::SEND_INITIAL_METADATA => initial_md, CallOps::SEND_CLOSE_FROM_CLIENT => nil) expect(client_batch.send_metadata).to be true expect(client_batch.send_close).to be true # confirm the server can receive the client metadata rcv_thread.join expect(recvd_rpc).to_not eq nil recvd_md = recvd_rpc.metadata replace_symbols = Hash[expected_md.each_pair.collect { |x, y| [x.to_s, y] }] expect(recvd_md).to eq(recvd_md.merge(replace_symbols)) credentials_update_test_finish_call(call, recvd_rpc.call) end
# File src/ruby/spec/client_server_spec.rb, line 632 def credentials_update_test_finish_call(client_call, server_call) final_server_batch = server_call.run_batch( CallOps::RECV_CLOSE_ON_SERVER => nil, CallOps::SEND_INITIAL_METADATA => nil, CallOps::SEND_STATUS_FROM_SERVER => ok_status) expect(final_server_batch.send_close).to be(true) expect(final_server_batch.send_metadata).to be(true) expect(final_server_batch.send_status).to be(true) final_client_batch = client_call.run_batch( CallOps::RECV_INITIAL_METADATA => nil, CallOps::RECV_STATUS_ON_CLIENT => nil) expect(final_client_batch.metadata).to eq({}) expect(final_client_batch.status.code).to eq(0) end
# File src/ruby/spec/call_spec.rb, line 177 def deadline Time.now + 2 # in 2 seconds; arbitrary end
# File src/ruby/spec/pb/package_with_underscore/checker_spec.rb, line 18 def debug_mode? !ENV['CONFIG'].nil? && ENV['CONFIG'] == 'dbg' end
# File src/ruby/bin/math_client.rb, line 33 def do_div(stub) GRPC.logger.info('request_response') GRPC.logger.info('----------------') req = Math::DivArgs.new(dividend: 7, divisor: 3) GRPC.logger.info("div(7/3): req=#{req.inspect}") resp = stub.div(req, timeout: INFINITE_FUTURE) GRPC.logger.info("Answer: #{resp.inspect}") GRPC.logger.info('----------------') end
# File src/ruby/bin/math_client.rb, line 66 def do_div_many(stub) GRPC.logger.info('bidi_streamer') GRPC.logger.info('-------------') reqs = [] reqs << Math::DivArgs.new(dividend: 7, divisor: 3) reqs << Math::DivArgs.new(dividend: 5, divisor: 2) reqs << Math::DivArgs.new(dividend: 7, divisor: 2) GRPC.logger.info("div(7/3), div(5/2), div(7/2): reqs=#{reqs.inspect}") resp = stub.div_many(reqs, timeout: INFINITE_FUTURE) resp.each do |r| GRPC.logger.info("Answer: #{r.inspect}") end GRPC.logger.info('----------------') end
# File src/ruby/bin/math_client.rb, line 54 def do_fib(stub) GRPC.logger.info('server_streamer') GRPC.logger.info('----------------') req = Math::FibArgs.new(limit: 11) GRPC.logger.info("fib(11): req=#{req.inspect}") resp = stub.fib(req, timeout: INFINITE_FUTURE) resp.each do |r| GRPC.logger.info("Answer: #{r.inspect}") end GRPC.logger.info('----------------') end
# File src/ruby/bin/math_client.rb, line 43 def do_sum(stub) # to make client streaming requests, pass an enumerable of the inputs GRPC.logger.info('client_streamer') GRPC.logger.info('---------------') reqs = [1, 2, 3, 4, 5].map { |x| Math::Num.new(num: x) } GRPC.logger.info("sum(1, 2, 3, 4, 5): reqs=#{reqs.inspect}") resp = stub.sum(reqs) # reqs.is_a?(Enumerable) GRPC.logger.info("Answer: #{resp.inspect}") GRPC.logger.info('---------------') end
# File src/ruby/spec/generic/active_call_spec.rb, line 656 def expect_server_to_be_invoked(**kw) recvd_rpc = @server.request_call expect(recvd_rpc).to_not eq nil recvd_call = recvd_rpc.call recvd_call.run_batch(CallOps::SEND_INITIAL_METADATA => kw) ActiveCall.new(recvd_call, @pass_through, @pass_through, deadline, metadata_received: true, started: true) end
# File src/ruby/spec/generic/active_call_spec.rb, line 650 def expect_server_to_receive(sent_text, **kw) c = expect_server_to_be_invoked(**kw) expect(c.remote_read).to eq(sent_text) c end
# File src/ruby/spec/generic/rpc_desc_spec.rb, line 333 def fake_bidistream(an_array) an_array end
# File src/ruby/spec/generic/rpc_desc_spec.rb, line 204 def fake_clstream(_arg) end
# File src/ruby/spec/generic/rpc_desc_spec.rb, line 321 def fake_reqresp(_req, _call) @ok_response end
# File src/ruby/spec/generic/rpc_desc_spec.rb, line 207 def fake_svstream(_arg1, _arg2) end
# File src/ruby/spec/generic/rpc_desc_spec.rb, line 210 def fake_three_args(_arg1, _arg2, _arg3) end
# File src/ruby/spec/generic/client_stub_spec.rb, line 295 def get_response(stub, credentials: nil) puts credentials.inspect stub.request_response(@method, @sent_msg, noop, noop, metadata: @metadata, credentials: credentials) end
# File src/ruby/spec/generic/client_stub_spec.rb, line 534 def get_responses(stub, unmarshal: noop) e = stub.server_streamer(@method, @sent_msg, noop, unmarshal, metadata: @metadata) expect(e).to be_a(Enumerator) e end
# File src/ruby/spec/generic/active_call_spec.rb, line 36 def inner_call_of_active_call(active_call) active_call.instance_variable_get(:@call) end
# File src/ruby/bin/math_client.rb, line 81 def load_test_certs this_dir = File.expand_path(File.dirname(__FILE__)) data_dir = File.join(File.dirname(this_dir), 'spec/testdata') files = ['ca.pem', 'server1.key', 'server1.pem'] files.map { |f| File.open(File.join(data_dir, f)).read } end
# File src/ruby/bin/apis/pubsub_demo.rb, line 235 def main args = parse_args pub, sub = publisher_stub(args), subscriber_stub(args) NamedActions.new(pub, sub, args).method(args.action).call end
# File src/ruby/spec/call_spec.rb, line 173 def make_test_call @ch.create_call(nil, nil, 'dummy_method', nil, deadline) end
# File src/ruby/pb/test/server.rb, line 117 def maybe_echo_metadata(_call) # these are consistent for all interop tests initial_metadata_key = "x-grpc-test-echo-initial" trailing_metadata_key = "x-grpc-test-echo-trailing-bin" if _call.metadata.has_key?(initial_metadata_key) _call.metadata_to_send[initial_metadata_key] = _call.metadata[initial_metadata_key] end if _call.metadata.has_key?(trailing_metadata_key) _call.output_metadata[trailing_metadata_key] = _call.metadata[trailing_metadata_key] end end
# File src/ruby/pb/test/server.rb, line 131 def maybe_echo_status_and_message(req) unless req.response_status.nil? fail GRPC::BadStatus.new_status_exception( req.response_status.code, req.response_status.message) end end
# File src/ruby/spec/generic/client_stub_spec.rb, line 181 def metadata_test(md) server_port = create_test_server host = "localhost:#{server_port}" th = run_request_response(@sent_msg, @resp, @pass, expected_metadata: md) stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) @metadata = md expect(get_response(stub)).to eq(@resp) th.join end
# File src/ruby/spec/client_server_spec.rb, line 37 def new_client_call @ch.create_call(nil, nil, '/method', nil, deadline) end
# File src/ruby/spec/generic/rpc_desc_spec.rb, line 201 def no_arg end
# File src/ruby/spec/generic/rpc_desc_spec.rb, line 353 def not_implemented(_req, _call) fail not_implemented_error end
# File src/ruby/spec/generic/rpc_desc_spec.rb, line 357 def not_implemented_alt(_call) fail not_implemented_error end
# File src/ruby/spec/generic/rpc_desc_spec.rb, line 366 def not_implemented_error NotImplementedError.new('some OS feature not implemented') end
# File src/ruby/spec/generic/rpc_desc_spec.rb, line 370 def not_implemented_error_msg(error = nil) error ||= not_implemented_error "#{error.class}: #{error.message}" end
produces a string of null chars (0) of length l.
# File src/ruby/pb/test/client.rb, line 170 def nulls(l) fail 'requires #{l} to be +ve' if l < 0 [].pack('x' * l).force_encoding('ascii-8bit') end
# File src/ruby/spec/client_server_spec.rb, line 41 def ok_status Struct::Status.new(StatusCodes::OK, 'OK') end
# File src/ruby/spec/generic/rpc_desc_spec.rb, line 341 def other_error(_req, _call) fail(ArgumentError, 'other error') end
# File src/ruby/spec/generic/rpc_desc_spec.rb, line 349 def other_error_alt(_call) fail(ArgumentError, 'other error') end
validates the command line options, returning them as an Arg.
# File src/ruby/bin/apis/pubsub_demo.rb, line 197 def parse_args args = Args.new('pubsub-staging.googleapis.com', 443, 'list_some_topics', 'stoked-keyword-656') OptionParser.new do |opts| opts.on('--server_host SERVER_HOST', 'server hostname') do |v| args.host = v end opts.on('--server_port SERVER_PORT', 'server port') do |v| args.port = v end # instance_methods(false) gives only the methods defined in that class. scenes = NamedActions.instance_methods(false).map { |t| t.to_s } scene_list = scenes.join(',') opts.on("--action CODE", scenes, {}, 'pick a demo action', " (#{scene_list})") do |v| args.action = v end # Set the remaining values. %w(project_id topic_name sub_name).each do |o| opts.on("--#{o} VALUE", "#{o}") do |v| args[o] = v end end end.parse! _check_args(args) end
validates the command line options, returning them as a Hash.
# File src/ruby/pb/test/server.rb, line 215 def parse_options options = { 'port' => nil, 'secure' => false } OptionParser.new do |opts| opts.banner = 'Usage: --port port' opts.on('--port PORT', 'server port') do |v| options['port'] = v end opts.on('--use_tls USE_TLS', ['false', 'true'], 'require a secure connection?') do |v| options['secure'] = v == 'true' end end.parse! if options['port'].nil? fail(OptionParser::MissingArgument, 'please specify --port') end options end
creates SSL Credentials from the production certificates.
# File src/ruby/pb/test/client.rb, line 86 def prod_creds GRPC::Core::ChannelCredentials.new() end
Creates a stub for accessing the publisher service.
# File src/ruby/bin/apis/pubsub_demo.rb, line 54 def publisher_stub(opts) address = "#{opts.host}:#{opts.port}" stub_clz = Tech::Pubsub::PublisherService::Stub # shorter GRPC.logger.info("... access PublisherService at #{address}") call_creds = GRPC::Core::CallCredentials.new(auth_proc(opts)) combined_creds = ssl_creds.compose(call_creds) stub_clz.new(address, creds: combined_creds, GRPC::Core::Channel::SSL_TARGET => opts.host) end
# File src/ruby/spec/generic/client_stub_spec.rb, line 979 def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts, expected_metadata: {}, server_initial_md: {}, server_trailing_md: {}) wanted_metadata = expected_metadata.clone wakey_thread do |notifier| c = expect_server_to_be_invoked( notifier, metadata_to_send: server_initial_md) wanted_metadata.each do |k, v| expect(c.metadata[k.to_s]).to eq(v) end expected_inputs.each do |i| if client_starts expect(c.remote_read).to eq(i) c.remote_send(i) else c.remote_send(i) expect(c.remote_read).to eq(i) end end c.send_status(status, status == @pass ? 'OK' : 'NOK', true, metadata: server_trailing_md) close_active_server_call(c) end end
# File src/ruby/spec/generic/client_stub_spec.rb, line 968 def run_bidi_streamer_handle_inputs_first(expected_inputs, replys, status) wakey_thread do |notifier| c = expect_server_to_be_invoked(notifier) expected_inputs.each { |i| expect(c.remote_read).to eq(i) } replys.each { |r| c.remote_send(r) } c.send_status(status, status == @pass ? 'OK' : 'NOK', true) close_active_server_call(c) end end
# File src/ruby/spec/generic/client_stub_spec.rb, line 1005 def run_client_streamer(expected_inputs, resp, status, expected_metadata: {}, server_initial_md: {}, server_trailing_md: {}) wanted_metadata = expected_metadata.clone wakey_thread do |notifier| c = expect_server_to_be_invoked( notifier, metadata_to_send: server_initial_md) expected_inputs.each { |i| expect(c.remote_read).to eq(i) } wanted_metadata.each do |k, v| expect(c.metadata[k.to_s]).to eq(v) end c.remote_send(resp) c.send_status(status, status == @pass ? 'OK' : 'NOK', true, metadata: server_trailing_md) close_active_server_call(c) end end
# File src/ruby/spec/generic/client_stub_spec.rb, line 717 def run_error_in_client_request_stream_test(requests_to_push, expected_error_message) # start a server that waits on a read indefinitely - it should # see a cancellation and be able to break out th = Thread.new { run_server_bidi_send_one_then_read_indefinitely } stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) request_queue = Queue.new @sent_msgs = BidiErrorTestingEnumerateForeverQueue.new(request_queue) verify_error_from_write_thread(stub, requests_to_push, request_queue, expected_error_message) # the write loop errror should cancel the call and end the # server's request stream th.join end
# File src/ruby/spec/generic/client_stub_spec.rb, line 324 def run_op_view_metadata_test(run_start_call_first) server_port = create_test_server host = "localhost:#{server_port}" @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' } @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' } th = run_request_response( @sent_msg, @resp, @pass, expected_metadata: @metadata, server_initial_md: @server_initial_md, server_trailing_md: @server_trailing_md) stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) expect( get_response(stub, run_start_call_first: run_start_call_first)).to eq(@resp) th.join end
# File src/ruby/spec/generic/client_stub_spec.rb, line 1024 def run_request_response(expected_input, resp, status, expected_metadata: {}, server_initial_md: {}, server_trailing_md: {}) wanted_metadata = expected_metadata.clone wakey_thread do |notifier| c = expect_server_to_be_invoked( notifier, metadata_to_send: server_initial_md) expect(c.remote_read).to eq(expected_input) wanted_metadata.each do |k, v| expect(c.metadata[k.to_s]).to eq(v) end c.remote_send(resp) c.send_status(status, status == @pass ? 'OK' : 'NOK', true, metadata: server_trailing_md) close_active_server_call(c) end end
# File src/ruby/spec/generic/client_stub_spec.rb, line 898 def run_server_bidi_expect_client_to_cancel(wait_for_shutdown_ok_callback) @server.start recvd_rpc = @server.request_call recvd_call = recvd_rpc.call server_call = GRPC::ActiveCall.new( recvd_call, noop, noop, INFINITE_FUTURE, metadata_received: true, started: false) server_call.send_initial_metadata server_call.remote_send('server call received') wait_for_shutdown_ok_callback.call # since the client is cancelling the call, # we should be able to shut down cleanly @server.shutdown_and_notify(nil) @server.close end
# File src/ruby/spec/generic/client_stub_spec.rb, line 665 def run_server_bidi_send_one_then_read_indefinitely @server.start recvd_rpc = @server.request_call recvd_call = recvd_rpc.call server_call = GRPC::ActiveCall.new( recvd_call, noop, noop, INFINITE_FUTURE, metadata_received: true, started: false) server_call.send_initial_metadata server_call.remote_send('server response') loop do m = server_call.remote_read break if m.nil? end # can't fail since initial metadata already sent server_call.send_status(@pass, 'OK', true) close_active_server_call(server_call) end
# File src/ruby/spec/generic/client_stub_spec.rb, line 770 def run_server_bidi_shutdown_after_one_read @server.start recvd_rpc = @server.request_call recvd_call = recvd_rpc.call server_call = GRPC::ActiveCall.new( recvd_call, noop, noop, INFINITE_FUTURE, metadata_received: true, started: false) expect(server_call.remote_read).to eq('first message') @server.shutdown_and_notify(from_relative_time(0)) @server.close end
# File src/ruby/spec/generic/client_stub_spec.rb, line 808 def run_server_bidi_shutdown_after_one_write @server.start recvd_rpc = @server.request_call recvd_call = recvd_rpc.call server_call = GRPC::ActiveCall.new( recvd_call, noop, noop, INFINITE_FUTURE, metadata_received: true, started: false) server_call.send_initial_metadata server_call.remote_send('message') @server.shutdown_and_notify(from_relative_time(0)) @server.close end
# File src/ruby/spec/generic/client_stub_spec.rb, line 949 def run_server_streamer(expected_input, replys, status, expected_metadata: {}, server_initial_md: {}, server_trailing_md: {}) wanted_metadata = expected_metadata.clone wakey_thread do |notifier| c = expect_server_to_be_invoked( notifier, metadata_to_send: server_initial_md) wanted_metadata.each do |k, v| expect(c.metadata[k.to_s]).to eq(v) end expect(c.remote_read).to eq(expected_input) replys.each { |r| c.remote_send(r) } c.send_status(status, status == @pass ? 'OK' : 'NOK', true, metadata: server_trailing_md) close_active_server_call(c) end end
# File src/ruby/spec/generic/client_stub_spec.rb, line 497 def run_server_streamer_against_client_with_unmarshal_error( expected_input, replys) wakey_thread do |notifier| c = expect_server_to_be_invoked(notifier) expect(c.remote_read).to eq(expected_input) begin replys.each { |r| c.remote_send(r) } rescue GRPC::Core::CallError # An attempt to write to the client might fail. This is ok # because the client call is expected to fail when # unmarshalling the first response, and to cancel the call, # and there is a race as for when the server-side call will # start to fail. p 'remote_send failed (allowed because call expected to cancel)' ensure c.send_status(OK, 'OK', true) close_active_server_call(c) end end end
# File src/ruby/spec/generic/client_stub_spec.rb, line 64 def sanity_check_values_of_accessors(op_view, expected_metadata, expected_trailing_metadata) expected_status = Struct::Status.new expected_status.code = 0 expected_status.details = 'OK' expected_status.metadata = expected_trailing_metadata expect(op_view.status).to eq(expected_status) expect(op_view.metadata).to eq(expected_metadata) expect(op_view.trailing_metadata).to eq(expected_trailing_metadata) expect(op_view.cancelled?).to be(false) expect(op_view.write_flag).to be(nil) # The deadline attribute of a call can be either # a GRPC::Core::TimeSpec or a Time, which are mutually exclusive. # TODO: fix so that the accessor always returns the same type. expect(op_view.deadline.is_a?(GRPC::Core::TimeSpec) || op_view.deadline.is_a?(Time)).to be(true) end
# File src/ruby/spec/generic/active_call_spec.rb, line 29 def send_and_receive_close_and_status(client_call, server_call) client_call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) server_call.run_batch(CallOps::RECV_CLOSE_ON_SERVER => nil, CallOps::SEND_STATUS_FROM_SERVER => ok_status) client_call.run_batch(CallOps::RECV_STATUS_ON_CLIENT => nil) end
# File src/ruby/spec/client_server_spec.rb, line 27 def server_allows_client_to_proceed(metadata = {}) recvd_rpc = @server.request_call expect(recvd_rpc).to_not eq nil server_call = recvd_rpc.call ops = { CallOps::SEND_INITIAL_METADATA => metadata } server_batch = server_call.run_batch(ops) expect(server_batch.send_metadata).to be true server_call end
creates a SSL Credentials from the production certificates.
# File src/ruby/bin/apis/pubsub_demo.rb, line 43 def ssl_creds GRPC::Core::ChannelCredentials.new() end
# File src/ruby/spec/server_spec.rb, line 203 def start_a_server s = new_core_server_for_testing(nil) s.add_http2_port('0.0.0.0:0', :this_port_is_insecure) s.start s end
# File src/ruby/spec/channel_connection_spec.rb, line 21 def start_server(port = 0) @srv = new_rpc_server_for_testing(pool_size: 1) server_port = @srv.add_http2_port("localhost:#{port}", :this_port_is_insecure) @srv.handle(EchoService) @server_thd = Thread.new { @srv.run } @srv.wait_till_running server_port end
# File src/ruby/spec/channel_connection_spec.rb, line 30 def stop_server expect(@srv.stopped?).to be(false) @srv.stop @server_thd.join expect(@srv.stopped?).to be(true) end
Creates a stub for accessing the subscriber service.
# File src/ruby/bin/apis/pubsub_demo.rb, line 65 def subscriber_stub(opts) address = "#{opts.host}:#{opts.port}" stub_clz = Tech::Pubsub::SubscriberService::Stub # shorter GRPC.logger.info("... access SubscriberService at #{address}") call_creds = GRPC::Core::CallCredentials.new(auth_proc(opts)) combined_creds = ssl_creds.compose(call_creds) stub_clz.new(address, creds: combined_creds, GRPC::Core::Channel::SSL_TARGET => opts.host) end
# File src/ruby/bin/math_client.rb, line 88 def test_creds certs = load_test_certs GRPC::Core::ChannelCredentials.new(certs[0]) end
# File src/ruby/bin/math_server.rb, line 157 def test_server_creds certs = load_test_certs GRPC::Core::ServerCredentials.new( nil, [{ private_key: certs[1], cert_chain: certs[2] }], false) end
convert upper snake-case to camel case. e.g., DEADLINE_EXCEEDED -> DeadlineExceeded
# File src/ruby/spec/error_sanity_spec.rb, line 22 def upper_snake_to_camel(name) name.to_s.split('_').map(&:downcase).map(&:capitalize).join('') end
# File src/ruby/spec/generic/client_stub_spec.rb, line 683 def verify_error_from_write_thread(stub, requests_to_push, request_queue, expected_description) # TODO: an improvement might be to raise the original exception from # bidi call write loops instead of only cancelling the call failing_marshal_proc = proc do |req| fail req if req.is_a?(StandardError) req end begin e = get_responses(stub, marshal_proc: failing_marshal_proc) first_response = e.next expect(first_response).to eq('server response') requests_to_push.each { |req| request_queue.push(req) } e.collect { |r| r } rescue GRPC::Unknown => e exception = e end expect(exception.message.include?(expected_description)).to be(true) end
# File src/ruby/spec/generic/client_stub_spec.rb, line 19 def wakey_thread(&blk) n = GRPC::Notifier.new t = Thread.new do blk.call(n) end t.abort_on_exception = true n.wait t end