Merge pull request #3754 from ctiller/latent-see

Latency profiling support
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc
index 1bf2b16..3c8ca8a 100644
--- a/src/compiler/cpp_generator.cc
+++ b/src/compiler/cpp_generator.cc
@@ -585,7 +585,7 @@
       "class Service : public ::grpc::SynchronousService {\n"
       " public:\n");
   printer->Indent();
-  printer->Print("Service() : service_(nullptr) {}\n");
+  printer->Print("Service();\n");
   printer->Print("virtual ~Service();\n");
   for (int i = 0; i < service->method_count(); ++i) {
     PrintHeaderServerMethodSync(printer, service->method(i), vars);
@@ -594,7 +594,7 @@
   printer->Outdent();
   printer->Print(
       " private:\n"
-      "  ::grpc::RpcService* service_;\n");
+      "  std::unique_ptr< ::grpc::RpcService> service_;\n");
   printer->Print("};\n");
 
   // Server side - Asynchronous
@@ -1014,8 +1014,10 @@
                  "{}\n\n");
 
   printer->Print(*vars,
+                 "$ns$$Service$::Service::Service() {\n"
+                 "}\n\n");
+  printer->Print(*vars,
                  "$ns$$Service$::Service::~Service() {\n"
-                 "  delete service_;\n"
                  "}\n\n");
   for (int i = 0; i < service->method_count(); ++i) {
     (*vars)["Idx"] = as_string(i);
@@ -1026,10 +1028,10 @@
                  "::grpc::RpcService* $ns$$Service$::Service::service() {\n");
   printer->Indent();
   printer->Print(
-      "if (service_ != nullptr) {\n"
-      "  return service_;\n"
+      "if (service_) {\n"
+      "  return service_.get();\n"
       "}\n");
-  printer->Print("service_ = new ::grpc::RpcService();\n");
+  printer->Print("service_ = std::unique_ptr< ::grpc::RpcService>(new ::grpc::RpcService());\n");
   for (int i = 0; i < service->method_count(); ++i) {
     const grpc::protobuf::MethodDescriptor *method = service->method(i);
     (*vars)["Idx"] = as_string(i);
@@ -1077,7 +1079,7 @@
           "        std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
     }
   }
-  printer->Print("return service_;\n");
+  printer->Print("return service_.get();\n");
   printer->Outdent();
   printer->Print("}\n\n");
 }
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index dfa71c9..9f85557 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -198,13 +198,12 @@
   return 1;
 }
 
-static void started_call(grpc_exec_ctx *exec_ctx, void *arg,
-                         int iomgr_success) {
+static void started_call_locked(grpc_exec_ctx *exec_ctx, void *arg,
+                                int iomgr_success) {
   call_data *calld = arg;
   grpc_transport_stream_op op;
   int have_waiting;
 
-  gpr_mu_lock(&calld->mu_state);
   if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL) {
     memset(&op, 0, sizeof(op));
     op.cancel_with_status = GRPC_STATUS_CANCELLED;
@@ -232,10 +231,18 @@
   }
 }
 
+static void started_call(grpc_exec_ctx *exec_ctx, void *arg,
+                         int iomgr_success) {
+  call_data *calld = arg;
+  gpr_mu_lock(&calld->mu_state);
+  started_call_locked(exec_ctx, arg, iomgr_success);
+}
+
 static void picked_target(grpc_exec_ctx *exec_ctx, void *arg,
                           int iomgr_success) {
   call_data *calld = arg;
   grpc_pollset *pollset;
+  grpc_subchannel_call_create_status call_creation_status;
 
   GPR_TIMER_BEGIN("picked_target", 0);
 
@@ -252,11 +259,15 @@
       GPR_ASSERT(calld->state == CALL_WAITING_FOR_PICK);
       calld->state = CALL_WAITING_FOR_CALL;
       pollset = calld->waiting_op.bind_pollset;
-      gpr_mu_unlock(&calld->mu_state);
       grpc_closure_init(&calld->async_setup_task, started_call, calld);
-      grpc_subchannel_create_call(exec_ctx, calld->picked_channel, pollset,
-                                  &calld->subchannel_call,
-                                  &calld->async_setup_task);
+      call_creation_status = grpc_subchannel_create_call(
+          exec_ctx, calld->picked_channel, pollset, &calld->subchannel_call,
+          &calld->async_setup_task);
+      if (call_creation_status == GRPC_SUBCHANNEL_CALL_CREATE_READY) {
+        started_call_locked(exec_ctx, calld, iomgr_success);
+      } else {
+        gpr_mu_unlock(&calld->mu_state);
+      }
     }
   }
 
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index a2c521a..5e84dec 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -335,18 +335,20 @@
 
 static void continue_creating_call(grpc_exec_ctx *exec_ctx, void *arg,
                                    int iomgr_success) {
+  grpc_subchannel_call_create_status call_creation_status;
   waiting_for_connect *w4c = arg;
   grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel, w4c->pollset);
-  grpc_subchannel_create_call(exec_ctx, w4c->subchannel, w4c->pollset,
-                              w4c->target, w4c->notify);
+  call_creation_status = grpc_subchannel_create_call(
+      exec_ctx, w4c->subchannel, w4c->pollset, w4c->target, w4c->notify);
+  GPR_ASSERT(call_creation_status == GRPC_SUBCHANNEL_CALL_CREATE_READY);
+  w4c->notify->cb(exec_ctx, w4c->notify->cb_arg, iomgr_success);
   GRPC_SUBCHANNEL_UNREF(exec_ctx, w4c->subchannel, "waiting_for_connect");
   gpr_free(w4c);
 }
 
-void grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, grpc_subchannel *c,
-                                 grpc_pollset *pollset,
-                                 grpc_subchannel_call **target,
-                                 grpc_closure *notify) {
+grpc_subchannel_call_create_status grpc_subchannel_create_call(
+    grpc_exec_ctx *exec_ctx, grpc_subchannel *c, grpc_pollset *pollset,
+    grpc_subchannel_call **target, grpc_closure *notify) {
   connection *con;
   gpr_mu_lock(&c->mu);
   if (c->active != NULL) {
@@ -355,7 +357,7 @@
     gpr_mu_unlock(&c->mu);
 
     *target = create_call(exec_ctx, con);
-    notify->cb(exec_ctx, notify->cb_arg, 1);
+    return GRPC_SUBCHANNEL_CALL_CREATE_READY;
   } else {
     waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c));
     w4c->next = c->waiting;
@@ -380,6 +382,7 @@
     } else {
       gpr_mu_unlock(&c->mu);
     }
+    return GRPC_SUBCHANNEL_CALL_CREATE_PENDING;
   }
 }
 
diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h
index 86b7fa5..a26d08f 100644
--- a/src/core/client_config/subchannel.h
+++ b/src/core/client_config/subchannel.h
@@ -75,12 +75,22 @@
                                 grpc_subchannel_call *call
                                     GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
 
-/** construct a call (possibly asynchronously) */
-void grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx,
-                                 grpc_subchannel *subchannel,
-                                 grpc_pollset *pollset,
-                                 grpc_subchannel_call **target,
-                                 grpc_closure *notify);
+typedef enum {
+  GRPC_SUBCHANNEL_CALL_CREATE_READY,
+  GRPC_SUBCHANNEL_CALL_CREATE_PENDING
+} grpc_subchannel_call_create_status;
+
+/** construct a subchannel call (possibly asynchronously).
+ *
+ * If the returned status is \a GRPC_SUBCHANNEL_CALL_CREATE_READY, the call will
+ * return immediately and \a target will point to a connected \a subchannel_call
+ * instance. Note that \a notify will \em not be invoked in this case.
+ * Otherwise, if the returned status is GRPC_SUBCHANNEL_CALL_CREATE_PENDING, the
+ * subchannel call will be created asynchronously, invoking the \a notify
+ * callback upon completion. */
+grpc_subchannel_call_create_status grpc_subchannel_create_call(
+    grpc_exec_ctx *exec_ctx, grpc_subchannel *subchannel, grpc_pollset *pollset,
+    grpc_subchannel_call **target, grpc_closure *notify);
 
 /** process a transport level op */
 void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index 231bc98..7ff80e6 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -286,7 +286,7 @@
 
 void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
   gpr_mu_lock(&fd->mu);
-  GPR_ASSERT(!gpr_atm_no_barrier_load(&fd->shutdown));
+  GPR_ASSERT(!fd->shutdown);
   fd->shutdown = 1;
   set_ready_locked(exec_ctx, fd, &fd->read_closure);
   set_ready_locked(exec_ctx, fd, &fd->write_closure);
@@ -320,7 +320,7 @@
   gpr_mu_lock(&fd->mu);
 
   /* if we are shutdown, then don't add to the watcher set */
-  if (gpr_atm_no_barrier_load(&fd->shutdown)) {
+  if (fd->shutdown) {
     watcher->fd = NULL;
     watcher->pollset = NULL;
     watcher->worker = NULL;
diff --git a/src/core/tsi/ssl_transport_security.c b/src/core/tsi/ssl_transport_security.c
index 05789f0..22b5796 100644
--- a/src/core/tsi/ssl_transport_security.c
+++ b/src/core/tsi/ssl_transport_security.c
@@ -319,8 +319,9 @@
   /* TODO(jboeuf): Maybe add more properties. */
   GENERAL_NAMES *subject_alt_names =
       X509_get_ext_d2i(cert, NID_subject_alt_name, 0, 0);
-  int subject_alt_name_count =
-      (subject_alt_names != NULL) ? sk_GENERAL_NAME_num(subject_alt_names) : 0;
+  int subject_alt_name_count = (subject_alt_names != NULL)
+                                   ? (int)sk_GENERAL_NAME_num(subject_alt_names)
+                                   : 0;
   size_t property_count;
   tsi_result result;
   GPR_ASSERT(subject_alt_name_count >= 0);
@@ -358,7 +359,7 @@
   unsigned long err;
   while ((err = ERR_get_error()) != 0) {
     char details[256];
-    ERR_error_string_n(err, details, sizeof(details));
+    ERR_error_string_n((uint32_t)err, details, sizeof(details));
     gpr_log(GPR_ERROR, "%s", details);
   }
 }
@@ -668,7 +669,7 @@
   tsi_result result = TSI_OK;
 
   /* First see if we have some pending data in the SSL BIO. */
-  int pending_in_ssl = BIO_pending(impl->from_ssl);
+  int pending_in_ssl = (int)BIO_pending(impl->from_ssl);
   if (pending_in_ssl > 0) {
     *unprotected_bytes_size = 0;
     GPR_ASSERT(*protected_output_frames_size <= INT_MAX);
@@ -726,7 +727,7 @@
     impl->buffer_offset = 0;
   }
 
-  pending = BIO_pending(impl->from_ssl);
+  pending = (int)BIO_pending(impl->from_ssl);
   GPR_ASSERT(pending >= 0);
   *still_pending_size = (size_t)pending;
   if (*still_pending_size == 0) return TSI_OK;
@@ -739,7 +740,7 @@
     return TSI_INTERNAL_ERROR;
   }
   *protected_output_frames_size = (size_t)read_from_ssl;
-  pending = BIO_pending(impl->from_ssl);
+  pending = (int)BIO_pending(impl->from_ssl);
   GPR_ASSERT(pending >= 0);
   *still_pending_size = (size_t)pending;
   return TSI_OK;
diff --git a/src/csharp/Grpc.Core.Tests/PInvokeTest.cs b/src/csharp/Grpc.Core.Tests/PInvokeTest.cs
index 714c2f7..073c502 100644
--- a/src/csharp/Grpc.Core.Tests/PInvokeTest.cs
+++ b/src/csharp/Grpc.Core.Tests/PInvokeTest.cs
@@ -60,7 +60,7 @@
         public void CompletionQueueCreateDestroyBenchmark()
         {
             BenchmarkUtil.RunBenchmark(
-                100000, 1000000,
+                10, 10,
                 () =>
                 {
                     CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create();
diff --git a/src/php/lib/Grpc/BaseStub.php b/src/php/lib/Grpc/BaseStub.php
index 381b114..0a3e1f7 100755
--- a/src/php/lib/Grpc/BaseStub.php
+++ b/src/php/lib/Grpc/BaseStub.php
@@ -114,7 +114,7 @@
       return true;
     }
     if ($new_state == \Grpc\CHANNEL_FATAL_FAILURE) {
-      throw new Exception('Failed to connect to server');
+      throw new \Exception('Failed to connect to server');
     }
     return false;
   }
@@ -153,6 +153,25 @@
     return array($metadata_copy, $timeout);
   }
 
+  /**
+   * validate and normalize the metadata array
+   * @param $metadata The metadata map
+   * @return $metadata Validated and key-normalized metadata map
+   * @throw InvalidArgumentException if key contains invalid characters
+   */
+  private function _validate_and_normalize_metadata($metadata) {
+    $metadata_copy = array();
+    foreach ($metadata as $key => $value) {
+      if (!preg_match('/^[A-Za-z\d_-]+$/', $key)) {
+        throw new \InvalidArgumentException(
+            'Metadata keys must be nonempty strings containing only '.
+            'alphanumeric characters, hyphens and underscores');
+      }
+      $metadata_copy[strtolower($key)] = $value;
+    }
+    return $metadata_copy;
+  }
+
   /* This class is intended to be subclassed by generated code, so all functions
      begin with "_" to avoid name collisions. */
 
@@ -178,6 +197,7 @@
                                         $actual_metadata,
                                         $jwt_aud_uri);
     }
+    $actual_metadata = $this->_validate_and_normalize_metadata($actual_metadata);
     $call->start($argument, $actual_metadata, $options);
     return $call;
   }
@@ -204,6 +224,7 @@
                                         $actual_metadata,
                                         $jwt_aud_uri);
     }
+    $actual_metadata = $this->_validate_and_normalize_metadata($actual_metadata);
     $call->start($actual_metadata);
     return $call;
   }
@@ -231,6 +252,7 @@
                                         $actual_metadata,
                                         $jwt_aud_uri);
     }
+    $actual_metadata = $this->_validate_and_normalize_metadata($actual_metadata);
     $call->start($argument, $actual_metadata, $options);
     return $call;
   }
@@ -254,6 +276,7 @@
                                         $actual_metadata,
                                         $jwt_aud_uri);
     }
+    $actual_metadata = $this->_validate_and_normalize_metadata($actual_metadata);
     $call->start($actual_metadata);
     return $call;
   }
diff --git a/src/php/tests/generated_code/AbstractGeneratedCodeTest.php b/src/php/tests/generated_code/AbstractGeneratedCodeTest.php
index 9cee188..5cdba1e 100644
--- a/src/php/tests/generated_code/AbstractGeneratedCodeTest.php
+++ b/src/php/tests/generated_code/AbstractGeneratedCodeTest.php
@@ -51,6 +51,14 @@
     $this->assertTrue(is_string(self::$client->getTarget()));
   }
 
+  /**
+   * @expectedException InvalidArgumentException
+   */
+  public function testInvalidMetadata() {
+    $div_arg = new math\DivArgs();
+    $call = self::$client->Div($div_arg, array(' ' => 'abc123'));
+  }
+
   public function testWriteFlags() {
     $div_arg = new math\DivArgs();
     $div_arg->setDividend(7);
diff --git a/tools/run_tests/port_server.py b/tools/run_tests/port_server.py
index b953df9..3b85486 100755
--- a/tools/run_tests/port_server.py
+++ b/tools/run_tests/port_server.py
@@ -42,7 +42,7 @@
 # increment this number whenever making a change to ensure that
 # the changes are picked up by running CI servers
 # note that all changes must be backwards compatible
-_MY_VERSION = 2
+_MY_VERSION = 5
 
 
 if len(sys.argv) == 2 and sys.argv[1] == 'dump_version':
@@ -52,8 +52,16 @@
 
 argp = argparse.ArgumentParser(description='Server for httpcli_test')
 argp.add_argument('-p', '--port', default=12345, type=int)
+argp.add_argument('-l', '--logfile', default=None, type=str)
 args = argp.parse_args()
 
+if args.logfile is not None:
+  sys.stdin.close()
+  sys.stderr.close()
+  sys.stdout.close()
+  sys.stderr = open(args.logfile, 'w')
+  sys.stdout = sys.stderr
+
 print 'port server running on port %d' % args.port
 
 pool = []
@@ -119,9 +127,12 @@
       self.send_header('Content-Type', 'text/plain')
       self.end_headers()
       p = int(self.path[6:])
-      del in_use[p]
-      pool.append(p)
-      self.log_message('drop port %d' % p)
+      if p in in_use:
+        del in_use[p]
+        pool.append(p)
+        self.log_message('drop known port %d' % p)
+      else:
+        self.log_message('drop unknown port %d' % p)
     elif self.path == '/version_number':
       # fetch a version string and the current process pid
       self.send_response(200)
@@ -146,6 +157,6 @@
 httpd = BaseHTTPServer.HTTPServer(('', args.port), Handler)
 while keep_running:
   httpd.handle_request()
+  sys.stderr.flush()
 
 print 'done'
-
diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py
index 048ab90..e9ae9f4 100755
--- a/tools/run_tests/run_tests.py
+++ b/tools/run_tests/run_tests.py
@@ -43,6 +43,8 @@
 import socket
 import subprocess
 import sys
+import tempfile
+import traceback
 import time
 import xml.etree.cElementTree as ET
 import urllib2
@@ -577,7 +579,7 @@
 build_configs = set(cfg.build_config for cfg in run_configs)
 
 if args.travis:
-  _FORCE_ENVIRON_FOR_WRAPPERS = {'GRPC_TRACE': 'surface,batch'}
+  _FORCE_ENVIRON_FOR_WRAPPERS = {'GRPC_TRACE': 'api'}
 
 languages = set(_LANGUAGES[l]
                 for l in itertools.chain.from_iterable(
@@ -704,35 +706,62 @@
       urllib2.urlopen('http://localhost:%d/quitquitquit' % port_server_port).read()
       time.sleep(1)
   if not running:
-    print 'starting port_server'
-    port_log = open('portlog.txt', 'w')
-    port_server = subprocess.Popen(
-        [sys.executable, 'tools/run_tests/port_server.py', '-p', '%d' % port_server_port],
-        stderr=subprocess.STDOUT,
-        stdout=port_log)
+    fd, logfile = tempfile.mkstemp()
+    os.close(fd)
+    print 'starting port_server, with log file %s' % logfile
+    args = [sys.executable, 'tools/run_tests/port_server.py', '-p', '%d' % port_server_port, '-l', logfile]
+    env = dict(os.environ)
+    env['BUILD_ID'] = 'pleaseDontKillMeJenkins'
+    if platform.system() == 'Windows':
+      port_server = subprocess.Popen(
+          args,
+          env=env,
+          creationflags = 0x00000008, # detached process
+          close_fds=True)
+    else:
+      port_server = subprocess.Popen(
+          args,
+          env=env,
+          preexec_fn=os.setsid,
+          close_fds=True)
+    time.sleep(1)
     # ensure port server is up
     waits = 0
     while True:
       if waits > 10:
+        print 'killing port server due to excessive start up waits'
         port_server.kill()
       if port_server.poll() is not None:
         print 'port_server failed to start'
-        port_log = open('portlog.txt', 'r').read()
-        print port_log
-        sys.exit(1)
+        # try one final time: maybe another build managed to start one
+        time.sleep(1)
+        try:
+          urllib2.urlopen('http://localhost:%d/get' % port_server_port,
+                          timeout=1).read()
+          print 'last ditch attempt to contact port server succeeded'
+          break
+        except:
+          traceback.print_exc();
+          port_log = open(logfile, 'r').read()
+          print port_log
+          sys.exit(1)
       try:
         urllib2.urlopen('http://localhost:%d/get' % port_server_port,
                         timeout=1).read()
+        print 'port server is up and ready'
         break
       except socket.timeout:
         print 'waiting for port_server: timeout'
-        time.sleep(0.5)
+        traceback.print_exc();
+        time.sleep(1)
         waits += 1
       except urllib2.URLError:
         print 'waiting for port_server: urlerror'
-        time.sleep(0.5)
+        traceback.print_exc();
+        time.sleep(1)
         waits += 1
       except:
+        traceback.print_exc();
         port_server.kill()
         raise