Merge pull request #27 from ctiller/faster_make
Build all targets for a config at once
diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h
index 49f88a6..b8982f4 100644
--- a/include/grpc++/stream.h
+++ b/include/grpc++/stream.h
@@ -96,7 +96,7 @@
virtual bool Read(R* msg) { return context_->Read(msg); }
- virtual void Cancel() { context_->FinishStream(Status::Cancelled, true); }
+ virtual void Cancel() { context_->Cancel(); }
virtual const Status& Wait() { return context_->Wait(); }
@@ -122,7 +122,7 @@
virtual void WritesDone() { context_->Write(nullptr, true); }
- virtual void Cancel() { context_->FinishStream(Status::Cancelled, true); }
+ virtual void Cancel() { context_->Cancel(); }
// Read the final response and wait for the final status.
virtual const Status& Wait() {
@@ -165,7 +165,7 @@
virtual void WritesDone() { context_->Write(nullptr, true); }
- virtual void Cancel() { context_->FinishStream(Status::Cancelled, true); }
+ virtual void Cancel() { context_->Cancel(); }
virtual const Status& Wait() { return context_->Wait(); }
diff --git a/include/grpc++/stream_context_interface.h b/include/grpc++/stream_context_interface.h
index 535c004..a841198 100644
--- a/include/grpc++/stream_context_interface.h
+++ b/include/grpc++/stream_context_interface.h
@@ -53,7 +53,7 @@
virtual bool Read(google::protobuf::Message* msg) = 0;
virtual bool Write(const google::protobuf::Message* msg, bool is_last) = 0;
virtual const Status& Wait() = 0;
- virtual void FinishStream(const Status& status, bool send) = 0;
+ virtual void Cancel() = 0;
virtual google::protobuf::Message* request() = 0;
virtual google::protobuf::Message* response() = 0;
diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h
index d8fd03b..45915cb 100644
--- a/include/grpc/grpc.h
+++ b/include/grpc/grpc.h
@@ -365,6 +365,16 @@
Can be called multiple times, from any thread. */
grpc_call_error grpc_call_cancel(grpc_call *call);
+/* Called by clients to cancel an RPC on the server.
+ Can be called multiple times, from any thread.
+ If a status has not been received for the call, set it to the status code
+ and description passed in.
+ Importantly, this function does not send status nor description to the
+ remote endpoint. */
+grpc_call_error grpc_call_cancel_with_status(grpc_call *call,
+ grpc_status_code status,
+ const char *description);
+
/* Queue a byte buffer for writing.
flags is a bit-field combination of the write flags defined above.
A write with byte_buffer null is allowed, and will not send any bytes on the
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc
index 1116049..94e56d7 100644
--- a/src/compiler/cpp_generator.cc
+++ b/src/compiler/cpp_generator.cc
@@ -143,16 +143,16 @@
} else if (ClientOnlyStreaming(method)) {
printer->Print(
*vars,
- "::grpc::ClientWriter<$Request$>* $Method$("
+ "::grpc::ClientWriter< $Request$>* $Method$("
"::grpc::ClientContext* context, $Response$* response);\n\n");
} else if (ServerOnlyStreaming(method)) {
printer->Print(
*vars,
- "::grpc::ClientReader<$Response$>* $Method$("
+ "::grpc::ClientReader< $Response$>* $Method$("
"::grpc::ClientContext* context, const $Request$* request);\n\n");
} else if (BidiStreaming(method)) {
printer->Print(*vars,
- "::grpc::ClientReaderWriter<$Request$, $Response$>* "
+ "::grpc::ClientReaderWriter< $Request$, $Response$>* "
"$Method$(::grpc::ClientContext* context);\n\n");
}
}
@@ -174,19 +174,20 @@
printer->Print(*vars,
"virtual ::grpc::Status $Method$("
"::grpc::ServerContext* context, "
- "::grpc::ServerReader<$Request$>* reader, "
+ "::grpc::ServerReader< $Request$>* reader, "
"$Response$* response);\n");
} else if (ServerOnlyStreaming(method)) {
printer->Print(*vars,
"virtual ::grpc::Status $Method$("
"::grpc::ServerContext* context, const $Request$* request, "
- "::grpc::ServerWriter<$Response$>* writer);\n");
+ "::grpc::ServerWriter< $Response$>* writer);\n");
} else if (BidiStreaming(method)) {
- printer->Print(*vars,
- "virtual ::grpc::Status $Method$("
- "::grpc::ServerContext* context, "
- "::grpc::ServerReaderWriter<$Response$, $Request$>* stream);"
- "\n");
+ printer->Print(
+ *vars,
+ "virtual ::grpc::Status $Method$("
+ "::grpc::ServerContext* context, "
+ "::grpc::ServerReaderWriter< $Response$, $Request$>* stream);"
+ "\n");
}
}
@@ -211,7 +212,7 @@
printer->Outdent();
printer->Print("};\n");
printer->Print(
- "static Stub* NewStub(const std::shared_ptr<::grpc::ChannelInterface>& "
+ "static Stub* NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& "
"channel);\n");
printer->Print("\n");
@@ -269,11 +270,12 @@
"context, request, response);\n"
"}\n\n");
} else if (ClientOnlyStreaming(method)) {
+ printer->Print(
+ *vars,
+ "::grpc::ClientWriter< $Request$>* $Service$::Stub::$Method$("
+ "::grpc::ClientContext* context, $Response$* response) {\n");
printer->Print(*vars,
- "::grpc::ClientWriter<$Request$>* $Service$::Stub::$Method$("
- "::grpc::ClientContext* context, $Response$* response) {\n");
- printer->Print(*vars,
- " return new ::grpc::ClientWriter<$Request$>("
+ " return new ::grpc::ClientWriter< $Request$>("
"channel()->CreateStream("
"::grpc::RpcMethod(\"/$Package$$Service$/$Method$\", "
"::grpc::RpcMethod::RpcType::CLIENT_STREAMING), "
@@ -282,10 +284,10 @@
} else if (ServerOnlyStreaming(method)) {
printer->Print(
*vars,
- "::grpc::ClientReader<$Response$>* $Service$::Stub::$Method$("
+ "::grpc::ClientReader< $Response$>* $Service$::Stub::$Method$("
"::grpc::ClientContext* context, const $Request$* request) {\n");
printer->Print(*vars,
- " return new ::grpc::ClientReader<$Response$>("
+ " return new ::grpc::ClientReader< $Response$>("
"channel()->CreateStream("
"::grpc::RpcMethod(\"/$Package$$Service$/$Method$\", "
"::grpc::RpcMethod::RpcType::SERVER_STREAMING), "
@@ -294,11 +296,11 @@
} else if (BidiStreaming(method)) {
printer->Print(
*vars,
- "::grpc::ClientReaderWriter<$Request$, $Response$>* "
+ "::grpc::ClientReaderWriter< $Request$, $Response$>* "
"$Service$::Stub::$Method$(::grpc::ClientContext* context) {\n");
printer->Print(
*vars,
- " return new ::grpc::ClientReaderWriter<$Request$, $Response$>("
+ " return new ::grpc::ClientReaderWriter< $Request$, $Response$>("
"channel()->CreateStream("
"::grpc::RpcMethod(\"/$Package$$Service$/$Method$\", "
"::grpc::RpcMethod::RpcType::BIDI_STREAMING), "
@@ -328,7 +330,7 @@
printer->Print(*vars,
"::grpc::Status $Service$::Service::$Method$("
"::grpc::ServerContext* context, "
- "::grpc::ServerReader<$Request$>* reader, "
+ "::grpc::ServerReader< $Request$>* reader, "
"$Response$* response) {\n");
printer->Print(
" return ::grpc::Status("
@@ -339,7 +341,7 @@
"::grpc::Status $Service$::Service::$Method$("
"::grpc::ServerContext* context, "
"const $Request$* request, "
- "::grpc::ServerWriter<$Response$>* writer) {\n");
+ "::grpc::ServerWriter< $Response$>* writer) {\n");
printer->Print(
" return ::grpc::Status("
"::grpc::StatusCode::UNIMPLEMENTED);\n");
@@ -348,7 +350,7 @@
printer->Print(*vars,
"::grpc::Status $Service$::Service::$Method$("
"::grpc::ServerContext* context, "
- "::grpc::ServerReaderWriter<$Response$, $Request$>* "
+ "::grpc::ServerReaderWriter< $Response$, $Request$>* "
"stream) {\n");
printer->Print(
" return ::grpc::Status("
@@ -361,13 +363,14 @@
const google::protobuf::ServiceDescriptor* service,
map<string, string>* vars) {
(*vars)["Service"] = service->name();
- printer->Print(*vars,
- "$Service$::Stub* $Service$::NewStub("
- "const std::shared_ptr<::grpc::ChannelInterface>& channel) {\n"
- " $Service$::Stub* stub = new $Service$::Stub();\n"
- " stub->set_channel(channel);\n"
- " return stub;\n"
- "};\n\n");
+ printer->Print(
+ *vars,
+ "$Service$::Stub* $Service$::NewStub("
+ "const std::shared_ptr< ::grpc::ChannelInterface>& channel) {\n"
+ " $Service$::Stub* stub = new $Service$::Stub();\n"
+ " stub->set_channel(channel);\n"
+ " return stub;\n"
+ "};\n\n");
for (int i = 0; i < service->method_count(); ++i) {
PrintSourceClientMethod(printer, service->method(i), vars);
}
@@ -400,9 +403,9 @@
"service_->AddMethod(new ::grpc::RpcServiceMethod(\n"
" \"/$Package$$Service$/$Method$\",\n"
" ::grpc::RpcMethod::NORMAL_RPC,\n"
- " new ::grpc::RpcMethodHandler<$Service$::Service, $Request$, "
+ " new ::grpc::RpcMethodHandler< $Service$::Service, $Request$, "
"$Response$>(\n"
- " std::function<::grpc::Status($Service$::Service*, "
+ " std::function< ::grpc::Status($Service$::Service*, "
"::grpc::ServerContext*, const $Request$*, $Response$*)>("
"&$Service$::Service::$Method$), this),\n"
" new $Request$, new $Response$));\n");
@@ -412,11 +415,11 @@
"service_->AddMethod(new ::grpc::RpcServiceMethod(\n"
" \"/$Package$$Service$/$Method$\",\n"
" ::grpc::RpcMethod::CLIENT_STREAMING,\n"
- " new ::grpc::ClientStreamingHandler<"
+ " new ::grpc::ClientStreamingHandler< "
"$Service$::Service, $Request$, $Response$>(\n"
- " std::function<::grpc::Status($Service$::Service*, "
+ " std::function< ::grpc::Status($Service$::Service*, "
"::grpc::ServerContext*, "
- "::grpc::ServerReader<$Request$>*, $Response$*)>("
+ "::grpc::ServerReader< $Request$>*, $Response$*)>("
"&$Service$::Service::$Method$), this),\n"
" new $Request$, new $Response$));\n");
} else if (ServerOnlyStreaming(method)) {
@@ -425,11 +428,11 @@
"service_->AddMethod(new ::grpc::RpcServiceMethod(\n"
" \"/$Package$$Service$/$Method$\",\n"
" ::grpc::RpcMethod::SERVER_STREAMING,\n"
- " new ::grpc::ServerStreamingHandler<"
+ " new ::grpc::ServerStreamingHandler< "
"$Service$::Service, $Request$, $Response$>(\n"
- " std::function<::grpc::Status($Service$::Service*, "
+ " std::function< ::grpc::Status($Service$::Service*, "
"::grpc::ServerContext*, "
- "const $Request$*, ::grpc::ServerWriter<$Response$>*)>("
+ "const $Request$*, ::grpc::ServerWriter< $Response$>*)>("
"&$Service$::Service::$Method$), this),\n"
" new $Request$, new $Response$));\n");
} else if (BidiStreaming(method)) {
@@ -438,11 +441,11 @@
"service_->AddMethod(new ::grpc::RpcServiceMethod(\n"
" \"/$Package$$Service$/$Method$\",\n"
" ::grpc::RpcMethod::BIDI_STREAMING,\n"
- " new ::grpc::BidiStreamingHandler<"
+ " new ::grpc::BidiStreamingHandler< "
"$Service$::Service, $Request$, $Response$>(\n"
- " std::function<::grpc::Status($Service$::Service*, "
+ " std::function< ::grpc::Status($Service$::Service*, "
"::grpc::ServerContext*, "
- "::grpc::ServerReaderWriter<$Response$, $Request$>*)>("
+ "::grpc::ServerReaderWriter< $Response$, $Request$>*)>("
"&$Service$::Service::$Method$), this),\n"
" new $Request$, new $Response$));\n");
}
diff --git a/src/core/support/log_posix.c b/src/core/support/log_posix.c
index 55a38b1..ee2705a 100644
--- a/src/core/support/log_posix.c
+++ b/src/core/support/log_posix.c
@@ -31,7 +31,6 @@
*
*/
-
#ifndef _POSIX_C_SOURCE
#define _POSIX_C_SOURCE 200112L
#endif
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 4ea8378..3579a5b 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -178,6 +178,7 @@
gpr_uint8 received_metadata;
gpr_uint8 have_read;
gpr_uint8 have_alarm;
+ gpr_uint8 got_status_code;
/* The current outstanding read message tag (only valid if have_read == 1) */
void *read_tag;
void *metadata_tag;
@@ -225,6 +226,7 @@
call->have_write = 0;
call->have_alarm = 0;
call->received_metadata = 0;
+ call->got_status_code = 0;
call->status_code =
server_transport_data != NULL ? GRPC_STATUS_OK : GRPC_STATUS_UNKNOWN;
call->status_details = NULL;
@@ -268,6 +270,19 @@
grpc_call_internal_unref(c);
}
+static void maybe_set_status_code(grpc_call *call, gpr_uint32 status) {
+ if (!call->got_status_code) {
+ call->status_code = status;
+ call->got_status_code = 1;
+ }
+}
+
+static void maybe_set_status_details(grpc_call *call, grpc_mdstr *status) {
+ if (!call->status_details) {
+ call->status_details = grpc_mdstr_ref(status);
+ }
+}
+
grpc_call_error grpc_call_cancel(grpc_call *c) {
grpc_call_element *elem;
grpc_call_op op;
@@ -284,6 +299,21 @@
return GRPC_CALL_OK;
}
+grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
+ grpc_status_code status,
+ const char *description) {
+ grpc_mdstr *details =
+ description ? grpc_mdstr_from_string(c->metadata_context, description)
+ : NULL;
+ gpr_mu_lock(&c->read_mu);
+ maybe_set_status_code(c, status);
+ if (details) {
+ maybe_set_status_details(c, details);
+ }
+ gpr_mu_unlock(&c->read_mu);
+ return grpc_call_cancel(c);
+}
+
void grpc_call_execute_op(grpc_call *call, grpc_call_op *op) {
grpc_call_element *elem;
GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
@@ -799,15 +829,14 @@
grpc_call *call = CALL_FROM_TOP_ELEM(elem);
grpc_mdelem *md = op->data.metadata;
grpc_mdstr *key = md->key;
+ gpr_log(GPR_DEBUG, "call %p got metadata %s %s", call,
+ grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value));
if (key == grpc_channel_get_status_string(call->channel)) {
- call->status_code = decode_status(md);
+ maybe_set_status_code(call, decode_status(md));
grpc_mdelem_unref(md);
op->done_cb(op->user_data, GRPC_OP_OK);
} else if (key == grpc_channel_get_message_string(call->channel)) {
- if (call->status_details) {
- grpc_mdstr_unref(call->status_details);
- }
- call->status_details = grpc_mdstr_ref(md->value);
+ maybe_set_status_details(call, md->value);
grpc_mdelem_unref(md);
op->done_cb(op->user_data, GRPC_OP_OK);
} else {
diff --git a/src/cpp/stream/stream_context.cc b/src/cpp/stream/stream_context.cc
index 7936a30..5ccf8c9 100644
--- a/src/cpp/stream/stream_context.cc
+++ b/src/cpp/stream/stream_context.cc
@@ -112,9 +112,8 @@
if (read_ev->data.read) {
if (!DeserializeProto(read_ev->data.read, msg)) {
ret = false;
- FinishStream(
- Status(StatusCode::DATA_LOSS, "Failed to parse incoming proto"),
- true);
+ grpc_call_cancel_with_status(call(), GRPC_STATUS_DATA_LOSS,
+ "Failed to parse incoming proto");
}
} else {
ret = false;
@@ -132,9 +131,8 @@
if (msg) {
grpc_byte_buffer* out_buf = nullptr;
if (!SerializeProto(*msg, &out_buf)) {
- FinishStream(Status(StatusCode::INVALID_ARGUMENT,
- "Failed to serialize outgoing proto"),
- true);
+ grpc_call_cancel_with_status(call(), GRPC_STATUS_INVALID_ARGUMENT,
+ "Failed to serialize outgoing proto");
return false;
}
int flag = is_last ? GRPC_WRITE_BUFFER_HINT : 0;
@@ -172,29 +170,18 @@
grpc_event_finish(metadata_ev);
// TODO(yangg) protect states by a mutex, including other places.
if (!self_halfclosed_ || !peer_halfclosed_) {
- FinishStream(Status::Cancelled, true);
- } else {
- grpc_event* finish_ev =
- grpc_completion_queue_pluck(cq(), finished_tag(), gpr_inf_future);
- GPR_ASSERT(finish_ev->type == GRPC_FINISHED);
- final_status_ = Status(
- static_cast<StatusCode>(finish_ev->data.finished.status),
- finish_ev->data.finished.details ? finish_ev->data.finished.details
- : "");
- grpc_event_finish(finish_ev);
- }
- return final_status_;
-}
-
-void StreamContext::FinishStream(const Status& status, bool send) {
- if (send) {
- grpc_call_cancel(call());
+ Cancel();
}
grpc_event* finish_ev =
grpc_completion_queue_pluck(cq(), finished_tag(), gpr_inf_future);
GPR_ASSERT(finish_ev->type == GRPC_FINISHED);
+ final_status_ = Status(
+ static_cast<StatusCode>(finish_ev->data.finished.status),
+ finish_ev->data.finished.details ? finish_ev->data.finished.details : "");
grpc_event_finish(finish_ev);
- final_status_ = status;
+ return final_status_;
}
+void StreamContext::Cancel() { grpc_call_cancel(call()); }
+
} // namespace grpc
diff --git a/src/cpp/stream/stream_context.h b/src/cpp/stream/stream_context.h
index f70fe6d..4781f27 100644
--- a/src/cpp/stream/stream_context.h
+++ b/src/cpp/stream/stream_context.h
@@ -48,7 +48,7 @@
class ClientContext;
class RpcMethod;
-class StreamContext : public StreamContextInterface {
+class StreamContext final : public StreamContextInterface {
public:
StreamContext(const RpcMethod& method, ClientContext* context,
const google::protobuf::Message* request,
@@ -63,7 +63,7 @@
bool Read(google::protobuf::Message* msg) override;
bool Write(const google::protobuf::Message* msg, bool is_last) override;
const Status& Wait() override;
- void FinishStream(const Status& status, bool send) override;
+ void Cancel() override;
google::protobuf::Message* request() override { return request_; }
google::protobuf::Message* response() override { return result_; }
diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c
index 7fd2567..6af3ded 100644
--- a/test/core/iomgr/tcp_posix_test.c
+++ b/test/core/iomgr/tcp_posix_test.c
@@ -36,7 +36,6 @@
#include <errno.h>
#include <fcntl.h>
#include <string.h>
-#include <signal.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
@@ -491,8 +490,6 @@
int main(int argc, char **argv) {
grpc_test_init(argc, argv);
grpc_iomgr_init();
- /* disable SIGPIPE */
- signal(SIGPIPE, SIG_IGN);
run_tests();
grpc_endpoint_tests(configs[0]);
grpc_iomgr_shutdown();
diff --git a/test/core/transport/chttp2_transport_end2end_test.c b/test/core/transport/chttp2_transport_end2end_test.c
index 30d2a17..8b0f9aa 100644
--- a/test/core/transport/chttp2_transport_end2end_test.c
+++ b/test/core/transport/chttp2_transport_end2end_test.c
@@ -107,9 +107,6 @@
int main(int argc, char **argv) {
size_t i;
- /* disable SIGPIPE */
- signal(SIGPIPE, SIG_IGN);
-
grpc_test_init(argc, argv);
grpc_iomgr_init();
diff --git a/test/core/util/test_config.c b/test/core/util/test_config.c
index ab2c0d8..44ab35f 100644
--- a/test/core/util/test_config.c
+++ b/test/core/util/test_config.c
@@ -35,22 +35,21 @@
#include <grpc/support/port_platform.h>
#include <stdlib.h>
+#include <signal.h>
#if GPR_GETPID_IN_UNISTD_H
#include <unistd.h>
-static int seed() {
- return getpid();
-}
+static int seed() { return getpid(); }
#endif
#if GPR_GETPID_IN_PROCESS_H
#include <process.h>
-static int seed(void) {
- return _getpid();
-}
+static int seed(void) { return _getpid(); }
#endif
void grpc_test_init(int argc, char **argv) {
+ /* disable SIGPIPE */
+ signal(SIGPIPE, SIG_IGN);
/* seed rng with pid, so we don't end up with the same random numbers as a
concurrently running test binary */
srand(seed());
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index e01a6ef..3a1da68 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -34,6 +34,7 @@
#include <chrono>
#include <thread>
+#include "test/core/util/test_config.h"
#include "test/cpp/util/echo_duplicate.pb.h"
#include "test/cpp/util/echo.pb.h"
#include "src/cpp/util/time.h"
@@ -435,6 +436,7 @@
} // namespace grpc
int main(int argc, char** argv) {
+ grpc_test_init(argc, argv);
grpc_init();
::testing::InitGoogleTest(&argc, argv);
int result = RUN_ALL_TESTS();
diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py
index 84193a1..428f6c4 100755
--- a/tools/run_tests/run_tests.py
+++ b/tools/run_tests/run_tests.py
@@ -78,17 +78,20 @@
'CONFIG=%s' % cfg] + _MAKE_TEST_TARGETS
for cfg in build_configs),
check_cancelled, maxjobs=1):
- sys.exit(1)
+ return 1
# run all the tests
- jobset.run((
+ if not jobset.run((
config.run_command(x)
for config in run_configs
for filt in filters
for x in itertools.chain.from_iterable(itertools.repeat(
glob.glob('bins/%s/%s_test' % (
config.build_config, filt)),
- runs_per_test))), check_cancelled)
+ runs_per_test))), check_cancelled):
+ return 2
+
+ return 0
if forever:
@@ -100,5 +103,5 @@
while not have_files_changed():
time.sleep(1)
else:
- _build_and_run(lambda: False)
+ sys.exit(_build_and_run(lambda: False))