blob: 81db68804ec10dfd865df54080cb76fdce7d1ca1 [file] [log] [blame]
# Copyright 2014, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
require 'grpc/grpc'
require 'grpc/generic/active_call'
require 'grpc/generic/service'
require 'thread'
require 'xray/thread_dump_signal_handler'
module Google::RPC
# RpcServer hosts a number of services and makes them available on the
# network.
class RpcServer
include Core::CompletionType
include Core::TimeConsts
extend ::Forwardable
def_delegators :@server, :add_http2_port
# Default thread pool size is 3
DEFAULT_POOL_SIZE = 3
# Default max_waiting_requests size is 20
DEFAULT_MAX_WAITING_REQUESTS = 20
# Creates a new RpcServer.
#
# The RPC server is configured using keyword arguments.
#
# There are some specific keyword args used to configure the RpcServer
# instance, however other arbitrary are allowed and when present are used
# to configure the listeninng connection set up by the RpcServer.
#
# * server_override: which if passed must be a [GRPC::Core::Server]. When
# present.
#
# * poll_period: when present, the server polls for new events with this
# period
#
# * pool_size: the size of the thread pool the server uses to run its
# threads
#
# * completion_queue_override: when supplied, this will be used as the
# completion_queue that the server uses to receive network events,
# otherwise its creates a new instance itself
#
# * creds: [GRPC::Core::ServerCredentials]
# the credentials used to secure the server
#
# * max_waiting_requests: the maximum number of requests that are not
# being handled to allow. When this limit is exceeded, the server responds
# with not available to new requests
def initialize(pool_size:DEFAULT_POOL_SIZE,
max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS,
poll_period:INFINITE_FUTURE,
completion_queue_override:nil,
creds:nil,
server_override:nil,
**kw)
if !completion_queue_override.nil?
cq = completion_queue_override
if !cq.is_a?(Core::CompletionQueue)
raise ArgumentError.new('not a CompletionQueue')
end
else
cq = Core::CompletionQueue.new
end
@cq = cq
if !server_override.nil?
srv = server_override
raise ArgumentError.new('not a Server') unless srv.is_a?(Core::Server)
elsif creds.nil?
srv = Core::Server.new(@cq, kw)
elsif !creds.is_a?(Core::ServerCredentials)
raise ArgumentError.new('not a ServerCredentials')
else
srv = Core::Server.new(@cq, kw, creds)
end
@server = srv
@pool_size = pool_size
@max_waiting_requests = max_waiting_requests
@poll_period = poll_period
@run_mutex = Mutex.new
@run_cond = ConditionVariable.new
@pool = Pool.new(@pool_size)
end
# stops a running server
#
# the call has no impact if the server is already stopped, otherwise
# server's current call loop is it's last.
def stop
if @running
@stopped = true
@pool.stop
end
end
# determines if the server is currently running
def running?
@running ||= false
end
# Is called from other threads to wait for #run to start up the server.
#
# If run has not been called, this returns immediately.
#
# @param timeout [Numeric] number of seconds to wait
# @result [true, false] true if the server is running, false otherwise
def wait_till_running(timeout=0.1)
end_time, sleep_period = Time.now + timeout, (1.0 * timeout)/100
while Time.now < end_time
if !running?
@run_mutex.synchronize { @run_cond.wait(@run_mutex) }
end
sleep(sleep_period)
end
return running?
end
# determines if the server is currently stopped
def stopped?
@stopped ||= false
end
# handle registration of classes
#
# service is either a class that includes GRPC::GenericService and whose
# #new function can be called without argument or any instance of such a
# class.
#
# E.g, after
#
# class Divider
# include GRPC::GenericService
# rpc :div DivArgs, DivReply # single request, single response
# def initialize(optional_arg='default option') # no args
# ...
# end
#
# srv = GRPC::RpcServer.new(...)
#
# # Either of these works
#
# srv.handle(Divider)
#
# # or
#
# srv.handle(Divider.new('replace optional arg'))
#
# It raises RuntimeError:
# - if service is not valid service class or object
# - if it is a valid service, but the handler methods are already registered
# - if the server is already running
#
# @param service [Object|Class] a service class or object as described
# above
def handle(service)
raise 'cannot add services if the server is running' if running?
raise 'cannot add services if the server is stopped' if stopped?
cls = service.is_a?(Class) ? service : service.class
assert_valid_service_class(cls)
add_rpc_descs_for(service)
end
# runs the server
#
# - if no rpc_descs are registered, this exits immediately, otherwise it
# continues running permanently and does not return until program exit.
#
# - #running? returns true after this is called, until #stop cause the
# the server to stop.
def run
if rpc_descs.size == 0
logger.warn('did not run as no services were present')
return
end
@run_mutex.synchronize do
@running = true
@run_cond.signal
end
@pool.start
@server.start
server_tag = Object.new
while !stopped?
@server.request_call(server_tag)
ev = @cq.pluck(server_tag, @poll_period)
next if ev.nil?
if ev.type != SERVER_RPC_NEW
logger.warn("bad evt: got:#{ev.type}, want:#{SERVER_RPC_NEW}")
ev.close
next
end
c = new_active_server_call(ev.call, ev.result)
if !c.nil?
mth = ev.result.method.to_sym
ev.close
@pool.schedule(c) do |call|
rpc_descs[mth].run_server_method(call, rpc_handlers[mth])
end
end
end
@running = false
end
def new_active_server_call(call, new_server_rpc)
# TODO(temiola): perhaps reuse the main server completion queue here, but
# for now, create a new completion queue per call, pending best practice
# usage advice from the c core.
# Accept the call. This is necessary even if a status is to be sent back
# immediately
finished_tag = Object.new
call_queue = Core::CompletionQueue.new
call.metadata = new_server_rpc.metadata # store the metadata on the call
call.server_accept(call_queue, finished_tag)
call.server_end_initial_metadata()
# Send UNAVAILABLE if there are too many unprocessed jobs
jobs_count, max = @pool.jobs_waiting, @max_waiting_requests
logger.info("waiting: #{jobs_count}, max: #{max}")
if @pool.jobs_waiting > @max_waiting_requests
logger.warn("NOT AVAILABLE: too many jobs_waiting: #{new_server_rpc}")
noop = Proc.new { |x| x }
c = ActiveCall.new(call, call_queue, noop, noop,
new_server_rpc.deadline, finished_tag: finished_tag)
c.send_status(StatusCodes::UNAVAILABLE, '')
return nil
end
# Send NOT_FOUND if the method does not exist
mth = new_server_rpc.method.to_sym
if !rpc_descs.has_key?(mth)
logger.warn("NOT_FOUND: #{new_server_rpc}")
noop = Proc.new { |x| x }
c = ActiveCall.new(call, call_queue, noop, noop,
new_server_rpc.deadline, finished_tag: finished_tag)
c.send_status(StatusCodes::NOT_FOUND, '')
return nil
end
# Create the ActiveCall
rpc_desc = rpc_descs[mth]
logger.info("deadline is #{new_server_rpc.deadline}; (now=#{Time.now})")
ActiveCall.new(call, call_queue,
rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input),
new_server_rpc.deadline, finished_tag: finished_tag)
end
# Pool is a simple thread pool for running server requests.
class Pool
def initialize(size)
raise 'pool size must be positive' unless size > 0
@jobs = Queue.new
@size = size
@stopped = false
@stop_mutex = Mutex.new
@stop_cond = ConditionVariable.new
@workers = []
end
# Returns the number of jobs waiting
def jobs_waiting
@jobs.size
end
# Runs the given block on the queue with the provided args.
#
# @param args the args passed blk when it is called
# @param blk the block to call
def schedule(*args, &blk)
raise 'already stopped' if @stopped
return if blk.nil?
logger.info('schedule another job')
@jobs << [blk, args]
end
# Starts running the jobs in the thread pool.
def start
raise 'already stopped' if @stopped
until @workers.size == @size.to_i
next_thread = Thread.new do
catch(:exit) do # allows { throw :exit } to kill a thread
loop do
begin
blk, args = @jobs.pop
blk.call(*args)
rescue StandardError => e
logger.warn('Error in worker thread')
logger.warn(e)
end
end
end
# removes the threads from workers, and signal when all the threads
# are complete.
@stop_mutex.synchronize do
@workers.delete(Thread.current)
if @workers.size == 0
@stop_cond.signal
end
end
end
@workers << next_thread
end
end
# Stops the jobs in the pool
def stop
logger.info('stopping, will wait for all the workers to exit')
@workers.size.times { schedule { throw :exit } }
@stopped = true
# TODO(temiola): allow configuration of the keepalive period
keep_alive = 5
@stop_mutex.synchronize do
if @workers.size > 0
@stop_cond.wait(@stop_mutex, keep_alive)
end
end
# Forcibly shutdown any threads that are still alive.
if @workers.size > 0
logger.warn("forcibly terminating #{@workers.size} worker(s)")
@workers.each do |t|
next unless t.alive?
begin
t.exit
rescue StandardError => e
logger.warn('error while terminating a worker')
logger.warn(e)
end
end
end
logger.info('stopped, all workers are shutdown')
end
end
protected
def rpc_descs
@rpc_descs ||= {}
end
def rpc_handlers
@rpc_handlers ||= {}
end
private
def assert_valid_service_class(cls)
if !cls.include?(GenericService)
raise "#{cls} should 'include GenericService'"
end
if cls.rpc_descs.size == 0
raise "#{cls} should specify some rpc descriptions"
end
cls.assert_rpc_descs_have_methods
end
def add_rpc_descs_for(service)
cls = service.is_a?(Class) ? service : service.class
specs = rpc_descs
handlers = rpc_handlers
cls.rpc_descs.each_pair do |name,spec|
route = "/#{cls.service_name}/#{name}".to_sym
if specs.has_key?(route)
raise "Cannot add rpc #{route} from #{spec}, already registered"
else
specs[route] = spec
if service.is_a?(Class)
handlers[route] = cls.new.method(name.to_s.underscore.to_sym)
else
handlers[route] = service.method(name.to_s.underscore.to_sym)
end
logger.info("handling #{route} with #{handlers[route]}")
end
end
end
end
end