Merge pull request #11693 from ncteisen/refactor-thread-pool

Make CreateThreadPool Settable
diff --git a/BUILD b/BUILD
index 083639c..5c2e2f9 100644
--- a/BUILD
+++ b/BUILD
@@ -1543,6 +1543,7 @@
         ":grpc++",
         "//src/proto/grpc/reflection/v1alpha:reflection_proto",
     ],
+    alwayslink = 1,
 )
 
 grpc_cc_library(
diff --git a/bazel/grpc_build_system.bzl b/bazel/grpc_build_system.bzl
index 8057f27..f793cae 100644
--- a/bazel/grpc_build_system.bzl
+++ b/bazel/grpc_build_system.bzl
@@ -25,7 +25,8 @@
 
 def grpc_cc_library(name, srcs = [], public_hdrs = [], hdrs = [],
                     external_deps = [], deps = [], standalone = False,
-                    language = "C++", testonly = False, visibility = None):
+                    language = "C++", testonly = False, visibility = None,
+                    alwayslink = 0):
   copts = []
   if language.upper() == "C":
     copts = ["-std=c99"]
@@ -40,7 +41,8 @@
     linkopts = ["-pthread"],
     includes = [
         "include"
-    ]
+    ],
+    alwayslink = alwayslink,
   )
 
 def grpc_proto_plugin(name, srcs = [], deps = []):
diff --git a/src/compiler/php_generator.cc b/src/compiler/php_generator.cc
index 6d34761..38ec46e 100644
--- a/src/compiler/php_generator.cc
+++ b/src/compiler/php_generator.cc
@@ -97,13 +97,14 @@
 }
 
 // Prints out the service descriptor object
-void PrintService(const ServiceDescriptor *service, Printer *out) {
+void PrintService(const ServiceDescriptor *service,
+                  const grpc::string &parameter, Printer *out) {
   map<grpc::string, grpc::string> vars;
   out->Print("/**\n");
   out->Print(GetPHPComments(service, " *").c_str());
   out->Print(" */\n");
-  vars["name"] = service->name();
-  out->Print(vars, "class $name$Client extends \\Grpc\\BaseStub {\n\n");
+  vars["name"] = GetPHPServiceClassname(service, parameter);
+  out->Print(vars, "class $name$ extends \\Grpc\\BaseStub {\n\n");
   out->Indent();
   out->Indent();
   out->Print(
@@ -131,7 +132,8 @@
 }
 
 grpc::string GenerateFile(const FileDescriptor *file,
-                          const ServiceDescriptor *service) {
+                          const ServiceDescriptor *service,
+                          const grpc::string &parameter) {
   grpc::string output;
   {
     StringOutputStream output_stream(&output);
@@ -150,7 +152,7 @@
     vars["package"] = MessageIdentifierName(file->package());
     out.Print(vars, "namespace $package$;\n\n");
 
-    PrintService(service, &out);
+    PrintService(service, parameter, &out);
   }
   return output;
 }
diff --git a/src/compiler/php_generator.h b/src/compiler/php_generator.h
index 4518bc2..9a04bd3 100644
--- a/src/compiler/php_generator.h
+++ b/src/compiler/php_generator.h
@@ -24,7 +24,8 @@
 namespace grpc_php_generator {
 
 grpc::string GenerateFile(const grpc::protobuf::FileDescriptor *file,
-                          const grpc::protobuf::ServiceDescriptor *service);
+                          const grpc::protobuf::ServiceDescriptor *service,
+                          const grpc::string &parameter);
 
 }  // namespace grpc_php_generator
 
diff --git a/src/compiler/php_generator_helpers.h b/src/compiler/php_generator_helpers.h
index 3a5c08b..5edebf6 100644
--- a/src/compiler/php_generator_helpers.h
+++ b/src/compiler/php_generator_helpers.h
@@ -26,9 +26,22 @@
 
 namespace grpc_php_generator {
 
+inline grpc::string GetPHPServiceClassname(
+    const grpc::protobuf::ServiceDescriptor *service,
+    const grpc::string &parameter) {
+  grpc::string suffix;
+  if (parameter == "") {
+    suffix = "Client";
+  } else {
+    suffix = parameter;
+  }
+  return service->name() + suffix;
+}
+
 inline grpc::string GetPHPServiceFilename(
     const grpc::protobuf::FileDescriptor *file,
-    const grpc::protobuf::ServiceDescriptor *service) {
+    const grpc::protobuf::ServiceDescriptor *service,
+    const grpc::string &parameter) {
   std::vector<grpc::string> tokens =
       grpc_generator::tokenize(file->package(), ".");
   std::ostringstream oss;
@@ -36,7 +49,7 @@
     oss << (i == 0 ? "" : "/")
         << grpc_generator::CapitalizeFirstLetter(tokens[i]);
   }
-  return oss.str() + "/" + service->name() + "Client.php";
+  return oss.str() + "/" + GetPHPServiceClassname(service, parameter) + ".php";
 }
 
 // ReplaceAll replaces all instances of search with replace in s.
diff --git a/src/compiler/php_plugin.cc b/src/compiler/php_plugin.cc
index 7a581fd..bbe9165 100644
--- a/src/compiler/php_plugin.cc
+++ b/src/compiler/php_plugin.cc
@@ -41,10 +41,11 @@
     }
 
     for (int i = 0; i < file->service_count(); i++) {
-      grpc::string code = GenerateFile(file, file->service(i));
+      grpc::string code = GenerateFile(file, file->service(i), parameter);
 
       // Get output file name
-      grpc::string file_name = GetPHPServiceFilename(file, file->service(i));
+      grpc::string file_name =
+          GetPHPServiceFilename(file, file->service(i), parameter);
 
       std::unique_ptr<grpc::protobuf::io::ZeroCopyOutputStream> output(
           context->Open(file_name));
diff --git a/src/core/ext/filters/client_channel/http_proxy.c b/src/core/ext/filters/client_channel/http_proxy.c
index cfb5ec6..aa3f61c 100644
--- a/src/core/ext/filters/client_channel/http_proxy.c
+++ b/src/core/ext/filters/client_channel/http_proxy.c
@@ -22,6 +22,7 @@
 #include <string.h>
 
 #include <grpc/support/alloc.h>
+#include <grpc/support/host_port.h>
 #include <grpc/support/log.h>
 #include <grpc/support/string_util.h>
 
@@ -30,6 +31,7 @@
 #include "src/core/ext/filters/client_channel/uri_parser.h"
 #include "src/core/lib/channel/channel_args.h"
 #include "src/core/lib/support/env.h"
+#include "src/core/lib/support/string.h"
 
 static char* grpc_get_http_proxy_server(grpc_exec_ctx* exec_ctx) {
   char* uri_str = gpr_getenv("http_proxy");
@@ -80,6 +82,50 @@
     grpc_uri_destroy(uri);
     return false;
   }
+  char* no_proxy_str = gpr_getenv("no_proxy");
+  if (no_proxy_str != NULL) {
+    static const char* NO_PROXY_SEPARATOR = ",";
+    bool use_proxy = true;
+    char* server_host;
+    char* server_port;
+    if (!gpr_split_host_port(uri->path[0] == '/' ? uri->path + 1 : uri->path,
+                             &server_host, &server_port)) {
+      gpr_log(GPR_INFO,
+              "unable to split host and port, not checking no_proxy list for "
+              "host '%s'",
+              server_uri);
+    } else {
+      size_t uri_len = strlen(server_host);
+      char** no_proxy_hosts;
+      size_t num_no_proxy_hosts;
+      gpr_string_split(no_proxy_str, NO_PROXY_SEPARATOR, &no_proxy_hosts,
+                       &num_no_proxy_hosts);
+      for (size_t i = 0; i < num_no_proxy_hosts; i++) {
+        char* no_proxy_entry = no_proxy_hosts[i];
+        size_t no_proxy_len = strlen(no_proxy_entry);
+        if (no_proxy_len <= uri_len &&
+            gpr_stricmp(no_proxy_entry, &server_host[uri_len - no_proxy_len]) ==
+                0) {
+          gpr_log(GPR_INFO, "not using proxy for host in no_proxy list '%s'",
+                  server_uri);
+          use_proxy = false;
+          break;
+        }
+      }
+      for (size_t i = 0; i < num_no_proxy_hosts; i++) {
+        gpr_free(no_proxy_hosts[i]);
+      }
+      gpr_free(no_proxy_hosts);
+      gpr_free(server_host);
+      gpr_free(server_port);
+      if (!use_proxy) {
+        grpc_uri_destroy(uri);
+        gpr_free(*name_to_resolve);
+        *name_to_resolve = NULL;
+        return false;
+      }
+    }
+  }
   grpc_arg new_arg = grpc_channel_arg_string_create(
       GRPC_ARG_HTTP_CONNECT_SERVER,
       uri->path[0] == '/' ? uri->path + 1 : uri->path);
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
index cccc3e8..fdb18f6 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
@@ -1705,7 +1705,6 @@
 static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
                               const grpc_lb_policy_args *args) {
   glb_lb_policy *glb_policy = (glb_lb_policy *)policy;
-
   if (glb_policy->updating_lb_channel) {
     if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
       gpr_log(GPR_INFO,
@@ -1813,9 +1812,11 @@
         // lb_on_server_status_received will pick up the cancel and reinit
         // lb_call.
         if (glb_policy->pending_update_args != NULL) {
-          const grpc_lb_policy_args *args = glb_policy->pending_update_args;
+          grpc_lb_policy_args *args = glb_policy->pending_update_args;
           glb_policy->pending_update_args = NULL;
           glb_update_locked(exec_ctx, &glb_policy->base, args);
+          grpc_channel_args_destroy(exec_ctx, args->args);
+          gpr_free(args);
         }
       } else if (glb_policy->started_picking && !glb_policy->shutting_down) {
         if (glb_policy->retry_timer_active) {
diff --git a/src/core/ext/transport/inproc/inproc_transport.c b/src/core/ext/transport/inproc/inproc_transport.c
index 4df64d8..1449802 100644
--- a/src/core/ext/transport/inproc/inproc_transport.c
+++ b/src/core/ext/transport/inproc/inproc_transport.c
@@ -190,8 +190,11 @@
 static bool inproc_slice_byte_stream_next(grpc_exec_ctx *exec_ctx,
                                           grpc_byte_stream *bs, size_t max,
                                           grpc_closure *on_complete) {
-  inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs;
-  return (stream->le->sb.count != 0);
+  // Because inproc transport always provides the entire message atomically,
+  // the byte stream always has data available when this function is called.
+  // Thus, this function always returns true (unlike other transports) and
+  // there is never any need to schedule a closure
+  return true;
 }
 
 static grpc_error *inproc_slice_byte_stream_pull(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/iomgr/sockaddr_utils.c b/src/core/lib/iomgr/sockaddr_utils.c
index 99dc2f1..3f4145d 100644
--- a/src/core/lib/iomgr/sockaddr_utils.c
+++ b/src/core/lib/iomgr/sockaddr_utils.c
@@ -220,6 +220,11 @@
   return NULL;
 }
 
+int grpc_sockaddr_get_family(const grpc_resolved_address *resolved_addr) {
+  const struct sockaddr *addr = (const struct sockaddr *)resolved_addr->addr;
+  return addr->sa_family;
+}
+
 int grpc_sockaddr_get_port(const grpc_resolved_address *resolved_addr) {
   const struct sockaddr *addr = (const struct sockaddr *)resolved_addr->addr;
   switch (addr->sa_family) {
diff --git a/src/core/lib/iomgr/sockaddr_utils.h b/src/core/lib/iomgr/sockaddr_utils.h
index 7692b96..a589a19 100644
--- a/src/core/lib/iomgr/sockaddr_utils.h
+++ b/src/core/lib/iomgr/sockaddr_utils.h
@@ -75,4 +75,6 @@
 /* Returns the URI scheme corresponding to \a addr */
 const char *grpc_sockaddr_get_uri_scheme(const grpc_resolved_address *addr);
 
+int grpc_sockaddr_get_family(const grpc_resolved_address *resolved_addr);
+
 #endif /* GRPC_CORE_LIB_IOMGR_SOCKADDR_UTILS_H */
diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c
index 2ab836c..079c913 100644
--- a/src/core/lib/iomgr/tcp_server_uv.c
+++ b/src/core/lib/iomgr/tcp_server_uv.c
@@ -316,6 +316,7 @@
   unsigned port_index = 0;
   int status;
   grpc_error *error = GRPC_ERROR_NONE;
+  int family;
 
   if (s->tail != NULL) {
     port_index = s->tail->port_index + 1;
@@ -353,7 +354,18 @@
   }
 
   handle = gpr_malloc(sizeof(uv_tcp_t));
-  status = uv_tcp_init(uv_default_loop(), handle);
+
+  family = grpc_sockaddr_get_family(addr);
+  status = uv_tcp_init_ex(uv_default_loop(), handle, (unsigned int)family);
+#if defined(GPR_LINUX) && defined(SO_REUSEPORT)
+  if (family == AF_INET || family == AF_INET6) {
+    int fd;
+    uv_fileno((uv_handle_t *)handle, &fd);
+    int enable = 1;
+    setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable));
+  }
+#endif /* GPR_LINUX && SO_REUSEPORT */
+
   if (status == 0) {
     error = add_socket_to_server(s, handle, addr, port_index, &sp);
   } else {
diff --git a/src/core/lib/support/env.h b/src/core/lib/support/env.h
index 18bc08a..e2c012a 100644
--- a/src/core/lib/support/env.h
+++ b/src/core/lib/support/env.h
@@ -36,6 +36,12 @@
 /* Sets the the environment with the specified name to the specified value. */
 void gpr_setenv(const char *name, const char *value);
 
+/* This is a version of gpr_getenv that does not produce any output if it has to
+   use an insecure version of the function. It is ONLY to be used to solve the
+   problem in which we need to check an env variable to configure the verbosity
+   level of logging. So DO NOT USE THIS. */
+const char *gpr_getenv_silent(const char *name, char **dst);
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/src/core/lib/support/env_linux.c b/src/core/lib/support/env_linux.c
index 0c79a2c..4c45a97 100644
--- a/src/core/lib/support/env_linux.c
+++ b/src/core/lib/support/env_linux.c
@@ -38,7 +38,9 @@
 
 #include "src/core/lib/support/string.h"
 
-char *gpr_getenv(const char *name) {
+const char *gpr_getenv_silent(const char *name, char **dst) {
+  const char *insecure_func_used = NULL;
+  char *result = NULL;
 #if defined(GPR_BACKWARDS_COMPATIBILITY_MODE)
   typedef char *(*getenv_type)(const char *);
   static getenv_type getenv_func = NULL;
@@ -48,22 +50,28 @@
   for (size_t i = 0; getenv_func == NULL && i < GPR_ARRAY_SIZE(names); i++) {
     getenv_func = (getenv_type)dlsym(RTLD_DEFAULT, names[i]);
     if (getenv_func != NULL && strstr(names[i], "secure") == NULL) {
-      gpr_log(GPR_DEBUG,
-              "Warning: insecure environment read function '%s' used",
-              names[i]);
+      insecure_func_used = names[i];
     }
   }
-  char *result = getenv_func(name);
-  return result == NULL ? result : gpr_strdup(result);
+  result = getenv_func(name);
 #elif __GLIBC__ > 2 || (__GLIBC__ == 2 && __GLIBC_MINOR__ >= 17)
-  char *result = secure_getenv(name);
-  return result == NULL ? result : gpr_strdup(result);
+  result = secure_getenv(name);
 #else
-  gpr_log(GPR_DEBUG, "Warning: insecure environment read function '%s' used",
-          "getenv");
-  char *result = getenv(name);
-  return result == NULL ? result : gpr_strdup(result);
+  result = getenv(name);
+  insecure_func_used = "getenv";
 #endif
+  *dst = result == NULL ? result : gpr_strdup(result);
+  return insecure_func_used;
+}
+
+char *gpr_getenv(const char *name) {
+  char *result = NULL;
+  const char *insecure_func_used = gpr_getenv_silent(name, &result);
+  if (insecure_func_used != NULL) {
+    gpr_log(GPR_DEBUG, "Warning: insecure environment read function '%s' used",
+            insecure_func_used);
+  }
+  return result;
 }
 
 void gpr_setenv(const char *name, const char *value) {
diff --git a/src/core/lib/support/env_posix.c b/src/core/lib/support/env_posix.c
index bdbc4da..b88822c 100644
--- a/src/core/lib/support/env_posix.c
+++ b/src/core/lib/support/env_posix.c
@@ -29,6 +29,11 @@
 #include <grpc/support/string_util.h>
 #include "src/core/lib/support/string.h"
 
+const char *gpr_getenv_silent(const char *name, char **dst) {
+  *dst = gpr_getenv(name);
+  return NULL;
+}
+
 char *gpr_getenv(const char *name) {
   char *result = getenv(name);
   return result == NULL ? result : gpr_strdup(result);
diff --git a/src/core/lib/support/env_windows.c b/src/core/lib/support/env_windows.c
index c1d557e..652eeb6 100644
--- a/src/core/lib/support/env_windows.c
+++ b/src/core/lib/support/env_windows.c
@@ -30,6 +30,11 @@
 #include <grpc/support/log.h>
 #include <grpc/support/string_util.h>
 
+const char *gpr_getenv_silent(const char *name, char **dst) {
+  *dst = gpr_getenv(name);
+  return NULL;
+}
+
 char *gpr_getenv(const char *name) {
   char *result = NULL;
   DWORD size;
diff --git a/src/core/lib/support/log.c b/src/core/lib/support/log.c
index bcc336b..fadb4d9 100644
--- a/src/core/lib/support/log.c
+++ b/src/core/lib/support/log.c
@@ -64,7 +64,8 @@
 }
 
 void gpr_log_verbosity_init() {
-  char *verbosity = gpr_getenv("GRPC_VERBOSITY");
+  char *verbosity = NULL;
+  const char *insecure_getenv = gpr_getenv_silent("GRPC_VERBOSITY", &verbosity);
 
   gpr_atm min_severity_to_print = GPR_LOG_SEVERITY_ERROR;
   if (verbosity != NULL) {
@@ -81,6 +82,11 @@
       GPR_LOG_VERBOSITY_UNSET) {
     gpr_atm_no_barrier_store(&g_min_severity_to_print, min_severity_to_print);
   }
+
+  if (insecure_getenv != NULL) {
+    gpr_log(GPR_DEBUG, "Warning: insecure environment read function '%s' used",
+            insecure_getenv);
+  }
 }
 
 void gpr_set_log_function(gpr_log_func f) {
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index c90f96c..200e477 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -250,14 +250,6 @@
       has_sync_methods && num_frequently_polled_cqs > 0;
 
   if (has_sync_methods) {
-    // This is a Sync server
-    gpr_log(GPR_INFO,
-            "Synchronous server. Num CQs: %d, Min pollers: %d, Max Pollers: "
-            "%d, CQ timeout (msec): %d",
-            sync_server_settings_.num_cqs, sync_server_settings_.min_pollers,
-            sync_server_settings_.max_pollers,
-            sync_server_settings_.cq_timeout_msec);
-
     grpc_cq_polling_type polling_type =
         is_hybrid_server ? GRPC_CQ_NON_POLLING : GRPC_CQ_DEFAULT_POLLING;
 
@@ -272,6 +264,16 @@
       sync_server_settings_.min_pollers, sync_server_settings_.max_pollers,
       sync_server_settings_.cq_timeout_msec));
 
+  if (has_sync_methods) {
+    // This is a Sync server
+    gpr_log(GPR_INFO,
+            "Synchronous server. Num CQs: %d, Min pollers: %d, Max Pollers: "
+            "%d, CQ timeout (msec): %d",
+            sync_server_settings_.num_cqs, sync_server_settings_.min_pollers,
+            sync_server_settings_.max_pollers,
+            sync_server_settings_.cq_timeout_msec);
+  }
+
   ServerInitializer* initializer = server->initializer();
 
   // Register all the completion queues with the server. i.e
diff --git a/src/proto/grpc/testing/control.proto b/src/proto/grpc/testing/control.proto
index 4801664..2ff2e4e 100644
--- a/src/proto/grpc/testing/control.proto
+++ b/src/proto/grpc/testing/control.proto
@@ -64,6 +64,7 @@
 message SecurityParams {
   bool use_test_ca = 1;
   string server_host_override = 2;
+  string cred_type = 3;
 }
 
 message ChannelArg {
@@ -240,6 +241,10 @@
   // Number of polls called inside completion queue per request
   double client_polls_per_request = 15;
   double server_polls_per_request = 16;
+
+  // Queries per CPU-sec over all servers or clients
+  double server_queries_per_cpu_sec = 17;
+  double client_queries_per_cpu_sec = 18;
 }
 
 // Results of a single benchmark scenario.
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index 67c984a..87b29c2 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -40,13 +40,13 @@
 module GRPC
   # The ActiveCall class provides simple methods for sending marshallable
   # data to a call
-  class ActiveCall
+  class ActiveCall # rubocop:disable Metrics/ClassLength
     include Core::TimeConsts
     include Core::CallOps
     extend Forwardable
-    attr_reader :deadline, :metadata_sent, :metadata_to_send
+    attr_reader :deadline, :metadata_sent, :metadata_to_send, :peer, :peer_cert
     def_delegators :@call, :cancel, :metadata, :write_flag, :write_flag=,
-                   :peer, :peer_cert, :trailing_metadata
+                   :trailing_metadata, :status
 
     # client_invoke begins a client invocation.
     #
@@ -100,6 +100,18 @@
       fail(ArgumentError, 'Already sent md') if started && metadata_to_send
       @metadata_to_send = metadata_to_send || {} unless started
       @send_initial_md_mutex = Mutex.new
+
+      @output_stream_done = false
+      @input_stream_done = false
+      @call_finished = false
+      @call_finished_mu = Mutex.new
+
+      @client_call_executed = false
+      @client_call_executed_mu = Mutex.new
+
+      # set the peer now so that the accessor can still function
+      # after the server closes the call
+      @peer = call.peer
     end
 
     # Sends the initial metadata that has yet to be sent.
@@ -142,11 +154,9 @@
       Operation.new(self)
     end
 
-    # finished waits until a client call is completed.
-    #
-    # It blocks until the remote endpoint acknowledges by sending a status.
-    def finished
+    def receive_and_check_status
       batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil)
+      set_input_stream_done
       attach_status_results_and_complete_call(batch_result)
     end
 
@@ -155,8 +165,6 @@
         @call.trailing_metadata = recv_status_batch_result.status.metadata
       end
       @call.status = recv_status_batch_result.status
-      @call.close
-      op_is_done
 
       # The RECV_STATUS in run_batch always succeeds
       # Check the status for a bad status or failed run batch
@@ -193,9 +201,19 @@
       }
       ops[RECV_CLOSE_ON_SERVER] = nil if assert_finished
       @call.run_batch(ops)
+      set_output_stream_done
+
       nil
     end
 
+    # Intended for use on server-side calls when a single request from
+    # the client is expected (i.e., unary and server-streaming RPC types).
+    def read_unary_request
+      req = remote_read
+      set_input_stream_done
+      req
+    end
+
     def server_unary_response(req, trailing_metadata: {},
                               code: Core::StatusCodes::OK, details: 'OK')
       ops = {}
@@ -211,6 +229,7 @@
       ops[RECV_CLOSE_ON_SERVER] = nil
 
       @call.run_batch(ops)
+      set_output_stream_done
     end
 
     # remote_read reads a response from the remote endpoint.
@@ -241,6 +260,8 @@
 
     # each_remote_read passes each response to the given block or returns an
     # enumerator the responses if no block is given.
+    # Used to generate the request enumerable for
+    # server-side client-streaming RPC's.
     #
     # == Enumerator ==
     #
@@ -258,10 +279,14 @@
     # @return [Enumerator] if no block was given
     def each_remote_read
       return enum_for(:each_remote_read) unless block_given?
-      loop do
-        resp = remote_read
-        break if resp.nil?  # the last response was received
-        yield resp
+      begin
+        loop do
+          resp = remote_read
+          break if resp.nil?  # the last response was received
+          yield resp
+        end
+      ensure
+        set_input_stream_done
       end
     end
 
@@ -287,13 +312,17 @@
     # @return [Enumerator] if no block was given
     def each_remote_read_then_finish
       return enum_for(:each_remote_read_then_finish) unless block_given?
-      loop do
-        resp = remote_read
-        if resp.nil?  # the last response was received, but not finished yet
-          finished
-          break
+      begin
+        loop do
+          resp = remote_read
+          if resp.nil?  # the last response was received
+            receive_and_check_status
+            break
+          end
+          yield resp
         end
-        yield resp
+      ensure
+        set_input_stream_done
       end
     end
 
@@ -305,6 +334,7 @@
     # a list, multiple metadata for its key are sent
     # @return [Object] the response received from the server
     def request_response(req, metadata: {})
+      raise_error_if_already_executed
       ops = {
         SEND_MESSAGE => @marshal.call(req),
         SEND_CLOSE_FROM_CLIENT => nil,
@@ -319,7 +349,15 @@
         end
         @metadata_sent = true
       end
-      batch_result = @call.run_batch(ops)
+
+      begin
+        batch_result = @call.run_batch(ops)
+        # no need to check for cancellation after a CallError because this
+        # batch contains a RECV_STATUS op
+      ensure
+        set_input_stream_done
+        set_output_stream_done
+      end
 
       @call.metadata = batch_result.metadata
       attach_status_results_and_complete_call(batch_result)
@@ -339,10 +377,20 @@
     # a list, multiple metadata for its key are sent
     # @return [Object] the response received from the server
     def client_streamer(requests, metadata: {})
-      # Metadata might have already been sent if this is an operation view
-      merge_metadata_and_send_if_not_already_sent(metadata)
+      raise_error_if_already_executed
+      begin
+        merge_metadata_and_send_if_not_already_sent(metadata)
+        requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) }
+      rescue GRPC::Core::CallError => e
+        receive_and_check_status # check for Cancelled
+        raise e
+      rescue => e
+        set_input_stream_done
+        raise e
+      ensure
+        set_output_stream_done
+      end
 
-      requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) }
       batch_result = @call.run_batch(
         SEND_CLOSE_FROM_CLIENT => nil,
         RECV_INITIAL_METADATA => nil,
@@ -350,12 +398,11 @@
         RECV_STATUS_ON_CLIENT => nil
       )
 
+      set_input_stream_done
+
       @call.metadata = batch_result.metadata
       attach_status_results_and_complete_call(batch_result)
       get_message_from_batch_result(batch_result)
-    rescue GRPC::Core::CallError => e
-      finished  # checks for Cancelled
-      raise e
     end
 
     # server_streamer sends one request to the GRPC server, which yields a
@@ -373,6 +420,7 @@
     # a list, multiple metadata for its key are sent
     # @return [Enumerator|nil] a response Enumerator
     def server_streamer(req, metadata: {})
+      raise_error_if_already_executed
       ops = {
         SEND_MESSAGE => @marshal.call(req),
         SEND_CLOSE_FROM_CLIENT => nil
@@ -384,13 +432,22 @@
         end
         @metadata_sent = true
       end
-      @call.run_batch(ops)
+
+      begin
+        @call.run_batch(ops)
+      rescue GRPC::Core::CallError => e
+        receive_and_check_status # checks for Cancelled
+        raise e
+      rescue => e
+        set_input_stream_done
+        raise e
+      ensure
+        set_output_stream_done
+      end
+
       replies = enum_for(:each_remote_read_then_finish)
       return replies unless block_given?
       replies.each { |r| yield r }
-    rescue GRPC::Core::CallError => e
-      finished  # checks for Cancelled
-      raise e
     end
 
     # bidi_streamer sends a stream of requests to the GRPC server, and yields
@@ -421,6 +478,7 @@
     # a list, multiple metadata for its key are sent
     # @return [Enumerator, nil] a response Enumerator
     def bidi_streamer(requests, metadata: {}, &blk)
+      raise_error_if_already_executed
       # Metadata might have already been sent if this is an operation view
       merge_metadata_and_send_if_not_already_sent(metadata)
       bd = BidiCall.new(@call,
@@ -428,7 +486,10 @@
                         @unmarshal,
                         metadata_received: @metadata_received)
 
-      bd.run_on_client(requests, @op_notifier, &blk)
+      bd.run_on_client(requests,
+                       proc { set_input_stream_done },
+                       proc { set_output_stream_done },
+                       &blk)
     end
 
     # run_server_bidi orchestrates a BiDi stream processing on a server.
@@ -449,7 +510,7 @@
                         metadata_received: @metadata_received,
                         req_view: MultiReqView.new(self))
 
-      bd.run_on_server(gen_each_reply)
+      bd.run_on_server(gen_each_reply, proc { set_input_stream_done })
     end
 
     # Waits till an operation completes
@@ -459,7 +520,8 @@
       @op_notifier.wait
     end
 
-    # Signals that an operation is done
+    # Signals that an operation is done.
+    # Only relevant on the client-side (this is a no-op on the server-side)
     def op_is_done
       return if @op_notifier.nil?
       @op_notifier.notify(self)
@@ -484,8 +546,40 @@
       end
     end
 
+    def attach_peer_cert(peer_cert)
+      @peer_cert = peer_cert
+    end
+
     private
 
+    # To be called once the "input stream" has been completelly
+    # read through (i.e, done reading from client or received status)
+    # note this is idempotent
+    def set_input_stream_done
+      @call_finished_mu.synchronize do
+        @input_stream_done = true
+        maybe_finish_and_close_call_locked
+      end
+    end
+
+    # To be called once the "output stream" has been completelly
+    # sent through (i.e, done sending from client or sent status)
+    # note this is idempotent
+    def set_output_stream_done
+      @call_finished_mu.synchronize do
+        @output_stream_done = true
+        maybe_finish_and_close_call_locked
+      end
+    end
+
+    def maybe_finish_and_close_call_locked
+      return unless @output_stream_done && @input_stream_done
+      return if @call_finished
+      @call_finished = true
+      op_is_done
+      @call.close
+    end
+
     # Starts the call if not already started
     # @param metadata [Hash] metadata to be sent to the server. If a value is
     # a list, multiple metadata for its key are sent
@@ -493,6 +587,15 @@
       merge_metadata_to_send(metadata) && send_initial_metadata
     end
 
+    def raise_error_if_already_executed
+      @client_call_executed_mu.synchronize do
+        if @client_call_executed
+          fail GRPC::Core::CallError, 'attempting to re-run a call'
+        end
+        @client_call_executed = true
+      end
+    end
+
     def self.view_class(*visible_methods)
       Class.new do
         extend ::Forwardable
@@ -518,6 +621,7 @@
     # server client_streamer handlers.
     MultiReqView = view_class(:cancelled?, :deadline,
                               :each_remote_read, :metadata, :output_metadata,
+                              :peer, :peer_cert,
                               :send_initial_metadata,
                               :metadata_to_send,
                               :merge_metadata_to_send,
diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb
index e54cf78..9e125cd 100644
--- a/src/ruby/lib/grpc/generic/bidi_call.rb
+++ b/src/ruby/lib/grpc/generic/bidi_call.rb
@@ -62,12 +62,19 @@
     # block that can be invoked with each response.
     #
     # @param requests the Enumerable of requests to send
-    # @param op_notifier a Notifier used to signal completion
+    # @param set_input_stream_done [Proc] called back when we're done
+    #   reading the input stream
+    # @param set_input_stream_done [Proc] called back when we're done
+    #   sending data on the output stream
     # @return an Enumerator of requests to yield
-    def run_on_client(requests, op_notifier, &blk)
-      @op_notifier = op_notifier
-      @enq_th = Thread.new { write_loop(requests) }
-      read_loop(&blk)
+    def run_on_client(requests,
+                      set_input_stream_done,
+                      set_output_stream_done,
+                      &blk)
+      @enq_th = Thread.new do
+        write_loop(requests, set_output_stream_done: set_output_stream_done)
+      end
+      read_loop(set_input_stream_done, &blk)
     end
 
     # Begins orchestration of the Bidi stream for a server generating replies.
@@ -81,12 +88,17 @@
     # produced by gen_each_reply could ignore the received_msgs
     #
     # @param gen_each_reply [Proc] generates the BiDi stream replies.
-    def run_on_server(gen_each_reply)
+    # @param set_input_steam_done [Proc] call back to call when
+    #   the reads have been completely read through.
+    def run_on_server(gen_each_reply, set_input_stream_done)
       # Pass in the optional call object parameter if possible
       if gen_each_reply.arity == 1
-        replys = gen_each_reply.call(read_loop(is_client: false))
+        replys = gen_each_reply.call(
+          read_loop(set_input_stream_done, is_client: false))
       elsif gen_each_reply.arity == 2
-        replys = gen_each_reply.call(read_loop(is_client: false), @req_view)
+        replys = gen_each_reply.call(
+          read_loop(set_input_stream_done, is_client: false),
+          @req_view)
       else
         fail 'Illegal arity of reply generator'
       end
@@ -99,22 +111,6 @@
     END_OF_READS = :end_of_reads
     END_OF_WRITES = :end_of_writes
 
-    # signals that bidi operation is complete
-    def notify_done
-      return unless @op_notifier
-      GRPC.logger.debug("bidi-notify-done: notifying  #{@op_notifier}")
-      @op_notifier.notify(self)
-    end
-
-    # signals that a bidi operation is complete (read + write)
-    def finished
-      @done_mutex.synchronize do
-        return unless @reads_complete && @writes_complete && !@complete
-        @call.close
-        @complete = true
-      end
-    end
-
     # performs a read using @call.run_batch, ensures metadata is set up
     def read_using_run_batch
       ops = { RECV_MESSAGE => nil }
@@ -127,7 +123,8 @@
       batch_result
     end
 
-    def write_loop(requests, is_client: true)
+    # set_output_stream_done is relevant on client-side
+    def write_loop(requests, is_client: true, set_output_stream_done: nil)
       GRPC.logger.debug('bidi-write-loop: starting')
       count = 0
       requests.each do |req|
@@ -151,23 +148,20 @@
         GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting")
         @call.run_batch(SEND_CLOSE_FROM_CLIENT => nil)
         GRPC.logger.debug('bidi-write-loop: done')
-        notify_done
-        @writes_complete = true
-        finished
       end
       GRPC.logger.debug('bidi-write-loop: finished')
     rescue StandardError => e
       GRPC.logger.warn('bidi-write-loop: failed')
       GRPC.logger.warn(e)
-      notify_done
-      @writes_complete = true
-      finished
       raise e
+    ensure
+      set_output_stream_done.call if is_client
     end
 
     # Provides an enumerator that yields results of remote reads
-    def read_loop(is_client: true)
+    def read_loop(set_input_stream_done, is_client: true)
       return enum_for(:read_loop,
+                      set_input_stream_done,
                       is_client: is_client) unless block_given?
       GRPC.logger.debug('bidi-read-loop: starting')
       begin
@@ -201,10 +195,10 @@
         GRPC.logger.warn('bidi: read-loop failed')
         GRPC.logger.warn(e)
         raise e
+      ensure
+        set_input_stream_done.call
       end
       GRPC.logger.debug('bidi-read-loop: finished')
-      @reads_complete = true
-      finished
       # Make sure that the write loop is done done before finishing the call.
       # Note that blocking is ok at this point because we've already received
       # a status
diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb
index ce00975..89cf8ff 100644
--- a/src/ruby/lib/grpc/generic/rpc_desc.rb
+++ b/src/ruby/lib/grpc/generic/rpc_desc.rb
@@ -48,7 +48,7 @@
     end
 
     def handle_request_response(active_call, mth)
-      req = active_call.remote_read
+      req = active_call.read_unary_request
       resp = mth.call(req, active_call.single_req_view)
       active_call.server_unary_response(
         resp, trailing_metadata: active_call.output_metadata)
@@ -61,7 +61,7 @@
     end
 
     def handle_server_streamer(active_call, mth)
-      req = active_call.remote_read
+      req = active_call.read_unary_request
       replys = mth.call(req, active_call.single_req_view)
       replys.each { |r| active_call.remote_send(r) }
       send_status(active_call, OK, 'OK', active_call.output_metadata)
diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb
index ef2cc0c..33b3cea 100644
--- a/src/ruby/lib/grpc/generic/rpc_server.rb
+++ b/src/ruby/lib/grpc/generic/rpc_server.rb
@@ -418,6 +418,7 @@
                          metadata_received: true,
                          started: false,
                          metadata_to_send: connect_md)
+      c.attach_peer_cert(an_rpc.call.peer_cert)
       mth = an_rpc.method.to_sym
       [c, mth]
     end
diff --git a/src/ruby/spec/client_auth_spec.rb b/src/ruby/spec/client_auth_spec.rb
new file mode 100644
index 0000000..79c9192
--- /dev/null
+++ b/src/ruby/spec/client_auth_spec.rb
@@ -0,0 +1,137 @@
+# Copyright 2015 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+require 'grpc'
+
+def create_channel_creds
+  test_root = File.join(File.dirname(__FILE__), 'testdata')
+  files = ['ca.pem', 'client.key', 'client.pem']
+  creds = files.map { |f| File.open(File.join(test_root, f)).read }
+  GRPC::Core::ChannelCredentials.new(creds[0], creds[1], creds[2])
+end
+
+def client_cert
+  test_root = File.join(File.dirname(__FILE__), 'testdata')
+  cert = File.open(File.join(test_root, 'client.pem')).read
+  fail unless cert.is_a?(String)
+  cert
+end
+
+def create_server_creds
+  test_root = File.join(File.dirname(__FILE__), 'testdata')
+  p "test root: #{test_root}"
+  files = ['ca.pem', 'server1.key', 'server1.pem']
+  creds = files.map { |f| File.open(File.join(test_root, f)).read }
+  GRPC::Core::ServerCredentials.new(
+    creds[0],
+    [{ private_key: creds[1], cert_chain: creds[2] }],
+    true) # force client auth
+end
+
+# A test message
+class EchoMsg
+  def self.marshal(_o)
+    ''
+  end
+
+  def self.unmarshal(_o)
+    EchoMsg.new
+  end
+end
+
+# a test service that checks the cert of its peer
+class SslTestService
+  include GRPC::GenericService
+  rpc :an_rpc, EchoMsg, EchoMsg
+  rpc :a_client_streaming_rpc, stream(EchoMsg), EchoMsg
+  rpc :a_server_streaming_rpc, EchoMsg, stream(EchoMsg)
+  rpc :a_bidi_rpc, stream(EchoMsg), stream(EchoMsg)
+
+  def check_peer_cert(call)
+    error_msg = "want:\n#{client_cert}\n\ngot:\n#{call.peer_cert}"
+    fail(error_msg) unless call.peer_cert == client_cert
+  end
+
+  def an_rpc(req, call)
+    check_peer_cert(call)
+    req
+  end
+
+  def a_client_streaming_rpc(call)
+    check_peer_cert(call)
+    call.each_remote_read.each { |r| p r }
+    EchoMsg.new
+  end
+
+  def a_server_streaming_rpc(_, call)
+    check_peer_cert(call)
+    [EchoMsg.new, EchoMsg.new]
+  end
+
+  def a_bidi_rpc(requests, call)
+    check_peer_cert(call)
+    requests.each { |r| p r }
+    [EchoMsg.new, EchoMsg.new]
+  end
+end
+
+SslTestServiceStub = SslTestService.rpc_stub_class
+
+describe 'client-server auth' do
+  RpcServer = GRPC::RpcServer
+
+  before(:all) do
+    server_opts = {
+      poll_period: 1
+    }
+    @srv = RpcServer.new(**server_opts)
+    port = @srv.add_http2_port('0.0.0.0:0', create_server_creds)
+    @srv.handle(SslTestService)
+    @srv_thd = Thread.new { @srv.run }
+    @srv.wait_till_running
+
+    client_opts = {
+      channel_args: {
+        GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr'
+      }
+    }
+    @stub = SslTestServiceStub.new("localhost:#{port}",
+                                   create_channel_creds,
+                                   **client_opts)
+  end
+
+  after(:all) do
+    expect(@srv.stopped?).to be(false)
+    @srv.stop
+    @srv_thd.join
+  end
+
+  it 'client-server auth with unary RPCs' do
+    @stub.an_rpc(EchoMsg.new)
+  end
+
+  it 'client-server auth with client streaming RPCs' do
+    @stub.a_client_streaming_rpc([EchoMsg.new, EchoMsg.new])
+  end
+
+  it 'client-server auth with server streaming RPCs' do
+    responses = @stub.a_server_streaming_rpc(EchoMsg.new)
+    responses.each { |r| p r }
+  end
+
+  it 'client-server auth with bidi RPCs' do
+    responses = @stub.a_bidi_rpc([EchoMsg.new, EchoMsg.new])
+    responses.each { |r| p r }
+  end
+end
diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb
index 72e55eb..ec0c294 100644
--- a/src/ruby/spec/generic/active_call_spec.rb
+++ b/src/ruby/spec/generic/active_call_spec.rb
@@ -473,7 +473,7 @@
       server_call.remote_send('server_response')
       expect(client_call.remote_read).to eq('server_response')
       server_call.send_status(OK, 'status code is OK')
-      expect { client_call.finished }.to_not raise_error
+      expect { client_call.receive_and_check_status }.to_not raise_error
     end
 
     it 'finishes ok if the server sends an early status response' do
@@ -490,7 +490,7 @@
       expect do
         call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil)
       end.to_not raise_error
-      expect { client_call.finished }.to_not raise_error
+      expect { client_call.receive_and_check_status }.to_not raise_error
     end
 
     it 'finishes ok if SEND_CLOSE and RECV_STATUS has been sent' do
diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb
index 09b88c7..a8653e7 100644
--- a/src/ruby/spec/generic/client_stub_spec.rb
+++ b/src/ruby/spec/generic/client_stub_spec.rb
@@ -36,6 +36,53 @@
 include GRPC::Core::TimeConsts
 include GRPC::Core::CallOps
 
+# check that methods on a finished/closed call t crash
+def check_op_view_of_finished_client_call(op_view,
+                                          expected_metadata,
+                                          expected_trailing_metadata)
+  # use read_response_stream to try to iterate through
+  # possible response stream
+  fail('need something to attempt reads') unless block_given?
+  expect do
+    resp = op_view.execute
+    yield resp
+  end.to raise_error(GRPC::Core::CallError)
+
+  expect { op_view.start_call }.to raise_error(RuntimeError)
+
+  sanity_check_values_of_accessors(op_view,
+                                   expected_metadata,
+                                   expected_trailing_metadata)
+
+  expect do
+    op_view.wait
+    op_view.cancel
+    op_view.write_flag = 1
+  end.to_not raise_error
+end
+
+def sanity_check_values_of_accessors(op_view,
+                                     expected_metadata,
+                                     expected_trailing_metadata)
+  expected_status = Struct::Status.new
+  expected_status.code = 0
+  expected_status.details = 'OK'
+  expected_status.metadata = expected_trailing_metadata
+
+  expect(op_view.status).to eq(expected_status)
+  expect(op_view.metadata).to eq(expected_metadata)
+  expect(op_view.trailing_metadata).to eq(expected_trailing_metadata)
+
+  expect(op_view.cancelled?).to be(false)
+  expect(op_view.write_flag).to be(nil)
+
+  # The deadline attribute of a call can be either
+  # a GRPC::Core::TimeSpec or a Time, which are mutually exclusive.
+  # TODO: fix so that the accessor always returns the same type.
+  expect(op_view.deadline.is_a?(GRPC::Core::TimeSpec) ||
+         op_view.deadline.is_a?(Time)).to be(true)
+end
+
 describe 'ClientStub' do
   let(:noop) { proc { |x| x } }
 
@@ -45,6 +92,7 @@
     @method = 'an_rpc_method'
     @pass = OK
     @fail = INTERNAL
+    @metadata = { k1: 'v1', k2: 'v2' }
   end
 
   after(:each) do
@@ -107,7 +155,7 @@
     end
   end
 
-  describe '#request_response' do
+  describe '#request_response', request_response: true do
     before(:each) do
       @sent_msg, @resp = 'a_msg', 'a_reply'
     end
@@ -126,7 +174,7 @@
         server_port = create_test_server
         host = "localhost:#{server_port}"
         th = run_request_response(@sent_msg, @resp, @pass,
-                                  k1: 'v1', k2: 'v2')
+                                  expected_metadata: { k1: 'v1', k2: 'v2' })
         stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
         expect(get_response(stub)).to eq(@resp)
         th.join
@@ -187,13 +235,24 @@
         # Kill the server thread so tests can complete
         th.kill
       end
+
+      it 'should raise ArgumentError if metadata contains invalid values' do
+        @metadata.merge!(k3: 3)
+        server_port = create_test_server
+        host = "localhost:#{server_port}"
+        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
+        expect do
+          get_response(stub)
+        end.to raise_error(ArgumentError,
+                           /Header values must be of type string or array/)
+      end
     end
 
     describe 'without a call operation' do
       def get_response(stub, credentials: nil)
         puts credentials.inspect
         stub.request_response(@method, @sent_msg, noop, noop,
-                              metadata: { k1: 'v1', k2: 'v2' },
+                              metadata: @metadata,
                               credentials: credentials)
       end
 
@@ -201,40 +260,62 @@
     end
 
     describe 'via a call operation' do
+      after(:each) do
+        # make sure op.wait doesn't hang, even if there's a bad status
+        @op.wait
+      end
       def get_response(stub, run_start_call_first: false, credentials: nil)
-        op = stub.request_response(@method, @sent_msg, noop, noop,
-                                   return_op: true,
-                                   metadata: { k1: 'v1', k2: 'v2' },
-                                   deadline: from_relative_time(2),
-                                   credentials: credentials)
-        expect(op).to be_a(GRPC::ActiveCall::Operation)
-        op.start_call if run_start_call_first
-        result = op.execute
-        op.wait # make sure wait doesn't hang
+        @op = stub.request_response(@method, @sent_msg, noop, noop,
+                                    return_op: true,
+                                    metadata: @metadata,
+                                    deadline: from_relative_time(2),
+                                    credentials: credentials)
+        expect(@op).to be_a(GRPC::ActiveCall::Operation)
+        @op.start_call if run_start_call_first
+        result = @op.execute
         result
       end
 
       it_behaves_like 'request response'
 
-      it 'sends metadata to the server ok when running start_call first' do
+      def run_op_view_metadata_test(run_start_call_first)
         server_port = create_test_server
         host = "localhost:#{server_port}"
-        th = run_request_response(@sent_msg, @resp, @pass,
-                                  k1: 'v1', k2: 'v2')
+
+        @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }
+        @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }
+        th = run_request_response(
+          @sent_msg, @resp, @pass,
+          expected_metadata: @metadata,
+          server_initial_md: @server_initial_md,
+          server_trailing_md: @server_trailing_md)
         stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
-        expect(get_response(stub)).to eq(@resp)
+        expect(
+          get_response(stub,
+                       run_start_call_first: run_start_call_first)).to eq(@resp)
         th.join
       end
+
+      it 'sends metadata to the server ok when running start_call first' do
+        run_op_view_metadata_test(true)
+        check_op_view_of_finished_client_call(
+          @op, @server_initial_md, @server_trailing_md) { |r| p r }
+      end
+
+      it 'does not crash when used after the call has been finished' do
+        run_op_view_metadata_test(false)
+        check_op_view_of_finished_client_call(
+          @op, @server_initial_md, @server_trailing_md) { |r| p r }
+      end
     end
   end
 
-  describe '#client_streamer' do
+  describe '#client_streamer', client_streamer: true do
     before(:each) do
       Thread.abort_on_exception = true
       server_port = create_test_server
       host = "localhost:#{server_port}"
       @stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
-      @metadata = { k1: 'v1', k2: 'v2' }
       @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
       @resp = 'a_reply'
     end
@@ -247,7 +328,8 @@
       end
 
       it 'should send metadata to the server ok' do
-        th = run_client_streamer(@sent_msgs, @resp, @pass, **@metadata)
+        th = run_client_streamer(@sent_msgs, @resp, @pass,
+                                 expected_metadata: @metadata)
         expect(get_response(@stub)).to eq(@resp)
         th.join
       end
@@ -278,27 +360,50 @@
     end
 
     describe 'via a call operation' do
+      after(:each) do
+        # make sure op.wait doesn't hang, even if there's a bad status
+        @op.wait
+      end
       def get_response(stub, run_start_call_first: false)
-        op = stub.client_streamer(@method, @sent_msgs, noop, noop,
-                                  return_op: true, metadata: @metadata)
-        expect(op).to be_a(GRPC::ActiveCall::Operation)
-        op.start_call if run_start_call_first
-        result = op.execute
-        op.wait # make sure wait doesn't hang
+        @op = stub.client_streamer(@method, @sent_msgs, noop, noop,
+                                   return_op: true, metadata: @metadata)
+        expect(@op).to be_a(GRPC::ActiveCall::Operation)
+        @op.start_call if run_start_call_first
+        result = @op.execute
         result
       end
 
       it_behaves_like 'client streaming'
 
-      it 'sends metadata to the server ok when running start_call first' do
-        th = run_client_streamer(@sent_msgs, @resp, @pass, **@metadata)
-        expect(get_response(@stub, run_start_call_first: true)).to eq(@resp)
+      def run_op_view_metadata_test(run_start_call_first)
+        @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }
+        @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }
+        th = run_client_streamer(
+          @sent_msgs, @resp, @pass,
+          expected_metadata: @metadata,
+          server_initial_md: @server_initial_md,
+          server_trailing_md: @server_trailing_md)
+        expect(
+          get_response(@stub,
+                       run_start_call_first: run_start_call_first)).to eq(@resp)
         th.join
       end
+
+      it 'sends metadata to the server ok when running start_call first' do
+        run_op_view_metadata_test(true)
+        check_op_view_of_finished_client_call(
+          @op, @server_initial_md, @server_trailing_md) { |r| p r }
+      end
+
+      it 'does not crash when used after the call has been finished' do
+        run_op_view_metadata_test(false)
+        check_op_view_of_finished_client_call(
+          @op, @server_initial_md, @server_trailing_md) { |r| p r }
+      end
     end
   end
 
-  describe '#server_streamer' do
+  describe '#server_streamer', server_streamer: true do
     before(:each) do
       @sent_msg = 'a_msg'
       @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
@@ -328,18 +433,42 @@
         server_port = create_test_server
         host = "localhost:#{server_port}"
         th = run_server_streamer(@sent_msg, @replys, @fail,
-                                 k1: 'v1', k2: 'v2')
+                                 expected_metadata: { k1: 'v1', k2: 'v2' })
         stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
         e = get_responses(stub)
         expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
         th.join
       end
+
+      it 'should raise ArgumentError if metadata contains invalid values' do
+        @metadata.merge!(k3: 3)
+        server_port = create_test_server
+        host = "localhost:#{server_port}"
+        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
+        expect do
+          get_responses(stub)
+        end.to raise_error(ArgumentError,
+                           /Header values must be of type string or array/)
+      end
+
+      it 'the call terminates when there is an unmarshalling error' do
+        server_port = create_test_server
+        host = "localhost:#{server_port}"
+        th = run_server_streamer(@sent_msg, @replys, @pass)
+        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
+
+        unmarshal = proc { fail(ArgumentError, 'test unmarshalling error') }
+        expect do
+          get_responses(stub, unmarshal: unmarshal).collect { |r| r }
+        end.to raise_error(ArgumentError, 'test unmarshalling error')
+        th.join
+      end
     end
 
     describe 'without a call operation' do
-      def get_responses(stub)
-        e = stub.server_streamer(@method, @sent_msg, noop, noop,
-                                 metadata: { k1: 'v1', k2: 'v2' })
+      def get_responses(stub, unmarshal: noop)
+        e = stub.server_streamer(@method, @sent_msg, noop, unmarshal,
+                                 metadata: @metadata)
         expect(e).to be_a(Enumerator)
         e
       end
@@ -351,10 +480,10 @@
       after(:each) do
         @op.wait # make sure wait doesn't hang
       end
-      def get_responses(stub, run_start_call_first: false)
-        @op = stub.server_streamer(@method, @sent_msg, noop, noop,
+      def get_responses(stub, run_start_call_first: false, unmarshal: noop)
+        @op = stub.server_streamer(@method, @sent_msg, noop, unmarshal,
                                    return_op: true,
-                                   metadata: { k1: 'v1', k2: 'v2' })
+                                   metadata: @metadata)
         expect(@op).to be_a(GRPC::ActiveCall::Operation)
         @op.start_call if run_start_call_first
         e = @op.execute
@@ -364,20 +493,41 @@
 
       it_behaves_like 'server streaming'
 
-      it 'should send metadata to the server ok when start_call is run first' do
+      def run_op_view_metadata_test(run_start_call_first)
         server_port = create_test_server
         host = "localhost:#{server_port}"
-        th = run_server_streamer(@sent_msg, @replys, @fail,
-                                 k1: 'v1', k2: 'v2')
+        @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }
+        @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }
+        th = run_server_streamer(
+          @sent_msg, @replys, @pass,
+          expected_metadata: @metadata,
+          server_initial_md: @server_initial_md,
+          server_trailing_md: @server_trailing_md)
         stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
-        e = get_responses(stub, run_start_call_first: true)
-        expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
+        e = get_responses(stub, run_start_call_first: run_start_call_first)
+        expect(e.collect { |r| r }).to eq(@replys)
         th.join
       end
+
+      it 'should send metadata to the server ok when start_call is run first' do
+        run_op_view_metadata_test(true)
+        check_op_view_of_finished_client_call(
+          @op, @server_initial_md, @server_trailing_md) do |responses|
+          responses.each { |r| p r }
+        end
+      end
+
+      it 'does not crash when used after the call has been finished' do
+        run_op_view_metadata_test(false)
+        check_op_view_of_finished_client_call(
+          @op, @server_initial_md, @server_trailing_md) do |responses|
+          responses.each { |r| p r }
+        end
+      end
     end
   end
 
-  describe '#bidi_streamer' do
+  describe '#bidi_streamer', bidi: true do
     before(:each) do
       @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
       @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
@@ -386,7 +536,7 @@
     end
 
     shared_examples 'bidi streaming' do
-      it 'supports sending all the requests first', bidi: true do
+      it 'supports sending all the requests first' do
         th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys,
                                                    @pass)
         stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
@@ -395,7 +545,7 @@
         th.join
       end
 
-      it 'supports client-initiated ping pong', bidi: true do
+      it 'supports client-initiated ping pong' do
         th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true)
         stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
         e = get_responses(stub)
@@ -403,18 +553,39 @@
         th.join
       end
 
-      it 'supports a server-initiated ping pong', bidi: true do
+      it 'supports a server-initiated ping pong' do
         th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, false)
         stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
         e = get_responses(stub)
         expect(e.collect { |r| r }).to eq(@sent_msgs)
         th.join
       end
+
+      it 'should raise an error if the status is not ok' do
+        th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @fail, false)
+        stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
+        e = get_responses(stub)
+        expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
+        th.join
+      end
+
+      # TODO: add test for metadata-related ArgumentError in a bidi call once
+      # issue mentioned in https://github.com/grpc/grpc/issues/10526 is fixed
+
+      it 'should send metadata to the server ok' do
+        th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true,
+                                              expected_metadata: @metadata)
+        stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
+        e = get_responses(stub)
+        expect(e.collect { |r| r }).to eq(@sent_msgs)
+        th.join
+      end
     end
 
     describe 'without a call operation' do
       def get_responses(stub)
-        e = stub.bidi_streamer(@method, @sent_msgs, noop, noop)
+        e = stub.bidi_streamer(@method, @sent_msgs, noop, noop,
+                               metadata: @metadata)
         expect(e).to be_a(Enumerator)
         e
       end
@@ -428,7 +599,8 @@
       end
       def get_responses(stub, run_start_call_first: false)
         @op = stub.bidi_streamer(@method, @sent_msgs, noop, noop,
-                                 return_op: true)
+                                 return_op: true,
+                                 metadata: @metadata)
         expect(@op).to be_a(GRPC::ActiveCall::Operation)
         @op.start_call if run_start_call_first
         e = @op.execute
@@ -438,27 +610,53 @@
 
       it_behaves_like 'bidi streaming'
 
-      it 'can run start_call before executing the call' do
-        th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys,
-                                                   @pass)
+      def run_op_view_metadata_test(run_start_call_first)
+        @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }
+        @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }
+        th = run_bidi_streamer_echo_ping_pong(
+          @sent_msgs, @pass, true,
+          expected_metadata: @metadata,
+          server_initial_md: @server_initial_md,
+          server_trailing_md: @server_trailing_md)
         stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
-        e = get_responses(stub, run_start_call_first: true)
-        expect(e.collect { |r| r }).to eq(@replys)
+        e = get_responses(stub, run_start_call_first: run_start_call_first)
+        expect(e.collect { |r| r }).to eq(@sent_msgs)
         th.join
       end
+
+      it 'can run start_call before executing the call' do
+        run_op_view_metadata_test(true)
+        check_op_view_of_finished_client_call(
+          @op, @server_initial_md, @server_trailing_md) do |responses|
+          responses.each { |r| p r }
+        end
+      end
+
+      it 'doesnt crash when op_view used after call has finished' do
+        run_op_view_metadata_test(false)
+        check_op_view_of_finished_client_call(
+          @op, @server_initial_md, @server_trailing_md) do |responses|
+          responses.each { |r| p r }
+        end
+      end
     end
   end
 
-  def run_server_streamer(expected_input, replys, status, **kw)
-    wanted_metadata = kw.clone
+  def run_server_streamer(expected_input, replys, status,
+                          expected_metadata: {},
+                          server_initial_md: {},
+                          server_trailing_md: {})
+    wanted_metadata = expected_metadata.clone
     wakey_thread do |notifier|
-      c = expect_server_to_be_invoked(notifier)
+      c = expect_server_to_be_invoked(
+        notifier, metadata_to_send: server_initial_md)
       wanted_metadata.each do |k, v|
         expect(c.metadata[k.to_s]).to eq(v)
       end
       expect(c.remote_read).to eq(expected_input)
       replys.each { |r| c.remote_send(r) }
-      c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
+      c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
+                    metadata: server_trailing_md)
     end
   end
 
@@ -472,9 +670,17 @@
     end
   end
 
-  def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts)
+  def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts,
+                                       expected_metadata: {},
+                                       server_initial_md: {},
+                                       server_trailing_md: {})
+    wanted_metadata = expected_metadata.clone
     wakey_thread do |notifier|
-      c = expect_server_to_be_invoked(notifier)
+      c = expect_server_to_be_invoked(
+        notifier, metadata_to_send: server_initial_md)
+      wanted_metadata.each do |k, v|
+        expect(c.metadata[k.to_s]).to eq(v)
+      end
       expected_inputs.each do |i|
         if client_starts
           expect(c.remote_read).to eq(i)
@@ -484,33 +690,44 @@
           expect(c.remote_read).to eq(i)
         end
       end
-      c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
+      c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
+                    metadata: server_trailing_md)
     end
   end
 
-  def run_client_streamer(expected_inputs, resp, status, **kw)
-    wanted_metadata = kw.clone
+  def run_client_streamer(expected_inputs, resp, status,
+                          expected_metadata: {},
+                          server_initial_md: {},
+                          server_trailing_md: {})
+    wanted_metadata = expected_metadata.clone
     wakey_thread do |notifier|
-      c = expect_server_to_be_invoked(notifier)
+      c = expect_server_to_be_invoked(
+        notifier, metadata_to_send: server_initial_md)
       expected_inputs.each { |i| expect(c.remote_read).to eq(i) }
       wanted_metadata.each do |k, v|
         expect(c.metadata[k.to_s]).to eq(v)
       end
       c.remote_send(resp)
-      c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
+      c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
+                    metadata: server_trailing_md)
     end
   end
 
-  def run_request_response(expected_input, resp, status, **kw)
-    wanted_metadata = kw.clone
+  def run_request_response(expected_input, resp, status,
+                           expected_metadata: {},
+                           server_initial_md: {},
+                           server_trailing_md: {})
+    wanted_metadata = expected_metadata.clone
     wakey_thread do |notifier|
-      c = expect_server_to_be_invoked(notifier)
+      c = expect_server_to_be_invoked(
+        notifier, metadata_to_send: server_initial_md)
       expect(c.remote_read).to eq(expected_input)
       wanted_metadata.each do |k, v|
         expect(c.metadata[k.to_s]).to eq(v)
       end
       c.remote_send(resp)
-      c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
+      c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
+                    metadata: server_trailing_md)
     end
   end
 
@@ -528,13 +745,13 @@
     @server.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
   end
 
-  def expect_server_to_be_invoked(notifier)
+  def expect_server_to_be_invoked(notifier, metadata_to_send: nil)
     @server.start
     notifier.notify(nil)
     recvd_rpc = @server.request_call
     recvd_call = recvd_rpc.call
     recvd_call.metadata = recvd_rpc.metadata
-    recvd_call.run_batch(SEND_INITIAL_METADATA => nil)
+    recvd_call.run_batch(SEND_INITIAL_METADATA => metadata_to_send)
     GRPC::ActiveCall.new(recvd_call, noop, noop, INFINITE_FUTURE,
                          metadata_received: true)
   end
diff --git a/src/ruby/spec/generic/rpc_desc_spec.rb b/src/ruby/spec/generic/rpc_desc_spec.rb
index 100e9e8..be578c4 100644
--- a/src/ruby/spec/generic/rpc_desc_spec.rb
+++ b/src/ruby/spec/generic/rpc_desc_spec.rb
@@ -38,14 +38,14 @@
 
   shared_examples 'it handles errors' do
     it 'sends the specified status if BadStatus is raised' do
-      expect(@call).to receive(:remote_read).once.and_return(Object.new)
+      expect(@call).to receive(:read_unary_request).once.and_return(Object.new)
       expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false,
                                                        metadata: {})
       this_desc.run_server_method(@call, method(:bad_status))
     end
 
     it 'sends status UNKNOWN if other StandardErrors are raised' do
-      expect(@call).to receive(:remote_read).once.and_return(Object.new)
+      expect(@call).to receive(:read_unary_request).once.and_return(Object.new)
       expect(@call).to receive(:send_status).once.with(UNKNOWN,
                                                        arg_error_msg,
                                                        false, metadata: {})
@@ -53,7 +53,7 @@
     end
 
     it 'absorbs CallError with no further action' do
-      expect(@call).to receive(:remote_read).once.and_raise(CallError)
+      expect(@call).to receive(:read_unary_request).once.and_raise(CallError)
       blk = proc do
         this_desc.run_server_method(@call, method(:fake_reqresp))
       end
@@ -75,7 +75,7 @@
 
       it 'sends a response and closes the stream if there no errors' do
         req = Object.new
-        expect(@call).to receive(:remote_read).once.and_return(req)
+        expect(@call).to receive(:read_unary_request).once.and_return(req)
         expect(@call).to receive(:output_metadata).once.and_return(fake_md)
         expect(@call).to receive(:server_unary_response).once
           .with(@ok_response, trailing_metadata: fake_md)
@@ -133,7 +133,7 @@
 
       it 'sends a response and closes the stream if there no errors' do
         req = Object.new
-        expect(@call).to receive(:remote_read).once.and_return(req)
+        expect(@call).to receive(:read_unary_request).once.and_return(req)
         expect(@call).to receive(:remote_send).twice.with(@ok_response)
         expect(@call).to receive(:output_metadata).and_return(fake_md)
         expect(@call).to receive(:send_status).once.with(OK, 'OK', true,
diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb
index 9633a82..e0646f4 100644
--- a/src/ruby/spec/generic/rpc_server_spec.rb
+++ b/src/ruby/spec/generic/rpc_server_spec.rb
@@ -111,6 +111,47 @@
 
 SlowStub = SlowService.rpc_stub_class
 
+# a test service that hangs onto call objects
+# and uses them after the server-side call has been
+# finished
+class CheckCallAfterFinishedService
+  include GRPC::GenericService
+  rpc :an_rpc, EchoMsg, EchoMsg
+  rpc :a_client_streaming_rpc, stream(EchoMsg), EchoMsg
+  rpc :a_server_streaming_rpc, EchoMsg, stream(EchoMsg)
+  rpc :a_bidi_rpc, stream(EchoMsg), stream(EchoMsg)
+  attr_reader :server_side_call
+
+  def an_rpc(req, call)
+    fail 'shouldnt reuse service' unless @server_side_call.nil?
+    @server_side_call = call
+    req
+  end
+
+  def a_client_streaming_rpc(call)
+    fail 'shouldnt reuse service' unless @server_side_call.nil?
+    @server_side_call = call
+    # iterate through requests so call can complete
+    call.each_remote_read.each { |r| p r }
+    EchoMsg.new
+  end
+
+  def a_server_streaming_rpc(_, call)
+    fail 'shouldnt reuse service' unless @server_side_call.nil?
+    @server_side_call = call
+    [EchoMsg.new, EchoMsg.new]
+  end
+
+  def a_bidi_rpc(requests, call)
+    fail 'shouldnt reuse service' unless @server_side_call.nil?
+    @server_side_call = call
+    requests.each { |r| p r }
+    [EchoMsg.new, EchoMsg.new]
+  end
+end
+
+CheckCallAfterFinishedServiceStub = CheckCallAfterFinishedService.rpc_stub_class
+
 describe GRPC::RpcServer do
   RpcServer = GRPC::RpcServer
   StatusCodes = GRPC::Core::StatusCodes
@@ -505,5 +546,109 @@
         t.join
       end
     end
+
+    context 'when call objects are used after calls have completed' do
+      before(:each) do
+        server_opts = {
+          poll_period: 1
+        }
+        @srv = RpcServer.new(**server_opts)
+        alt_port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
+        @alt_host = "0.0.0.0:#{alt_port}"
+
+        @service = CheckCallAfterFinishedService.new
+        @srv.handle(@service)
+        @srv_thd  = Thread.new { @srv.run }
+        @srv.wait_till_running
+      end
+
+      # check that the server-side call is still in a usable state even
+      # after it has finished
+      def check_single_req_view_of_finished_call(call)
+        common_check_of_finished_server_call(call)
+
+        expect(call.peer).to be_a(String)
+        expect(call.peer_cert).to be(nil)
+      end
+
+      def check_multi_req_view_of_finished_call(call)
+        common_check_of_finished_server_call(call)
+
+        expect do
+          call.each_remote_read.each { |r| p r }
+        end.to raise_error(GRPC::Core::CallError)
+      end
+
+      def common_check_of_finished_server_call(call)
+        expect do
+          call.merge_metadata_to_send({})
+        end.to raise_error(RuntimeError)
+
+        expect do
+          call.send_initial_metadata
+        end.to_not raise_error
+
+        expect(call.cancelled?).to be(false)
+        expect(call.metadata).to be_a(Hash)
+        expect(call.metadata['user-agent']).to be_a(String)
+
+        expect(call.metadata_sent).to be(true)
+        expect(call.output_metadata).to eq({})
+        expect(call.metadata_to_send).to eq({})
+        expect(call.deadline.is_a?(Time)).to be(true)
+      end
+
+      it 'should not crash when call used after an unary call is finished' do
+        req = EchoMsg.new
+        stub = CheckCallAfterFinishedServiceStub.new(@alt_host,
+                                                     :this_channel_is_insecure)
+        resp = stub.an_rpc(req)
+        expect(resp).to be_a(EchoMsg)
+        @srv.stop
+        @srv_thd.join
+
+        check_single_req_view_of_finished_call(@service.server_side_call)
+      end
+
+      it 'should not crash when call used after client streaming finished' do
+        requests = [EchoMsg.new, EchoMsg.new]
+        stub = CheckCallAfterFinishedServiceStub.new(@alt_host,
+                                                     :this_channel_is_insecure)
+        resp = stub.a_client_streaming_rpc(requests)
+        expect(resp).to be_a(EchoMsg)
+        @srv.stop
+        @srv_thd.join
+
+        check_multi_req_view_of_finished_call(@service.server_side_call)
+      end
+
+      it 'should not crash when call used after server streaming finished' do
+        req = EchoMsg.new
+        stub = CheckCallAfterFinishedServiceStub.new(@alt_host,
+                                                     :this_channel_is_insecure)
+        responses = stub.a_server_streaming_rpc(req)
+        responses.each do |r|
+          expect(r).to be_a(EchoMsg)
+        end
+        @srv.stop
+        @srv_thd.join
+
+        check_single_req_view_of_finished_call(@service.server_side_call)
+      end
+
+      it 'should not crash when call used after a bidi call is finished' do
+        requests = [EchoMsg.new, EchoMsg.new]
+        stub = CheckCallAfterFinishedServiceStub.new(@alt_host,
+                                                     :this_channel_is_insecure)
+        responses = stub.a_bidi_rpc(requests)
+        responses.each do |r|
+          expect(r).to be_a(EchoMsg)
+        end
+        @srv.stop
+        @srv_thd.join
+
+        check_multi_req_view_of_finished_call(@service.server_side_call)
+      end
+    end
   end
 end
diff --git a/src/ruby/spec/testdata/client.key b/src/ruby/spec/testdata/client.key
new file mode 100644
index 0000000..f48d073
--- /dev/null
+++ b/src/ruby/spec/testdata/client.key
@@ -0,0 +1,16 @@
+-----BEGIN PRIVATE KEY-----
+MIICeQIBADANBgkqhkiG9w0BAQEFAASCAmMwggJfAgEAAoGBAOxUR9uhvhbeVUIM
+s5WbH0px0mehl2+6sZpNjzvE2KimZpHzMJHukVH0Ffkvhs0b8+S5Ut9VNUAqd3IM
+JCCAEGtRNoQhM1t9Yr2zAckSvbRacp+FL/Cj9eDmyo00KsVGaeefA4Dh4OW+ZhkT
+NKcldXqkSuj1sEf244JZYuqZp6/tAgMBAAECgYEAi2NSVqpZMafE5YYUTcMGe6QS
+k2jtpsqYgggI2RnLJ/2tNZwYI5pwP8QVSbnMaiF4gokD5hGdrNDfTnb2v+yIwYEH
+0w8+oG7Z81KodsiZSIDJfTGsAZhVNwOz9y0VD8BBZZ1/274Zh52AUKLjZS/ZwIbS
+W2ywya855dPnH/wj+0ECQQD9X8D920kByTNHhBG18biAEZ4pxs9f0OAG8333eVcI
+w2lJDLsYDZrCB2ocgA3lUdozlzPC7YDYw8reg0tkiRY5AkEA7sdNzOeQsQRn7++5
+0bP9DtT/iON1gbfxRzCfCfXdoOtfQWIzTePWtURt9X/5D9NofI0Rg5W2oGy/MLe5
+/sXHVQJBAIup5XrJDkQywNZyAUU2ecn2bCWBFjwtqd+LBmuMciI9fOKsZtEKZrz/
+U0lkeMRoSwvXE8wmGLjjrAbdfohrXFkCQQDZEx/LtIl6JINJQiswVe0tWr6k+ASP
+1WXoTm+HYpoF/XUvv9LccNF1IazFj34hwRQwhx7w/V52Ieb+p0jUMYGxAkEAjDhd
+9pBO1fKXWiXzi9ZKfoyTNcUq3eBSVKwPG2nItg5ycXengjT5sgcWDnciIzW7BIVI
+JiqOszq9GWESErAatg==
+-----END PRIVATE KEY-----
diff --git a/src/ruby/spec/testdata/client.pem b/src/ruby/spec/testdata/client.pem
new file mode 100644
index 0000000..e332091
--- /dev/null
+++ b/src/ruby/spec/testdata/client.pem
@@ -0,0 +1,14 @@
+-----BEGIN CERTIFICATE-----
+MIICHzCCAYgCAQEwDQYJKoZIhvcNAQEFBQAwVjELMAkGA1UEBhMCQVUxEzARBgNV
+BAgMClNvbWUtU3RhdGUxITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0
+ZDEPMA0GA1UEAwwGdGVzdGNhMB4XDTE0MDcxNzIzNTYwMloXDTI0MDcxNDIzNTYw
+MlowWjELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
+GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDETMBEGA1UEAwwKdGVzdGNsaWVudDCB
+nzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEA7FRH26G+Ft5VQgyzlZsfSnHSZ6GX
+b7qxmk2PO8TYqKZmkfMwke6RUfQV+S+GzRvz5LlS31U1QCp3cgwkIIAQa1E2hCEz
+W31ivbMByRK9tFpyn4Uv8KP14ObKjTQqxUZp558DgOHg5b5mGRM0pyV1eqRK6PWw
+R/bjglli6pmnr+0CAwEAATANBgkqhkiG9w0BAQUFAAOBgQAStSm5PM7ubROiKK6/
+T2FkKlhiTOx+Ryenm3Eio59emq+jXl+1nhPySX5G2PQzSR5vd1dIhwgZSR4Gyttk
+tRZ57k/NI1brUW8joiEOMJA/Mr7H7asx7wIRYDE91Fs8GkKWd5LhoPAQj+qdG35C
+OO+svdkmqH0KZo320ZUqdl2ooQ==
+-----END CERTIFICATE-----
diff --git a/test/cpp/qps/BUILD b/test/cpp/qps/BUILD
index 93fb534..b3348b7 100644
--- a/test/cpp/qps/BUILD
+++ b/test/cpp/qps/BUILD
@@ -81,6 +81,7 @@
         "//src/proto/grpc/testing:services_proto",
         "//test/core/util:gpr_test_util",
         "//test/core/util:grpc_test_util",
+        "//test/cpp/util:test_util",
     ],
 )
 
@@ -148,6 +149,7 @@
         ":driver_impl",
         "//:grpc++",
         "//test/cpp/util:test_config",
+        "//test/cpp/util:test_util",
     ],
     external_deps = [
         "gflags",
@@ -162,6 +164,7 @@
         ":driver_impl",
         ":qps_worker_impl",
         "//test/cpp/util:test_config",
+        "//test/cpp/util:test_util",
     ],
 )
 
@@ -173,6 +176,7 @@
         ":driver_impl",
         "//:grpc++",
         "//test/cpp/util:test_config",
+        "//test/cpp/util:test_util",
     ],
 )
 
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index ecbf9c3..6c4d92e 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -39,6 +39,7 @@
 #include "test/cpp/qps/interarrival.h"
 #include "test/cpp/qps/usage_timer.h"
 #include "test/cpp/util/create_test_channel.h"
+#include "test/cpp/util/test_credentials_provider.h"
 
 namespace grpc {
 namespace testing {
@@ -405,9 +406,18 @@
       ChannelArguments args;
       args.SetInt("shard_to_ensure_no_subchannel_merges", shard);
       set_channel_args(config, &args);
+
+      grpc::string type;
+      if (config.has_security_params() &&
+          config.security_params().cred_type().empty()) {
+        type = kTlsCredentialsType;
+      } else {
+        type = config.security_params().cred_type();
+      }
+
       channel_ = CreateTestChannel(
-          target, config.security_params().server_host_override(),
-          config.has_security_params(), !config.security_params().use_test_ca(),
+          target, type, config.security_params().server_host_override(),
+          !config.security_params().use_test_ca(),
           std::shared_ptr<CallCredentials>(), args);
       gpr_log(GPR_INFO, "Connecting to %s", target.c_str());
       GPR_ASSERT(channel_->WaitForConnected(
diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc
index fbd8d1b..4458e38 100644
--- a/test/cpp/qps/driver.cc
+++ b/test/cpp/qps/driver.cc
@@ -40,6 +40,7 @@
 #include "test/cpp/qps/histogram.h"
 #include "test/cpp/qps/qps_worker.h"
 #include "test/cpp/qps/stats.h"
+#include "test/cpp/util/test_credentials_provider.h"
 
 using std::list;
 using std::thread;
@@ -172,13 +173,26 @@
       sum(result->client_stats(), CliPollCount) / histogram.Count());
   result->mutable_summary()->set_server_polls_per_request(
       sum(result->server_stats(), SvrPollCount) / histogram.Count());
+
+  auto server_queries_per_cpu_sec =
+      histogram.Count() / (sum(result->server_stats(), ServerSystemTime) +
+                           sum(result->server_stats(), ServerUserTime));
+  auto client_queries_per_cpu_sec =
+      histogram.Count() / (sum(result->client_stats(), SystemTime) +
+                           sum(result->client_stats(), UserTime));
+
+  result->mutable_summary()->set_server_queries_per_cpu_sec(
+      server_queries_per_cpu_sec);
+  result->mutable_summary()->set_client_queries_per_cpu_sec(
+      client_queries_per_cpu_sec);
 }
 
 std::unique_ptr<ScenarioResult> RunScenario(
     const ClientConfig& initial_client_config, size_t num_clients,
     const ServerConfig& initial_server_config, size_t num_servers,
     int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count,
-    const char* qps_server_target_override) {
+    const grpc::string& qps_server_target_override,
+    const grpc::string& credential_type) {
   // Log everything from the driver
   gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
 
@@ -214,7 +228,7 @@
     }
 
     int driver_port = grpc_pick_unused_port_or_die();
-    local_workers.emplace_back(new QpsWorker(driver_port));
+    local_workers.emplace_back(new QpsWorker(driver_port, 0, credential_type));
     char addr[256];
     sprintf(addr, "localhost:%d", driver_port);
     if (spawn_local_worker_count < 0) {
@@ -246,12 +260,14 @@
   };
   std::vector<ServerData> servers(num_servers);
   std::unordered_map<string, std::deque<int>> hosts_cores;
+  ChannelArguments channel_args;
 
   for (size_t i = 0; i < num_servers; i++) {
     gpr_log(GPR_INFO, "Starting server on %s (worker #%" PRIuPTR ")",
             workers[i].c_str(), i);
-    servers[i].stub = WorkerService::NewStub(
-        CreateChannel(workers[i], InsecureChannelCredentials()));
+    servers[i].stub = WorkerService::NewStub(CreateChannel(
+        workers[i], GetCredentialsProvider()->GetChannelCredentials(
+                        credential_type, &channel_args)));
 
     ServerConfig server_config = initial_server_config;
     if (server_config.core_limit() != 0) {
@@ -269,8 +285,7 @@
     if (!servers[i].stream->Read(&init_status)) {
       gpr_log(GPR_ERROR, "Server %zu did not yield initial status", i);
     }
-    if (qps_server_target_override != NULL &&
-        strlen(qps_server_target_override) > 0) {
+    if (qps_server_target_override.length() > 0) {
       // overriding the qps server target only works if there is 1 server
       GPR_ASSERT(num_servers == 1);
       client_config.add_server_targets(qps_server_target_override);
@@ -298,7 +313,8 @@
     gpr_log(GPR_INFO, "Starting client on %s (worker #%" PRIuPTR ")",
             worker.c_str(), i + num_servers);
     clients[i].stub = WorkerService::NewStub(
-        CreateChannel(worker, InsecureChannelCredentials()));
+        CreateChannel(worker, GetCredentialsProvider()->GetChannelCredentials(
+                                  credential_type, &channel_args)));
     ClientConfig per_client_config = client_config;
 
     if (initial_client_config.core_limit() != 0) {
@@ -483,16 +499,19 @@
   return result;
 }
 
-bool RunQuit() {
+bool RunQuit(const grpc::string& credential_type) {
   // Get client, server lists
   bool result = true;
   auto workers = get_workers("QPS_WORKERS");
   if (workers.size() == 0) {
     return false;
   }
+
+  ChannelArguments channel_args;
   for (size_t i = 0; i < workers.size(); i++) {
-    auto stub = WorkerService::NewStub(
-        CreateChannel(workers[i], InsecureChannelCredentials()));
+    auto stub = WorkerService::NewStub(CreateChannel(
+        workers[i], GetCredentialsProvider()->GetChannelCredentials(
+                        credential_type, &channel_args)));
     Void dummy;
     grpc::ClientContext ctx;
     ctx.set_wait_for_ready(true);
diff --git a/test/cpp/qps/driver.h b/test/cpp/qps/driver.h
index def32c6..29f2776 100644
--- a/test/cpp/qps/driver.h
+++ b/test/cpp/qps/driver.h
@@ -31,9 +31,10 @@
     const grpc::testing::ClientConfig& client_config, size_t num_clients,
     const grpc::testing::ServerConfig& server_config, size_t num_servers,
     int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count,
-    const char* qps_server_target_override = "");
+    const grpc::string& qps_server_target_override,
+    const grpc::string& credential_type);
 
-bool RunQuit();
+bool RunQuit(const grpc::string& credential_type);
 }  // namespace testing
 }  // namespace grpc
 
diff --git a/test/cpp/qps/qps_json_driver.cc b/test/cpp/qps/qps_json_driver.cc
index e1e5802..cca59f6 100644
--- a/test/cpp/qps/qps_json_driver.cc
+++ b/test/cpp/qps/qps_json_driver.cc
@@ -31,6 +31,7 @@
 #include "test/cpp/qps/parse_json.h"
 #include "test/cpp/qps/report.h"
 #include "test/cpp/util/test_config.h"
+#include "test/cpp/util/test_credentials_provider.h"
 
 DEFINE_string(scenarios_file, "",
               "JSON file containing an array of Scenario objects");
@@ -61,6 +62,9 @@
 
 DEFINE_string(json_file_out, "", "File to write the JSON output to.");
 
+DEFINE_string(credential_type, grpc::testing::kInsecureCredentialsType,
+              "Credential type for communication with workers");
+
 namespace grpc {
 namespace testing {
 
@@ -72,7 +76,7 @@
                   scenario.server_config(), scenario.num_servers(),
                   scenario.warmup_seconds(), scenario.benchmark_seconds(),
                   scenario.spawn_local_worker_count(),
-                  FLAGS_qps_server_target_override.c_str());
+                  FLAGS_qps_server_target_override, FLAGS_credential_type);
 
   // Amend the result with scenario config. Eventually we should adjust
   // RunScenario contract so we don't need to touch the result here.
@@ -84,6 +88,7 @@
   GetReporter()->ReportTimes(*result);
   GetReporter()->ReportCpuUsage(*result);
   GetReporter()->ReportPollCount(*result);
+  GetReporter()->ReportQueriesPerCpuSec(*result);
 
   for (int i = 0; *success && i < result->client_success_size(); i++) {
     *success = result->client_success(i);
@@ -185,7 +190,7 @@
   } else if (scjson) {
     json = FLAGS_scenarios_json.c_str();
   } else if (FLAGS_quit) {
-    return RunQuit();
+    return RunQuit(FLAGS_credential_type);
   }
 
   // Parse into an array of scenarios
diff --git a/test/cpp/qps/qps_openloop_test.cc b/test/cpp/qps/qps_openloop_test.cc
index 2f8a3d7..069b3fa 100644
--- a/test/cpp/qps/qps_openloop_test.cc
+++ b/test/cpp/qps/qps_openloop_test.cc
@@ -25,6 +25,7 @@
 #include "test/cpp/qps/driver.h"
 #include "test/cpp/qps/report.h"
 #include "test/cpp/util/test_config.h"
+#include "test/cpp/util/test_credentials_provider.h"
 
 namespace grpc {
 namespace testing {
@@ -48,8 +49,8 @@
   server_config.set_server_type(ASYNC_SERVER);
   server_config.set_async_server_threads(8);
 
-  const auto result =
-      RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
+  const auto result = RunScenario(client_config, 1, server_config, 1, WARMUP,
+                                  BENCHMARK, -2, "", kInsecureCredentialsType);
 
   GetReporter()->ReportQPSPerCore(*result);
   GetReporter()->ReportLatency(*result);
diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc
index 10bc542..d20bc1b 100644
--- a/test/cpp/qps/qps_worker.cc
+++ b/test/cpp/qps/qps_worker.cc
@@ -41,6 +41,7 @@
 #include "test/cpp/qps/client.h"
 #include "test/cpp/qps/server.h"
 #include "test/cpp/util/create_test_channel.h"
+#include "test/cpp/util/test_credentials_provider.h"
 
 namespace grpc {
 namespace testing {
@@ -263,7 +264,8 @@
   QpsWorker* worker_;
 };
 
-QpsWorker::QpsWorker(int driver_port, int server_port) {
+QpsWorker::QpsWorker(int driver_port, int server_port,
+                     const grpc::string& credential_type) {
   impl_.reset(new WorkerServiceImpl(server_port, this));
   gpr_atm_rel_store(&done_, static_cast<gpr_atm>(0));
 
@@ -271,7 +273,9 @@
   gpr_join_host_port(&server_address, "::", driver_port);
 
   ServerBuilder builder;
-  builder.AddListeningPort(server_address, InsecureServerCredentials());
+  builder.AddListeningPort(
+      server_address,
+      GetCredentialsProvider()->GetServerCredentials(credential_type));
   builder.RegisterService(impl_.get());
 
   gpr_free(server_address);
diff --git a/test/cpp/qps/qps_worker.h b/test/cpp/qps/qps_worker.h
index c8a7be9..360125f 100644
--- a/test/cpp/qps/qps_worker.h
+++ b/test/cpp/qps/qps_worker.h
@@ -21,6 +21,7 @@
 
 #include <memory>
 
+#include <grpc++/support/config.h>
 #include <grpc/support/atm.h>
 
 namespace grpc {
@@ -33,7 +34,8 @@
 
 class QpsWorker {
  public:
-  explicit QpsWorker(int driver_port, int server_port = 0);
+  explicit QpsWorker(int driver_port, int server_port,
+                     const grpc::string& credential_type);
   ~QpsWorker();
 
   bool Done() const;
diff --git a/test/cpp/qps/report.cc b/test/cpp/qps/report.cc
index 809c563..a45b10b 100644
--- a/test/cpp/qps/report.cc
+++ b/test/cpp/qps/report.cc
@@ -71,6 +71,12 @@
   }
 }
 
+void CompositeReporter::ReportQueriesPerCpuSec(const ScenarioResult& result) {
+  for (size_t i = 0; i < reporters_.size(); ++i) {
+    reporters_[i]->ReportQueriesPerCpuSec(result);
+  }
+}
+
 void GprLogReporter::ReportQPS(const ScenarioResult& result) {
   gpr_log(GPR_INFO, "QPS: %.1f", result.summary().qps());
   if (result.summary().failed_requests_per_second() > 0) {
@@ -119,6 +125,13 @@
           result.summary().server_polls_per_request());
 }
 
+void GprLogReporter::ReportQueriesPerCpuSec(const ScenarioResult& result) {
+  gpr_log(GPR_INFO, "Server Queries/CPU-sec: %.2f",
+          result.summary().server_queries_per_cpu_sec());
+  gpr_log(GPR_INFO, "Client Queries/CPU-sec: %.2f",
+          result.summary().client_queries_per_cpu_sec());
+}
+
 void JsonReporter::ReportQPS(const ScenarioResult& result) {
   grpc::string json_string =
       SerializeJson(result, "type.googleapis.com/grpc.testing.ScenarioResult");
@@ -147,6 +160,10 @@
   // NOP - all reporting is handled by ReportQPS.
 }
 
+void JsonReporter::ReportQueriesPerCpuSec(const ScenarioResult& result) {
+  // NOP - all reporting is handled by ReportQPS.
+}
+
 void RpcReporter::ReportQPS(const ScenarioResult& result) {
   grpc::ClientContext context;
   grpc::Status status;
@@ -183,5 +200,9 @@
   // NOP - all reporting is handled by ReportQPS.
 }
 
+void RpcReporter::ReportQueriesPerCpuSec(const ScenarioResult& result) {
+  // NOP - all reporting is handled by ReportQPS.
+}
+
 }  // namespace testing
 }  // namespace grpc
diff --git a/test/cpp/qps/report.h b/test/cpp/qps/report.h
index 0bd398f..321be2a 100644
--- a/test/cpp/qps/report.h
+++ b/test/cpp/qps/report.h
@@ -64,6 +64,9 @@
   /** Reports client and server poll usage inside completion queue. */
   virtual void ReportPollCount(const ScenarioResult& result) = 0;
 
+  /** Reports queries per cpu-sec. */
+  virtual void ReportQueriesPerCpuSec(const ScenarioResult& result) = 0;
+
  private:
   const string name_;
 };
@@ -82,6 +85,7 @@
   void ReportTimes(const ScenarioResult& result) override;
   void ReportCpuUsage(const ScenarioResult& result) override;
   void ReportPollCount(const ScenarioResult& result) override;
+  void ReportQueriesPerCpuSec(const ScenarioResult& result) override;
 
  private:
   std::vector<std::unique_ptr<Reporter> > reporters_;
@@ -99,6 +103,7 @@
   void ReportTimes(const ScenarioResult& result) override;
   void ReportCpuUsage(const ScenarioResult& result) override;
   void ReportPollCount(const ScenarioResult& result) override;
+  void ReportQueriesPerCpuSec(const ScenarioResult& result) override;
 };
 
 /** Dumps the report to a JSON file. */
@@ -114,6 +119,7 @@
   void ReportTimes(const ScenarioResult& result) override;
   void ReportCpuUsage(const ScenarioResult& result) override;
   void ReportPollCount(const ScenarioResult& result) override;
+  void ReportQueriesPerCpuSec(const ScenarioResult& result) override;
 
   const string report_file_;
 };
@@ -130,6 +136,7 @@
   void ReportTimes(const ScenarioResult& result) override;
   void ReportCpuUsage(const ScenarioResult& result) override;
   void ReportPollCount(const ScenarioResult& result) override;
+  void ReportQueriesPerCpuSec(const ScenarioResult& result) override;
 
   std::unique_ptr<ReportQpsScenarioService::Stub> stub_;
 };
diff --git a/test/cpp/qps/secure_sync_unary_ping_pong_test.cc b/test/cpp/qps/secure_sync_unary_ping_pong_test.cc
index 1ee6e37..137b33e 100644
--- a/test/cpp/qps/secure_sync_unary_ping_pong_test.cc
+++ b/test/cpp/qps/secure_sync_unary_ping_pong_test.cc
@@ -24,6 +24,7 @@
 #include "test/cpp/qps/driver.h"
 #include "test/cpp/qps/report.h"
 #include "test/cpp/util/test_config.h"
+#include "test/cpp/util/test_credentials_provider.h"
 
 namespace grpc {
 namespace testing {
@@ -51,8 +52,8 @@
   client_config.mutable_security_params()->CopyFrom(security);
   server_config.mutable_security_params()->CopyFrom(security);
 
-  const auto result =
-      RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
+  const auto result = RunScenario(client_config, 1, server_config, 1, WARMUP,
+                                  BENCHMARK, -2, "", kInsecureCredentialsType);
 
   GetReporter()->ReportQPS(*result);
   GetReporter()->ReportLatency(*result);
diff --git a/test/cpp/qps/server.h b/test/cpp/qps/server.h
index 4b699e0..c0dac96 100644
--- a/test/cpp/qps/server.h
+++ b/test/cpp/qps/server.h
@@ -32,6 +32,7 @@
 #include "test/core/end2end/data/ssl_test_data.h"
 #include "test/core/util/port.h"
 #include "test/cpp/qps/usage_timer.h"
+#include "test/cpp/util/test_credentials_provider.h"
 
 namespace grpc {
 namespace testing {
@@ -89,12 +90,14 @@
   static std::shared_ptr<ServerCredentials> CreateServerCredentials(
       const ServerConfig& config) {
     if (config.has_security_params()) {
-      SslServerCredentialsOptions::PemKeyCertPair pkcp = {test_server1_key,
-                                                          test_server1_cert};
-      SslServerCredentialsOptions ssl_opts;
-      ssl_opts.pem_root_certs = "";
-      ssl_opts.pem_key_cert_pairs.push_back(pkcp);
-      return SslServerCredentials(ssl_opts);
+      grpc::string type;
+      if (config.security_params().cred_type().empty()) {
+        type = kTlsCredentialsType;
+      } else {
+        type = config.security_params().cred_type();
+      }
+
+      return GetCredentialsProvider()->GetServerCredentials(type);
     } else {
       return InsecureServerCredentials();
     }
diff --git a/test/cpp/qps/worker.cc b/test/cpp/qps/worker.cc
index fd51d32..27010b7 100644
--- a/test/cpp/qps/worker.cc
+++ b/test/cpp/qps/worker.cc
@@ -27,9 +27,12 @@
 
 #include "test/cpp/qps/qps_worker.h"
 #include "test/cpp/util/test_config.h"
+#include "test/cpp/util/test_credentials_provider.h"
 
 DEFINE_int32(driver_port, 0, "Port for communication with driver");
 DEFINE_int32(server_port, 0, "Port for operation as a server");
+DEFINE_string(credential_type, grpc::testing::kInsecureCredentialsType,
+              "Credential type for communication with driver");
 
 static bool got_sigint = false;
 
@@ -39,7 +42,7 @@
 namespace testing {
 
 static void RunServer() {
-  QpsWorker worker(FLAGS_driver_port, FLAGS_server_port);
+  QpsWorker worker(FLAGS_driver_port, FLAGS_server_port, FLAGS_credential_type);
 
   while (!got_sigint && !worker.Done()) {
     gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
diff --git a/test/cpp/util/create_test_channel.cc b/test/cpp/util/create_test_channel.cc
index 68c6fe4..34b6d60 100644
--- a/test/cpp/util/create_test_channel.cc
+++ b/test/cpp/util/create_test_channel.cc
@@ -51,29 +51,31 @@
 
 }  // namespace
 
-// When ssl is enabled, if server is empty, override_hostname is used to
+// When cred_type is 'ssl', if server is empty, override_hostname is used to
 // create channel. Otherwise, connect to server and override hostname if
 // override_hostname is provided.
-// When ssl is not enabled, override_hostname is ignored.
+// When cred_type is not 'ssl', override_hostname is ignored.
 // Set use_prod_root to true to use the SSL root for connecting to google.
 // In this case, path to the roots pem file must be set via environment variable
 // GRPC_DEFAULT_SSL_ROOTS_FILE_PATH.
 // Otherwise, root for test SSL cert will be used.
-// creds will be used to create a channel when enable_ssl is true.
+// creds will be used to create a channel when cred_type is 'ssl'.
 // Use examples:
 //   CreateTestChannel(
-//       "1.1.1.1:12345", "override.hostname.com", true, false, creds);
-//   CreateTestChannel("test.google.com:443", "", true, true, creds);
+//       "1.1.1.1:12345", "ssl", "override.hostname.com", false, creds);
+//   CreateTestChannel("test.google.com:443", "ssl", "", true, creds);
 //   same as above
-//   CreateTestChannel("", "test.google.com:443", true, true, creds);
+//   CreateTestChannel("", "ssl", "test.google.com:443", true, creds);
 std::shared_ptr<Channel> CreateTestChannel(
-    const grpc::string& server, const grpc::string& override_hostname,
-    bool enable_ssl, bool use_prod_roots,
+    const grpc::string& server, const grpc::string& cred_type,
+    const grpc::string& override_hostname, bool use_prod_roots,
     const std::shared_ptr<CallCredentials>& creds,
     const ChannelArguments& args) {
   ChannelArguments channel_args(args);
   std::shared_ptr<ChannelCredentials> channel_creds;
-  if (enable_ssl) {
+  if (cred_type.empty()) {
+    return CreateChannel(server, InsecureChannelCredentials());
+  } else if (cred_type == testing::kTlsCredentialsType) {  // cred_type == "ssl"
     if (use_prod_roots) {
       gpr_once_init(&g_once_init_add_prod_ssl_provider, &AddProdSslType);
       channel_creds = testing::GetCredentialsProvider()->GetChannelCredentials(
@@ -95,13 +97,31 @@
     }
     return CreateCustomChannel(connect_to, channel_creds, channel_args);
   } else {
-    return CreateChannel(server, InsecureChannelCredentials());
+    channel_creds = testing::GetCredentialsProvider()->GetChannelCredentials(
+        cred_type, &channel_args);
+    GPR_ASSERT(channel_creds != nullptr);
+
+    return CreateChannel(server, channel_creds);
   }
 }
 
 std::shared_ptr<Channel> CreateTestChannel(
     const grpc::string& server, const grpc::string& override_hostname,
     bool enable_ssl, bool use_prod_roots,
+    const std::shared_ptr<CallCredentials>& creds,
+    const ChannelArguments& args) {
+  grpc::string type;
+  if (enable_ssl) {
+    type = testing::kTlsCredentialsType;
+  }
+
+  return CreateTestChannel(server, type, override_hostname, use_prod_roots,
+                           creds, args);
+}
+
+std::shared_ptr<Channel> CreateTestChannel(
+    const grpc::string& server, const grpc::string& override_hostname,
+    bool enable_ssl, bool use_prod_roots,
     const std::shared_ptr<CallCredentials>& creds) {
   return CreateTestChannel(server, override_hostname, enable_ssl,
                            use_prod_roots, creds, ChannelArguments());
diff --git a/test/cpp/util/create_test_channel.h b/test/cpp/util/create_test_channel.h
index 9b4b091..e2ca8f9 100644
--- a/test/cpp/util/create_test_channel.h
+++ b/test/cpp/util/create_test_channel.h
@@ -45,6 +45,12 @@
     const ChannelArguments& args);
 
 std::shared_ptr<Channel> CreateTestChannel(
+    const grpc::string& server, const grpc::string& cred_type,
+    const grpc::string& override_hostname, bool use_prod_roots,
+    const std::shared_ptr<CallCredentials>& creds,
+    const ChannelArguments& args);
+
+std::shared_ptr<Channel> CreateTestChannel(
     const grpc::string& server, const grpc::string& credential_type,
     const std::shared_ptr<CallCredentials>& creds);
 
diff --git a/tools/internal_ci/linux/grpc_build_artifacts.sh b/tools/internal_ci/linux/grpc_build_artifacts.sh
index bc29014..3997a13 100755
--- a/tools/internal_ci/linux/grpc_build_artifacts.sh
+++ b/tools/internal_ci/linux/grpc_build_artifacts.sh
@@ -20,9 +20,10 @@
 
 source tools/internal_ci/helper_scripts/prepare_build_linux_rc
 
-# TODO(jtattermusch): install ruby on the internal_ci worker
-gpg --keyserver hkp://keys.gnupg.net --recv-keys 409B6B1796C275462A1703113804BB82D39DC0E3
-# TODO(jtattermusch): grep works around https://github.com/rvm/rvm/issues/4068
-curl -sSL https://get.rvm.io | grep -v __rvm_print_headline | bash -s stable --ruby
+set +ex
+[[ -s /etc/profile.d/rvm.sh ]] && . /etc/profile.d/rvm.sh
+set -e  # rvm commands are very verbose
+rvm --default use ruby-2.4.1
+set -ex
 
 tools/run_tests/task_runner.py -f artifact linux
diff --git a/tools/interop_matrix/client_matrix.py b/tools/interop_matrix/client_matrix.py
index 4394e32..4d1b5f0 100644
--- a/tools/interop_matrix/client_matrix.py
+++ b/tools/interop_matrix/client_matrix.py
@@ -30,9 +30,27 @@
 }
 
 # Dictionary of releases per language.  For each language, we need to provide
-# a tuple of release tag (used as the tag for the GCR image) and also github hash.
+# a release tag pointing to the latest build of the branch.
 LANG_RELEASE_MATRIX = {
-    'cxx': ['v1.0.1', 'v1.1.2'],
-    'go': ['v1.0.1-GA', 'v1.3.0'],
-    'java': ['v1.0.3', 'v1.1.2'],
+    'cxx': [
+        'v1.0.1',
+        'v1.1.4',
+        'v1.2.5',
+        'v1.3.9',
+        'v1.4.2',
+    ],
+    'go': [
+        'v1.0.5',
+        'v1.2.1',
+        'v1.3.0',
+        'v1.4.2',
+    ],
+    'java': [
+        'v1.0.3',
+        'v1.1.2',
+        'v1.2.0',
+        'v1.3.1',
+        'v1.4.0',
+        'v1.5.0',
+    ],
 }
diff --git a/tools/interop_matrix/run_interop_matrix_tests.py b/tools/interop_matrix/run_interop_matrix_tests.py
index 28126dd..4315c82 100755
--- a/tools/interop_matrix/run_interop_matrix_tests.py
+++ b/tools/interop_matrix/run_interop_matrix_tests.py
@@ -168,7 +168,7 @@
         _xml_report_tree,
         resultset,
         'grpc_interop_matrix',
-        '%s__%s:%s'%(lang,runtime,release),
+        '%s__%s %s'%(lang,runtime,release),
         str(uuid.uuid4()))
 
 _docker_images_cleanup = []
diff --git a/tools/profiling/microbenchmarks/bm_diff/bm_diff.py b/tools/profiling/microbenchmarks/bm_diff/bm_diff.py
index 809817a..1ac951f 100755
--- a/tools/profiling/microbenchmarks/bm_diff/bm_diff.py
+++ b/tools/profiling/microbenchmarks/bm_diff/bm_diff.py
@@ -144,7 +144,7 @@
 def fmt_dict(d):
   return ''.join(["    " + k + ": " + str(d[k]) + "\n" for k in d])
 
-def diff(bms, loops, track, old, new, counters):
+def diff(bms, loops, regex, track, old, new, counters):
   benchmarks = collections.defaultdict(Benchmark)
 
   badjson_files = {}
@@ -153,7 +153,8 @@
     for loop in range(0, loops):
       for line in subprocess.check_output(
         ['bm_diff_%s/opt/%s' % (old, bm),
-         '--benchmark_list_tests']).splitlines():
+         '--benchmark_list_tests', 
+         '--benchmark_filter=%s' % regex]).splitlines():
         stripped_line = line.strip().replace("/", "_").replace(
           "<", "_").replace(">", "_").replace(", ", "_")
         js_new_opt = _read_json('%s.%s.opt.%s.%d.json' %
diff --git a/tools/profiling/microbenchmarks/bm_diff/bm_main.py b/tools/profiling/microbenchmarks/bm_diff/bm_main.py
index 8b4e0cb..5aa11ac 100755
--- a/tools/profiling/microbenchmarks/bm_diff/bm_main.py
+++ b/tools/profiling/microbenchmarks/bm_diff/bm_main.py
@@ -63,10 +63,10 @@
     help='Name of baseline run to compare to. Ususally just called "old"')
   argp.add_argument(
     '-r',
-    '--repetitions',
-    type=int,
-    default=1,
-    help='Number of repetitions to pass to the benchmarks')
+    '--regex',
+    type=str,
+    default="",
+    help='Regex to filter benchmarks run')
   argp.add_argument(
     '-l',
     '--loops',
@@ -125,10 +125,10 @@
       subprocess.check_call(['git', 'checkout', where_am_i])
       subprocess.check_call(['git', 'submodule', 'update'])
 
-  bm_run.run('new', args.benchmarks, args.jobs, args.loops, args.repetitions, args.counters)
-  bm_run.run(old, args.benchmarks, args.jobs, args.loops, args.repetitions, args.counters)
+  bm_run.run('new', args.benchmarks, args.jobs, args.loops, args.regex, args.counters)
+  bm_run.run(old, args.benchmarks, args.jobs, args.loops, args.regex, args.counters)
 
-  diff, note = bm_diff.diff(args.benchmarks, args.loops, args.track, old,
+  diff, note = bm_diff.diff(args.benchmarks, args.loops, args.regex, args.track, old,
                 'new', args.counters)
   if diff:
     text = '[%s] Performance differences noted:\n%s' % (args.pr_comment_name, diff)
diff --git a/tools/profiling/microbenchmarks/bm_diff/bm_run.py b/tools/profiling/microbenchmarks/bm_diff/bm_run.py
index 72b3d3c..206f7c5 100755
--- a/tools/profiling/microbenchmarks/bm_diff/bm_run.py
+++ b/tools/profiling/microbenchmarks/bm_diff/bm_run.py
@@ -56,10 +56,10 @@
   )
   argp.add_argument(
     '-r',
-    '--repetitions',
-    type=int,
-    default=1,
-    help='Number of repetitions to pass to the benchmarks')
+    '--regex',
+    type=str,
+    default="",
+    help='Regex to filter benchmarks run')
   argp.add_argument(
     '-l',
     '--loops',
@@ -77,18 +77,17 @@
   return args
 
 
-def _collect_bm_data(bm, cfg, name, reps, idx, loops):
+def _collect_bm_data(bm, cfg, name, regex, idx, loops):
   jobs_list = []
   for line in subprocess.check_output(
     ['bm_diff_%s/%s/%s' % (name, cfg, bm),
-     '--benchmark_list_tests']).splitlines():
+     '--benchmark_list_tests', '--benchmark_filter=%s' % regex]).splitlines():
     stripped_line = line.strip().replace("/", "_").replace(
       "<", "_").replace(">", "_").replace(", ", "_")
     cmd = [
       'bm_diff_%s/%s/%s' % (name, cfg, bm), '--benchmark_filter=^%s$' %
       line, '--benchmark_out=%s.%s.%s.%s.%d.json' %
       (bm, stripped_line, cfg, name, idx), '--benchmark_out_format=json',
-      '--benchmark_repetitions=%d' % (reps)
     ]
     jobs_list.append(
       jobset.JobSpec(
@@ -100,13 +99,13 @@
   return jobs_list
 
 
-def run(name, benchmarks, jobs, loops, reps, counters):
+def run(name, benchmarks, jobs, loops, regex, counters):
   jobs_list = []
   for loop in range(0, loops):
     for bm in benchmarks:
-      jobs_list += _collect_bm_data(bm, 'opt', name, reps, loop, loops)
+      jobs_list += _collect_bm_data(bm, 'opt', name, regex, loop, loops)
       if counters:
-        jobs_list += _collect_bm_data(bm, 'counters', name, reps, loop,
+        jobs_list += _collect_bm_data(bm, 'counters', name, regex, loop,
                         loops)
   random.shuffle(jobs_list, random.SystemRandom().random)
   jobset.run(jobs_list, maxjobs=jobs)
@@ -114,4 +113,4 @@
 
 if __name__ == '__main__':
   args = _args()
-  run(args.name, args.benchmarks, args.jobs, args.loops, args.repetitions, args.counters)
+  run(args.name, args.benchmarks, args.jobs, args.loops, args.regex, args.counters)
diff --git a/tools/run_tests/sanity/core_banned_functions.py b/tools/run_tests/sanity/core_banned_functions.py
index b394bbb..1f13905 100755
--- a/tools/run_tests/sanity/core_banned_functions.py
+++ b/tools/run_tests/sanity/core_banned_functions.py
@@ -41,6 +41,8 @@
     'grpc_closure_sched(' : ['src/core/lib/iomgr/closure.c'],
     'grpc_closure_run(' : ['src/core/lib/iomgr/closure.c'],
     'grpc_closure_list_sched(' : ['src/core/lib/iomgr/closure.c'],
+    'gpr_getenv_silent(' : ['src/core/lib/support/log.c', 'src/core/lib/support/env_linux.c', 
+                            'src/core/lib/support/env_posix.c', 'src/core/lib/support/env_windows.c'],
 }
 
 errors = 0