add missing fields on server call context and improve robustness of finished calls
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index 4a748a4..84c8887 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -44,9 +44,9 @@
     include Core::TimeConsts
     include Core::CallOps
     extend Forwardable
-    attr_reader :deadline, :metadata_sent, :metadata_to_send
+    attr_reader :deadline, :metadata_sent, :metadata_to_send, :peer, :peer_cert
     def_delegators :@call, :cancel, :metadata, :write_flag, :write_flag=,
-                   :peer, :peer_cert, :trailing_metadata, :status
+                   :trailing_metadata, :status
 
     # client_invoke begins a client invocation.
     #
@@ -105,8 +105,13 @@
       @input_stream_done = false
       @call_finished = false
       @call_finished_mu = Mutex.new
+
       @client_call_executed = false
       @client_call_executed_mu = Mutex.new
+
+      # set the peer now so that the accessor can still function
+      # after the server closes the call
+      @peer = call.peer
     end
 
     # Sends the initial metadata that has yet to be sent.
@@ -541,6 +546,10 @@
       end
     end
 
+    def attach_peer_cert(peer_cert)
+      @peer_cert = peer_cert
+    end
+
     private
 
     # To be called once the "input stream" has been completelly
@@ -612,6 +621,7 @@
     # server client_streamer handlers.
     MultiReqView = view_class(:cancelled?, :deadline, :each_queued_msg,
                               :each_remote_read, :metadata, :output_metadata,
+                              :peer, :peer_cert,
                               :send_initial_metadata,
                               :metadata_to_send,
                               :merge_metadata_to_send,
diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb
index ef2cc0c..33b3cea 100644
--- a/src/ruby/lib/grpc/generic/rpc_server.rb
+++ b/src/ruby/lib/grpc/generic/rpc_server.rb
@@ -418,6 +418,7 @@
                          metadata_received: true,
                          started: false,
                          metadata_to_send: connect_md)
+      c.attach_peer_cert(an_rpc.call.peer_cert)
       mth = an_rpc.method.to_sym
       [c, mth]
     end
diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb
index 7b5e6a9..a8653e7 100644
--- a/src/ruby/spec/generic/client_stub_spec.rb
+++ b/src/ruby/spec/generic/client_stub_spec.rb
@@ -37,7 +37,9 @@
 include GRPC::Core::CallOps
 
 # check that methods on a finished/closed call t crash
-def check_op_view_of_finished_client_call_is_robust(op_view)
+def check_op_view_of_finished_client_call(op_view,
+                                          expected_metadata,
+                                          expected_trailing_metadata)
   # use read_response_stream to try to iterate through
   # possible response stream
   fail('need something to attempt reads') unless block_given?
@@ -48,21 +50,39 @@
 
   expect { op_view.start_call }.to raise_error(RuntimeError)
 
+  sanity_check_values_of_accessors(op_view,
+                                   expected_metadata,
+                                   expected_trailing_metadata)
+
   expect do
     op_view.wait
     op_view.cancel
-
-    op_view.metadata
-    op_view.trailing_metadata
-    op_view.status
-
-    op_view.cancelled?
-    op_view.deadline
-    op_view.write_flag
     op_view.write_flag = 1
   end.to_not raise_error
 end
 
+def sanity_check_values_of_accessors(op_view,
+                                     expected_metadata,
+                                     expected_trailing_metadata)
+  expected_status = Struct::Status.new
+  expected_status.code = 0
+  expected_status.details = 'OK'
+  expected_status.metadata = expected_trailing_metadata
+
+  expect(op_view.status).to eq(expected_status)
+  expect(op_view.metadata).to eq(expected_metadata)
+  expect(op_view.trailing_metadata).to eq(expected_trailing_metadata)
+
+  expect(op_view.cancelled?).to be(false)
+  expect(op_view.write_flag).to be(nil)
+
+  # The deadline attribute of a call can be either
+  # a GRPC::Core::TimeSpec or a Time, which are mutually exclusive.
+  # TODO: fix so that the accessor always returns the same type.
+  expect(op_view.deadline.is_a?(GRPC::Core::TimeSpec) ||
+         op_view.deadline.is_a?(Time)).to be(true)
+end
+
 describe 'ClientStub' do
   let(:noop) { proc { |x| x } }
 
@@ -154,7 +174,7 @@
         server_port = create_test_server
         host = "localhost:#{server_port}"
         th = run_request_response(@sent_msg, @resp, @pass,
-                                  k1: 'v1', k2: 'v2')
+                                  expected_metadata: { k1: 'v1', k2: 'v2' })
         stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
         expect(get_response(stub)).to eq(@resp)
         th.join
@@ -261,8 +281,14 @@
       def run_op_view_metadata_test(run_start_call_first)
         server_port = create_test_server
         host = "localhost:#{server_port}"
-        th = run_request_response(@sent_msg, @resp, @pass,
-                                  k1: 'v1', k2: 'v2')
+
+        @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }
+        @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }
+        th = run_request_response(
+          @sent_msg, @resp, @pass,
+          expected_metadata: @metadata,
+          server_initial_md: @server_initial_md,
+          server_trailing_md: @server_trailing_md)
         stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
         expect(
           get_response(stub,
@@ -272,12 +298,14 @@
 
       it 'sends metadata to the server ok when running start_call first' do
         run_op_view_metadata_test(true)
-        check_op_view_of_finished_client_call_is_robust(@op) { |r| p r }
+        check_op_view_of_finished_client_call(
+          @op, @server_initial_md, @server_trailing_md) { |r| p r }
       end
 
       it 'does not crash when used after the call has been finished' do
         run_op_view_metadata_test(false)
-        check_op_view_of_finished_client_call_is_robust(@op) { |r| p r }
+        check_op_view_of_finished_client_call(
+          @op, @server_initial_md, @server_trailing_md) { |r| p r }
       end
     end
   end
@@ -300,7 +328,8 @@
       end
 
       it 'should send metadata to the server ok' do
-        th = run_client_streamer(@sent_msgs, @resp, @pass, **@metadata)
+        th = run_client_streamer(@sent_msgs, @resp, @pass,
+                                 expected_metadata: @metadata)
         expect(get_response(@stub)).to eq(@resp)
         th.join
       end
@@ -347,7 +376,13 @@
       it_behaves_like 'client streaming'
 
       def run_op_view_metadata_test(run_start_call_first)
-        th = run_client_streamer(@sent_msgs, @resp, @pass, **@metadata)
+        @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }
+        @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }
+        th = run_client_streamer(
+          @sent_msgs, @resp, @pass,
+          expected_metadata: @metadata,
+          server_initial_md: @server_initial_md,
+          server_trailing_md: @server_trailing_md)
         expect(
           get_response(@stub,
                        run_start_call_first: run_start_call_first)).to eq(@resp)
@@ -356,12 +391,14 @@
 
       it 'sends metadata to the server ok when running start_call first' do
         run_op_view_metadata_test(true)
-        check_op_view_of_finished_client_call_is_robust(@op) { |r| p r }
+        check_op_view_of_finished_client_call(
+          @op, @server_initial_md, @server_trailing_md) { |r| p r }
       end
 
       it 'does not crash when used after the call has been finished' do
         run_op_view_metadata_test(false)
-        check_op_view_of_finished_client_call_is_robust(@op) { |r| p r }
+        check_op_view_of_finished_client_call(
+          @op, @server_initial_md, @server_trailing_md) { |r| p r }
       end
     end
   end
@@ -396,7 +433,7 @@
         server_port = create_test_server
         host = "localhost:#{server_port}"
         th = run_server_streamer(@sent_msg, @replys, @fail,
-                                 k1: 'v1', k2: 'v2')
+                                 expected_metadata: { k1: 'v1', k2: 'v2' })
         stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
         e = get_responses(stub)
         expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
@@ -459,24 +496,31 @@
       def run_op_view_metadata_test(run_start_call_first)
         server_port = create_test_server
         host = "localhost:#{server_port}"
-        th = run_server_streamer(@sent_msg, @replys, @fail,
-                                 k1: 'v1', k2: 'v2')
+        @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }
+        @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }
+        th = run_server_streamer(
+          @sent_msg, @replys, @pass,
+          expected_metadata: @metadata,
+          server_initial_md: @server_initial_md,
+          server_trailing_md: @server_trailing_md)
         stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
         e = get_responses(stub, run_start_call_first: run_start_call_first)
-        expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
+        expect(e.collect { |r| r }).to eq(@replys)
         th.join
       end
 
       it 'should send metadata to the server ok when start_call is run first' do
         run_op_view_metadata_test(true)
-        check_op_view_of_finished_client_call_is_robust(@op) do |responses|
+        check_op_view_of_finished_client_call(
+          @op, @server_initial_md, @server_trailing_md) do |responses|
           responses.each { |r| p r }
         end
       end
 
       it 'does not crash when used after the call has been finished' do
         run_op_view_metadata_test(false)
-        check_op_view_of_finished_client_call_is_robust(@op) do |responses|
+        check_op_view_of_finished_client_call(
+          @op, @server_initial_md, @server_trailing_md) do |responses|
           responses.each { |r| p r }
         end
       end
@@ -530,7 +574,7 @@
 
       it 'should send metadata to the server ok' do
         th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true,
-                                              **@metadata)
+                                              expected_metadata: @metadata)
         stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
         e = get_responses(stub)
         expect(e.collect { |r| r }).to eq(@sent_msgs)
@@ -567,40 +611,52 @@
       it_behaves_like 'bidi streaming'
 
       def run_op_view_metadata_test(run_start_call_first)
-        th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys,
-                                                   @pass)
+        @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }
+        @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }
+        th = run_bidi_streamer_echo_ping_pong(
+          @sent_msgs, @pass, true,
+          expected_metadata: @metadata,
+          server_initial_md: @server_initial_md,
+          server_trailing_md: @server_trailing_md)
         stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
         e = get_responses(stub, run_start_call_first: run_start_call_first)
-        expect(e.collect { |r| r }).to eq(@replys)
+        expect(e.collect { |r| r }).to eq(@sent_msgs)
         th.join
       end
 
       it 'can run start_call before executing the call' do
         run_op_view_metadata_test(true)
-        check_op_view_of_finished_client_call_is_robust(@op) do |responses|
+        check_op_view_of_finished_client_call(
+          @op, @server_initial_md, @server_trailing_md) do |responses|
           responses.each { |r| p r }
         end
       end
 
       it 'doesnt crash when op_view used after call has finished' do
         run_op_view_metadata_test(false)
-        check_op_view_of_finished_client_call_is_robust(@op) do |responses|
+        check_op_view_of_finished_client_call(
+          @op, @server_initial_md, @server_trailing_md) do |responses|
           responses.each { |r| p r }
         end
       end
     end
   end
 
-  def run_server_streamer(expected_input, replys, status, **kw)
-    wanted_metadata = kw.clone
+  def run_server_streamer(expected_input, replys, status,
+                          expected_metadata: {},
+                          server_initial_md: {},
+                          server_trailing_md: {})
+    wanted_metadata = expected_metadata.clone
     wakey_thread do |notifier|
-      c = expect_server_to_be_invoked(notifier)
+      c = expect_server_to_be_invoked(
+        notifier, metadata_to_send: server_initial_md)
       wanted_metadata.each do |k, v|
         expect(c.metadata[k.to_s]).to eq(v)
       end
       expect(c.remote_read).to eq(expected_input)
       replys.each { |r| c.remote_send(r) }
-      c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
+      c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
+                    metadata: server_trailing_md)
     end
   end
 
@@ -615,10 +671,13 @@
   end
 
   def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts,
-                                       **kw)
-    wanted_metadata = kw.clone
+                                       expected_metadata: {},
+                                       server_initial_md: {},
+                                       server_trailing_md: {})
+    wanted_metadata = expected_metadata.clone
     wakey_thread do |notifier|
-      c = expect_server_to_be_invoked(notifier)
+      c = expect_server_to_be_invoked(
+        notifier, metadata_to_send: server_initial_md)
       wanted_metadata.each do |k, v|
         expect(c.metadata[k.to_s]).to eq(v)
       end
@@ -631,33 +690,44 @@
           expect(c.remote_read).to eq(i)
         end
       end
-      c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
+      c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
+                    metadata: server_trailing_md)
     end
   end
 
-  def run_client_streamer(expected_inputs, resp, status, **kw)
-    wanted_metadata = kw.clone
+  def run_client_streamer(expected_inputs, resp, status,
+                          expected_metadata: {},
+                          server_initial_md: {},
+                          server_trailing_md: {})
+    wanted_metadata = expected_metadata.clone
     wakey_thread do |notifier|
-      c = expect_server_to_be_invoked(notifier)
+      c = expect_server_to_be_invoked(
+        notifier, metadata_to_send: server_initial_md)
       expected_inputs.each { |i| expect(c.remote_read).to eq(i) }
       wanted_metadata.each do |k, v|
         expect(c.metadata[k.to_s]).to eq(v)
       end
       c.remote_send(resp)
-      c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
+      c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
+                    metadata: server_trailing_md)
     end
   end
 
-  def run_request_response(expected_input, resp, status, **kw)
-    wanted_metadata = kw.clone
+  def run_request_response(expected_input, resp, status,
+                           expected_metadata: {},
+                           server_initial_md: {},
+                           server_trailing_md: {})
+    wanted_metadata = expected_metadata.clone
     wakey_thread do |notifier|
-      c = expect_server_to_be_invoked(notifier)
+      c = expect_server_to_be_invoked(
+        notifier, metadata_to_send: server_initial_md)
       expect(c.remote_read).to eq(expected_input)
       wanted_metadata.each do |k, v|
         expect(c.metadata[k.to_s]).to eq(v)
       end
       c.remote_send(resp)
-      c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
+      c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
+                    metadata: server_trailing_md)
     end
   end
 
@@ -675,13 +745,13 @@
     @server.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
   end
 
-  def expect_server_to_be_invoked(notifier)
+  def expect_server_to_be_invoked(notifier, metadata_to_send: nil)
     @server.start
     notifier.notify(nil)
     recvd_rpc = @server.request_call
     recvd_call = recvd_rpc.call
     recvd_call.metadata = recvd_rpc.metadata
-    recvd_call.run_batch(SEND_INITIAL_METADATA => nil)
+    recvd_call.run_batch(SEND_INITIAL_METADATA => metadata_to_send)
     GRPC::ActiveCall.new(recvd_call, noop, noop, INFINITE_FUTURE,
                          metadata_received: true)
   end
diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb
index 9633a82..4258d59 100644
--- a/src/ruby/spec/generic/rpc_server_spec.rb
+++ b/src/ruby/spec/generic/rpc_server_spec.rb
@@ -111,6 +111,47 @@
 
 SlowStub = SlowService.rpc_stub_class
 
+# a test service that hangs onto call objects
+# and uses them after the server-side call has been
+# finished
+class CheckCallAfterFinishedService
+  include GRPC::GenericService
+  rpc :an_rpc, EchoMsg, EchoMsg
+  rpc :a_client_streaming_rpc, stream(EchoMsg), EchoMsg
+  rpc :a_server_streaming_rpc, EchoMsg, stream(EchoMsg)
+  rpc :a_bidi_rpc, stream(EchoMsg), stream(EchoMsg)
+  attr_reader :server_side_call
+
+  def an_rpc(req, call)
+    fail 'shouldnt reuse service' unless @call.nil?
+    @server_side_call = call
+    req
+  end
+
+  def a_client_streaming_rpc(call)
+    fail 'shouldnt reuse service' unless @call.nil?
+    @server_side_call = call
+    # iterate through requests so call can complete
+    call.each_remote_read.each { |r| p r }
+    EchoMsg.new
+  end
+
+  def a_server_streaming_rpc(_, call)
+    fail 'shouldnt reuse service' unless @call.nil?
+    @server_side_call = call
+    [EchoMsg.new, EchoMsg.new]
+  end
+
+  def a_bidi_rpc(requests, call)
+    fail 'shouldnt reuse service' unless @call.nil?
+    @server_side_call = call
+    requests.each { |r| p r }
+    [EchoMsg.new, EchoMsg.new]
+  end
+end
+
+CheckCallAfterFinishedServiceStub = CheckCallAfterFinishedService.rpc_stub_class
+
 describe GRPC::RpcServer do
   RpcServer = GRPC::RpcServer
   StatusCodes = GRPC::Core::StatusCodes
@@ -505,5 +546,109 @@
         t.join
       end
     end
+
+    context 'when call objects are used after calls have completed' do
+      before(:each) do
+        server_opts = {
+          poll_period: 1
+        }
+        @srv = RpcServer.new(**server_opts)
+        alt_port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
+        @alt_host = "0.0.0.0:#{alt_port}"
+
+        @service = CheckCallAfterFinishedService.new
+        @srv.handle(@service)
+        @srv_thd  = Thread.new { @srv.run }
+        @srv.wait_till_running
+      end
+
+      # check that the server-side call is still in a usable state even
+      # after it has finished
+      def check_single_req_view_of_finished_call(call)
+        common_check_of_finished_server_call(call)
+
+        expect(call.peer).to be_a(String)
+        expect(call.peer_cert).to be(nil)
+      end
+
+      def check_multi_req_view_of_finished_call(call)
+        common_check_of_finished_server_call(call)
+
+        expect do
+          call.each_remote_read.each { |r| p r }
+        end.to raise_error(GRPC::Core::CallError)
+      end
+
+      def common_check_of_finished_server_call(call)
+        expect do
+          call.merge_metadata_to_send({})
+        end.to raise_error(RuntimeError)
+
+        expect do
+          call.send_initial_metadata
+        end.to_not raise_error
+
+        expect(call.cancelled?).to be(false)
+        expect(call.metadata).to be_a(Hash)
+        expect(call.metadata['user-agent']).to be_a(String)
+
+        expect(call.metadata_sent).to be(true)
+        expect(call.output_metadata).to eq({})
+        expect(call.metadata_to_send).to eq({})
+        expect(call.deadline.is_a?(Time)).to be(true)
+      end
+
+      it 'should not crash when call used after an unary call is finished' do
+        req = EchoMsg.new
+        stub = CheckCallAfterFinishedServiceStub.new(@alt_host,
+                                                     :this_channel_is_insecure)
+        resp = stub.an_rpc(req)
+        expect(resp).to be_a(EchoMsg)
+        @srv.stop
+        @srv_thd.join
+
+        check_single_req_view_of_finished_call(@service.server_side_call)
+      end
+
+      it 'should not crash when call used after client streaming finished' do
+        requests = [EchoMsg.new, EchoMsg.new]
+        stub = CheckCallAfterFinishedServiceStub.new(@alt_host,
+                                                     :this_channel_is_insecure)
+        resp = stub.a_client_streaming_rpc(requests)
+        expect(resp).to be_a(EchoMsg)
+        @srv.stop
+        @srv_thd.join
+
+        check_multi_req_view_of_finished_call(@service.server_side_call)
+      end
+
+      it 'should not crash when call used after server streaming finished' do
+        req = EchoMsg.new
+        stub = CheckCallAfterFinishedServiceStub.new(@alt_host,
+                                                     :this_channel_is_insecure)
+        responses = stub.a_server_streaming_rpc(req)
+        responses.each do |r|
+          expect(r).to be_a(EchoMsg)
+        end
+        @srv.stop
+        @srv_thd.join
+
+        check_single_req_view_of_finished_call(@service.server_side_call)
+      end
+
+      it 'should not crash when call used after a bidi call is finished' do
+        requests = [EchoMsg.new, EchoMsg.new]
+        stub = CheckCallAfterFinishedServiceStub.new(@alt_host,
+                                                     :this_channel_is_insecure)
+        responses = stub.a_bidi_rpc(requests)
+        responses.each do |r|
+          expect(r).to be_a(EchoMsg)
+        end
+        @srv.stop
+        @srv_thd.join
+
+        check_multi_req_view_of_finished_call(@service.server_side_call)
+      end
+    end
   end
 end