blob: 238f409a1d6019a16f5f49d71f6bc2904fa38a4f [file] [log] [blame]
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
require 'forwardable'
require_relative '../grpc'
# GRPC contains the General RPC module.
module GRPC
# The BiDiCall class orchestrates exection of a BiDi stream on a client or
# server.
class BidiCall
include Core::CallOps
include Core::StatusCodes
include Core::TimeConsts
# Creates a BidiCall.
#
# BidiCall should only be created after a call is accepted. That means
# different things on a client and a server. On the client, the call is
# accepted after call.invoke. On the server, this is after call.accept.
#
# #initialize cannot determine if the call is accepted or not; so if a
# call that's not accepted is used here, the error won't be visible until
# the BidiCall#run is called.
#
# deadline is the absolute deadline for the call.
#
# @param call [Call] the call used by the ActiveCall
# @param q [CompletionQueue] the completion queue used to accept
# the call
# @param marshal [Function] f(obj)->string that marshal requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param metadata_tag [Object] tag object used to collect metadata
def initialize(call, q, marshal, unmarshal, metadata_tag: nil)
fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
unless q.is_a? Core::CompletionQueue
fail(ArgumentError, 'not a CompletionQueue')
end
@call = call
@cq = q
@marshal = marshal
@op_notifier = nil # signals completion on clients
@readq = Queue.new
@unmarshal = unmarshal
@metadata_tag = metadata_tag
@reads_complete = false
@writes_complete = false
@complete = false
@done_mutex = Mutex.new
end
# Begins orchestration of the Bidi stream for a client sending requests.
#
# The method either returns an Enumerator of the responses, or accepts a
# block that can be invoked with each response.
#
# @param requests the Enumerable of requests to send
# @op_notifier a Notifier used to signal completion
# @return an Enumerator of requests to yield
def run_on_client(requests, op_notifier, &blk)
@op_notifier = op_notifier
@enq_th = Thread.new { write_loop(requests) }
@loop_th = start_read_loop
each_queued_msg(&blk)
end
# Begins orchestration of the Bidi stream for a server generating replies.
#
# N.B. gen_each_reply is a func(Enumerable<Requests>)
#
# It takes an enumerable of requests as an arg, in case there is a
# relationship between the stream of requests and the stream of replies.
#
# This does not mean that must necessarily be one. E.g, the replies
# produced by gen_each_reply could ignore the received_msgs
#
# @param gen_each_reply [Proc] generates the BiDi stream replies.
def run_on_server(gen_each_reply)
replys = gen_each_reply.call(each_queued_msg)
@loop_th = start_read_loop(is_client: false)
write_loop(replys, is_client: false)
end
private
END_OF_READS = :end_of_reads
END_OF_WRITES = :end_of_writes
# signals that bidi operation is complete
def notify_done
return unless @op_notifier
GRPC.logger.debug("bidi-notify-done: notifying #{@op_notifier}")
@op_notifier.notify(self)
end
# signals that a bidi operation is complete (read + write)
def finished
@done_mutex.synchronize do
return unless @reads_complete && @writes_complete && !@complete
@call.close
@cq.close
@complete = true
end
end
# performs a read using @call.run_batch, ensures metadata is set up
def read_using_run_batch
ops = { RECV_MESSAGE => nil }
ops[RECV_INITIAL_METADATA] = nil unless @metadata_tag.nil?
batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
unless @metadata_tag.nil?
@call.metadata = batch_result.metadata
@metadata_tag = nil
end
batch_result
end
# each_queued_msg yields each message on this instances readq
#
# - messages are added to the readq by #read_loop
# - iteration ends when the instance itself is added
def each_queued_msg
return enum_for(:each_queued_msg) unless block_given?
count = 0
loop do
GRPC.logger.debug("each_queued_msg: waiting##{count}")
count += 1
req = @readq.pop
GRPC.logger.debug("each_queued_msg: req = #{req}")
fail req if req.is_a? StandardError
break if req.equal?(END_OF_READS)
yield req
end
end
def write_loop(requests, is_client: true)
GRPC.logger.debug('bidi-write-loop: starting')
write_tag = Object.new
count = 0
requests.each do |req|
GRPC.logger.debug("bidi-write-loop: #{count}")
count += 1
payload = @marshal.call(req)
@call.run_batch(@cq, write_tag, INFINITE_FUTURE,
SEND_MESSAGE => payload)
end
GRPC.logger.debug("bidi-write-loop: #{count} writes done")
if is_client
GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting")
@call.run_batch(@cq, write_tag, INFINITE_FUTURE,
SEND_CLOSE_FROM_CLIENT => nil)
GRPC.logger.debug('bidi-write-loop: done')
notify_done
@writes_complete = true
finished
end
GRPC.logger.debug('bidi-write-loop: finished')
rescue StandardError => e
GRPC.logger.warn('bidi-write-loop: failed')
GRPC.logger.warn(e)
notify_done
@writes_complete = true
finished
raise e
end
# starts the read loop
def start_read_loop(is_client: true)
Thread.new do
GRPC.logger.debug('bidi-read-loop: starting')
begin
read_tag = Object.new
count = 0
# queue the initial read before beginning the loop
loop do
GRPC.logger.debug("bidi-read-loop: #{count}")
count += 1
batch_result = read_using_run_batch
# handle the next message
if batch_result.message.nil?
GRPC.logger.debug("bidi-read-loop: null batch #{batch_result}")
if is_client
batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE,
RECV_STATUS_ON_CLIENT => nil)
@call.status = batch_result.status
batch_result.check_status
GRPC.logger.debug("bidi-read-loop: done status #{@call.status}")
end
@readq.push(END_OF_READS)
GRPC.logger.debug('bidi-read-loop: done reading!')
break
end
# push the latest read onto the queue and continue reading
res = @unmarshal.call(batch_result.message)
@readq.push(res)
end
rescue StandardError => e
GRPC.logger.warn('bidi: read-loop failed')
GRPC.logger.warn(e)
@readq.push(e) # let each_queued_msg terminate with this error
end
GRPC.logger.debug('bidi-read-loop: finished')
@reads_complete = true
finished
end
end
end
end