blob: b16c8f85632fd5e1674c77d86533c05dc9b38081 [file] [log] [blame]
# Copyright 2014, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
require 'forwardable'
require 'grpc/generic/bidi_call'
def assert_event_type(ev, want)
raise OutOfTime if ev.nil?
got = ev.type
raise 'Unexpected rpc event: got %s, want %s' % [got, want] unless got == want
end
module Google::RPC
# The ActiveCall class provides simple methods for sending marshallable
# data to a call
class ActiveCall
include Core::CompletionType
include Core::StatusCodes
include Core::TimeConsts
attr_reader(:deadline)
# client_start_invoke begins a client invocation.
#
# Flow Control note: this blocks until flow control accepts that client
# request can go ahead.
#
# deadline is the absolute deadline for the call.
#
# == Keyword Arguments ==
# any keyword arguments are treated as metadata to be sent to the server
# if a keyword value is a list, multiple metadata for it's key are sent
#
# @param call [Call] a call on which to start and invocation
# @param q [CompletionQueue] used to wait for INVOKE_ACCEPTED
# @param deadline [Fixnum,TimeSpec] the deadline for INVOKE_ACCEPTED
def self.client_start_invoke(call, q, deadline, **kw)
raise ArgumentError.new('not a call') unless call.is_a?Core::Call
if !q.is_a?Core::CompletionQueue
raise ArgumentError.new('not a CompletionQueue')
end
call.add_metadata(kw) if kw.length > 0
invoke_accepted, client_metadata_read = Object.new, Object.new
finished_tag = Object.new
call.start_invoke(q, invoke_accepted, client_metadata_read, finished_tag)
# wait for the invocation to be accepted
ev = q.pluck(invoke_accepted, INFINITE_FUTURE)
raise OutOfTime if ev.nil?
[finished_tag, client_metadata_read]
end
# Creates an ActiveCall.
#
# ActiveCall should only be created after a call is accepted. That means
# different things on a client and a server. On the client, the call is
# accepted after call.start_invoke followed by receipt of the
# corresponding INVOKE_ACCEPTED. on the server, this is after
# call.accept.
#
# #initialize cannot determine if the call is accepted or not; so if a
# call that's not accepted is used here, the error won't be visible until
# the ActiveCall methods are called.
#
# deadline is the absolute deadline for the call.
#
# @param call [Call] the call used by the ActiveCall
# @param q [CompletionQueue] the completion queue used to accept
# the call
# @param marshal [Function] f(obj)->string that marshal requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param deadline [Fixnum] the deadline for the call to complete
# @param finished_tag [Object] the object used as the call's finish tag,
# if the call has begun
# @param read_metadata_tag [Object] the object used as the call's finish
# tag, if the call has begun
# @param started [true|false] (default true) indicates if the call has begun
def initialize(call, q, marshal, unmarshal, deadline, finished_tag: nil,
read_metadata_tag: nil, started: true)
raise ArgumentError.new('not a call') unless call.is_a?Core::Call
if !q.is_a?Core::CompletionQueue
raise ArgumentError.new('not a CompletionQueue')
end
@call = call
@cq = q
@deadline = deadline
@finished_tag = finished_tag
@read_metadata_tag = read_metadata_tag
@marshal = marshal
@started = started
@unmarshal = unmarshal
end
# Obtains the status of the call.
#
# this value is nil until the call completes
# @return this call's status
def status
@call.status
end
# Obtains the metadata of the call.
#
# At the start of the call this will be nil. During the call this gets
# some values as soon as the other end of the connection acknowledges the
# request.
#
# @return this calls's metadata
def metadata
@call.metadata
end
# Cancels the call.
#
# Cancels the call. The call does not return any result, but once this it
# has been called, the call should eventually terminate. Due to potential
# races between the execution of the cancel and the in-flight request, the
# result of the call after calling #cancel is indeterminate:
#
# - the call may terminate with a BadStatus exception, with code=CANCELLED
# - the call may terminate with OK Status, and return a response
# - the call may terminate with a different BadStatus exception if that was
# happening
def cancel
@call.cancel
end
# indicates if the call is shutdown
def shutdown
@shutdown ||= false
end
# indicates if the call is cancelled.
def cancelled
@cancelled ||= false
end
# multi_req_view provides a restricted view of this ActiveCall for use
# in a server client-streaming handler.
def multi_req_view
MultiReqView.new(self)
end
# single_req_view provides a restricted view of this ActiveCall for use in
# a server request-response handler.
def single_req_view
SingleReqView.new(self)
end
# operation provides a restricted view of this ActiveCall for use as
# a Operation.
def operation
Operation.new(self)
end
# writes_done indicates that all writes are completed.
#
# It blocks until the remote endpoint acknowledges by sending a FINISHED
# event, unless assert_finished is set to false. Any calls to
# #remote_send after this call will fail.
#
# @param assert_finished [true, false] when true(default), waits for
# FINISHED.
def writes_done(assert_finished=true)
@call.writes_done(self)
ev = @cq.pluck(self, INFINITE_FUTURE)
assert_event_type(ev, FINISH_ACCEPTED)
logger.debug("Writes done: waiting for finish? #{assert_finished}")
if assert_finished
ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
raise "unexpected event: #{ev.inspect}" if ev.nil?
return @call.status
end
end
# finished waits until the call is completed.
#
# It blocks until the remote endpoint acknowledges by sending a FINISHED
# event.
def finished
ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
raise "unexpected event: #{ev.inspect}" unless ev.type == FINISHED
if @call.metadata.nil?
@call.metadata = ev.result.metadata
else
@call.metadata.merge!(ev.result.metadata)
end
if ev.result.code != Core::StatusCodes::OK
raise BadStatus.new(ev.result.code, ev.result.details)
end
res = ev.result
# NOTE(temiola): This is necessary to allow the C call struct wrapped
# within the active_call to be GCed; this is necessary so that other
# C-level destructors get called in the required order.
ev = nil # allow the event to be GCed
res
end
# remote_send sends a request to the remote endpoint.
#
# It blocks until the remote endpoint acknowledges by sending a
# WRITE_ACCEPTED. req can be marshalled already.
#
# @param req [Object, String] the object to send or it's marshal form.
# @param marshalled [false, true] indicates if the object is already
# marshalled.
def remote_send(req, marshalled=false)
assert_queue_is_ready
logger.debug("sending payload #{req.inspect}, marshalled? #{marshalled}")
if marshalled
payload = req
else
payload = @marshal.call(req)
end
@call.start_write(Core::ByteBuffer.new(payload), self)
# call queue#pluck, and wait for WRITE_ACCEPTED, so as not to return
# until the flow control allows another send on this call.
ev = @cq.pluck(self, INFINITE_FUTURE)
assert_event_type(ev, WRITE_ACCEPTED)
ev = nil
end
# send_status sends a status to the remote endpoint
#
# @param code [int] the status code to send
# @param details [String] details
# @param assert_finished [true, false] when true(default), waits for
# FINISHED.
def send_status(code=OK, details='', assert_finished=false)
assert_queue_is_ready
@call.start_write_status(code, details, self)
ev = @cq.pluck(self, INFINITE_FUTURE)
assert_event_type(ev, FINISH_ACCEPTED)
logger.debug("Status sent: #{code}:'#{details}'")
if assert_finished
return finished
end
nil
end
# remote_read reads a response from the remote endpoint.
#
# It blocks until the remote endpoint sends a READ or FINISHED event. On
# a READ, it returns the response after unmarshalling it. On
# FINISHED, it returns nil if the status is OK, otherwise raising BadStatus
def remote_read
if @call.metadata.nil? && !@read_metadata_tag.nil?
ev = @cq.pluck(@read_metadata_tag, INFINITE_FUTURE)
assert_event_type(ev, CLIENT_METADATA_READ)
@call.metadata = ev.result
@read_metadata_tag = nil
end
@call.start_read(self)
ev = @cq.pluck(self, INFINITE_FUTURE)
assert_event_type(ev, READ)
logger.debug("received req: #{ev.result.inspect}")
if !ev.result.nil?
logger.debug("received req.to_s: #{ev.result.to_s}")
res = @unmarshal.call(ev.result.to_s)
logger.debug("received_req (unmarshalled): #{res.inspect}")
return res
end
logger.debug('found nil; the final response has been sent')
nil
end
# each_remote_read passes each response to the given block or returns an
# enumerator the responses if no block is given.
#
# == Enumerator ==
#
# * #next blocks until the remote endpoint sends a READ or FINISHED
# * for each read, enumerator#next yields the response
# * on status
# * if it's is OK, enumerator#next raises StopException
# * if is not OK, enumerator#next raises RuntimeException
#
# == Block ==
#
# * if provided it is executed for each response
# * the call blocks until no more responses are provided
#
# @return [Enumerator] if no block was given
def each_remote_read
return enum_for(:each_remote_read) if !block_given?
loop do
resp = remote_read()
break if resp.is_a?Struct::Status # is an OK status, bad statii raise
break if resp.nil? # the last response was received
yield resp
end
end
# each_remote_read_then_finish passes each response to the given block or
# returns an enumerator of the responses if no block is given.
#
# It is like each_remote_read, but it blocks on finishing on detecting
# the final message.
#
# == Enumerator ==
#
# * #next blocks until the remote endpoint sends a READ or FINISHED
# * for each read, enumerator#next yields the response
# * on status
# * if it's is OK, enumerator#next raises StopException
# * if is not OK, enumerator#next raises RuntimeException
#
# == Block ==
#
# * if provided it is executed for each response
# * the call blocks until no more responses are provided
#
# @return [Enumerator] if no block was given
def each_remote_read_then_finish
return enum_for(:each_remote_read_then_finish) if !block_given?
loop do
resp = remote_read
break if resp.is_a?Struct::Status # is an OK status, bad statii raise
if resp.nil? # the last response was received, but not finished yet
finished
break
end
yield resp
end
end
# request_response sends a request to a GRPC server, and returns the
# response.
#
# == Keyword Arguments ==
# any keyword arguments are treated as metadata to be sent to the server
# if a keyword value is a list, multiple metadata for it's key are sent
#
# @param req [Object] the request sent to the server
# @return [Object] the response received from the server
def request_response(req, **kw)
start_call(**kw) unless @started
remote_send(req)
writes_done(false)
response = remote_read
if !response.is_a?(Struct::Status) # finish if status not yet received
finished
end
response
end
# client_streamer sends a stream of requests to a GRPC server, and
# returns a single response.
#
# requests provides an 'iterable' of Requests. I.e. it follows Ruby's
# #each enumeration protocol. In the simplest case, requests will be an
# array of marshallable objects; in typical case it will be an Enumerable
# that allows dynamic construction of the marshallable objects.
#
# == Keyword Arguments ==
# any keyword arguments are treated as metadata to be sent to the server
# if a keyword value is a list, multiple metadata for it's key are sent
#
# @param requests [Object] an Enumerable of requests to send
# @return [Object] the response received from the server
def client_streamer(requests, **kw)
start_call(**kw) unless @started
requests.each { |r| remote_send(r) }
writes_done(false)
response = remote_read
if !response.is_a?(Struct::Status) # finish if status not yet received
finished
end
response
end
# server_streamer sends one request to the GRPC server, which yields a
# stream of responses.
#
# responses provides an enumerator over the streamed responses, i.e. it
# follows Ruby's #each iteration protocol. The enumerator blocks while
# waiting for each response, stops when the server signals that no
# further responses will be supplied. If the implicit block is provided,
# it is executed with each response as the argument and no result is
# returned.
#
# == Keyword Arguments ==
# any keyword arguments are treated as metadata to be sent to the server
# if a keyword value is a list, multiple metadata for it's key are sent
# any keyword arguments are treated as metadata to be sent to the server.
#
# @param req [Object] the request sent to the server
# @return [Enumerator|nil] a response Enumerator
def server_streamer(req, **kw)
start_call(**kw) unless @started
remote_send(req)
writes_done(false)
replies = enum_for(:each_remote_read_then_finish)
return replies if !block_given?
replies.each { |r| yield r }
end
# bidi_streamer sends a stream of requests to the GRPC server, and yields
# a stream of responses.
#
# This method takes an Enumerable of requests, and returns and enumerable
# of responses.
#
# == requests ==
#
# requests provides an 'iterable' of Requests. I.e. it follows Ruby's #each
# enumeration protocol. In the simplest case, requests will be an array of
# marshallable objects; in typical case it will be an Enumerable that
# allows dynamic construction of the marshallable objects.
#
# == responses ==
#
# This is an enumerator of responses. I.e, its #next method blocks
# waiting for the next response. Also, if at any point the block needs
# to consume all the remaining responses, this can be done using #each or
# #collect. Calling #each or #collect should only be done if
# the_call#writes_done has been called, otherwise the block will loop
# forever.
#
# == Keyword Arguments ==
# any keyword arguments are treated as metadata to be sent to the server
# if a keyword value is a list, multiple metadata for it's key are sent
#
# @param requests [Object] an Enumerable of requests to send
# @return [Enumerator, nil] a response Enumerator
def bidi_streamer(requests, **kw, &blk)
start_call(**kw) unless @started
bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline,
@finished_tag)
bd.run_on_client(requests, &blk)
end
# run_server_bidi orchestrates a BiDi stream processing on a server.
#
# N.B. gen_each_reply is a func(Enumerable<Requests>)
#
# It takes an enumerable of requests as an arg, in case there is a
# relationship between the stream of requests and the stream of replies.
#
# This does not mean that must necessarily be one. E.g, the replies
# produced by gen_each_reply could ignore the received_msgs
#
# @param gen_each_reply [Proc] generates the BiDi stream replies
def run_server_bidi(gen_each_reply)
bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline,
@finished_tag)
bd.run_on_server(gen_each_reply)
end
private
def start_call(**kw)
tags = ActiveCall.client_start_invoke(@call, @cq, @deadline, **kw)
@finished_tag, @read_metadata_tag = tags
@started = true
end
def self.view_class(*visible_methods)
Class.new do
extend ::Forwardable
def_delegators :@wrapped, *visible_methods
# @param wrapped [ActiveCall] the call whose methods are shielded
def initialize(wrapped)
@wrapped = wrapped
end
end
end
# SingleReqView limits access to an ActiveCall's methods for use in server
# handlers that receive just one request.
SingleReqView = view_class(:cancelled, :deadline)
# MultiReqView limits access to an ActiveCall's methods for use in
# server client_streamer handlers.
MultiReqView = view_class(:cancelled, :deadline, :each_queued_msg,
:each_remote_read)
# Operation limits access to an ActiveCall's methods for use as
# a Operation on the client.
Operation = view_class(:cancel, :cancelled, :deadline, :execute, :metadata,
:status)
# confirms that no events are enqueued, and that the queue is not
# shutdown.
def assert_queue_is_ready
begin
ev = @cq.pluck(self, ZERO)
raise "unexpected event #{ev.inspect}" unless ev.nil?
rescue OutOfTime
# expected, nothing should be on the queue and the deadline was ZERO,
# except things using another tag
end
end
end
end