Merge pull request #12105 from jiangtaoli2016/revert-12101-revert-11977-tsi_grpc

Roll forward "Add TSI zero-copy frame protector"
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index aabe7b4..8976686 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -1788,9 +1788,8 @@
     bool pending_data = s->pending_byte_stream ||
                         s->unprocessed_incoming_frames_buffer.length > 0;
     if (s->stream_compression_recv_enabled && s->read_closed &&
-        s->frame_storage.length > 0 &&
-        s->unprocessed_incoming_frames_buffer.length == 0 && !pending_data &&
-        !s->seen_error && s->recv_trailing_metadata_finished != NULL) {
+        s->frame_storage.length > 0 && !pending_data && !s->seen_error &&
+        s->recv_trailing_metadata_finished != NULL) {
       /* Maybe some SYNC_FLUSH data is left in frame_storage. Consume them and
        * maybe decompress the next 5 bytes in the stream. */
       bool end_of_context;
@@ -1817,7 +1816,6 @@
       }
     }
     if (s->read_closed && s->frame_storage.length == 0 &&
-        s->unprocessed_incoming_frames_buffer.length == 0 &&
         (!pending_data || s->seen_error) &&
         s->recv_trailing_metadata_finished != NULL) {
       grpc_chttp2_incoming_metadata_buffer_publish(
diff --git a/src/core/lib/security/transport/security_handshaker.c b/src/core/lib/security/transport/security_handshaker.c
index b9da6e1..fc9c9f9 100644
--- a/src/core/lib/security/transport/security_handshaker.c
+++ b/src/core/lib/security/transport/security_handshaker.c
@@ -261,7 +261,7 @@
     grpc_exec_ctx *exec_ctx, security_handshaker *h,
     const unsigned char *bytes_received, size_t bytes_received_size) {
   // Invoke TSI handshaker.
-  unsigned char *bytes_to_send = NULL;
+  const unsigned char *bytes_to_send = NULL;
   size_t bytes_to_send_size = 0;
   tsi_handshaker_result *handshaker_result = NULL;
   tsi_result result = tsi_handshaker_next(
diff --git a/src/core/tsi/fake_transport_security.c b/src/core/tsi/fake_transport_security.c
index de16b35..967126e 100644
--- a/src/core/tsi/fake_transport_security.c
+++ b/src/core/tsi/fake_transport_security.c
@@ -532,7 +532,7 @@
 
 static tsi_result fake_handshaker_next(
     tsi_handshaker *self, const unsigned char *received_bytes,
-    size_t received_bytes_size, unsigned char **bytes_to_send,
+    size_t received_bytes_size, const unsigned char **bytes_to_send,
     size_t *bytes_to_send_size, tsi_handshaker_result **handshaker_result,
     tsi_handshaker_on_next_done_cb cb, void *user_data) {
   /* Sanity check the arguments. */
diff --git a/src/core/tsi/transport_security.c b/src/core/tsi/transport_security.c
index 3637f3c..7621307 100644
--- a/src/core/tsi/transport_security.c
+++ b/src/core/tsi/transport_security.c
@@ -186,7 +186,7 @@
 
 tsi_result tsi_handshaker_next(
     tsi_handshaker *self, const unsigned char *received_bytes,
-    size_t received_bytes_size, unsigned char **bytes_to_send,
+    size_t received_bytes_size, const unsigned char **bytes_to_send,
     size_t *bytes_to_send_size, tsi_handshaker_result **handshaker_result,
     tsi_handshaker_on_next_done_cb cb, void *user_data) {
   if (self == NULL || self->vtable == NULL) return TSI_INVALID_ARGUMENT;
diff --git a/src/core/tsi/transport_security.h b/src/core/tsi/transport_security.h
index dde48a6..b0d7039 100644
--- a/src/core/tsi/transport_security.h
+++ b/src/core/tsi/transport_security.h
@@ -70,7 +70,8 @@
                                        tsi_frame_protector **protector);
   void (*destroy)(tsi_handshaker *self);
   tsi_result (*next)(tsi_handshaker *self, const unsigned char *received_bytes,
-                     size_t received_bytes_size, unsigned char **bytes_to_send,
+                     size_t received_bytes_size,
+                     const unsigned char **bytes_to_send,
                      size_t *bytes_to_send_size,
                      tsi_handshaker_result **handshaker_result,
                      tsi_handshaker_on_next_done_cb cb, void *user_data);
diff --git a/src/core/tsi/transport_security_adapter.c b/src/core/tsi/transport_security_adapter.c
index 3b388af..1c2a57b 100644
--- a/src/core/tsi/transport_security_adapter.c
+++ b/src/core/tsi/transport_security_adapter.c
@@ -143,7 +143,7 @@
 
 static tsi_result adapter_next(
     tsi_handshaker *self, const unsigned char *received_bytes,
-    size_t received_bytes_size, unsigned char **bytes_to_send,
+    size_t received_bytes_size, const unsigned char **bytes_to_send,
     size_t *bytes_to_send_size, tsi_handshaker_result **handshaker_result,
     tsi_handshaker_on_next_done_cb cb, void *user_data) {
   /* Input sanity check.  */
diff --git a/src/core/tsi/transport_security_interface.h b/src/core/tsi/transport_security_interface.h
index 414c786..80c426b 100644
--- a/src/core/tsi/transport_security_interface.h
+++ b/src/core/tsi/transport_security_interface.h
@@ -438,7 +438,7 @@
    tsi_handshaker object.  */
 tsi_result tsi_handshaker_next(
     tsi_handshaker *self, const unsigned char *received_bytes,
-    size_t received_bytes_size, unsigned char **bytes_to_send,
+    size_t received_bytes_size, const unsigned char **bytes_to_send,
     size_t *bytes_to_send_size, tsi_handshaker_result **handshaker_result,
     tsi_handshaker_on_next_done_cb cb, void *user_data);
 
diff --git a/src/objective-c/README.md b/src/objective-c/README.md
index 3624475..e76ee17 100644
--- a/src/objective-c/README.md
+++ b/src/objective-c/README.md
@@ -112,7 +112,7 @@
 ```ruby
   s.prepare_command = <<-CMD
     ...
-        #{src}/*.proto #{src}/**/*.proto
+        `find . -name *.proto -print | xargs`
   CMD
   ...
     ms.source_files = "#{dir}/*.pbobjc.{h,m}", "#{dir}/**/*.pbobjc.{h,m}"
diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c
index b999548..74f189e 100644
--- a/src/ruby/ext/grpc/rb_call.c
+++ b/src/ruby/ext/grpc/rb_call.c
@@ -179,6 +179,38 @@
   return Qnil;
 }
 
+/* TODO: expose this as part of the surface API if needed.
+ * This is meant for internal usage by the "write thread" of grpc-ruby
+ * client-side bidi calls. It provides a way for the background write-thread
+ * to propogate failures to the main read-thread and give the user an error
+ * message. */
+static VALUE grpc_rb_call_cancel_with_status(VALUE self, VALUE status_code,
+                                             VALUE details) {
+  grpc_rb_call *call = NULL;
+  grpc_call_error err;
+  if (RTYPEDDATA_DATA(self) == NULL) {
+    // This call has been closed
+    return Qnil;
+  }
+
+  if (TYPE(details) != T_STRING || TYPE(status_code) != T_FIXNUM) {
+    rb_raise(rb_eTypeError,
+             "Bad parameter type error for cancel with status. Want Fixnum, "
+             "String.");
+    return Qnil;
+  }
+
+  TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
+  err = grpc_call_cancel_with_status(call->wrapped, NUM2LONG(status_code),
+                                     StringValueCStr(details), NULL);
+  if (err != GRPC_CALL_OK) {
+    rb_raise(grpc_rb_eCallError, "cancel with status failed: %s (code=%d)",
+             grpc_call_error_detail_of(err), err);
+  }
+
+  return Qnil;
+}
+
 /* Releases the c-level resources associated with a call
    Once a call has been closed, no further requests can be
    processed.
@@ -949,6 +981,8 @@
   /* Add ruby analogues of the Call methods. */
   rb_define_method(grpc_rb_cCall, "run_batch", grpc_rb_call_run_batch, 1);
   rb_define_method(grpc_rb_cCall, "cancel", grpc_rb_call_cancel, 0);
+  rb_define_method(grpc_rb_cCall, "cancel_with_status",
+                   grpc_rb_call_cancel_with_status, 2);
   rb_define_method(grpc_rb_cCall, "close", grpc_rb_call_close, 0);
   rb_define_method(grpc_rb_cCall, "peer", grpc_rb_call_get_peer, 0);
   rb_define_method(grpc_rb_cCall, "peer_cert", grpc_rb_call_get_peer_cert, 0);
diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb
index 9e125cd..c2239d0 100644
--- a/src/ruby/lib/grpc/generic/bidi_call.rb
+++ b/src/ruby/lib/grpc/generic/bidi_call.rb
@@ -153,7 +153,12 @@
     rescue StandardError => e
       GRPC.logger.warn('bidi-write-loop: failed')
       GRPC.logger.warn(e)
-      raise e
+      if is_client
+        @call.cancel_with_status(GRPC::Core::StatusCodes::UNKNOWN,
+                                 "GRPC bidi call error: #{e.inspect}")
+      else
+        raise e
+      end
     ensure
       set_output_stream_done.call if is_client
     end
@@ -180,8 +185,8 @@
               batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil)
               @call.status = batch_result.status
               @call.trailing_metadata = @call.status.metadata if @call.status
-              batch_result.check_status
               GRPC.logger.debug("bidi-read-loop: done status #{@call.status}")
+              batch_result.check_status
             end
 
             GRPC.logger.debug('bidi-read-loop: done reading!')
diff --git a/src/ruby/spec/call_spec.rb b/src/ruby/spec/call_spec.rb
index 473ff4a..1cc0500 100644
--- a/src/ruby/spec/call_spec.rb
+++ b/src/ruby/spec/call_spec.rb
@@ -137,6 +137,39 @@
     end
   end
 
+  describe '#cancel' do
+    it 'completes ok' do
+      call = make_test_call
+      expect { call.cancel }.not_to raise_error
+    end
+
+    it 'completes ok when the call is closed' do
+      call = make_test_call
+      call.close
+      expect { call.cancel }.not_to raise_error
+    end
+  end
+
+  describe '#cancel_with_status' do
+    it 'completes ok' do
+      call = make_test_call
+      expect do
+        call.cancel_with_status(0, 'test status')
+      end.not_to raise_error
+      expect do
+        call.cancel_with_status(0, nil)
+      end.to raise_error(TypeError)
+    end
+
+    it 'completes ok when the call is closed' do
+      call = make_test_call
+      call.close
+      expect do
+        call.cancel_with_status(0, 'test status')
+      end.not_to raise_error
+    end
+  end
+
   def make_test_call
     @ch.create_call(nil, nil, 'dummy_method', nil, deadline)
   end
diff --git a/src/ruby/spec/client_server_spec.rb b/src/ruby/spec/client_server_spec.rb
index b48b417..1a9b47e 100644
--- a/src/ruby/spec/client_server_spec.rb
+++ b/src/ruby/spec/client_server_spec.rb
@@ -226,6 +226,62 @@
     svr_batch = server_call.run_batch(server_ops)
     expect(svr_batch.send_close).to be true
   end
+
+  def client_cancel_test(cancel_proc, expected_code,
+                         expected_details)
+    call = new_client_call
+    server_call = nil
+
+    server_thread = Thread.new do
+      server_call = server_allows_client_to_proceed
+    end
+
+    client_ops = {
+      CallOps::SEND_INITIAL_METADATA => {},
+      CallOps::RECV_INITIAL_METADATA => nil
+    }
+    batch_result = call.run_batch(client_ops)
+    expect(batch_result.send_metadata).to be true
+    expect(batch_result.metadata).to eq({})
+
+    cancel_proc.call(call)
+
+    server_thread.join
+    server_ops = {
+      CallOps::RECV_CLOSE_ON_SERVER => nil
+    }
+    svr_batch = server_call.run_batch(server_ops)
+    expect(svr_batch.send_close).to be true
+
+    client_ops = {
+      CallOps::RECV_STATUS_ON_CLIENT => {}
+    }
+    batch_result = call.run_batch(client_ops)
+
+    expect(batch_result.status.code).to be expected_code
+    expect(batch_result.status.details).to eq expected_details
+  end
+
+  it 'clients can cancel a call on the server' do
+    expected_code = StatusCodes::CANCELLED
+    expected_details = 'Cancelled'
+    cancel_proc = proc { |call| call.cancel }
+    client_cancel_test(cancel_proc, expected_code, expected_details)
+  end
+
+  it 'cancel_with_status unknown status' do
+    code = StatusCodes::UNKNOWN
+    details = 'test unknown reason'
+    cancel_proc = proc { |call| call.cancel_with_status(code, details) }
+    client_cancel_test(cancel_proc, code, details)
+  end
+
+  it 'cancel_with_status unknown status' do
+    code = StatusCodes::FAILED_PRECONDITION
+    details = 'test failed precondition reason'
+    cancel_proc = proc { |call| call.cancel_with_status(code, details) }
+    client_cancel_test(cancel_proc, code, details)
+  end
 end
 
 shared_examples 'GRPC metadata delivery works OK' do
diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb
index e1e7a53..9539e56 100644
--- a/src/ruby/spec/generic/client_stub_spec.rb
+++ b/src/ruby/spec/generic/client_stub_spec.rb
@@ -472,7 +472,7 @@
         host = "localhost:#{server_port}"
         stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
         expect do
-          get_responses(stub)
+          get_responses(stub).collect { |r| r }
         end.to raise_error(ArgumentError,
                            /Header values must be of type string or array/)
       end
@@ -641,11 +641,101 @@
         expect(e.collect { |r| r }).to eq(@sent_msgs)
         th.join
       end
+
+      # Prompted by grpc/github #10526
+      describe 'surfacing of errors when sending requests' do
+        def run_server_bidi_send_one_then_read_indefinitely
+          @server.start
+          recvd_rpc = @server.request_call
+          recvd_call = recvd_rpc.call
+          server_call = GRPC::ActiveCall.new(
+            recvd_call, noop, noop, INFINITE_FUTURE,
+            metadata_received: true, started: false)
+          server_call.send_initial_metadata
+          server_call.remote_send('server response')
+          loop do
+            m = server_call.remote_read
+            break if m.nil?
+          end
+          # can't fail since initial metadata already sent
+          server_call.send_status(@pass, 'OK', true)
+        end
+
+        def verify_error_from_write_thread(stub, requests_to_push,
+                                           request_queue, expected_description)
+          # TODO: an improvement might be to raise the original exception from
+          # bidi call write loops instead of only cancelling the call
+          failing_marshal_proc = proc do |req|
+            fail req if req.is_a?(StandardError)
+            req
+          end
+          begin
+            e = get_responses(stub, marshal_proc: failing_marshal_proc)
+            first_response = e.next
+            expect(first_response).to eq('server response')
+            requests_to_push.each { |req| request_queue.push(req) }
+            e.collect { |r| r }
+          rescue GRPC::Unknown => e
+            exception = e
+          end
+          expect(exception.message.include?(expected_description)).to be(true)
+        end
+
+        # Provides an Enumerable view of a Queue
+        class BidiErrorTestingEnumerateForeverQueue
+          def initialize(queue)
+            @queue = queue
+          end
+
+          def each
+            loop do
+              msg = @queue.pop
+              yield msg
+            end
+          end
+        end
+
+        def run_error_in_client_request_stream_test(requests_to_push,
+                                                    expected_error_message)
+          # start a server that waits on a read indefinitely - it should
+          # see a cancellation and be able to break out
+          th = Thread.new { run_server_bidi_send_one_then_read_indefinitely }
+          stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
+
+          request_queue = Queue.new
+          @sent_msgs = BidiErrorTestingEnumerateForeverQueue.new(request_queue)
+
+          verify_error_from_write_thread(stub,
+                                         requests_to_push,
+                                         request_queue,
+                                         expected_error_message)
+          # the write loop errror should cancel the call and end the
+          # server's request stream
+          th.join
+        end
+
+        it 'non-GRPC errors from the write loop surface when raised ' \
+          'at the start of a request stream' do
+          expected_error_message = 'expect error on first request'
+          requests_to_push = [StandardError.new(expected_error_message)]
+          run_error_in_client_request_stream_test(requests_to_push,
+                                                  expected_error_message)
+        end
+
+        it 'non-GRPC errors from the write loop surface when raised ' \
+          'during the middle of a request stream' do
+          expected_error_message = 'expect error on last request'
+          requests_to_push = %w( one two )
+          requests_to_push << StandardError.new(expected_error_message)
+          run_error_in_client_request_stream_test(requests_to_push,
+                                                  expected_error_message)
+        end
+      end
     end
 
     describe 'without a call operation' do
-      def get_responses(stub, deadline: nil)
-        e = stub.bidi_streamer(@method, @sent_msgs, noop, noop,
+      def get_responses(stub, deadline: nil, marshal_proc: noop)
+        e = stub.bidi_streamer(@method, @sent_msgs, marshal_proc, noop,
                                metadata: @metadata, deadline: deadline)
         expect(e).to be_a(Enumerator)
         e
@@ -658,8 +748,9 @@
       after(:each) do
         @op.wait # make sure wait doesn't hang
       end
-      def get_responses(stub, run_start_call_first: false, deadline: nil)
-        @op = stub.bidi_streamer(@method, @sent_msgs, noop, noop,
+      def get_responses(stub, run_start_call_first: false, deadline: nil,
+                        marshal_proc: noop)
+        @op = stub.bidi_streamer(@method, @sent_msgs, marshal_proc, noop,
                                  return_op: true,
                                  metadata: @metadata, deadline: deadline)
         expect(@op).to be_a(GRPC::ActiveCall::Operation)
diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb
index e4fe594..b887eaa 100644
--- a/src/ruby/spec/generic/rpc_server_spec.rb
+++ b/src/ruby/spec/generic/rpc_server_spec.rb
@@ -178,6 +178,18 @@
 
 CheckCallAfterFinishedServiceStub = CheckCallAfterFinishedService.rpc_stub_class
 
+# A service with a bidi streaming method.
+class BidiService
+  include GRPC::GenericService
+  rpc :server_sends_bad_input, stream(EchoMsg), stream(EchoMsg)
+
+  def server_sends_bad_input(_, _)
+    'bad response. (not an enumerable, client sees an error)'
+  end
+end
+
+BidiStub = BidiService.rpc_stub_class
+
 describe GRPC::RpcServer do
   RpcServer = GRPC::RpcServer
   StatusCodes = GRPC::Core::StatusCodes
@@ -520,6 +532,29 @@
         t.join
         expect(one_failed_as_unavailable).to be(true)
       end
+
+      it 'should send a status UNKNOWN with a relevant message when the' \
+        'servers response stream is not an enumerable' do
+        @srv.handle(BidiService)
+        t = Thread.new { @srv.run }
+        @srv.wait_till_running
+        stub = BidiStub.new(@host, :this_channel_is_insecure, **client_opts)
+        responses = stub.server_sends_bad_input([])
+        exception = nil
+        begin
+          responses.each { |r| r }
+        rescue GRPC::Unknown => e
+          exception = e
+        end
+        # Erroneous responses sent from the server handler should cause an
+        # exception on the client with relevant info.
+        expected_details = 'NoMethodError: undefined method `each\' for '\
+          '"bad response. (not an enumerable, client sees an error)"'
+
+        expect(exception.inspect.include?(expected_details)).to be true
+        @srv.stop
+        t.join
+      end
     end
 
     context 'with connect metadata' do
diff --git a/tools/run_tests/run_interop_tests.py b/tools/run_tests/run_interop_tests.py
index dbbf2ad..1537641 100755
--- a/tools/run_tests/run_interop_tests.py
+++ b/tools/run_tests/run_interop_tests.py
@@ -731,7 +731,7 @@
     if manual_cmd_log is not None:
       if manual_cmd_log == []:
         manual_cmd_log.append('echo "Testing ${docker_image:=%s}"' % docker_image)
-      manual_cmd_log.append(manual_cmdline(cmdline, docker_iamge))
+      manual_cmd_log.append(manual_cmdline(cmdline, docker_image))
     cwd = None
 
   test_job = jobset.JobSpec(
@@ -793,7 +793,7 @@
   if manual_cmd_log is not None:
       if manual_cmd_log == []:
         manual_cmd_log.append('echo "Testing ${docker_image:=%s}"' % docker_image)
-      manual_cmd_log.append(manual_cmdline(docker_cmdline, docker_iamge))
+      manual_cmd_log.append(manual_cmdline(docker_cmdline, docker_image))
   server_job = jobset.JobSpec(
           cmdline=docker_cmdline,
           environ=environ,