blob: da0f6503db373cf1bd9568098b986bf62fde0b85 [file] [log] [blame]
Craig Tiller6169d5f2016-03-31 07:46:18 -07001# Copyright 2015, Google Inc.
nnoble097ef9b2014-12-01 17:06:10 -08002# All rights reserved.
3#
4# Redistribution and use in source and binary forms, with or without
5# modification, are permitted provided that the following conditions are
6# met:
7#
8# * Redistributions of source code must retain the above copyright
9# notice, this list of conditions and the following disclaimer.
10# * Redistributions in binary form must reproduce the above
11# copyright notice, this list of conditions and the following disclaimer
12# in the documentation and/or other materials provided with the
13# distribution.
14# * Neither the name of Google Inc. nor the names of its
15# contributors may be used to endorse or promote products derived from
16# this software without specific prior written permission.
17#
18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
murgatroid9979108d02016-04-22 13:41:00 -070030require_relative '../grpc'
31require_relative 'active_call'
32require_relative 'service'
nnoble097ef9b2014-12-01 17:06:10 -080033require 'thread'
Alexander Polcyn9c744872016-08-12 14:58:10 -070034require 'concurrent'
nnoble097ef9b2014-12-01 17:06:10 -080035
Tim Emiola7e7911f2015-02-17 18:28:23 -080036# GRPC contains the General RPC module.
37module GRPC
38 # RpcServer hosts a number of services and makes them available on the
39 # network.
40 class RpcServer
Tim Emiolab22a21e2015-04-02 21:45:26 -070041 include Core::CallOps
Tim Emiola7e7911f2015-02-17 18:28:23 -080042 include Core::TimeConsts
43 extend ::Forwardable
nnoble097ef9b2014-12-01 17:06:10 -080044
Tim Emiola7e7911f2015-02-17 18:28:23 -080045 def_delegators :@server, :add_http2_port
nnoble097ef9b2014-12-01 17:06:10 -080046
Alexander Polcyn9c744872016-08-12 14:58:10 -070047 # Default max size of the thread pool size is 100
48 DEFAULT_MAX_POOL_SIZE = 100
nnoble097ef9b2014-12-01 17:06:10 -080049
Alexander Polcyn9c744872016-08-12 14:58:10 -070050 # Default minimum size of the thread pool is 5
51 DEFAULT_MIN_POOL_SIZE = 5
52
53 # Default max_waiting_requests size is 60
54 DEFAULT_MAX_WAITING_REQUESTS = 60
nnoble097ef9b2014-12-01 17:06:10 -080055
Tim Emiola95584602015-04-16 12:53:07 -070056 # Default poll period is 1s
57 DEFAULT_POLL_PERIOD = 1
58
59 # Signal check period is 0.25s
60 SIGNAL_CHECK_PERIOD = 0.25
61
Tim Emiola3fd2be22015-04-16 17:43:59 -070062 # setup_connect_md_proc is used by #initialize to validate the
63 # connect_md_proc.
64 def self.setup_connect_md_proc(a_proc)
65 return nil if a_proc.nil?
66 fail(TypeError, '!Proc') unless a_proc.is_a? Proc
67 a_proc
68 end
69
Tim Emiola7e7911f2015-02-17 18:28:23 -080070 # Creates a new RpcServer.
71 #
72 # The RPC server is configured using keyword arguments.
73 #
74 # There are some specific keyword args used to configure the RpcServer
murgatroid99580a64a2016-07-12 13:54:09 -070075 # instance.
Tim Emiola7e7911f2015-02-17 18:28:23 -080076 #
Alexander Polcyn9c744872016-08-12 14:58:10 -070077 # * pool_size: the maximum size of the thread pool that the server's
78 # thread pool can reach.
Tim Emiola7e7911f2015-02-17 18:28:23 -080079 #
Tim Emiola7e7911f2015-02-17 18:28:23 -080080 # * max_waiting_requests: the maximum number of requests that are not
81 # being handled to allow. When this limit is exceeded, the server responds
82 # with not available to new requests
Tim Emiola3fd2be22015-04-16 17:43:59 -070083 #
murgatroid99580a64a2016-07-12 13:54:09 -070084 # * poll_period: when present, the server polls for new events with this
85 # period
86 #
Tim Emiola3fd2be22015-04-16 17:43:59 -070087 # * connect_md_proc:
88 # when non-nil is a proc for determining metadata to to send back the client
89 # on receiving an invocation req. The proc signature is:
90 # {key: val, ..} func(method_name, {key: val, ...})
murgatroid99b19f1812016-05-16 12:21:39 -070091 #
92 # * server_args:
93 # A server arguments hash to be passed down to the underlying core server
Alexander Polcyn9c744872016-08-12 14:58:10 -070094 def initialize(pool_size:DEFAULT_MAX_POOL_SIZE,
95 min_pool_size:DEFAULT_MIN_POOL_SIZE,
Tim Emiola7e7911f2015-02-17 18:28:23 -080096 max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS,
Tim Emiola95584602015-04-16 12:53:07 -070097 poll_period:DEFAULT_POLL_PERIOD,
Tim Emiola3fd2be22015-04-16 17:43:59 -070098 connect_md_proc:nil,
murgatroid99b19f1812016-05-16 12:21:39 -070099 server_args:{})
Tim Emiola3fd2be22015-04-16 17:43:59 -0700100 @connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc)
Tim Emiola7e7911f2015-02-17 18:28:23 -0800101 @max_waiting_requests = max_waiting_requests
102 @poll_period = poll_period
Alexander Polcyn9c744872016-08-12 14:58:10 -0700103
104 @pool = Concurrent::ThreadPoolExecutor.new(
105 min_threads: [min_pool_size, pool_size].min,
106 max_threads: pool_size,
107 max_queue: max_waiting_requests,
108 fallback_policy: :discard)
Tim Emiola4aba3562015-05-22 17:25:49 -0700109 @run_cond = ConditionVariable.new
110 @run_mutex = Mutex.new
murgatroid99d48d84d2016-03-09 11:10:20 -0800111 # running_state can take 4 values: :not_started, :running, :stopping, and
112 # :stopped. State transitions can only proceed in that order.
113 @running_state = :not_started
murgatroid995ea4a992016-06-13 10:36:41 -0700114 @server = Core::Server.new(server_args)
Tim Emiola7e7911f2015-02-17 18:28:23 -0800115 end
116
117 # stops a running server
118 #
119 # the call has no impact if the server is already stopped, otherwise
120 # server's current call loop is it's last.
121 def stop
murgatroid99d48d84d2016-03-09 11:10:20 -0800122 @run_mutex.synchronize do
123 fail 'Cannot stop before starting' if @running_state == :not_started
124 return if @running_state != :running
murgatroid991d685202016-03-09 17:30:37 -0800125 transition_running_state(:stopping)
Tim Emiola4aba3562015-05-22 17:25:49 -0700126 end
Tim Emiolab1fa5d42015-06-11 09:35:06 -0700127 deadline = from_relative_time(@poll_period)
murgatroid995ea4a992016-06-13 10:36:41 -0700128 @server.close(deadline)
Alexander Polcyn9c744872016-08-12 14:58:10 -0700129 @pool.shutdown
130 @pool.wait_for_termination
Tim Emiola4aba3562015-05-22 17:25:49 -0700131 end
Tim Emiola95584602015-04-16 12:53:07 -0700132
murgatroid99d48d84d2016-03-09 11:10:20 -0800133 def running_state
134 @run_mutex.synchronize do
135 return @running_state
Tim Emiola4aba3562015-05-22 17:25:49 -0700136 end
Tim Emiola7e7911f2015-02-17 18:28:23 -0800137 end
138
murgatroid991d685202016-03-09 17:30:37 -0800139 # Can only be called while holding @run_mutex
140 def transition_running_state(target_state)
141 state_transitions = {
142 not_started: :running,
143 running: :stopping,
144 stopping: :stopped
145 }
146 if state_transitions[@running_state] == target_state
147 @running_state = target_state
148 else
149 fail "Bad server state transition: #{@running_state}->#{target_state}"
150 end
151 end
152
Tim Emiola7e7911f2015-02-17 18:28:23 -0800153 def running?
murgatroid99d48d84d2016-03-09 11:10:20 -0800154 running_state == :running
155 end
156
157 def stopped?
158 running_state == :stopped
Tim Emiola7e7911f2015-02-17 18:28:23 -0800159 end
160
161 # Is called from other threads to wait for #run to start up the server.
162 #
163 # If run has not been called, this returns immediately.
164 #
165 # @param timeout [Numeric] number of seconds to wait
166 # @result [true, false] true if the server is running, false otherwise
murgatroid99d48d84d2016-03-09 11:10:20 -0800167 def wait_till_running(timeout = nil)
168 @run_mutex.synchronize do
169 @run_cond.wait(@run_mutex, timeout) if @running_state == :not_started
170 return @running_state == :running
Tim Emiola7e7911f2015-02-17 18:28:23 -0800171 end
Tim Emiola7e7911f2015-02-17 18:28:23 -0800172 end
173
Tim Emiola7e7911f2015-02-17 18:28:23 -0800174 # handle registration of classes
175 #
176 # service is either a class that includes GRPC::GenericService and whose
177 # #new function can be called without argument or any instance of such a
178 # class.
179 #
180 # E.g, after
181 #
182 # class Divider
183 # include GRPC::GenericService
184 # rpc :div DivArgs, DivReply # single request, single response
185 # def initialize(optional_arg='default option') # no args
186 # ...
187 # end
188 #
189 # srv = GRPC::RpcServer.new(...)
190 #
191 # # Either of these works
192 #
193 # srv.handle(Divider)
194 #
195 # # or
196 #
197 # srv.handle(Divider.new('replace optional arg'))
198 #
199 # It raises RuntimeError:
200 # - if service is not valid service class or object
201 # - its handler methods are already registered
202 # - if the server is already running
203 #
204 # @param service [Object|Class] a service class or object as described
205 # above
206 def handle(service)
murgatroid99d48d84d2016-03-09 11:10:20 -0800207 @run_mutex.synchronize do
208 unless @running_state == :not_started
209 fail 'cannot add services if the server has been started'
210 end
211 cls = service.is_a?(Class) ? service : service.class
212 assert_valid_service_class(cls)
murgatroid991d685202016-03-09 17:30:37 -0800213 add_rpc_descs_for(service)
murgatroid99d48d84d2016-03-09 11:10:20 -0800214 end
Tim Emiola7e7911f2015-02-17 18:28:23 -0800215 end
216
217 # runs the server
218 #
219 # - if no rpc_descs are registered, this exits immediately, otherwise it
220 # continues running permanently and does not return until program exit.
221 #
222 # - #running? returns true after this is called, until #stop cause the
223 # the server to stop.
224 def run
Tim Emiola7e7911f2015-02-17 18:28:23 -0800225 @run_mutex.synchronize do
murgatroid99d48d84d2016-03-09 11:10:20 -0800226 fail 'cannot run without registering services' if rpc_descs.size.zero?
murgatroid99d48d84d2016-03-09 11:10:20 -0800227 @server.start
murgatroid991d685202016-03-09 17:30:37 -0800228 transition_running_state(:running)
murgatroid99d48d84d2016-03-09 11:10:20 -0800229 @run_cond.broadcast
Tim Emiola7e7911f2015-02-17 18:28:23 -0800230 end
Tim Emiolaf9e77b32015-04-16 14:50:11 -0700231 loop_handle_server_calls
Tim Emiola7e7911f2015-02-17 18:28:23 -0800232 end
Tim Emiolae2860c52015-01-16 02:58:41 -0800233
murgatroid996bbe3692016-05-06 11:43:22 -0700234 alias_method :run_till_terminated, :run
235
murgatroid99895c1112016-04-04 11:27:51 -0700236 # Sends RESOURCE_EXHAUSTED if there are too many unprocessed jobs
Tim Emiolab22a21e2015-04-02 21:45:26 -0700237 def available?(an_rpc)
Alexander Polcyn9c744872016-08-12 14:58:10 -0700238 jobs_count, max = @pool.queue_length, @pool.max_queue
Nick Gauthierf233d962015-05-20 14:02:50 -0400239 GRPC.logger.info("waiting: #{jobs_count}, max: #{max}")
Alexander Polcyn9c744872016-08-12 14:58:10 -0700240
241 # remaining capacity for ThreadPoolExecutors is -1 if unbounded
242 return an_rpc if @pool.remaining_capacity != 0
Nick Gauthierf233d962015-05-20 14:02:50 -0400243 GRPC.logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}")
Tim Emiolab22a21e2015-04-02 21:45:26 -0700244 noop = proc { |x| x }
Alex Polcynd9892bd2016-07-04 00:52:39 -0700245
246 # Create a new active call that knows that metadata hasn't been
247 # sent yet
murgatroid995ea4a992016-06-13 10:36:41 -0700248 c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline,
Alex Polcynd9892bd2016-07-04 00:52:39 -0700249 metadata_received: true, started: false)
murgatroid9938281cf2016-05-03 10:51:49 -0700250 c.send_status(GRPC::Core::StatusCodes::RESOURCE_EXHAUSTED, '')
Tim Emiolab22a21e2015-04-02 21:45:26 -0700251 nil
252 end
nnoble097ef9b2014-12-01 17:06:10 -0800253
murgatroid993d6d0582015-07-08 12:47:13 -0700254 # Sends UNIMPLEMENTED if the method is not implemented by this server
murgatroid99f88eecd2015-07-10 09:58:15 -0700255 def implemented?(an_rpc)
Tim Emiolab22a21e2015-04-02 21:45:26 -0700256 mth = an_rpc.method.to_sym
257 return an_rpc if rpc_descs.key?(mth)
murgatroid993d6d0582015-07-08 12:47:13 -0700258 GRPC.logger.warn("UNIMPLEMENTED: #{an_rpc}")
Tim Emiolab22a21e2015-04-02 21:45:26 -0700259 noop = proc { |x| x }
Alex Polcynd9892bd2016-07-04 00:52:39 -0700260
261 # Create a new active call that knows that
262 # metadata hasn't been sent yet
murgatroid995ea4a992016-06-13 10:36:41 -0700263 c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline,
Alex Polcynd9892bd2016-07-04 00:52:39 -0700264 metadata_received: true, started: false)
murgatroid9938281cf2016-05-03 10:51:49 -0700265 c.send_status(GRPC::Core::StatusCodes::UNIMPLEMENTED, '')
Tim Emiolab22a21e2015-04-02 21:45:26 -0700266 nil
267 end
268
Tim Emiolaf9e77b32015-04-16 14:50:11 -0700269 # handles calls to the server
270 def loop_handle_server_calls
murgatroid99d48d84d2016-03-09 11:10:20 -0800271 fail 'not started' if running_state == :not_started
murgatroid99d48d84d2016-03-09 11:10:20 -0800272 while running_state == :running
Tim Emiola4aba3562015-05-22 17:25:49 -0700273 begin
murgatroid995ea4a992016-06-13 10:36:41 -0700274 an_rpc = @server.request_call
murgatroid993c09a642015-10-14 17:25:49 -0700275 break if (!an_rpc.nil?) && an_rpc.call.nil?
Tim Emiola7d21c042015-11-10 13:15:36 -0800276 active_call = new_active_server_call(an_rpc)
277 unless active_call.nil?
Alexander Polcyn9c744872016-08-12 14:58:10 -0700278 @pool.post(active_call) do |ac|
Tim Emiola7d21c042015-11-10 13:15:36 -0800279 c, mth = ac
murgatroid9938281cf2016-05-03 10:51:49 -0700280 begin
281 rpc_descs[mth].run_server_method(c, rpc_handlers[mth])
murgatroid9959dfee82016-05-03 11:33:25 -0700282 rescue StandardError
283 c.send_status(GRPC::Core::StatusCodes::INTERNAL,
284 'Server handler failed')
murgatroid9938281cf2016-05-03 10:51:49 -0700285 end
Tim Emiola391664a2015-08-31 15:38:43 -0700286 end
287 end
Tim Emiola4aba3562015-05-22 17:25:49 -0700288 rescue Core::CallError, RuntimeError => e
Tim Emiolab1fa5d42015-06-11 09:35:06 -0700289 # these might happen for various reasonse. The correct behaviour of
Tim Emiola81d950a2015-08-28 16:57:28 -0700290 # the server is to log them and continue, if it's not shutting down.
murgatroid99d48d84d2016-03-09 11:10:20 -0800291 if running_state == :running
292 GRPC.logger.warn("server call failed: #{e}")
293 end
Tim Emiola4aba3562015-05-22 17:25:49 -0700294 next
295 end
Tim Emiolaf9e77b32015-04-16 14:50:11 -0700296 end
murgatroid99d48d84d2016-03-09 11:10:20 -0800297 # @running_state should be :stopping here
murgatroid991d685202016-03-09 17:30:37 -0800298 @run_mutex.synchronize { transition_running_state(:stopped) }
Tim Emiola81d950a2015-08-28 16:57:28 -0700299 GRPC.logger.info("stopped: #{self}")
Tim Emiolaf9e77b32015-04-16 14:50:11 -0700300 end
301
Tim Emiolab22a21e2015-04-02 21:45:26 -0700302 def new_active_server_call(an_rpc)
Tim Emiolab22a21e2015-04-02 21:45:26 -0700303 return nil if an_rpc.nil? || an_rpc.call.nil?
304
305 # allow the metadata to be accessed from the call
Tim Emiola3fd2be22015-04-16 17:43:59 -0700306 an_rpc.call.metadata = an_rpc.metadata # attaches md to call for handlers
Tim Emiola7d21c042015-11-10 13:15:36 -0800307 GRPC.logger.debug("call md is #{an_rpc.metadata}")
Tim Emiola3fd2be22015-04-16 17:43:59 -0700308 connect_md = nil
309 unless @connect_md_proc.nil?
310 connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.metadata)
311 end
Ken Paysondce1ee62016-05-20 10:29:34 -0700312
Tim Emiolab22a21e2015-04-02 21:45:26 -0700313 return nil unless available?(an_rpc)
murgatroid99f88eecd2015-07-10 09:58:15 -0700314 return nil unless implemented?(an_rpc)
nnoble097ef9b2014-12-01 17:06:10 -0800315
Alex Polcynd9892bd2016-07-04 00:52:39 -0700316 # Create the ActiveCall. Indicate that metadata hasnt been sent yet.
Nick Gauthierf233d962015-05-20 14:02:50 -0400317 GRPC.logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})")
Tim Emiolab22a21e2015-04-02 21:45:26 -0700318 rpc_desc = rpc_descs[an_rpc.method.to_sym]
Alex Polcynd9892bd2016-07-04 00:52:39 -0700319 c = ActiveCall.new(an_rpc.call,
320 rpc_desc.marshal_proc,
321 rpc_desc.unmarshal_proc(:input),
322 an_rpc.deadline,
323 metadata_received: true,
324 started: false,
325 metadata_to_send: connect_md)
Tim Emiola7d21c042015-11-10 13:15:36 -0800326 mth = an_rpc.method.to_sym
327 [c, mth]
Tim Emiola7e7911f2015-02-17 18:28:23 -0800328 end
329
Tim Emiola7e7911f2015-02-17 18:28:23 -0800330 protected
331
332 def rpc_descs
murgatroid991d685202016-03-09 17:30:37 -0800333 @rpc_descs ||= {}
Tim Emiola7e7911f2015-02-17 18:28:23 -0800334 end
335
336 def rpc_handlers
murgatroid991d685202016-03-09 17:30:37 -0800337 @rpc_handlers ||= {}
Tim Emiola7e7911f2015-02-17 18:28:23 -0800338 end
339
Tim Emiola7e7911f2015-02-17 18:28:23 -0800340 def assert_valid_service_class(cls)
341 unless cls.include?(GenericService)
Tim Emiolaf9e77b32015-04-16 14:50:11 -0700342 fail "#{cls} must 'include GenericService'"
Tim Emiola7e7911f2015-02-17 18:28:23 -0800343 end
murgatroid99cf239e72016-05-06 14:48:21 -0700344 fail "#{cls} should specify some rpc descriptions" if
345 cls.rpc_descs.size.zero?
Tim Emiola7e7911f2015-02-17 18:28:23 -0800346 end
347
murgatroid991d685202016-03-09 17:30:37 -0800348 # This should be called while holding @run_mutex
Tim Emiola7e7911f2015-02-17 18:28:23 -0800349 def add_rpc_descs_for(service)
350 cls = service.is_a?(Class) ? service : service.class
murgatroid99d48d84d2016-03-09 11:10:20 -0800351 specs, handlers = (@rpc_descs ||= {}), (@rpc_handlers ||= {})
Tim Emiola7e7911f2015-02-17 18:28:23 -0800352 cls.rpc_descs.each_pair do |name, spec|
353 route = "/#{cls.service_name}/#{name}".to_sym
Tim Emiolaf9e77b32015-04-16 14:50:11 -0700354 fail "already registered: rpc #{route} from #{spec}" if specs.key? route
355 specs[route] = spec
Tim Emiolabae3a612015-05-07 14:24:23 -0700356 rpc_name = GenericService.underscore(name.to_s).to_sym
Tim Emiolaf9e77b32015-04-16 14:50:11 -0700357 if service.is_a?(Class)
Tim Emiolabae3a612015-05-07 14:24:23 -0700358 handlers[route] = cls.new.method(rpc_name)
Tim Emiola7e7911f2015-02-17 18:28:23 -0800359 else
Tim Emiolabae3a612015-05-07 14:24:23 -0700360 handlers[route] = service.method(rpc_name)
nnoble097ef9b2014-12-01 17:06:10 -0800361 end
Nick Gauthierf233d962015-05-20 14:02:50 -0400362 GRPC.logger.info("handling #{route} with #{handlers[route]}")
nnoble097ef9b2014-12-01 17:06:10 -0800363 end
364 end
365 end
nnoble097ef9b2014-12-01 17:06:10 -0800366end