Merge pull request #12851 from AspirinSJL/cancel_fallback_timer

Cancel fallback timer in glb_shutdown()
diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec
index 7ad6812..ca5301a 100644
--- a/gRPC-Core.podspec
+++ b/gRPC-Core.podspec
@@ -84,6 +84,7 @@
     # build.
     'USE_HEADERMAP' => 'NO',
     'ALWAYS_SEARCH_USER_PATHS' => 'NO',
+    'GCC_PREPROCESSOR_DEFINITIONS' => '"$(inherited)" "COCOAPODS=1" "PB_NO_PACKED_STRUCTS=1"',
   }
 
   s.default_subspecs = 'Interface', 'Implementation'
diff --git a/src/compiler/objective_c_generator.cc b/src/compiler/objective_c_generator.cc
index c05d153..33b5fed 100644
--- a/src/compiler/objective_c_generator.cc
+++ b/src/compiler/objective_c_generator.cc
@@ -17,6 +17,7 @@
  */
 
 #include <map>
+#include <set>
 #include <sstream>
 
 #include "src/compiler/config.h"
@@ -29,7 +30,9 @@
 using ::grpc::protobuf::io::Printer;
 using ::grpc::protobuf::MethodDescriptor;
 using ::grpc::protobuf::ServiceDescriptor;
+using ::grpc::protobuf::FileDescriptor;
 using ::std::map;
+using ::std::set;
 
 namespace grpc_objective_c_generator {
 namespace {
@@ -190,6 +193,24 @@
 
 }  // namespace
 
+::grpc::string GetAllMessageClasses(const FileDescriptor *file) {
+  ::grpc::string output;
+  set< ::grpc::string> classes;
+  for (int i = 0; i < file->service_count(); i++) {
+    const auto service = file->service(i);
+    for (int i = 0; i < service->method_count(); i++) {
+      const auto method = service->method(i);
+      classes.insert(ClassName(method->input_type()));
+      classes.insert(ClassName(method->output_type()));
+    }
+  }
+  for (auto one_class : classes) {
+    output += "  @class " + one_class + ";\n";
+  }
+
+  return output;
+}
+
 ::grpc::string GetHeader(const ServiceDescriptor *service) {
   ::grpc::string output;
   {
diff --git a/src/compiler/objective_c_generator.h b/src/compiler/objective_c_generator.h
index edbee7f..e912a52 100644
--- a/src/compiler/objective_c_generator.h
+++ b/src/compiler/objective_c_generator.h
@@ -24,8 +24,12 @@
 namespace grpc_objective_c_generator {
 
 using ::grpc::protobuf::ServiceDescriptor;
+using ::grpc::protobuf::FileDescriptor;
 using ::grpc::string;
 
+// Returns forward declaration of classes in the generated header file.
+string GetAllMessageClasses(const FileDescriptor *file);
+
 // Returns the content to be included in the "global_scope" insertion point of
 // the generated header file.
 string GetHeader(const ServiceDescriptor *service);
diff --git a/src/compiler/objective_c_plugin.cc b/src/compiler/objective_c_plugin.cc
index 96a3375..e751d05 100644
--- a/src/compiler/objective_c_plugin.cc
+++ b/src/compiler/objective_c_plugin.cc
@@ -58,9 +58,10 @@
                                "#import <RxLibrary/GRXWriteable.h>\n"
                                "#import <RxLibrary/GRXWriter.h>\n";
 
-      // TODO(jcanizales): Instead forward-declare the input and output types
-      // and import the files in the .pbrpc.m
       ::grpc::string proto_imports;
+      proto_imports += "#if GPB_GRPC_FORWARD_DECLARE_MESSAGE_PROTO\n" +
+                       grpc_objective_c_generator::GetAllMessageClasses(file) +
+                       "#else\n";
       for (int i = 0; i < file->dependency_count(); i++) {
         ::grpc::string header =
             grpc_objective_c_generator::MessageHeaderName(file->dependency(i));
@@ -70,19 +71,20 @@
           grpc_generator::StripPrefix(&base_name, "google/protobuf/");
           // create the import code snippet
           proto_imports +=
-              "#if GPB_USE_PROTOBUF_FRAMEWORK_IMPORTS\n"
-              "  #import <" +
+              "  #if GPB_USE_PROTOBUF_FRAMEWORK_IMPORTS\n"
+              "    #import <" +
               ::grpc::string(ProtobufLibraryFrameworkName) + "/" + base_name +
               ">\n"
-              "#else\n"
-              "  #import \"" +
+              "  #else\n"
+              "    #import \"" +
               header +
               "\"\n"
-              "#endif\n";
+              "  #endif\n";
         } else {
-          proto_imports += ::grpc::string("#import \"") + header + "\"\n";
+          proto_imports += ::grpc::string("  #import \"") + header + "\"\n";
         }
       }
+      proto_imports += "#endif\n";
 
       ::grpc::string declarations;
       for (int i = 0; i < file->service_count(); i++) {
@@ -106,6 +108,28 @@
                                ".pbrpc.h\"\n\n"
                                "#import <ProtoRPC/ProtoRPC.h>\n"
                                "#import <RxLibrary/GRXWriter+Immediate.h>\n";
+      for (int i = 0; i < file->dependency_count(); i++) {
+        ::grpc::string header =
+            grpc_objective_c_generator::MessageHeaderName(file->dependency(i));
+        const grpc::protobuf::FileDescriptor *dependency = file->dependency(i);
+        if (IsProtobufLibraryBundledProtoFile(dependency)) {
+          ::grpc::string base_name = header;
+          grpc_generator::StripPrefix(&base_name, "google/protobuf/");
+          // create the import code snippet
+          imports +=
+              "#if GPB_USE_PROTOBUF_FRAMEWORK_IMPORTS\n"
+              "  #import <" +
+              ::grpc::string(ProtobufLibraryFrameworkName) + "/" + base_name +
+              ">\n"
+              "#else\n"
+              "  #import \"" +
+              header +
+              "\"\n"
+              "#endif\n";
+        } else {
+          imports += ::grpc::string("#import \"") + header + "\"\n";
+        }
+      }
 
       ::grpc::string definitions;
       for (int i = 0; i < file->service_count(); i++) {
diff --git a/src/core/lib/support/log_posix.cc b/src/core/lib/support/log_posix.cc
index 38a136e..29530c8 100644
--- a/src/core/lib/support/log_posix.cc
+++ b/src/core/lib/support/log_posix.cc
@@ -58,7 +58,7 @@
 }
 
 extern "C" void gpr_default_log(gpr_log_func_args *args) {
-  char *final_slash;
+  const char *final_slash;
   const char *display_file;
   char time_buffer[64];
   time_t timer;
diff --git a/src/objective-c/tests/RemoteTestClient/RemoteTest.podspec b/src/objective-c/tests/RemoteTestClient/RemoteTest.podspec
index 1796c6d..ad83481 100644
--- a/src/objective-c/tests/RemoteTestClient/RemoteTest.podspec
+++ b/src/objective-c/tests/RemoteTestClient/RemoteTest.podspec
@@ -47,7 +47,7 @@
 
   s.pod_target_xcconfig = {
     # This is needed by all pods that depend on Protobuf:
-    'GCC_PREPROCESSOR_DEFINITIONS' => '$(inherited) GPB_USE_PROTOBUF_FRAMEWORK_IMPORTS=1',
+    'GCC_PREPROCESSOR_DEFINITIONS' => '$(inherited) GPB_USE_PROTOBUF_FRAMEWORK_IMPORTS=1 GPB_GRPC_FORWARD_DECLARE_MESSAGE_PROTO=1',
     # This is needed by all pods that depend on gRPC-RxLibrary:
     'CLANG_ALLOW_NON_MODULAR_INCLUDES_IN_FRAMEWORK_MODULES' => 'YES',
   }
diff --git a/src/ruby/end2end/load_grpc_with_gc_stress_driver.rb b/src/ruby/end2end/load_grpc_with_gc_stress_driver.rb
new file mode 100755
index 0000000..3668b95
--- /dev/null
+++ b/src/ruby/end2end/load_grpc_with_gc_stress_driver.rb
@@ -0,0 +1,32 @@
+#!/usr/bin/env ruby
+#
+# Copyright 2016 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+this_dir = File.expand_path(File.dirname(__FILE__))
+protos_lib_dir = File.join(this_dir, 'lib')
+grpc_lib_dir = File.join(File.dirname(this_dir), 'lib')
+$LOAD_PATH.unshift(grpc_lib_dir) unless $LOAD_PATH.include?(grpc_lib_dir)
+$LOAD_PATH.unshift(protos_lib_dir) unless $LOAD_PATH.include?(protos_lib_dir)
+$LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir)
+
+GC.stress = 0x04
+
+require 'grpc'
+
+GRPC::Core::Channel.new('dummy_host', nil, :this_channel_is_insecure)
+GRPC::Core::Server.new({})
+GRPC::Core::ChannelCredentials.new
+GRPC::Core::CallCredentials.new(proc { |noop| noop })
+GRPC::Core::CompressionOptions.new
diff --git a/src/ruby/ext/grpc/rb_grpc.c b/src/ruby/ext/grpc/rb_grpc.c
index 933a3ac..b53f09e 100644
--- a/src/ruby/ext/grpc/rb_grpc.c
+++ b/src/ruby/ext/grpc/rb_grpc.c
@@ -315,8 +315,8 @@
     return;
   }
 
-  bg_thread_init_rb_mu = rb_mutex_new();
   rb_global_variable(&bg_thread_init_rb_mu);
+  bg_thread_init_rb_mu = rb_mutex_new();
 
   grpc_rb_mGRPC = rb_define_module("GRPC");
   grpc_rb_mGrpcCore = rb_define_module_under(grpc_rb_mGRPC, "Core");
diff --git a/templates/gRPC-Core.podspec.template b/templates/gRPC-Core.podspec.template
index 77191aa..f3ac30c 100644
--- a/templates/gRPC-Core.podspec.template
+++ b/templates/gRPC-Core.podspec.template
@@ -119,6 +119,7 @@
       # build.
       'USE_HEADERMAP' => 'NO',
       'ALWAYS_SEARCH_USER_PATHS' => 'NO',
+      'GCC_PREPROCESSOR_DEFINITIONS' => '"$(inherited)" "COCOAPODS=1" "PB_NO_PACKED_STRUCTS=1"',
     }
 
     s.default_subspecs = 'Interface', 'Implementation'
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index 41090d1..bbefbac 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -223,11 +223,8 @@
   bool disable_blocking;
   bool inproc;
   bool health_check_service;
-  // Although the below grpc::string's are logically const, we can't declare
-  // them const because of a limitation in the way old compilers (e.g., gcc-4.4)
-  // manage vector insertion using a copy constructor
-  grpc::string credentials_type;
-  grpc::string message_content;
+  const grpc::string credentials_type;
+  const grpc::string message_content;
 };
 
 static std::ostream& operator<<(std::ostream& out,
@@ -537,33 +534,18 @@
   service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
                                  tag(2));
 
-  cli_stream->Write(send_request, tag(3));
+  auto verif = Verifier(GetParam().disable_blocking);
+  verif.Expect(2, true);
 
-  // 65536(64KB) is the default flow control window size. Should change this
-  // number when default flow control window size changes. For the write of
-  // send_request larger than the flow control window size, tag:3 will not come
-  // up until server read is initiated. For write of send_request smaller than
-  // the flow control window size, the request can take the free ride with
-  // initial metadata due to coalescing, thus write tag:3 will come up here.
-  if (GetParam().message_content.length() < 65536 || GetParam().inproc) {
-    Verifier(GetParam().disable_blocking)
-        .Expect(2, true)
-        .Expect(3, true)
-        .Verify(cq_.get());
-  } else {
-    Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
+  cli_stream->Write(send_request, tag(3));
+  verif.Expect(3, true);
+
+  // Drain tag 2, optional to get tag 3 now
+  while (verif.Next(cq_.get(), false) != 2) {
   }
 
   srv_stream.Read(&recv_request, tag(4));
-
-  if (GetParam().message_content.length() < 65536 || GetParam().inproc) {
-    Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
-  } else {
-    Verifier(GetParam().disable_blocking)
-        .Expect(3, true)
-        .Expect(4, true)
-        .Verify(cq_.get());
-  }
+  verif.Expect(4, true).Verify(cq_.get());
 
   EXPECT_EQ(send_request.message(), recv_request.message());
 
@@ -832,33 +814,19 @@
   service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
                               tag(2));
 
-  cli_stream->WriteLast(send_request, WriteOptions(), tag(3));
+  auto verif = Verifier(GetParam().disable_blocking);
+  verif.Expect(2, true);
 
-  // 65536(64KB) is the default flow control window size. Should change this
-  // number when default flow control window size changes. For the write of
-  // send_request larger than the flow control window size, tag:3 will not come
-  // up until server read is initiated. For write of send_request smaller than
-  // the flow control window size, the request can take the free ride with
-  // initial metadata due to coalescing, thus write tag:3 will come up here.
-  if (GetParam().message_content.length() < 65536 || GetParam().inproc) {
-    Verifier(GetParam().disable_blocking)
-        .Expect(2, true)
-        .Expect(3, true)
-        .Verify(cq_.get());
-  } else {
-    Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
+  cli_stream->WriteLast(send_request, WriteOptions(), tag(3));
+  verif.Expect(3, true);
+
+  // Drain tag 2, optional to get tag 3 now
+  while (verif.Next(cq_.get(), false) != 2) {
   }
 
   srv_stream.Read(&recv_request, tag(4));
+  verif.Expect(4, true).Verify(cq_.get());
 
-  if (GetParam().message_content.length() < 65536 || GetParam().inproc) {
-    Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
-  } else {
-    Verifier(GetParam().disable_blocking)
-        .Expect(3, true)
-        .Expect(4, true)
-        .Verify(cq_.get());
-  }
   EXPECT_EQ(send_request.message(), recv_request.message());
 
   srv_stream.Read(&recv_request, tag(5));
@@ -900,33 +868,19 @@
   service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
                               tag(2));
 
-  cli_stream->WriteLast(send_request, WriteOptions(), tag(3));
+  auto verif = Verifier(GetParam().disable_blocking);
+  verif.Expect(2, true);
 
-  // 65536(64KB) is the default flow control window size. Should change this
-  // number when default flow control window size changes. For the write of
-  // send_request larger than the flow control window size, tag:3 will not come
-  // up until server read is initiated. For write of send_request smaller than
-  // the flow control window size, the request can take the free ride with
-  // initial metadata due to coalescing, thus write tag:3 will come up here.
-  if (GetParam().message_content.length() < 65536 || GetParam().inproc) {
-    Verifier(GetParam().disable_blocking)
-        .Expect(2, true)
-        .Expect(3, true)
-        .Verify(cq_.get());
-  } else {
-    Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
+  cli_stream->WriteLast(send_request, WriteOptions(), tag(3));
+  verif.Expect(3, true);
+
+  // Drain tag 2, optional to get tag 3 now
+  while (verif.Next(cq_.get(), false) != 2) {
   }
 
   srv_stream.Read(&recv_request, tag(4));
+  verif.Expect(4, true).Verify(cq_.get());
 
-  if (GetParam().message_content.length() < 65536 || GetParam().inproc) {
-    Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
-  } else {
-    Verifier(GetParam().disable_blocking)
-        .Expect(3, true)
-        .Expect(4, true)
-        .Verify(cq_.get());
-  }
   EXPECT_EQ(send_request.message(), recv_request.message());
 
   srv_stream.Read(&recv_request, tag(5));
@@ -1788,7 +1742,7 @@
   GPR_ASSERT(!credentials_types.empty());
 
   messages.push_back("Hello");
-  for (int sz = 1; sz < test_big_limit; sz *= 2) {
+  for (int sz = 1; sz <= test_big_limit; sz *= 32) {
     grpc::string big_msg;
     for (int i = 0; i < sz * 1024; i++) {
       char c = 'a' + (i % 26);
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index 5dae5b0..810ee30 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -198,10 +198,7 @@
   void Log() const;
   bool use_proxy;
   bool inproc;
-  // Although the below grpc::string is logically const, we can't declare
-  // them const because of a limitation in the way old compilers (e.g., gcc-4.4)
-  // manage vector insertion using a copy constructor
-  grpc::string credentials_type;
+  const grpc::string credentials_type;
 };
 
 static std::ostream& operator<<(std::ostream& out,
diff --git a/test/cpp/interop/stress_test.cc b/test/cpp/interop/stress_test.cc
index 9cc5a81..c6d3600 100644
--- a/test/cpp/interop/stress_test.cc
+++ b/test/cpp/interop/stress_test.cc
@@ -257,6 +257,7 @@
   gpr_log(GPR_INFO, "Starting test(s)..");
 
   std::vector<std::thread> test_threads;
+  std::vector<std::unique_ptr<StressTestInteropClient>> clients;
 
   // Create and start the test threads.
   // Note that:
@@ -282,9 +283,9 @@
       // Create stub(s) for each channel
       for (int stub_idx = 0; stub_idx < FLAGS_num_stubs_per_channel;
            stub_idx++) {
-        StressTestInteropClient* client = new StressTestInteropClient(
+        clients.emplace_back(new StressTestInteropClient(
             ++thread_idx, *it, channel, test_selector, FLAGS_test_duration_secs,
-            FLAGS_sleep_duration_ms, FLAGS_do_not_abort_on_transient_failures);
+            FLAGS_sleep_duration_ms, FLAGS_do_not_abort_on_transient_failures));
 
         bool is_already_created = false;
         // QpsGauge name
@@ -293,7 +294,7 @@
                       server_idx, channel_idx, stub_idx);
 
         test_threads.emplace_back(std::thread(
-            &StressTestInteropClient::MainLoop, client,
+            &StressTestInteropClient::MainLoop, clients.back().get(),
             metrics_service.CreateQpsGauge(buffer, &is_already_created)));
 
         // The QpsGauge should not have been already created
diff --git a/tools/run_tests/helper_scripts/run_ruby_end2end_tests.sh b/tools/run_tests/helper_scripts/run_ruby_end2end_tests.sh
index 7914b0e..5cfab14 100755
--- a/tools/run_tests/helper_scripts/run_ruby_end2end_tests.sh
+++ b/tools/run_tests/helper_scripts/run_ruby_end2end_tests.sh
@@ -27,4 +27,5 @@
 ruby src/ruby/end2end/forking_client_driver.rb || EXIT_CODE=1
 ruby src/ruby/end2end/grpc_class_init_driver.rb || EXIT_CODE=1
 ruby src/ruby/end2end/multiple_killed_watching_threads_driver.rb || EXIT_CODE=1
+ruby src/ruby/end2end/load_grpc_with_gc_stress_driver.rb || EXIT_CODE=1
 exit $EXIT_CODE
diff --git a/tools/run_tests/python_utils/jobset.py b/tools/run_tests/python_utils/jobset.py
index 82a3bc1..d523095 100755
--- a/tools/run_tests/python_utils/jobset.py
+++ b/tools/run_tests/python_utils/jobset.py
@@ -306,8 +306,8 @@
         else:
           self._state = _FAILURE
           if not self._suppress_failure_message:
-            message('FAILED', '%s [ret=%d, pid=%d]' % (
-                self._spec.shortname, self._process.returncode, self._process.pid),
+            message('FAILED', '%s [ret=%d, pid=%d, time=%.1fsec]' % (
+                self._spec.shortname, self._process.returncode, self._process.pid, elapsed),
                 stdout(), do_newline=True)
           self.result.state = 'FAILED'
           self.result.num_failures += 1
@@ -326,7 +326,7 @@
             self.result.cpu_estimated = float('%.01f' % self._spec.cpu_cost)
             measurement = '; cpu_cost=%.01f; estimated=%.01f' % (self.result.cpu_measured, self.result.cpu_estimated)
         if not self._quiet_success:
-          message('PASSED', '%s [time=%.1fsec; retries=%d:%d%s]' % (
+          message('PASSED', '%s [time=%.1fsec, retries=%d:%d%s]' % (
               self._spec.shortname, elapsed, self._retries, self._timeout_retries, measurement),
               stdout() if self._spec.verbose_success else None,
               do_newline=self._newline_on_success or self._travis)
@@ -334,6 +334,8 @@
     elif (self._state == _RUNNING and
           self._spec.timeout_seconds is not None and
           time.time() - self._start > self._spec.timeout_seconds):
+      elapsed = time.time() - self._start
+      self.result.elapsed_time = elapsed
       if self._timeout_retries < self._spec.timeout_retries:
         message('TIMEOUT_FLAKE', '%s [pid=%d]' % (self._spec.shortname, self._process.pid), stdout(), do_newline=True)
         self._timeout_retries += 1
@@ -344,7 +346,7 @@
         self._process.terminate()
         self.start()
       else:
-        message('TIMEOUT', '%s [pid=%d]' % (self._spec.shortname, self._process.pid), stdout(), do_newline=True)
+        message('TIMEOUT', '%s [pid=%d, time=%.1fsec]' % (self._spec.shortname, self._process.pid, elapsed), stdout(), do_newline=True)
         self.kill()
         self.result.state = 'TIMEOUT'
         self.result.num_failures += 1
diff --git a/tools/run_tests/python_utils/upload_test_results.py b/tools/run_tests/python_utils/upload_test_results.py
index 580e7f7..15e8277 100644
--- a/tools/run_tests/python_utils/upload_test_results.py
+++ b/tools/run_tests/python_utils/upload_test_results.py
@@ -102,6 +102,15 @@
       test_results['timestamp'] = time.strftime('%Y-%m-%d %H:%M:%S')
 
       row = big_query_utils.make_row(str(uuid.uuid4()), test_results)
-      if not big_query_utils.insert_rows(bq, _PROJECT_ID, _DATASET_ID, bq_table, [row]):
-        print('Error uploading result to bigquery.')
-        sys.exit(1)
+
+      # TODO(jtattermusch): rows are inserted one by one, very inefficient
+      max_retries = 3
+      for attempt in range(max_retries):
+        if big_query_utils.insert_rows(bq, _PROJECT_ID, _DATASET_ID, bq_table, [row]):
+          break
+        else:
+          if attempt < max_retries - 1:
+            print('Error uploading result to bigquery, will retry.')
+          else:
+            print('Error uploading result to bigquery, all attempts failed.')
+            sys.exit(1)