dont wait for gc to destroy calls on ruby server
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index cb407d2..96c773a 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -40,7 +40,7 @@
 module GRPC
   # The ActiveCall class provides simple methods for sending marshallable
   # data to a call
-  class ActiveCall
+  class ActiveCall # rubocop:disable Metrics/ClassLength
     include Core::TimeConsts
     include Core::CallOps
     extend Forwardable
@@ -100,6 +100,11 @@
       fail(ArgumentError, 'Already sent md') if started && metadata_to_send
       @metadata_to_send = metadata_to_send || {} unless started
       @send_initial_md_mutex = Mutex.new
+
+      @output_stream_done = false
+      @input_stream_done = false
+      @call_finished = false
+      @call_finished_mu = Mutex.new
     end
 
     # Sends the initial metadata that has yet to be sent.
@@ -142,11 +147,9 @@
       Operation.new(self)
     end
 
-    # finished waits until a client call is completed.
-    #
-    # It blocks until the remote endpoint acknowledges by sending a status.
-    def finished
+    def receive_and_check_status
       batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil)
+      set_input_stream_done
       attach_status_results_and_complete_call(batch_result)
     end
 
@@ -155,8 +158,6 @@
         @call.trailing_metadata = recv_status_batch_result.status.metadata
       end
       @call.status = recv_status_batch_result.status
-      @call.close
-      op_is_done
 
       # The RECV_STATUS in run_batch always succeeds
       # Check the status for a bad status or failed run batch
@@ -193,9 +194,19 @@
       }
       ops[RECV_CLOSE_ON_SERVER] = nil if assert_finished
       @call.run_batch(ops)
+      set_output_stream_done
+
       nil
     end
 
+    # Intended for use on server-side calls when a single request from
+    # the client is expected (i.e., unary and server-streaming RPC types).
+    def read_unary_request
+      req = remote_read
+      set_input_stream_done
+      req
+    end
+
     def server_unary_response(req, trailing_metadata: {},
                               code: Core::StatusCodes::OK, details: 'OK')
       ops = {}
@@ -211,6 +222,7 @@
       ops[RECV_CLOSE_ON_SERVER] = nil
 
       @call.run_batch(ops)
+      set_output_stream_done
     end
 
     # remote_read reads a response from the remote endpoint.
@@ -241,6 +253,8 @@
 
     # each_remote_read passes each response to the given block or returns an
     # enumerator the responses if no block is given.
+    # Used to generate the request enumerable for
+    # server-side client-streaming RPC's.
     #
     # == Enumerator ==
     #
@@ -258,10 +272,14 @@
     # @return [Enumerator] if no block was given
     def each_remote_read
       return enum_for(:each_remote_read) unless block_given?
-      loop do
-        resp = remote_read
-        break if resp.nil?  # the last response was received
-        yield resp
+      begin
+        loop do
+          resp = remote_read
+          break if resp.nil?  # the last response was received
+          yield resp
+        end
+      ensure
+        set_input_stream_done
       end
     end
 
@@ -287,13 +305,17 @@
     # @return [Enumerator] if no block was given
     def each_remote_read_then_finish
       return enum_for(:each_remote_read_then_finish) unless block_given?
-      loop do
-        resp = remote_read
-        if resp.nil?  # the last response was received, but not finished yet
-          finished
-          break
+      begin
+        loop do
+          resp = remote_read
+          if resp.nil?  # the last response was received
+            receive_and_check_status
+            break
+          end
+          yield resp
         end
-        yield resp
+      ensure
+        set_input_stream_done
       end
     end
 
@@ -319,7 +341,15 @@
         end
         @metadata_sent = true
       end
-      batch_result = @call.run_batch(ops)
+
+      begin
+        batch_result = @call.run_batch(ops)
+        # no need to check for cancellation after a CallError because this
+        # batch contains a RECV_STATUS op
+      ensure
+        set_input_stream_done
+        set_output_stream_done
+      end
 
       @call.metadata = batch_result.metadata
       attach_status_results_and_complete_call(batch_result)
@@ -339,10 +369,19 @@
     # a list, multiple metadata for its key are sent
     # @return [Object] the response received from the server
     def client_streamer(requests, metadata: {})
-      # Metadata might have already been sent if this is an operation view
-      merge_metadata_and_send_if_not_already_sent(metadata)
+      begin
+        merge_metadata_and_send_if_not_already_sent(metadata)
+        requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) }
+      rescue GRPC::Core::CallError => e
+        receive_and_check_status # check for Cancelled
+        raise e
+      rescue => e
+        set_input_stream_done
+        raise e
+      ensure
+        set_output_stream_done
+      end
 
-      requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) }
       batch_result = @call.run_batch(
         SEND_CLOSE_FROM_CLIENT => nil,
         RECV_INITIAL_METADATA => nil,
@@ -350,12 +389,11 @@
         RECV_STATUS_ON_CLIENT => nil
       )
 
+      set_input_stream_done
+
       @call.metadata = batch_result.metadata
       attach_status_results_and_complete_call(batch_result)
       get_message_from_batch_result(batch_result)
-    rescue GRPC::Core::CallError => e
-      finished  # checks for Cancelled
-      raise e
     end
 
     # server_streamer sends one request to the GRPC server, which yields a
@@ -384,13 +422,22 @@
         end
         @metadata_sent = true
       end
-      @call.run_batch(ops)
+
+      begin
+        @call.run_batch(ops)
+      rescue GRPC::Core::CallError => e
+        receive_and_check_status # checks for Cancelled
+        raise e
+      rescue => e
+        set_input_stream_done
+        raise e
+      ensure
+        set_output_stream_done
+      end
+
       replies = enum_for(:each_remote_read_then_finish)
       return replies unless block_given?
       replies.each { |r| yield r }
-    rescue GRPC::Core::CallError => e
-      finished  # checks for Cancelled
-      raise e
     end
 
     # bidi_streamer sends a stream of requests to the GRPC server, and yields
@@ -428,7 +475,10 @@
                         @unmarshal,
                         metadata_received: @metadata_received)
 
-      bd.run_on_client(requests, @op_notifier, &blk)
+      bd.run_on_client(requests,
+                       proc { set_input_stream_done },
+                       proc { set_output_stream_done },
+                       &blk)
     end
 
     # run_server_bidi orchestrates a BiDi stream processing on a server.
@@ -449,7 +499,7 @@
                         metadata_received: @metadata_received,
                         req_view: MultiReqView.new(self))
 
-      bd.run_on_server(gen_each_reply)
+      bd.run_on_server(gen_each_reply, proc { set_input_stream_done })
     end
 
     # Waits till an operation completes
@@ -459,7 +509,8 @@
       @op_notifier.wait
     end
 
-    # Signals that an operation is done
+    # Signals that an operation is done.
+    # Only relevant on the client-side (this is a no-op on the server-side)
     def op_is_done
       return if @op_notifier.nil?
       @op_notifier.notify(self)
@@ -486,6 +537,34 @@
 
     private
 
+    # To be called once the "input stream" has been completelly
+    # read through (i.e, done reading from client or received status)
+    # note this is idempotent
+    def set_input_stream_done
+      @call_finished_mu.synchronize do
+        @input_stream_done = true
+        maybe_finish_and_close_call_locked
+      end
+    end
+
+    # To be called once the "output stream" has been completelly
+    # sent through (i.e, done sending from client or sent status)
+    # note this is idempotent
+    def set_output_stream_done
+      @call_finished_mu.synchronize do
+        @output_stream_done = true
+        maybe_finish_and_close_call_locked
+      end
+    end
+
+    def maybe_finish_and_close_call_locked
+      return unless @output_stream_done && @input_stream_done
+      return if @call_finished
+      @call_finished = true
+      op_is_done
+      @call.close
+    end
+
     # Starts the call if not already started
     # @param metadata [Hash] metadata to be sent to the server. If a value is
     # a list, multiple metadata for its key are sent
diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb
index e54cf78..9e125cd 100644
--- a/src/ruby/lib/grpc/generic/bidi_call.rb
+++ b/src/ruby/lib/grpc/generic/bidi_call.rb
@@ -62,12 +62,19 @@
     # block that can be invoked with each response.
     #
     # @param requests the Enumerable of requests to send
-    # @param op_notifier a Notifier used to signal completion
+    # @param set_input_stream_done [Proc] called back when we're done
+    #   reading the input stream
+    # @param set_input_stream_done [Proc] called back when we're done
+    #   sending data on the output stream
     # @return an Enumerator of requests to yield
-    def run_on_client(requests, op_notifier, &blk)
-      @op_notifier = op_notifier
-      @enq_th = Thread.new { write_loop(requests) }
-      read_loop(&blk)
+    def run_on_client(requests,
+                      set_input_stream_done,
+                      set_output_stream_done,
+                      &blk)
+      @enq_th = Thread.new do
+        write_loop(requests, set_output_stream_done: set_output_stream_done)
+      end
+      read_loop(set_input_stream_done, &blk)
     end
 
     # Begins orchestration of the Bidi stream for a server generating replies.
@@ -81,12 +88,17 @@
     # produced by gen_each_reply could ignore the received_msgs
     #
     # @param gen_each_reply [Proc] generates the BiDi stream replies.
-    def run_on_server(gen_each_reply)
+    # @param set_input_steam_done [Proc] call back to call when
+    #   the reads have been completely read through.
+    def run_on_server(gen_each_reply, set_input_stream_done)
       # Pass in the optional call object parameter if possible
       if gen_each_reply.arity == 1
-        replys = gen_each_reply.call(read_loop(is_client: false))
+        replys = gen_each_reply.call(
+          read_loop(set_input_stream_done, is_client: false))
       elsif gen_each_reply.arity == 2
-        replys = gen_each_reply.call(read_loop(is_client: false), @req_view)
+        replys = gen_each_reply.call(
+          read_loop(set_input_stream_done, is_client: false),
+          @req_view)
       else
         fail 'Illegal arity of reply generator'
       end
@@ -99,22 +111,6 @@
     END_OF_READS = :end_of_reads
     END_OF_WRITES = :end_of_writes
 
-    # signals that bidi operation is complete
-    def notify_done
-      return unless @op_notifier
-      GRPC.logger.debug("bidi-notify-done: notifying  #{@op_notifier}")
-      @op_notifier.notify(self)
-    end
-
-    # signals that a bidi operation is complete (read + write)
-    def finished
-      @done_mutex.synchronize do
-        return unless @reads_complete && @writes_complete && !@complete
-        @call.close
-        @complete = true
-      end
-    end
-
     # performs a read using @call.run_batch, ensures metadata is set up
     def read_using_run_batch
       ops = { RECV_MESSAGE => nil }
@@ -127,7 +123,8 @@
       batch_result
     end
 
-    def write_loop(requests, is_client: true)
+    # set_output_stream_done is relevant on client-side
+    def write_loop(requests, is_client: true, set_output_stream_done: nil)
       GRPC.logger.debug('bidi-write-loop: starting')
       count = 0
       requests.each do |req|
@@ -151,23 +148,20 @@
         GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting")
         @call.run_batch(SEND_CLOSE_FROM_CLIENT => nil)
         GRPC.logger.debug('bidi-write-loop: done')
-        notify_done
-        @writes_complete = true
-        finished
       end
       GRPC.logger.debug('bidi-write-loop: finished')
     rescue StandardError => e
       GRPC.logger.warn('bidi-write-loop: failed')
       GRPC.logger.warn(e)
-      notify_done
-      @writes_complete = true
-      finished
       raise e
+    ensure
+      set_output_stream_done.call if is_client
     end
 
     # Provides an enumerator that yields results of remote reads
-    def read_loop(is_client: true)
+    def read_loop(set_input_stream_done, is_client: true)
       return enum_for(:read_loop,
+                      set_input_stream_done,
                       is_client: is_client) unless block_given?
       GRPC.logger.debug('bidi-read-loop: starting')
       begin
@@ -201,10 +195,10 @@
         GRPC.logger.warn('bidi: read-loop failed')
         GRPC.logger.warn(e)
         raise e
+      ensure
+        set_input_stream_done.call
       end
       GRPC.logger.debug('bidi-read-loop: finished')
-      @reads_complete = true
-      finished
       # Make sure that the write loop is done done before finishing the call.
       # Note that blocking is ok at this point because we've already received
       # a status
diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb
index ce00975..89cf8ff 100644
--- a/src/ruby/lib/grpc/generic/rpc_desc.rb
+++ b/src/ruby/lib/grpc/generic/rpc_desc.rb
@@ -48,7 +48,7 @@
     end
 
     def handle_request_response(active_call, mth)
-      req = active_call.remote_read
+      req = active_call.read_unary_request
       resp = mth.call(req, active_call.single_req_view)
       active_call.server_unary_response(
         resp, trailing_metadata: active_call.output_metadata)
@@ -61,7 +61,7 @@
     end
 
     def handle_server_streamer(active_call, mth)
-      req = active_call.remote_read
+      req = active_call.read_unary_request
       replys = mth.call(req, active_call.single_req_view)
       replys.each { |r| active_call.remote_send(r) }
       send_status(active_call, OK, 'OK', active_call.output_metadata)
diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb
index 72e55eb..ec0c294 100644
--- a/src/ruby/spec/generic/active_call_spec.rb
+++ b/src/ruby/spec/generic/active_call_spec.rb
@@ -473,7 +473,7 @@
       server_call.remote_send('server_response')
       expect(client_call.remote_read).to eq('server_response')
       server_call.send_status(OK, 'status code is OK')
-      expect { client_call.finished }.to_not raise_error
+      expect { client_call.receive_and_check_status }.to_not raise_error
     end
 
     it 'finishes ok if the server sends an early status response' do
@@ -490,7 +490,7 @@
       expect do
         call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil)
       end.to_not raise_error
-      expect { client_call.finished }.to_not raise_error
+      expect { client_call.receive_and_check_status }.to_not raise_error
     end
 
     it 'finishes ok if SEND_CLOSE and RECV_STATUS has been sent' do
diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb
index 09b88c7..3b8f72e 100644
--- a/src/ruby/spec/generic/client_stub_spec.rb
+++ b/src/ruby/spec/generic/client_stub_spec.rb
@@ -45,6 +45,7 @@
     @method = 'an_rpc_method'
     @pass = OK
     @fail = INTERNAL
+    @metadata = { k1: 'v1', k2: 'v2' }
   end
 
   after(:each) do
@@ -107,7 +108,7 @@
     end
   end
 
-  describe '#request_response' do
+  describe '#request_response', request_response: true do
     before(:each) do
       @sent_msg, @resp = 'a_msg', 'a_reply'
     end
@@ -187,13 +188,24 @@
         # Kill the server thread so tests can complete
         th.kill
       end
+
+      it 'should raise ArgumentError if metadata contains invalid values' do
+        @metadata.merge!(k3: 3)
+        server_port = create_test_server
+        host = "localhost:#{server_port}"
+        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
+        expect do
+          get_response(stub)
+        end.to raise_error(ArgumentError,
+                           /Header values must be of type string or array/)
+      end
     end
 
     describe 'without a call operation' do
       def get_response(stub, credentials: nil)
         puts credentials.inspect
         stub.request_response(@method, @sent_msg, noop, noop,
-                              metadata: { k1: 'v1', k2: 'v2' },
+                              metadata: @metadata,
                               credentials: credentials)
       end
 
@@ -201,16 +213,19 @@
     end
 
     describe 'via a call operation' do
+      after(:each) do
+        # make sure op.wait doesn't hang, even if there's a bad status
+        @op.wait
+      end
       def get_response(stub, run_start_call_first: false, credentials: nil)
-        op = stub.request_response(@method, @sent_msg, noop, noop,
-                                   return_op: true,
-                                   metadata: { k1: 'v1', k2: 'v2' },
-                                   deadline: from_relative_time(2),
-                                   credentials: credentials)
-        expect(op).to be_a(GRPC::ActiveCall::Operation)
-        op.start_call if run_start_call_first
-        result = op.execute
-        op.wait # make sure wait doesn't hang
+        @op = stub.request_response(@method, @sent_msg, noop, noop,
+                                    return_op: true,
+                                    metadata: @metadata,
+                                    deadline: from_relative_time(2),
+                                    credentials: credentials)
+        expect(@op).to be_a(GRPC::ActiveCall::Operation)
+        @op.start_call if run_start_call_first
+        result = @op.execute
         result
       end
 
@@ -228,13 +243,12 @@
     end
   end
 
-  describe '#client_streamer' do
+  describe '#client_streamer', client_streamer: true do
     before(:each) do
       Thread.abort_on_exception = true
       server_port = create_test_server
       host = "localhost:#{server_port}"
       @stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
-      @metadata = { k1: 'v1', k2: 'v2' }
       @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
       @resp = 'a_reply'
     end
@@ -278,13 +292,16 @@
     end
 
     describe 'via a call operation' do
+      after(:each) do
+        # make sure op.wait doesn't hang, even if there's a bad status
+        @op.wait
+      end
       def get_response(stub, run_start_call_first: false)
-        op = stub.client_streamer(@method, @sent_msgs, noop, noop,
-                                  return_op: true, metadata: @metadata)
-        expect(op).to be_a(GRPC::ActiveCall::Operation)
-        op.start_call if run_start_call_first
-        result = op.execute
-        op.wait # make sure wait doesn't hang
+        @op = stub.client_streamer(@method, @sent_msgs, noop, noop,
+                                   return_op: true, metadata: @metadata)
+        expect(@op).to be_a(GRPC::ActiveCall::Operation)
+        @op.start_call if run_start_call_first
+        result = @op.execute
         result
       end
 
@@ -298,7 +315,7 @@
     end
   end
 
-  describe '#server_streamer' do
+  describe '#server_streamer', server_streamer: true do
     before(:each) do
       @sent_msg = 'a_msg'
       @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
@@ -334,12 +351,36 @@
         expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
         th.join
       end
+
+      it 'should raise ArgumentError if metadata contains invalid values' do
+        @metadata.merge!(k3: 3)
+        server_port = create_test_server
+        host = "localhost:#{server_port}"
+        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
+        expect do
+          get_responses(stub)
+        end.to raise_error(ArgumentError,
+                           /Header values must be of type string or array/)
+      end
+
+      it 'the call terminates when there is an unmarshalling error' do
+        server_port = create_test_server
+        host = "localhost:#{server_port}"
+        th = run_server_streamer(@sent_msg, @replys, @pass)
+        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
+
+        unmarshal = proc { fail(ArgumentError, 'test unmarshalling error') }
+        expect do
+          get_responses(stub, unmarshal: unmarshal).collect { |r| r }
+        end.to raise_error(ArgumentError, 'test unmarshalling error')
+        th.join
+      end
     end
 
-    describe 'without a call operation' do
-      def get_responses(stub)
-        e = stub.server_streamer(@method, @sent_msg, noop, noop,
-                                 metadata: { k1: 'v1', k2: 'v2' })
+    describe 'without a call operation', test2: true do
+      def get_responses(stub, unmarshal: noop)
+        e = stub.server_streamer(@method, @sent_msg, noop, unmarshal,
+                                 metadata: @metadata)
         expect(e).to be_a(Enumerator)
         e
       end
@@ -351,10 +392,10 @@
       after(:each) do
         @op.wait # make sure wait doesn't hang
       end
-      def get_responses(stub, run_start_call_first: false)
-        @op = stub.server_streamer(@method, @sent_msg, noop, noop,
+      def get_responses(stub, run_start_call_first: false, unmarshal: noop)
+        @op = stub.server_streamer(@method, @sent_msg, noop, unmarshal,
                                    return_op: true,
-                                   metadata: { k1: 'v1', k2: 'v2' })
+                                   metadata: @metadata)
         expect(@op).to be_a(GRPC::ActiveCall::Operation)
         @op.start_call if run_start_call_first
         e = @op.execute
@@ -377,7 +418,7 @@
     end
   end
 
-  describe '#bidi_streamer' do
+  describe '#bidi_streamer', bidi: true do
     before(:each) do
       @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
       @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
@@ -386,7 +427,7 @@
     end
 
     shared_examples 'bidi streaming' do
-      it 'supports sending all the requests first', bidi: true do
+      it 'supports sending all the requests first' do
         th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys,
                                                    @pass)
         stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
@@ -395,7 +436,7 @@
         th.join
       end
 
-      it 'supports client-initiated ping pong', bidi: true do
+      it 'supports client-initiated ping pong' do
         th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true)
         stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
         e = get_responses(stub)
@@ -403,18 +444,39 @@
         th.join
       end
 
-      it 'supports a server-initiated ping pong', bidi: true do
+      it 'supports a server-initiated ping pong' do
         th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, false)
         stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
         e = get_responses(stub)
         expect(e.collect { |r| r }).to eq(@sent_msgs)
         th.join
       end
+
+      it 'should raise an error if the status is not ok' do
+        th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @fail, false)
+        stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
+        e = get_responses(stub)
+        expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
+        th.join
+      end
+
+      # TODO: add test for metadata-related ArgumentError in a bidi call once
+      # issue mentioned in https://github.com/grpc/grpc/issues/10526 is fixed
+
+      it 'should send metadata to the server ok' do
+        th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true,
+                                              **@metadata)
+        stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
+        e = get_responses(stub)
+        expect(e.collect { |r| r }).to eq(@sent_msgs)
+        th.join
+      end
     end
 
     describe 'without a call operation' do
       def get_responses(stub)
-        e = stub.bidi_streamer(@method, @sent_msgs, noop, noop)
+        e = stub.bidi_streamer(@method, @sent_msgs, noop, noop,
+                               metadata: @metadata)
         expect(e).to be_a(Enumerator)
         e
       end
@@ -428,7 +490,8 @@
       end
       def get_responses(stub, run_start_call_first: false)
         @op = stub.bidi_streamer(@method, @sent_msgs, noop, noop,
-                                 return_op: true)
+                                 return_op: true,
+                                 metadata: @metadata)
         expect(@op).to be_a(GRPC::ActiveCall::Operation)
         @op.start_call if run_start_call_first
         e = @op.execute
@@ -472,9 +535,14 @@
     end
   end
 
-  def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts)
+  def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts,
+                                       **kw)
+    wanted_metadata = kw.clone
     wakey_thread do |notifier|
       c = expect_server_to_be_invoked(notifier)
+      wanted_metadata.each do |k, v|
+        expect(c.metadata[k.to_s]).to eq(v)
+      end
       expected_inputs.each do |i|
         if client_starts
           expect(c.remote_read).to eq(i)
diff --git a/src/ruby/spec/generic/rpc_desc_spec.rb b/src/ruby/spec/generic/rpc_desc_spec.rb
index 100e9e8..be578c4 100644
--- a/src/ruby/spec/generic/rpc_desc_spec.rb
+++ b/src/ruby/spec/generic/rpc_desc_spec.rb
@@ -38,14 +38,14 @@
 
   shared_examples 'it handles errors' do
     it 'sends the specified status if BadStatus is raised' do
-      expect(@call).to receive(:remote_read).once.and_return(Object.new)
+      expect(@call).to receive(:read_unary_request).once.and_return(Object.new)
       expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false,
                                                        metadata: {})
       this_desc.run_server_method(@call, method(:bad_status))
     end
 
     it 'sends status UNKNOWN if other StandardErrors are raised' do
-      expect(@call).to receive(:remote_read).once.and_return(Object.new)
+      expect(@call).to receive(:read_unary_request).once.and_return(Object.new)
       expect(@call).to receive(:send_status).once.with(UNKNOWN,
                                                        arg_error_msg,
                                                        false, metadata: {})
@@ -53,7 +53,7 @@
     end
 
     it 'absorbs CallError with no further action' do
-      expect(@call).to receive(:remote_read).once.and_raise(CallError)
+      expect(@call).to receive(:read_unary_request).once.and_raise(CallError)
       blk = proc do
         this_desc.run_server_method(@call, method(:fake_reqresp))
       end
@@ -75,7 +75,7 @@
 
       it 'sends a response and closes the stream if there no errors' do
         req = Object.new
-        expect(@call).to receive(:remote_read).once.and_return(req)
+        expect(@call).to receive(:read_unary_request).once.and_return(req)
         expect(@call).to receive(:output_metadata).once.and_return(fake_md)
         expect(@call).to receive(:server_unary_response).once
           .with(@ok_response, trailing_metadata: fake_md)
@@ -133,7 +133,7 @@
 
       it 'sends a response and closes the stream if there no errors' do
         req = Object.new
-        expect(@call).to receive(:remote_read).once.and_return(req)
+        expect(@call).to receive(:read_unary_request).once.and_return(req)
         expect(@call).to receive(:remote_send).twice.with(@ok_response)
         expect(@call).to receive(:output_metadata).and_return(fake_md)
         expect(@call).to receive(:send_status).once.with(OK, 'OK', true,