Merge pull request #3754 from ctiller/latent-see
Latency profiling support
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc
index 1bf2b16..3c8ca8a 100644
--- a/src/compiler/cpp_generator.cc
+++ b/src/compiler/cpp_generator.cc
@@ -585,7 +585,7 @@
"class Service : public ::grpc::SynchronousService {\n"
" public:\n");
printer->Indent();
- printer->Print("Service() : service_(nullptr) {}\n");
+ printer->Print("Service();\n");
printer->Print("virtual ~Service();\n");
for (int i = 0; i < service->method_count(); ++i) {
PrintHeaderServerMethodSync(printer, service->method(i), vars);
@@ -594,7 +594,7 @@
printer->Outdent();
printer->Print(
" private:\n"
- " ::grpc::RpcService* service_;\n");
+ " std::unique_ptr< ::grpc::RpcService> service_;\n");
printer->Print("};\n");
// Server side - Asynchronous
@@ -1014,8 +1014,10 @@
"{}\n\n");
printer->Print(*vars,
+ "$ns$$Service$::Service::Service() {\n"
+ "}\n\n");
+ printer->Print(*vars,
"$ns$$Service$::Service::~Service() {\n"
- " delete service_;\n"
"}\n\n");
for (int i = 0; i < service->method_count(); ++i) {
(*vars)["Idx"] = as_string(i);
@@ -1026,10 +1028,10 @@
"::grpc::RpcService* $ns$$Service$::Service::service() {\n");
printer->Indent();
printer->Print(
- "if (service_ != nullptr) {\n"
- " return service_;\n"
+ "if (service_) {\n"
+ " return service_.get();\n"
"}\n");
- printer->Print("service_ = new ::grpc::RpcService();\n");
+ printer->Print("service_ = std::unique_ptr< ::grpc::RpcService>(new ::grpc::RpcService());\n");
for (int i = 0; i < service->method_count(); ++i) {
const grpc::protobuf::MethodDescriptor *method = service->method(i);
(*vars)["Idx"] = as_string(i);
@@ -1077,7 +1079,7 @@
" std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
}
}
- printer->Print("return service_;\n");
+ printer->Print("return service_.get();\n");
printer->Outdent();
printer->Print("}\n\n");
}
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index dfa71c9..9f85557 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -198,13 +198,12 @@
return 1;
}
-static void started_call(grpc_exec_ctx *exec_ctx, void *arg,
- int iomgr_success) {
+static void started_call_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ int iomgr_success) {
call_data *calld = arg;
grpc_transport_stream_op op;
int have_waiting;
- gpr_mu_lock(&calld->mu_state);
if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL) {
memset(&op, 0, sizeof(op));
op.cancel_with_status = GRPC_STATUS_CANCELLED;
@@ -232,10 +231,18 @@
}
}
+static void started_call(grpc_exec_ctx *exec_ctx, void *arg,
+ int iomgr_success) {
+ call_data *calld = arg;
+ gpr_mu_lock(&calld->mu_state);
+ started_call_locked(exec_ctx, arg, iomgr_success);
+}
+
static void picked_target(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) {
call_data *calld = arg;
grpc_pollset *pollset;
+ grpc_subchannel_call_create_status call_creation_status;
GPR_TIMER_BEGIN("picked_target", 0);
@@ -252,11 +259,15 @@
GPR_ASSERT(calld->state == CALL_WAITING_FOR_PICK);
calld->state = CALL_WAITING_FOR_CALL;
pollset = calld->waiting_op.bind_pollset;
- gpr_mu_unlock(&calld->mu_state);
grpc_closure_init(&calld->async_setup_task, started_call, calld);
- grpc_subchannel_create_call(exec_ctx, calld->picked_channel, pollset,
- &calld->subchannel_call,
- &calld->async_setup_task);
+ call_creation_status = grpc_subchannel_create_call(
+ exec_ctx, calld->picked_channel, pollset, &calld->subchannel_call,
+ &calld->async_setup_task);
+ if (call_creation_status == GRPC_SUBCHANNEL_CALL_CREATE_READY) {
+ started_call_locked(exec_ctx, calld, iomgr_success);
+ } else {
+ gpr_mu_unlock(&calld->mu_state);
+ }
}
}
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index a2c521a..5e84dec 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -335,18 +335,20 @@
static void continue_creating_call(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) {
+ grpc_subchannel_call_create_status call_creation_status;
waiting_for_connect *w4c = arg;
grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel, w4c->pollset);
- grpc_subchannel_create_call(exec_ctx, w4c->subchannel, w4c->pollset,
- w4c->target, w4c->notify);
+ call_creation_status = grpc_subchannel_create_call(
+ exec_ctx, w4c->subchannel, w4c->pollset, w4c->target, w4c->notify);
+ GPR_ASSERT(call_creation_status == GRPC_SUBCHANNEL_CALL_CREATE_READY);
+ w4c->notify->cb(exec_ctx, w4c->notify->cb_arg, iomgr_success);
GRPC_SUBCHANNEL_UNREF(exec_ctx, w4c->subchannel, "waiting_for_connect");
gpr_free(w4c);
}
-void grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, grpc_subchannel *c,
- grpc_pollset *pollset,
- grpc_subchannel_call **target,
- grpc_closure *notify) {
+grpc_subchannel_call_create_status grpc_subchannel_create_call(
+ grpc_exec_ctx *exec_ctx, grpc_subchannel *c, grpc_pollset *pollset,
+ grpc_subchannel_call **target, grpc_closure *notify) {
connection *con;
gpr_mu_lock(&c->mu);
if (c->active != NULL) {
@@ -355,7 +357,7 @@
gpr_mu_unlock(&c->mu);
*target = create_call(exec_ctx, con);
- notify->cb(exec_ctx, notify->cb_arg, 1);
+ return GRPC_SUBCHANNEL_CALL_CREATE_READY;
} else {
waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c));
w4c->next = c->waiting;
@@ -380,6 +382,7 @@
} else {
gpr_mu_unlock(&c->mu);
}
+ return GRPC_SUBCHANNEL_CALL_CREATE_PENDING;
}
}
diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h
index 86b7fa5..a26d08f 100644
--- a/src/core/client_config/subchannel.h
+++ b/src/core/client_config/subchannel.h
@@ -75,12 +75,22 @@
grpc_subchannel_call *call
GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
-/** construct a call (possibly asynchronously) */
-void grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *subchannel,
- grpc_pollset *pollset,
- grpc_subchannel_call **target,
- grpc_closure *notify);
+typedef enum {
+ GRPC_SUBCHANNEL_CALL_CREATE_READY,
+ GRPC_SUBCHANNEL_CALL_CREATE_PENDING
+} grpc_subchannel_call_create_status;
+
+/** construct a subchannel call (possibly asynchronously).
+ *
+ * If the returned status is \a GRPC_SUBCHANNEL_CALL_CREATE_READY, the call will
+ * return immediately and \a target will point to a connected \a subchannel_call
+ * instance. Note that \a notify will \em not be invoked in this case.
+ * Otherwise, if the returned status is GRPC_SUBCHANNEL_CALL_CREATE_PENDING, the
+ * subchannel call will be created asynchronously, invoking the \a notify
+ * callback upon completion. */
+grpc_subchannel_call_create_status grpc_subchannel_create_call(
+ grpc_exec_ctx *exec_ctx, grpc_subchannel *subchannel, grpc_pollset *pollset,
+ grpc_subchannel_call **target, grpc_closure *notify);
/** process a transport level op */
void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index 231bc98..7ff80e6 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -286,7 +286,7 @@
void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
gpr_mu_lock(&fd->mu);
- GPR_ASSERT(!gpr_atm_no_barrier_load(&fd->shutdown));
+ GPR_ASSERT(!fd->shutdown);
fd->shutdown = 1;
set_ready_locked(exec_ctx, fd, &fd->read_closure);
set_ready_locked(exec_ctx, fd, &fd->write_closure);
@@ -320,7 +320,7 @@
gpr_mu_lock(&fd->mu);
/* if we are shutdown, then don't add to the watcher set */
- if (gpr_atm_no_barrier_load(&fd->shutdown)) {
+ if (fd->shutdown) {
watcher->fd = NULL;
watcher->pollset = NULL;
watcher->worker = NULL;
diff --git a/src/core/tsi/ssl_transport_security.c b/src/core/tsi/ssl_transport_security.c
index 05789f0..22b5796 100644
--- a/src/core/tsi/ssl_transport_security.c
+++ b/src/core/tsi/ssl_transport_security.c
@@ -319,8 +319,9 @@
/* TODO(jboeuf): Maybe add more properties. */
GENERAL_NAMES *subject_alt_names =
X509_get_ext_d2i(cert, NID_subject_alt_name, 0, 0);
- int subject_alt_name_count =
- (subject_alt_names != NULL) ? sk_GENERAL_NAME_num(subject_alt_names) : 0;
+ int subject_alt_name_count = (subject_alt_names != NULL)
+ ? (int)sk_GENERAL_NAME_num(subject_alt_names)
+ : 0;
size_t property_count;
tsi_result result;
GPR_ASSERT(subject_alt_name_count >= 0);
@@ -358,7 +359,7 @@
unsigned long err;
while ((err = ERR_get_error()) != 0) {
char details[256];
- ERR_error_string_n(err, details, sizeof(details));
+ ERR_error_string_n((uint32_t)err, details, sizeof(details));
gpr_log(GPR_ERROR, "%s", details);
}
}
@@ -668,7 +669,7 @@
tsi_result result = TSI_OK;
/* First see if we have some pending data in the SSL BIO. */
- int pending_in_ssl = BIO_pending(impl->from_ssl);
+ int pending_in_ssl = (int)BIO_pending(impl->from_ssl);
if (pending_in_ssl > 0) {
*unprotected_bytes_size = 0;
GPR_ASSERT(*protected_output_frames_size <= INT_MAX);
@@ -726,7 +727,7 @@
impl->buffer_offset = 0;
}
- pending = BIO_pending(impl->from_ssl);
+ pending = (int)BIO_pending(impl->from_ssl);
GPR_ASSERT(pending >= 0);
*still_pending_size = (size_t)pending;
if (*still_pending_size == 0) return TSI_OK;
@@ -739,7 +740,7 @@
return TSI_INTERNAL_ERROR;
}
*protected_output_frames_size = (size_t)read_from_ssl;
- pending = BIO_pending(impl->from_ssl);
+ pending = (int)BIO_pending(impl->from_ssl);
GPR_ASSERT(pending >= 0);
*still_pending_size = (size_t)pending;
return TSI_OK;
diff --git a/src/csharp/Grpc.Core.Tests/PInvokeTest.cs b/src/csharp/Grpc.Core.Tests/PInvokeTest.cs
index 714c2f7..073c502 100644
--- a/src/csharp/Grpc.Core.Tests/PInvokeTest.cs
+++ b/src/csharp/Grpc.Core.Tests/PInvokeTest.cs
@@ -60,7 +60,7 @@
public void CompletionQueueCreateDestroyBenchmark()
{
BenchmarkUtil.RunBenchmark(
- 100000, 1000000,
+ 10, 10,
() =>
{
CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create();
diff --git a/src/php/lib/Grpc/BaseStub.php b/src/php/lib/Grpc/BaseStub.php
index 381b114..0a3e1f7 100755
--- a/src/php/lib/Grpc/BaseStub.php
+++ b/src/php/lib/Grpc/BaseStub.php
@@ -114,7 +114,7 @@
return true;
}
if ($new_state == \Grpc\CHANNEL_FATAL_FAILURE) {
- throw new Exception('Failed to connect to server');
+ throw new \Exception('Failed to connect to server');
}
return false;
}
@@ -153,6 +153,25 @@
return array($metadata_copy, $timeout);
}
+ /**
+ * validate and normalize the metadata array
+ * @param $metadata The metadata map
+ * @return $metadata Validated and key-normalized metadata map
+ * @throw InvalidArgumentException if key contains invalid characters
+ */
+ private function _validate_and_normalize_metadata($metadata) {
+ $metadata_copy = array();
+ foreach ($metadata as $key => $value) {
+ if (!preg_match('/^[A-Za-z\d_-]+$/', $key)) {
+ throw new \InvalidArgumentException(
+ 'Metadata keys must be nonempty strings containing only '.
+ 'alphanumeric characters, hyphens and underscores');
+ }
+ $metadata_copy[strtolower($key)] = $value;
+ }
+ return $metadata_copy;
+ }
+
/* This class is intended to be subclassed by generated code, so all functions
begin with "_" to avoid name collisions. */
@@ -178,6 +197,7 @@
$actual_metadata,
$jwt_aud_uri);
}
+ $actual_metadata = $this->_validate_and_normalize_metadata($actual_metadata);
$call->start($argument, $actual_metadata, $options);
return $call;
}
@@ -204,6 +224,7 @@
$actual_metadata,
$jwt_aud_uri);
}
+ $actual_metadata = $this->_validate_and_normalize_metadata($actual_metadata);
$call->start($actual_metadata);
return $call;
}
@@ -231,6 +252,7 @@
$actual_metadata,
$jwt_aud_uri);
}
+ $actual_metadata = $this->_validate_and_normalize_metadata($actual_metadata);
$call->start($argument, $actual_metadata, $options);
return $call;
}
@@ -254,6 +276,7 @@
$actual_metadata,
$jwt_aud_uri);
}
+ $actual_metadata = $this->_validate_and_normalize_metadata($actual_metadata);
$call->start($actual_metadata);
return $call;
}
diff --git a/src/php/tests/generated_code/AbstractGeneratedCodeTest.php b/src/php/tests/generated_code/AbstractGeneratedCodeTest.php
index 9cee188..5cdba1e 100644
--- a/src/php/tests/generated_code/AbstractGeneratedCodeTest.php
+++ b/src/php/tests/generated_code/AbstractGeneratedCodeTest.php
@@ -51,6 +51,14 @@
$this->assertTrue(is_string(self::$client->getTarget()));
}
+ /**
+ * @expectedException InvalidArgumentException
+ */
+ public function testInvalidMetadata() {
+ $div_arg = new math\DivArgs();
+ $call = self::$client->Div($div_arg, array(' ' => 'abc123'));
+ }
+
public function testWriteFlags() {
$div_arg = new math\DivArgs();
$div_arg->setDividend(7);
diff --git a/tools/run_tests/port_server.py b/tools/run_tests/port_server.py
index b953df9..3b85486 100755
--- a/tools/run_tests/port_server.py
+++ b/tools/run_tests/port_server.py
@@ -42,7 +42,7 @@
# increment this number whenever making a change to ensure that
# the changes are picked up by running CI servers
# note that all changes must be backwards compatible
-_MY_VERSION = 2
+_MY_VERSION = 5
if len(sys.argv) == 2 and sys.argv[1] == 'dump_version':
@@ -52,8 +52,16 @@
argp = argparse.ArgumentParser(description='Server for httpcli_test')
argp.add_argument('-p', '--port', default=12345, type=int)
+argp.add_argument('-l', '--logfile', default=None, type=str)
args = argp.parse_args()
+if args.logfile is not None:
+ sys.stdin.close()
+ sys.stderr.close()
+ sys.stdout.close()
+ sys.stderr = open(args.logfile, 'w')
+ sys.stdout = sys.stderr
+
print 'port server running on port %d' % args.port
pool = []
@@ -119,9 +127,12 @@
self.send_header('Content-Type', 'text/plain')
self.end_headers()
p = int(self.path[6:])
- del in_use[p]
- pool.append(p)
- self.log_message('drop port %d' % p)
+ if p in in_use:
+ del in_use[p]
+ pool.append(p)
+ self.log_message('drop known port %d' % p)
+ else:
+ self.log_message('drop unknown port %d' % p)
elif self.path == '/version_number':
# fetch a version string and the current process pid
self.send_response(200)
@@ -146,6 +157,6 @@
httpd = BaseHTTPServer.HTTPServer(('', args.port), Handler)
while keep_running:
httpd.handle_request()
+ sys.stderr.flush()
print 'done'
-
diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py
index 048ab90..e9ae9f4 100755
--- a/tools/run_tests/run_tests.py
+++ b/tools/run_tests/run_tests.py
@@ -43,6 +43,8 @@
import socket
import subprocess
import sys
+import tempfile
+import traceback
import time
import xml.etree.cElementTree as ET
import urllib2
@@ -577,7 +579,7 @@
build_configs = set(cfg.build_config for cfg in run_configs)
if args.travis:
- _FORCE_ENVIRON_FOR_WRAPPERS = {'GRPC_TRACE': 'surface,batch'}
+ _FORCE_ENVIRON_FOR_WRAPPERS = {'GRPC_TRACE': 'api'}
languages = set(_LANGUAGES[l]
for l in itertools.chain.from_iterable(
@@ -704,35 +706,62 @@
urllib2.urlopen('http://localhost:%d/quitquitquit' % port_server_port).read()
time.sleep(1)
if not running:
- print 'starting port_server'
- port_log = open('portlog.txt', 'w')
- port_server = subprocess.Popen(
- [sys.executable, 'tools/run_tests/port_server.py', '-p', '%d' % port_server_port],
- stderr=subprocess.STDOUT,
- stdout=port_log)
+ fd, logfile = tempfile.mkstemp()
+ os.close(fd)
+ print 'starting port_server, with log file %s' % logfile
+ args = [sys.executable, 'tools/run_tests/port_server.py', '-p', '%d' % port_server_port, '-l', logfile]
+ env = dict(os.environ)
+ env['BUILD_ID'] = 'pleaseDontKillMeJenkins'
+ if platform.system() == 'Windows':
+ port_server = subprocess.Popen(
+ args,
+ env=env,
+ creationflags = 0x00000008, # detached process
+ close_fds=True)
+ else:
+ port_server = subprocess.Popen(
+ args,
+ env=env,
+ preexec_fn=os.setsid,
+ close_fds=True)
+ time.sleep(1)
# ensure port server is up
waits = 0
while True:
if waits > 10:
+ print 'killing port server due to excessive start up waits'
port_server.kill()
if port_server.poll() is not None:
print 'port_server failed to start'
- port_log = open('portlog.txt', 'r').read()
- print port_log
- sys.exit(1)
+ # try one final time: maybe another build managed to start one
+ time.sleep(1)
+ try:
+ urllib2.urlopen('http://localhost:%d/get' % port_server_port,
+ timeout=1).read()
+ print 'last ditch attempt to contact port server succeeded'
+ break
+ except:
+ traceback.print_exc();
+ port_log = open(logfile, 'r').read()
+ print port_log
+ sys.exit(1)
try:
urllib2.urlopen('http://localhost:%d/get' % port_server_port,
timeout=1).read()
+ print 'port server is up and ready'
break
except socket.timeout:
print 'waiting for port_server: timeout'
- time.sleep(0.5)
+ traceback.print_exc();
+ time.sleep(1)
waits += 1
except urllib2.URLError:
print 'waiting for port_server: urlerror'
- time.sleep(0.5)
+ traceback.print_exc();
+ time.sleep(1)
waits += 1
except:
+ traceback.print_exc();
port_server.kill()
raise