こんにちは、Wantedly の Infrastructure Team で Engineer をしている南(@south37)です。
今日は、WANTEDLY TECH BOOK 6 から「gRPC Internal」という章を抜粋して Blog にします。
「WANTEDLY TECH BOOK 1-7を一挙大公開」でも書いた通り、Wantedly では WANTEDLY TECH BOOK のうち最新版を除いた電子版を無料で配布する事にしました。Wantedly Engineer Blogでも過去記事の内容を順次公開予定であり、この Blog もその一環となっています。
これで、lib/echo_pb.rb が生成されました。この中で定義された Echo::EchoRequest という message class は、データを binary format へ serialize/deserialize す る為に利用することができます。このように、Protocol Buffers 単体でも gRPC と似たステップで簡単に利用することができます。
リスト8.15 lib/echo_pb.rb
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: echo.proto
require 'google/protobuf'
Google::Protobuf::DescriptorPool.generated_pool.build do
add_message "echo.EchoRequest" do
optional :message, :string, 1
end
end
module Echo
EchoRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("echo.EchoRequest").msgclass
end
# lib/echo_services_pb.rb
1 # Generated by the protocol buffer compiler. DO NOT EDIT!
2 # Source: echo.proto for package 'echo'
3
4 require 'grpc'
5 require 'echo_pb'
6
7 module Echo
8 module Echo
9 class Service
10
11 include GRPC::GenericService
12
13 self.marshal_class_method = :encode
14 self.unmarshal_class_method = :decode
15 self.service_name = 'echo.Echo'
16
17 rpc :echo, EchoRequest, EchoResponse
18 end
19
20 Stub = Service.rpc_stub_class
21 end
22 end
リスト8.19 再掲:「gRPC の使い方」の lib/echo_client.rb
# lib/echo_client.rb
12 def main
13 stub = Echo::Echo::Stub.new('localhost:50051', :this_channel_is_insecure)
14 m = ARGV.size > 0 ? ARGV[0] : 'hello'
15 message = stub.echo(Echo::EchoRequest.new(message: m)).message
16 print "echo response: \"#{message}\"\n"
17 end
18
19 main
client からは、Service.rpc_stub_class method によって作られた Stub class が使われています。Service の中では、Service.rpc method によって echo という service method が定義されています。まずは、この Service.rpc と Service.rpc_stub_class に注目してみます。
Service class は GRPC::GenericService を include しており、.rpc と .rpc_stub_class もこの中で定義されています(厳密には、Dsl という module の中でこれらは定義されていて、include のタイミングで extend されています)。.rpc は次のような定義になっています。注目するのは 95 行目で、ここで name, input, output の情報を集約した RpcDesc という class の object を作り、rpc_descs という hash に格納しています。RpcDesc は重要な class で、client と server 両方の処理において利用されます。詳細はまた後で説明します。
リスト8.20 lib/grpc/generic/service.rb
79 # Adds an RPC spec.
80 #
81 # Takes the RPC name and the classes representing the types to be
82 # serialized, and adds them to the including classes rpc_desc hash.
83 #
84 # input and output should both have the methods #marshal and #unmarshal
85 # that are responsible for writing and reading an object instance from a
86 # byte buffer respectively.
87 #
88 # @param name [String] the name of the rpc
89 # @param input [Object] the input parameter's class
90 # @param output [Object] the output parameter's class
91 def rpc(name, input, output)
92 fail(DuplicateRpcName, name) if rpc_descs.key? name
93 assert_can_marshal(input)
94 assert_can_marshal(output)
95 rpc_descs[name] = RpcDesc.new(name, input, output,
96 marshal_class_method,
97 unmarshal_class_method)
98 define_method(GenericService.underscore(name.to_s).to_sym) do |*|
99 fail GRPC::BadStatus.new_status_exception(
100 GRPC::Core::StatusCodes::UNIMPLEMENTED)
101 end
102 end
リスト8.21 lib/grpc/generic/service.rb
140 # the RpcDescs defined for this GenericService, keyed by name.
141 def rpc_descs
142 @rpc_descs ||= {}
143 end
.rpc がやっていることはそれ以外には引数の値のチェックくらいで、特筆すべきものはありません。ただし、98 行目で define_method で「service method のデフォルト実装を与えていること」は知っておくと良いと思います。これは server 実装において利用されます。「gRPC の使い方」で説明した様に、gRPC の server を実装する際は、 GRPC::GenericService を include した Service class を継承して、server class を用意するのでした。その際は、上記のデフォルト実装を override する形で「service method の実装を行う」ことになっています。その為、たとえば .proto file に記述されているのにまだ実装されていない service method へ client から request した場合 には、100 行目のGRPC::Core::StatusCodes::UNIMPLEMENTED という error code が返されることになります。
リスト8.22 再掲:「gRPC の使い方」の lib/echo_server.rb
13 # EchoServer is simple server that implements the Echo::Echo service.
14 class EchoServer < Echo::Echo::Service
15 EchoLogger = Logger.new(STDOUT)
16
17 def echo(echo_req, _unused_call) # ここで `Echo::Echo::Service` で定義された `echo` method を override
18 EchoLogger.info("echo \"#{echo_req.message}\"")
19 Echo::EchoResponse.new(message: echo_req.message)
20 end
21 end
145 # Creates a rpc client class with methods for accessing the methods
146 # currently in rpc_descs.
147 def rpc_stub_class
148 descs = rpc_descs
149 route_prefix = service_name
150 Class.new(ClientStub) do
151 # @param host [String] the host the stub connects to
152 # @param creds [Core::ChannelCredentials|Symbol] The channel
153 # credentials to use, or :this_channel_is_insecure otherwise
154 # @param kw [KeywordArgs] the channel arguments, plus any optional
155 # args for configuring the client's channel
156 def initialize(host, creds, **kw)
157 super(host, creds, **kw)
158 end
159
160 # Used define_method to add a method for each rpc_desc. Each method
161 # calls the base class method for the given descriptor.
162 descs.each_pair do |name, desc|
163 mth_name = GenericService.underscore(name.to_s).to_sym
164 marshal = desc.marshal_proc
165 unmarshal = desc.unmarshal_proc(:output)
166 route = "/#{route_prefix}/#{name}"
167 if desc.request_response?
168 define_method(mth_name) do |req, metadata = {}|
169 GRPC.logger.debug("calling #{@host}:#{route}")
170 request_response(route, req, marshal, unmarshal, metadata)
171 end
172 elsif desc.client_streamer?
173 define_method(mth_name) do |reqs, metadata = {}|
174 GRPC.logger.debug("calling #{@host}:#{route}")
175 client_streamer(route, reqs, marshal, unmarshal, metadata)
176 end
177 elsif desc.server_streamer?
178 define_method(mth_name) do |req, metadata = {}, &blk|
179 GRPC.logger.debug("calling #{@host}:#{route}")
180 server_streamer(route, req, marshal, unmarshal, metadata, &blk)
181 end
182 else # is a bidi_stream
183 define_method(mth_name) do |reqs, metadata = {}, &blk|
184 GRPC.logger.debug("calling #{@host}:#{route}")
185 bidi_streamer(route, reqs, marshal, unmarshal, metadata, &blk)
186 end
187 end
188 end
189 end
190 end
111 # request_response sends a request to a GRPC server, and returns the
112 # response.
113 #
114 # == Flow Control ==
115 # This is a blocking call.
116 #
117 # * it does not return until a response is received.
118 #
119 # * the requests is sent only when GRPC cores flow control allows it to
120 # be sent.
121 #
122 # == Errors ==
123 # An RuntimeError is raised if
124 #
125 # * the server responds with a non-OK status
126 #
127 # * the deadline is exceeded
128 #
129 # == Return Value ==
130 #
131 # If return_op is false, the call returns the response
132 #
133 # If return_op is true, the call returns an Operation, calling execute
134 # on the Operation returns the response.
135 #
136 # @param method [String] the RPC method to call on the GRPC server
137 # @param req [Object] the request sent to the server
138 # @param marshal [Function] f(obj)->string that marshals requests
139 # @param unmarshal [Function] f(string)->obj that unmarshals responses
140 # @param deadline [Time] (optional) the time the request should complete
141 # @param return_op [true|false] return an Operation if true
142 # @param parent [Core::Call] a prior call whose reserved metadata
143 # will be propagated by this one.
144 # @param credentials [Core::CallCredentials] credentials to use when making
145 # the call
146 # @param metadata [Hash] metadata to be sent to the server
147 # @return [Object] the response received from the server
148 def request_response(method, req, marshal, unmarshal,
149 deadline: nil,
150 return_op: false,
151 parent: nil,
152 credentials: nil,
153 metadata: {})
154 c = new_active_call(method, marshal, unmarshal,
155 deadline: deadline,
156 parent: parent,
157 credentials: credentials)
158 interception_context = @interceptors.build_context
159 intercept_args = {
160 method: method,
161 request: req,
162 call: c.interceptable,
163 metadata: metadata
164 }
165 if return_op
166 # return the operation view of the active_call; define #execute as a
167 # new method for this instance that invokes #request_response.
168 c.merge_metadata_to_send(metadata)
169 op = c.operation
170 op.define_singleton_method(:execute) do
171 interception_context.intercept!(:request_response, intercept_args) do
172 c.request_response(req, metadata: metadata)
173 end
174 end
175 op
176 else
177 interception_context.intercept!(:request_response, intercept_args) do
178 c.request_response(req, metadata: metadata)
179 end
180 end
181 end
476 # Creates a new active stub
477 #
478 # @param method [string] the method being called.
479 # @param marshal [Function] f(obj)->string that marshals requests
480 # @param unmarshal [Function] f(string)->obj that unmarshals responses
481 # @param parent [Grpc::Call] a parent call, available when calls are
482 # made from server
483 # @param credentials [Core::CallCredentials] credentials to use when making
484 # the call
485 def new_active_call(method, marshal, unmarshal,
486 deadline: nil,
487 parent: nil,
488 credentials: nil)
489 deadline = from_relative_time(@timeout) if deadline.nil?
490 # Provide each new client call with its own completion queue
491 call = @ch.create_call(parent, # parent call
492 @propagate_mask, # propagation options
493 method,
494 nil, # host use nil,
495 deadline)
496 call.set_credentials! credentials unless credentials.nil?
497 ActiveCall.new(call, marshal, unmarshal, deadline,
498 started: false)
499 end
Channel と Call という新しい 2 つの class が出てきました。実は、この2つは gRPC において非常に重要なものです。gRPC の C-core はその機能を C の struct およ び function として提供しているのですが、Channel と Call は C-core から提供される grpc_channel と grpc_call という C の struct をそれぞれラップしたものとなっています。その為、どちらも gem の native extention として定義されています。
61 /** The Channel interface allows creation of Call objects. */
62 typedef struct grpc_channel grpc_channel;
.
.
.
67 /** A Call represents an RPC. When created, it is in a configuration state
68 allowing properties to be set until it is invoked. After invoke, the Call
69 can have messages written to it and read from it. */
70 typedef struct grpc_call grpc_call;
343 # request_response sends a request to a GRPC server, and returns the
344 # response.
345 #
346 # @param req [Object] the request sent to the server
347 # @param metadata [Hash] metadata to be sent to the server. If a value is
348 # a list, multiple metadata for its key are sent
349 # @return [Object] the response received from the server
350 def request_response(req, metadata: {})
351 raise_error_if_already_executed
352 ops = {
353 SEND_MESSAGE => @marshal.call(req),
354 SEND_CLOSE_FROM_CLIENT => nil,
355 RECV_INITIAL_METADATA => nil,
356 RECV_MESSAGE => nil,
357 RECV_STATUS_ON_CLIENT => nil
358 }
359 @send_initial_md_mutex.synchronize do
360 # Metadata might have already been sent if this is an operation view
361 unless @metadata_sent
362 ops[SEND_INITIAL_METADATA] = @metadata_to_send.merge!(metadata)
363 end
364 @metadata_sent = true
365 end
366
367 begin
368 batch_result = @call.run_batch(ops)
369 # no need to check for cancellation after a CallError because this
370 # batch contains a RECV_STATUS op
371 ensure
372 set_input_stream_done
373 set_output_stream_done
374 end
375
376 @call.metadata = batch_result.metadata
377 attach_status_results_and_complete_call(batch_result)
378 get_message_from_batch_result(batch_result)
379 end
511 typedef enum {
512 /** Send initial metadata: one and only one instance MUST be sent for each
513 call, unless the call was cancelled - in which case this can be skipped.
514 This op completes after all bytes of metadata have been accepted by
515 outgoing flow control. */
516 GRPC_OP_SEND_INITIAL_METADATA = 0,
517 /** Send a message: 0 or more of these operations can occur for each call.
518 This op completes after all bytes for the message have been accepted by
519 outgoing flow control. */
520 GRPC_OP_SEND_MESSAGE,
521 /** Send a close from the client: one and only one instance MUST be sent from
522 the client, unless the call was cancelled - in which case this can be
523 skipped. This op completes after all bytes for the call
524 (including the close) have passed outgoing flow control. */
525 GRPC_OP_SEND_CLOSE_FROM_CLIENT,
526 /** Send status from the server: one and only one instance MUST be sent from
527 the server unless the call was cancelled - in which case this can be
528 skipped. This op completes after all bytes for the call
529 (including the status) have passed outgoing flow control. */
530 GRPC_OP_SEND_STATUS_FROM_SERVER,
531 /** Receive initial metadata: one and only one MUST be made on the client,
532 must not be made on the server.
533 This op completes after all initial metadata has been read from the
534 peer. */
535 GRPC_OP_RECV_INITIAL_METADATA,
536 /** Receive a message: 0 or more of these operations can occur for each call.
537 This op completes after all bytes of the received message have been
538 read, or after a half-close has been received on this call. */
539 GRPC_OP_RECV_MESSAGE,
540 /** Receive status on the client: one and only one must be made on the client.
541 This operation always succeeds, meaning ops paired with this operation
542 will also appear to succeed, even though they may not have. In that case
543 the status will indicate some failure.
544 This op completes after all activity on the call has completed. */
545 GRPC_OP_RECV_STATUS_ON_CLIENT,
546 /** Receive close on the server: one and only one must be made on the
547 server. This op completes after the close has been received by the
548 server. This operation always succeeds, meaning ops paired with
549 this operation will also appear to succeed, even though they may not
550 have. */
551 GRPC_OP_RECV_CLOSE_ON_SERVER
552 } grpc_op_type;
ops が送信される実態である Call#run_batch についても少し見ておきます。これは ext/grpc/rb_call.c の中で grpc_rb_call_run_batch という c function として定義されています。
795-811 行目のコメントで説明がある様に、これは operation を post して、その完了を待つものになっています。少々長いですが、注目するべき処理は 841行目の grpc_call_start_batch という function call と、852 行目の rb_completion_queue_pluck という function call だけです。grpc_call_start_batch は C-core によって提供 されている function で、ops の実行を開始します。rb_completion_queue_pluck は grpc gem の中で定義されている function ですが、内部では C-core が提供する grpc_completion_queue_pluck という function を呼び出しています。
ここで、completion queue というワードが新たに登場しました。これは、gRPC の C-core が結果を受け取るために用意しているインターフェースです。grpc_completion_queue という struct が queue として用意されていて、operation が完了するとその通知が queue に post されます。grpc_rb_call_run_batch は rb_completion_queue_pluck を呼び出して、ops 完了の待ち合わせを行なっています。そして、859 行目で ops の結果を result として受け取り、862 行目でその result を return しています。
リスト8.30 ext/grpc/rb_call.c
795 /* call-seq:
796 ops = {
797 GRPC::Core::CallOps::SEND_INITIAL_METADATA => <op_value>,
798 GRPC::Core::CallOps::SEND_MESSAGE => <op_value>,
799 ...
800 }
801 tag = Object.new
802 timeout = 10
803 call.start_batch(tag, timeout, ops)
804
805 Start a batch of operations defined in the array ops; when complete, post a
806 completion of type 'tag' to the completion queue bound to the call.
807
808 Also waits for the batch to complete, until timeout is reached.
809 The order of ops specified in the batch has no significance.
810 Only one operation of each type can be active at once in any given
811 batch */
812 static VALUE grpc_rb_call_run_batch(VALUE self, VALUE ops_hash) {
813 run_batch_stack* st = NULL;
814 grpc_rb_call* call = NULL;
815 grpc_event ev;
816 grpc_call_error err;
817 VALUE result = Qnil;
818 VALUE rb_write_flag = rb_ivar_get(self, id_write_flag);
819 unsigned write_flag = 0;
820 void* tag = (void*)&st;
821
822 grpc_ruby_fork_guard();
823 if (RTYPEDDATA_DATA(self) == NULL) {
824 rb_raise(grpc_rb_eCallError, "Cannot run batch on closed call");
825 return Qnil;
826 }
827 TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
828
829 /* Validate the ops args, adding them to a ruby array */
830 if (TYPE(ops_hash) != T_HASH) {
831 rb_raise(rb_eTypeError, "call#run_batch: ops hash should be a hash");
832 return Qnil;
833 }
834 if (rb_write_flag != Qnil) {
835 write_flag = NUM2UINT(rb_write_flag);
836 }
837 st = gpr_malloc(sizeof(run_batch_stack));
838 grpc_run_batch_stack_init(st, write_flag);
839 grpc_run_batch_stack_fill_ops(st, ops_hash);
840
841 /* call grpc_call_start_batch, then wait for it to complete using
842 * pluck_event */
843 err = grpc_call_start_batch(call->wrapped, st->ops, st->op_num, tag, NULL);
844 if (err != GRPC_CALL_OK) {
845 grpc_run_batch_stack_cleanup(st);
846 gpr_free(st);
847 rb_raise(grpc_rb_eCallError,
848 "grpc_call_start_batch failed with %s (code=%d)",
849 grpc_call_error_detail_of(err), err);
850 return Qnil;
851 }
852 ev = rb_completion_queue_pluck(call->queue, tag,
853 gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
854 if (!ev.success) {
855 rb_raise(grpc_rb_eCallError, "call#run_batch failed somehow");
856 }
857 /* Build and return the BatchResult struct result,
858 if there is an error, it's reflected in the status */
859 result = grpc_run_batch_stack_build_result(st);
860 grpc_run_batch_stack_cleanup(st);
861 gpr_free(st);
862 return result;
863 }
262 def get_message_from_batch_result(recv_message_batch_result)
263 unless recv_message_batch_result.nil? ||
264 recv_message_batch_result.message.nil?
265 return @unmarshal.call(recv_message_batch_result.message)
266 end
267 GRPC.logger.debug('found nil; the final response has been sent')
268 nil
269 end
14 class EchoServer < Echo::Echo::Service
15 EchoLogger = Logger.new(STDOUT)
16
17 def echo(echo_req, _unused_call)
18 EchoLogger.info("echo \"#{echo_req.message}\"")
19 Echo::EchoResponse.new(message: echo_req.message)
20 end
21 end
22
23 # main starts an RpcServer that receives requests to EchoServer at the sample
24 # server port.
25 def main
26 s = GRPC::RpcServer.new
27 s.add_http2_port('0.0.0.0:50051', :this_port_is_insecure)
28 logger = Logger.new(STDOUT)
29 logger.info("... running insecurely on 0.0.0.0:50051")
30 s.handle(EchoServer)
31 # Runs the server with SIGHUP, SIGINT and SIGQUIT signal handlers to
32 # gracefully shutdown.
33 s.run_till_terminated_or_interrupted(['SIGHUP', 'SIGINT', 'SIGQUIT'])
34 end
35
36 main
301 # handle registration of classes
302 #
303 # service is either a class that includes GRPC::GenericService and whose
304 # #new function can be called without argument or any instance of such a
305 # class.
306 #
307 # E.g, after
308 #
309 # class Divider
310 # include GRPC::GenericService
311 # rpc :div DivArgs, DivReply # single request, single response
312 # def initialize(optional_arg='default option') # no args
313 # ...
314 # end
315 #
316 # srv = GRPC::RpcServer.new(...)
317 #
318 # # Either of these works
319 #
320 # srv.handle(Divider)
321 #
322 # # or
323 #
324 # srv.handle(Divider.new('replace optional arg'))
325 #
326 # It raises RuntimeError:
327 # - if service is not valid service class or object
328 # - its handler methods are already registered
329 # - if the server is already running
330 #
331 # @param service [Object|Class] a service class or object as described
332 # above
333 def handle(service)
334 @run_mutex.synchronize do
335 unless @running_state == :not_started
336 fail 'cannot add services if the server has been started'
337 end
338 cls = service.is_a?(Class) ? service : service.class
339 assert_valid_service_class(cls)
340 add_rpc_descs_for(service)
341 end
342 end
#add_rpc_descs_for は次のような実装になっています。まず注目するのは 537 行目と 541 行目で、ここで rps_specs へ RpcDesc class の object をセットしていま す。この RpcDesc class は、client 実装で登場したのと同じものです。
次に注目するのは 543-547 行目で、ここで service class に定義された method を handler として登録しています。
面白いのは method メソッドが利用されていることです。これは、「既存の object の method から Method class の object を作るメソッド」で、Method class の object は Proc などと同様に #call で内部の処理を呼び出すことができます。つまり、ここで handlers[route] に格納した処理が適切なタイミングで call されて、 そこで gRPC の利用者(開発者)が定義した service method が呼ばれる訳です。なお、#add_rpc_descs_for の 543-547 行目の処理から分かる様に、実は GRPC::RpcServer#handle には class では無く object を渡すこともできます。必要になった際にはこの機能を活用してみてください。
リスト8.35 lib/grpc/generic/rpc_server.rb
534 # This should be called while holding @run_mutex
535 def add_rpc_descs_for(service)
536 cls = service.is_a?(Class) ? service : service.class
537 specs, handlers = (@rpc_descs ||= {}), (@rpc_handlers ||= {})
538 cls.rpc_descs.each_pair do |name, spec|
539 route = "/#{cls.service_name}/#{name}".to_sym
540 fail "already registered: rpc #{route} from #{spec}" if specs.key? route
541 specs[route] = spec
542 rpc_name = GenericService.underscore(name.to_s).to_sym
543 if service.is_a?(Class)
544 handlers[route] = cls.new.method(rpc_name)
545 else
546 handlers[route] = service.method(rpc_name)
547 end
548 GRPC.logger.info("handling #{route} with #{handlers[route]}")
549 end
550 end
service class が handler に登録されるまでの処理はわかりました。次は、GRPC::RpcServer#s.run_till_terminated_or_interrupted で server を running にする処理を 見てみます。
GRPC::RpcServer#s.run_till_terminated_or_interrupted は次のような実装になっています。392-411 行目で signal handler を登録したあとで、413 行目の #run で server が running になります。
リスト8.37 lib/grpc/generic/rpc_server.rb
364 # runs the server with signal handlers
365 # @param signals
366 # List of String, Integer or both representing signals that the user
367 # would like to send to the server for graceful shutdown
368 # @param wait_interval (optional)
369 # Integer seconds that user would like stop_server_thread to poll
370 # stop_server
371 def run_till_terminated_or_interrupted(signals, wait_interval = 60)
372 @stop_server = false
373 @stop_server_mu = Mutex.new
374 @stop_server_cv = ConditionVariable.new
375
376 @stop_server_thread = Thread.new do
377 loop do
378 break if @stop_server
379 @stop_server_mu.synchronize do
380 @stop_server_cv.wait(@stop_server_mu, wait_interval)
381 end
382 end
383
384 # stop is surrounded by mutex, should handle multiple calls to stop
385 # correctly
386 stop
387 end
388
389 valid_signals = Signal.list
390
391 # register signal handlers
392 signals.each do |sig|
393 # input validation
394 if sig.class == String
395 sig.upcase!
396 if sig.start_with?('SIG')
397 # cut out the SIG prefix to see if valid signal
398 sig = sig[3..-1]
399 end
400 end
401
402 # register signal traps for all valid signals
403 if valid_signals.value?(sig) || valid_signals.key?(sig)
404 Signal.trap(sig) do
405 @stop_server = true
406 @stop_server_cv.broadcast
407 end
408 else
409 fail "#{sig} not a valid signal"
410 end
411 end
412
413 run
414
415 @stop_server_thread.join
416 end
344 # runs the server
345 #
346 # - if no rpc_descs are registered, this exits immediately, otherwise it
347 # continues running permanently and does not return until program exit.
348 #
349 # - #running? returns true after this is called, until #stop cause the
350 # the server to stop.
351 def run
352 @run_mutex.synchronize do
353 fail 'cannot run without registering services' if rpc_descs.size.zero?
354 @pool.start
355 @server.start
356 transition_running_state(:running)
357 @run_cond.broadcast
358 end
359 loop_handle_server_calls
360 end
448 # handles calls to the server
449 def loop_handle_server_calls
450 fail 'not started' if running_state == :not_started
451 while running_state == :running
452 begin
453 an_rpc = @server.request_call
454 break if (!an_rpc.nil?) && an_rpc.call.nil?
455 active_call = new_active_server_call(an_rpc)
456 unless active_call.nil?
457 @pool.schedule(active_call) do |ac|
458 c, mth = ac
459 begin
460 rpc_descs[mth].run_server_method(
461 c,
462 rpc_handlers[mth],
463 @interceptors.build_context
464 )
465 rescue StandardError
466 c.send_status(GRPC::Core::StatusCodes::INTERNAL,
467 'Server handler failed')
468 end
469 end
470 end
471 rescue Core::CallError, RuntimeError => e
472 # these might happen for various reasons. The correct behavior of
473 # the server is to log them and continue, if it's not shutting down.
474 if running_state == :running
475 GRPC.logger.warn("server call failed: #{e}")
476 end
477 next
478 end
479 end
480 # @running_state should be :stopping here
481 @run_mutex.synchronize do
482 transition_running_state(:stopped)
483 GRPC.logger.info("stopped: #{self}")
484 @server.close
485 end
486 end
110 ##
111 # @param [GRPC::ActiveCall] active_call The current active call object
112 # for the request
113 # @param [Method] mth The current RPC method being called
114 # @param [GRPC::InterceptionContext] inter_ctx The interception context
115 # being executed
116 #
117 def run_server_method(active_call, mth, inter_ctx = InterceptionContext.new)
118 # While a server method is running, it might be cancelled, its deadline
119 # might be reached, the handler could throw an unknown error, or a
120 # well-behaved handler could throw a StatusError.
121 if request_response?
122 handle_request_response(active_call, mth, inter_ctx)
123 elsif client_streamer?
124 handle_client_streamer(active_call, mth, inter_ctx)
125 elsif server_streamer?
126 handle_server_streamer(active_call, mth, inter_ctx)
127 else # is a bidi_stream
128 handle_bidi_streamer(active_call, mth, inter_ctx)
129 end
130 rescue BadStatus => e
131 # this is raised by handlers that want GRPC to send an application error
132 # code and detail message and some additional app-specific metadata.
133 GRPC.logger.debug("app err:#{active_call}, status:#{e.code}:#{e.details}")
134 send_status(active_call, e.code, e.details, e.metadata)
135 rescue Core::CallError => e
136 # This is raised by GRPC internals but should rarely, if ever happen.
137 # Log it, but don't notify the other endpoint..
138 GRPC.logger.warn("failed call: #{active_call}\n#{e}")
139 rescue Core::OutOfTime
140 # This is raised when active_call#method.call exceeds the deadline
141 # event. Send a status of deadline exceeded
142 GRPC.logger.warn("late call: #{active_call}")
143 send_status(active_call, DEADLINE_EXCEEDED, 'late')
144 rescue StandardError, NotImplementedError => e
145 # This will usuaally be an unhandled error in the handling code.
146 # Send back a UNKNOWN status to the client
147 #
148 # Note: this intentionally does not map NotImplementedError to
149 # UNIMPLEMENTED because NotImplementedError is intended for low-level
150 # OS interaction (e.g. syscalls) not supported by the current OS.
151 GRPC.logger.warn("failed handler: #{active_call}; sending status:UNKNOWN")
152 GRPC.logger.warn(e)
153 send_status(active_call, UNKNOWN, "#{e.class}: #{e.message}")
154 end
A channel filter defines how operations on a channel are implemented. Channel filters are chained together to create full channels, and if those chains are linear, then channel stacks provide a mechanism to minimize allocations for that chain. Call stacks are created by channel stacks and represent the per-call data for that stack.
gRPC の transport は、grpc_transport_vtable という struct の function としていくつかの function を実装している必要があります(この struct は function pointer を保持するコンテナとなっています)。もっとも重要なものは、perform_stream_op と呼ばれる function です。function signature は次のようなものなります。
transport 実装は、このそれぞれの stream operation を適切に処理することが求められます。
その他、次の function も transport では定義されます。また、cancellations などをうまく処理することも求められる様です。
perform_transport_op
init_stream
destroy_stream, destroy_transport
set_pollset, set_pollset_set, get_endpoint
ここまで、gRPC が transport 実装に求めるものについて概観しました。gRPC は transport 実装に期待する振る舞いを src/core/lib/transport/transport_impl.h の中で grpc_transport_vtable という struct として定義しており、その振る舞いを実装さえすれば transport 実装を差し替えることが可能です。
gRPC の pluggable な transport という特徴がどう実現されているのか、イメージが掴めたかと思います。