Progress commit on fixing up C++
diff --git a/BUILD b/BUILD
index bc8fa55..adad959 100644
--- a/BUILD
+++ b/BUILD
@@ -576,7 +576,6 @@
"src/cpp/client/channel.cc",
"src/cpp/client/channel_arguments.cc",
"src/cpp/client/client_context.cc",
- "src/cpp/client/client_unary_call.cc",
"src/cpp/client/create_channel.cc",
"src/cpp/client/credentials.cc",
"src/cpp/client/generic_stub.cc",
@@ -656,7 +655,6 @@
"src/cpp/client/channel.cc",
"src/cpp/client/channel_arguments.cc",
"src/cpp/client/client_context.cc",
- "src/cpp/client/client_unary_call.cc",
"src/cpp/client/create_channel.cc",
"src/cpp/client/credentials.cc",
"src/cpp/client/generic_stub.cc",
diff --git a/Makefile b/Makefile
index 1647daa..cfea847 100644
--- a/Makefile
+++ b/Makefile
@@ -3325,7 +3325,6 @@
src/cpp/client/channel.cc \
src/cpp/client/channel_arguments.cc \
src/cpp/client/client_context.cc \
- src/cpp/client/client_unary_call.cc \
src/cpp/client/create_channel.cc \
src/cpp/client/credentials.cc \
src/cpp/client/generic_stub.cc \
@@ -3613,7 +3612,6 @@
src/cpp/client/channel.cc \
src/cpp/client/channel_arguments.cc \
src/cpp/client/client_context.cc \
- src/cpp/client/client_unary_call.cc \
src/cpp/client/create_channel.cc \
src/cpp/client/credentials.cc \
src/cpp/client/generic_stub.cc \
diff --git a/build.json b/build.json
index 0ff85a5..710475c 100644
--- a/build.json
+++ b/build.json
@@ -72,7 +72,6 @@
"src/cpp/client/channel.cc",
"src/cpp/client/channel_arguments.cc",
"src/cpp/client/client_context.cc",
- "src/cpp/client/client_unary_call.cc",
"src/cpp/client/create_channel.cc",
"src/cpp/client/credentials.cc",
"src/cpp/client/generic_stub.cc",
diff --git a/include/grpc++/async_unary_call.h b/include/grpc++/async_unary_call.h
index a296102..d009ac6 100644
--- a/include/grpc++/async_unary_call.h
+++ b/include/grpc++/async_unary_call.h
@@ -51,7 +51,6 @@
virtual ~ClientAsyncResponseReaderInterface() {}
virtual void ReadInitialMetadata(void* tag) = 0;
virtual void Finish(R* msg, Status* status, void* tag) = 0;
-
};
template <class R>
@@ -72,15 +71,15 @@
void ReadInitialMetadata(void* tag) {
GPR_ASSERT(!context_->initial_metadata_received_);
- meta_buf_.SetOutputTag(tag);
+ meta_buf_.set_output_tag(tag);
meta_buf_.RecvInitialMetadata(context_);
call_.PerformOps(&meta_buf_);
}
void Finish(R* msg, Status* status, void* tag) {
- finish_buf_.SetOutputTag(tag);
+ finish_buf_.set_output_tag(tag);
if (!context_->initial_metadata_received_) {
- finish_buf_.AddRecvInitialMetadata(context_);
+ finish_buf_.RecvInitialMetadata(context_);
}
finish_buf_.RecvMessage(msg);
finish_buf_.ClientRecvStatus(context_, status);
@@ -92,7 +91,7 @@
Call call_;
SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> init_buf_;
CallOpSet<CallOpRecvInitialMetadata> meta_buf_;
- CallOpSet<CallOpRecvMessage<R>, CallOpClientRecvStatus> finish_buf_;
+ CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>, CallOpClientRecvStatus> finish_buf_;
};
template <class W>
@@ -105,14 +104,14 @@
void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
- meta_buf_.SetOutputTag(tag);
+ meta_buf_.set_output_tag(tag);
meta_buf_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_buf_);
}
void Finish(const W& msg, const Status& status, void* tag) {
- finish_buf_.SetOutputTag(tag);
+ finish_buf_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_buf_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
@@ -127,7 +126,7 @@
void FinishWithError(const Status& status, void* tag) {
GPR_ASSERT(!status.IsOk());
- finish_buf_.SetOutputTag(tag);
+ finish_buf_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_buf_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
diff --git a/include/grpc++/byte_buffer.h b/include/grpc++/byte_buffer.h
index ceb6262..80dedda 100644
--- a/include/grpc++/byte_buffer.h
+++ b/include/grpc++/byte_buffer.h
@@ -38,6 +38,8 @@
#include <grpc/support/log.h>
#include <grpc++/config.h>
#include <grpc++/slice.h>
+#include <grpc++/status.h>
+#include <grpc++/impl/serialization_traits.h>
#include <vector>
@@ -60,9 +62,6 @@
void Clear();
size_t Length();
- private:
- friend class CallOpBuffer;
-
// takes ownership
void set_buffer(grpc_byte_buffer* buf) {
if (buffer_) {
@@ -72,11 +71,23 @@
buffer_ = buf;
}
+ private:
+ friend class CallOpBuffer;
+
grpc_byte_buffer* buffer() const { return buffer_; }
grpc_byte_buffer* buffer_;
};
+template <>
+class SerializationTraits<ByteBuffer, void> {
+ public:
+ static Status Deserialize(grpc_byte_buffer* byte_buffer, ByteBuffer* dest, int max_message_size) {
+ dest->set_buffer(byte_buffer);
+ return Status::OK;
+ }
+};
+
} // namespace grpc
#endif // GRPCXX_BYTE_BUFFER_H
diff --git a/include/grpc++/client_context.h b/include/grpc++/client_context.h
index 6d9015f..d444c90 100644
--- a/include/grpc++/client_context.h
+++ b/include/grpc++/client_context.h
@@ -131,6 +131,12 @@
friend class ::grpc::ClientAsyncReaderWriter;
template <class R>
friend class ::grpc::ClientAsyncResponseReader;
+ template <class InputMessage, class OutputMessage>
+ friend Status BlockingUnaryCall(ChannelInterface* channel,
+ const RpcMethod& method,
+ ClientContext* context,
+ const InputMessage& request,
+ OutputMessage* result);
grpc_call* call() { return call_; }
void set_call(grpc_call* call,
diff --git a/include/grpc++/completion_queue.h b/include/grpc++/completion_queue.h
index b45b5e2..0490073 100644
--- a/include/grpc++/completion_queue.h
+++ b/include/grpc++/completion_queue.h
@@ -54,6 +54,14 @@
class ServerWriter;
template <class R, class W>
class ServerReaderWriter;
+template <class ServiceType, class RequestType, class ResponseType>
+class RpcMethodHandler;
+template <class ServiceType, class RequestType, class ResponseType>
+class ClientStreamingHandler;
+template <class ServiceType, class RequestType, class ResponseType>
+class ServerStreamingHandler;
+template <class ServiceType, class RequestType, class ResponseType>
+class BidiStreamingHandler;
class ChannelInterface;
class ClientContext;
@@ -62,6 +70,7 @@
class Server;
class ServerBuilder;
class ServerContext;
+class Status;
class CompletionQueueTag {
public:
@@ -120,6 +129,14 @@
friend class ::grpc::ServerWriter;
template <class R, class W>
friend class ::grpc::ServerReaderWriter;
+ template <class ServiceType, class RequestType, class ResponseType>
+ friend class RpcMethodHandler;
+ template <class ServiceType, class RequestType, class ResponseType>
+ friend class ClientStreamingHandler;
+ template <class ServiceType, class RequestType, class ResponseType>
+ friend class ServerStreamingHandler;
+ template <class ServiceType, class RequestType, class ResponseType>
+ friend class BidiStreamingHandler;
friend class ::grpc::Server;
friend class ::grpc::ServerContext;
template <class InputMessage, class OutputMessage>
diff --git a/include/grpc++/config_protobuf.h b/include/grpc++/config_protobuf.h
index 5ef1be1..f6938b0 100644
--- a/include/grpc++/config_protobuf.h
+++ b/include/grpc++/config_protobuf.h
@@ -34,8 +34,6 @@
#ifndef GRPCXX_CONFIG_PROTOBUF_H
#define GRPCXX_CONFIG_PROTOBUF_H
-#include <grpc++/impl/serialization_traits.h>
-
#ifndef GRPC_CUSTOM_PROTOBUF_INT64
#include <google/protobuf/stubs/common.h>
#define GRPC_CUSTOM_PROTOBUF_INT64 ::google::protobuf::int64
diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h
index f8b290a..3701e40 100644
--- a/include/grpc++/impl/call.h
+++ b/include/grpc++/impl/call.h
@@ -38,6 +38,7 @@
#include <grpc++/completion_queue.h>
#include <grpc++/config.h>
#include <grpc++/status.h>
+#include <grpc++/impl/serialization_traits.h>
#include <memory>
#include <map>
@@ -53,7 +54,7 @@
class CallNoOp {
protected:
void AddOp(grpc_op* ops, size_t* nops) {}
- void FinishOp(void* tag, bool* status) {}
+ void FinishOp(void* tag, bool* status, int max_message_size) {}
};
class CallOpSendInitialMetadata {
@@ -62,24 +63,71 @@
protected:
void AddOp(grpc_op* ops, size_t* nops);
- void FinishOp(void* tag, bool* status);
+ void FinishOp(void* tag, bool* status, int max_message_size);
};
class CallOpSendMessage {
public:
+ CallOpSendMessage() : send_buf_(nullptr) {}
+
template <class M>
- void SendMessage(const M& message);
+ bool SendMessage(const M& message) {
+ return SerializationTraits<M>::Serialize(message, &send_buf_);
+ }
protected:
- void AddOp(grpc_op* ops, size_t* nops);
- void FinishOp(void* tag, bool* status);
+ void AddOp(grpc_op* ops, size_t* nops) {
+ if (send_buf_ == nullptr) return;
+ grpc_op* op = &ops[(*nops)++];
+ op->op = GRPC_OP_SEND_MESSAGE;
+ op->data.send_message = send_buf_;
+ }
+ void FinishOp(void* tag, bool* status, int max_message_size) {
+ grpc_byte_buffer_destroy(send_buf_);
+ }
+
+ private:
+ grpc_byte_buffer* send_buf_;
};
-template <class M>
+template <class R>
class CallOpRecvMessage {
+ public:
+ CallOpRecvMessage() : got_message(false), message_(nullptr) {}
+
+ void RecvMessage(R* message) {
+ message_ = message;
+ }
+
+ bool got_message;
+
protected:
- void AddOp(grpc_op* ops, size_t* nops);
- void FinishOp(void* tag, bool* status);
+ void AddOp(grpc_op* ops, size_t* nops) {
+ if (message_ == nullptr) return;
+ grpc_op *op = &ops[(*nops)++];
+ op->op = GRPC_OP_RECV_MESSAGE;
+ op->data.recv_message = &recv_buf_;
+ }
+
+ void FinishOp(void* tag, bool* status, int max_message_size) {
+ if (message_ == nullptr) return;
+ if (recv_buf_) {
+ if (*status) {
+ got_message = true;
+ *status = SerializationTraits<R>::Deserialize(recv_buf_, message_, max_message_size).IsOk();
+ } else {
+ got_message = false;
+ grpc_byte_buffer_destroy(recv_buf_);
+ }
+ } else {
+ got_message = false;
+ *status = false;
+ }
+ }
+
+ private:
+ R* message_;
+ grpc_byte_buffer* recv_buf_;
};
class CallOpGenericRecvMessage {
@@ -87,9 +135,11 @@
template <class R>
void RecvMessage(R* message);
+ bool got_message;
+
protected:
void AddOp(grpc_op* ops, size_t* nops);
- void FinishOp(void* tag, bool* status);
+ void FinishOp(void* tag, bool* status, int max_message_size);
};
class CallOpClientSendClose {
@@ -98,7 +148,7 @@
protected:
void AddOp(grpc_op* ops, size_t* nops);
- void FinishOp(void* tag, bool* status);
+ void FinishOp(void* tag, bool* status, int max_message_size);
};
class CallOpServerSendStatus {
@@ -107,7 +157,7 @@
protected:
void AddOp(grpc_op* ops, size_t* nops);
- void FinishOp(void* tag, bool* status);
+ void FinishOp(void* tag, bool* status, int max_message_size);
};
class CallOpRecvInitialMetadata {
@@ -116,7 +166,7 @@
protected:
void AddOp(grpc_op* ops, size_t* nops);
- void FinishOp(void* tag, bool* status);
+ void FinishOp(void* tag, bool* status, int max_message_size);
};
class CallOpClientRecvStatus {
@@ -125,12 +175,18 @@
protected:
void AddOp(grpc_op* ops, size_t* nops);
- void FinishOp(void* tag, bool* status);
+ void FinishOp(void* tag, bool* status, int max_message_size);
};
class CallOpSetInterface : public CompletionQueueTag {
public:
+ CallOpSetInterface() : max_message_size_(0) {}
virtual void FillOps(grpc_op* ops, size_t* nops) = 0;
+
+ void set_max_message_size(int max_message_size) { max_message_size_ = max_message_size; }
+
+ protected:
+ int max_message_size_;
};
template <class T, int I>
@@ -145,27 +201,28 @@
public WrapAndDerive<Op5, 5>,
public WrapAndDerive<Op6, 6> {
public:
+ CallOpSet() : return_tag_(this) {}
void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE {
- this->Op1::AddOp(ops, nops);
- this->Op2::AddOp(ops, nops);
- this->Op3::AddOp(ops, nops);
- this->Op4::AddOp(ops, nops);
- this->Op5::AddOp(ops, nops);
- this->Op6::AddOp(ops, nops);
+ this->WrapAndDerive<Op1, 1>::AddOp(ops, nops);
+ this->WrapAndDerive<Op2, 2>::AddOp(ops, nops);
+ this->WrapAndDerive<Op3, 3>::AddOp(ops, nops);
+ this->WrapAndDerive<Op4, 4>::AddOp(ops, nops);
+ this->WrapAndDerive<Op5, 5>::AddOp(ops, nops);
+ this->WrapAndDerive<Op6, 6>::AddOp(ops, nops);
}
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
- this->Op1::FinishOp(*tag, status);
- this->Op2::FinishOp(*tag, status);
- this->Op3::FinishOp(*tag, status);
- this->Op4::FinishOp(*tag, status);
- this->Op5::FinishOp(*tag, status);
- this->Op6::FinishOp(*tag, status);
+ this->WrapAndDerive<Op1, 1>::FinishOp(*tag, status, max_message_size_);
+ this->WrapAndDerive<Op2, 2>::FinishOp(*tag, status, max_message_size_);
+ this->WrapAndDerive<Op3, 3>::FinishOp(*tag, status, max_message_size_);
+ this->WrapAndDerive<Op4, 4>::FinishOp(*tag, status, max_message_size_);
+ this->WrapAndDerive<Op5, 5>::FinishOp(*tag, status, max_message_size_);
+ this->WrapAndDerive<Op6, 6>::FinishOp(*tag, status, max_message_size_);
*tag = return_tag_;
return true;
}
- void SetOutputTag(void* return_tag) { return_tag_ = return_tag; }
+ void set_output_tag(void* return_tag) { return_tag_ = return_tag; }
private:
void *return_tag_;
diff --git a/include/grpc++/impl/client_unary_call.h b/include/grpc++/impl/client_unary_call.h
index 561c472..8c42fb4 100644
--- a/include/grpc++/impl/client_unary_call.h
+++ b/include/grpc++/impl/client_unary_call.h
@@ -62,12 +62,12 @@
CallOpClientSendClose,
CallOpClientRecvStatus> ops;
Status status;
- ops.AddSendInitialMetadata(context);
- ops.AddSendMessage(request);
- ops.AddRecvInitialMetadata(context);
- ops.AddRecvMessage(result);
- ops.AddClientSendClose();
- ops.AddClientRecvStatus(context, &status);
+ ops.SendInitialMetadata(context->send_initial_metadata_);
+ ops.SendMessage(request);
+ ops.RecvInitialMetadata(context);
+ ops.RecvMessage(result);
+ ops.ClientSendClose();
+ ops.ClientRecvStatus(context, &status);
call.PerformOps(&ops);
GPR_ASSERT((cq.Pluck(&ops) && ops.got_message) || !status.IsOk());
return status;
diff --git a/src/cpp/proto/proto_utils.h b/include/grpc++/impl/proto_utils.h
similarity index 74%
rename from src/cpp/proto/proto_utils.h
rename to include/grpc++/impl/proto_utils.h
index 67a775b..1a0cc31 100644
--- a/src/cpp/proto/proto_utils.h
+++ b/include/grpc++/impl/proto_utils.h
@@ -34,7 +34,11 @@
#ifndef GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H
#define GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H
-#include <grpc++/config.h>
+#include <type_traits>
+
+#include <grpc++/impl/serialization_traits.h>
+#include <grpc++/config_protobuf.h>
+#include <grpc++/status.h>
struct grpc_byte_buffer;
@@ -47,8 +51,19 @@
grpc_byte_buffer** buffer);
// The caller keeps ownership of buffer and msg.
-bool DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg,
- int max_message_size);
+Status DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg,
+ int max_message_size);
+
+template <class T>
+class SerializationTraits<T, typename std::enable_if<std::is_base_of<grpc::protobuf::Message, T>::value>::type> {
+ public:
+ static bool Serialize(const grpc::protobuf::Message& msg, grpc_byte_buffer** buffer) {
+ return SerializeProto(msg, buffer);
+ }
+ static Status Deserialize(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg, int max_message_size) {
+ return DeserializeProto(buffer, msg, max_message_size);
+ }
+};
} // namespace grpc
diff --git a/include/grpc++/impl/rpc_service_method.h b/include/grpc++/impl/rpc_service_method.h
index 50204d2..05bba6e 100644
--- a/include/grpc++/impl/rpc_service_method.h
+++ b/include/grpc++/impl/rpc_service_method.h
@@ -56,15 +56,15 @@
virtual ~MethodHandler() {}
struct HandlerParameter {
HandlerParameter(Call* c, ServerContext* context,
- const grpc::protobuf::Message* req,
- grpc::protobuf::Message* resp)
- : call(c), server_context(context), request(req), response(resp) {}
+ grpc_byte_buffer* req, int max_size)
+ : call(c), server_context(context), request(req), max_message_size(max_size) {}
Call* call;
ServerContext* server_context;
- const grpc::protobuf::Message* request;
- grpc::protobuf::Message* response;
+ // Handler required to grpc_byte_buffer_destroy this
+ grpc_byte_buffer* request;
+ int max_message_size;
};
- virtual Status RunHandler(const HandlerParameter& param) = 0;
+ virtual void RunHandler(const HandlerParameter& param) = 0;
};
// A wrapper class of an application provided rpc method handler.
@@ -77,11 +77,23 @@
ServiceType* service)
: func_(func), service_(service) {}
- Status RunHandler(const HandlerParameter& param) GRPC_FINAL {
- // Invoke application function, cast proto messages to their actual types.
- return func_(service_, param.server_context,
- dynamic_cast<const RequestType*>(param.request),
- dynamic_cast<ResponseType*>(param.response));
+ void RunHandler(const HandlerParameter& param) GRPC_FINAL {
+ RequestType req;
+ Status status = SerializationTraits<RequestType>::Deserialize(param.request, &req, param.max_message_size);
+ ResponseType rsp;
+ if (status.IsOk()) {
+ status = func_(service_, param.server_context, &req, &rsp);
+ }
+
+ GPR_ASSERT(!param.server_context->sent_initial_metadata_);
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpServerSendStatus> ops;
+ ops.SendInitialMetadata(param.server_context->initial_metadata_);
+ if (status.IsOk()) {
+ ops.SendMessage(rsp);
+ }
+ ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
+ param.call->PerformOps(&ops);
+ param.call->cq()->Pluck(&ops);
}
private:
@@ -102,10 +114,20 @@
ServiceType* service)
: func_(func), service_(service) {}
- Status RunHandler(const HandlerParameter& param) GRPC_FINAL {
+ void RunHandler(const HandlerParameter& param) GRPC_FINAL {
ServerReader<RequestType> reader(param.call, param.server_context);
- return func_(service_, param.server_context, &reader,
- dynamic_cast<ResponseType*>(param.response));
+ ResponseType rsp;
+ Status status = func_(service_, param.server_context, &reader, &rsp);
+
+ GPR_ASSERT(!param.server_context->sent_initial_metadata_);
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpServerSendStatus> ops;
+ ops.SendInitialMetadata(param.server_context->initial_metadata_);
+ if (status.IsOk()) {
+ ops.SendMessage(rsp);
+ }
+ ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
+ param.call->PerformOps(&ops);
+ param.call->cq()->Pluck(&ops);
}
private:
@@ -124,10 +146,22 @@
ServiceType* service)
: func_(func), service_(service) {}
- Status RunHandler(const HandlerParameter& param) GRPC_FINAL {
- ServerWriter<ResponseType> writer(param.call, param.server_context);
- return func_(service_, param.server_context,
- dynamic_cast<const RequestType*>(param.request), &writer);
+ void RunHandler(const HandlerParameter& param) GRPC_FINAL {
+ RequestType req;
+ Status status = SerializationTraits<RequestType>::Deserialize(param.request, &req, param.max_message_size);
+
+ if (status.IsOk()) {
+ ServerWriter<ResponseType> writer(param.call, param.server_context);
+ status = func_(service_, param.server_context, &req, &writer);
+ }
+
+ CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
+ if (!param.server_context->sent_initial_metadata_) {
+ ops.SendInitialMetadata(param.server_context->initial_metadata_);
+ }
+ ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
+ param.call->PerformOps(&ops);
+ param.call->cq()->Pluck(&ops);
}
private:
@@ -147,10 +181,18 @@
ServiceType* service)
: func_(func), service_(service) {}
- Status RunHandler(const HandlerParameter& param) GRPC_FINAL {
+ void RunHandler(const HandlerParameter& param) GRPC_FINAL {
ServerReaderWriter<ResponseType, RequestType> stream(param.call,
param.server_context);
- return func_(service_, param.server_context, &stream);
+ Status status = func_(service_, param.server_context, &stream);
+
+ CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
+ if (!param.server_context->sent_initial_metadata_) {
+ ops.SendInitialMetadata(param.server_context->initial_metadata_);
+ }
+ ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
+ param.call->PerformOps(&ops);
+ param.call->cq()->Pluck(&ops);
}
private:
@@ -162,29 +204,16 @@
// Server side rpc method class
class RpcServiceMethod : public RpcMethod {
public:
- // Takes ownership of the handler and two prototype objects.
+ // Takes ownership of the handler
RpcServiceMethod(const char* name, RpcMethod::RpcType type,
- MethodHandler* handler,
- grpc::protobuf::Message* request_prototype,
- grpc::protobuf::Message* response_prototype)
+ MethodHandler* handler)
: RpcMethod(name, type, nullptr),
- handler_(handler),
- request_prototype_(request_prototype),
- response_prototype_(response_prototype) {}
+ handler_(handler) {}
MethodHandler* handler() { return handler_.get(); }
- grpc::protobuf::Message* AllocateRequestProto() {
- return request_prototype_->New();
- }
- grpc::protobuf::Message* AllocateResponseProto() {
- return response_prototype_->New();
- }
-
private:
std::unique_ptr<MethodHandler> handler_;
- std::unique_ptr<grpc::protobuf::Message> request_prototype_;
- std::unique_ptr<grpc::protobuf::Message> response_prototype_;
};
// This class contains all the method information for an rpc service. It is
diff --git a/include/grpc++/impl/serialization_traits.h b/include/grpc++/impl/serialization_traits.h
index d21ad92..4648bbf 100644
--- a/include/grpc++/impl/serialization_traits.h
+++ b/include/grpc++/impl/serialization_traits.h
@@ -38,12 +38,9 @@
namespace grpc {
-template <class Message>
+template <class Message, class UnusedButHereForPartialTemplateSpecialization = void>
class SerializationTraits;
-typedef bool (*SerializationTraitsReadFunction)(grpc_byte_buffer* src, void* dest);
-typedef bool (*SerializationTraitsWriteFunction)(const void* src, grpc_byte_buffer* dst);
-
} // namespace grpc
#endif // GRPCXX_IMPL_SERIALIZATION_TRAITS_H
diff --git a/include/grpc++/server.h b/include/grpc++/server.h
index e0599ee..4784bac 100644
--- a/include/grpc++/server.h
+++ b/include/grpc++/server.h
@@ -41,13 +41,13 @@
#include <grpc++/config.h>
#include <grpc++/impl/call.h>
#include <grpc++/impl/grpc_library.h>
-#include <grpc++/impl/service_type.h>
#include <grpc++/impl/sync.h>
#include <grpc++/status.h>
struct grpc_server;
namespace grpc {
+
class AsynchronousService;
class GenericServerContext;
class AsyncGenericService;
@@ -99,6 +99,44 @@
void PerformOpsOnCall(CallOpSetInterface *ops, Call* call) GRPC_OVERRIDE;
+ class BaseAsyncRequest : public CompletionQueueTag {
+ public:
+ BaseAsyncRequest(Server* server,
+ ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq, void* tag);
+
+ private:
+ };
+
+ class RegisteredAsyncRequest : public BaseAsyncRequest {
+ public:
+ RegisteredAsyncRequest(Server* server, ServerContext* context,
+ ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq, void* tag)
+ : BaseAsyncRequest(server, stream, call_cq, notification_cq, tag) {}
+ };
+
+ class NoPayloadAsyncRequest : public RegisteredAsyncRequest {
+ public:
+ NoPayloadAsyncRequest(Server* server, ServerContext* context,
+ ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq, void* tag)
+ : RegisteredAsyncRequest(server, context, stream, call_cq, notification_cq, tag) {
+ }
+ };
+
+ template <class Message>
+ class PayloadAsyncRequest : public RegisteredAsyncRequest {
+ PayloadAsyncRequest(Server* server, ServerContext* context,
+ ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq, void* tag)
+ : RegisteredAsyncRequest(server, context, stream, call_cq, notification_cq, tag) {
+ }
+ };
+
+ class GenericAsyncRequest : public BaseAsyncRequest {
+ };
+
template <class Message>
void RequestAsyncCall(void* registered_method, ServerContext* context,
ServerAsyncStreamingInterface* stream,
@@ -139,8 +177,6 @@
ThreadPoolInterface* thread_pool_;
// Whether the thread pool is created and owned by the server.
bool thread_pool_owned_;
- private:
- Server() : max_message_size_(-1), server_(NULL) { abort(); }
};
} // namespace grpc
diff --git a/include/grpc++/server_context.h b/include/grpc++/server_context.h
index d88a3ae..326b6a1 100644
--- a/include/grpc++/server_context.h
+++ b/include/grpc++/server_context.h
@@ -60,6 +60,14 @@
class ServerWriter;
template <class R, class W>
class ServerReaderWriter;
+template <class ServiceType, class RequestType, class ResponseType>
+class RpcMethodHandler;
+template <class ServiceType, class RequestType, class ResponseType>
+class ClientStreamingHandler;
+template <class ServiceType, class RequestType, class ResponseType>
+class ServerStreamingHandler;
+template <class ServiceType, class RequestType, class ResponseType>
+class BidiStreamingHandler;
class Call;
class CallOpBuffer;
@@ -105,6 +113,14 @@
friend class ::grpc::ServerWriter;
template <class R, class W>
friend class ::grpc::ServerReaderWriter;
+ template <class ServiceType, class RequestType, class ResponseType>
+ friend class RpcMethodHandler;
+ template <class ServiceType, class RequestType, class ResponseType>
+ friend class ClientStreamingHandler;
+ template <class ServiceType, class RequestType, class ResponseType>
+ friend class ServerStreamingHandler;
+ template <class ServiceType, class RequestType, class ResponseType>
+ friend class BidiStreamingHandler;
// Prevent copying.
ServerContext(const ServerContext&);
diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h
index 32ba03f..39a1cc8 100644
--- a/include/grpc++/stream.h
+++ b/include/grpc++/stream.h
@@ -161,8 +161,8 @@
call_(channel->CreateCall(method, context, &cq_)) {
finish_ops_.RecvMessage(response);
- CallOpSet<CallOpRecvMessage<R>> ops;
- ops.AddSendInitialMetadata(&context->send_initial_metadata_);
+ CallOpSet<CallOpSendInitialMetadata> ops;
+ ops.SendInitialMetadata(context->send_initial_metadata_);
call_.PerformOps(&ops);
cq_.Pluck(&ops);
}
@@ -413,7 +413,7 @@
const RpcMethod& method, ClientContext* context,
const W& request, void* tag)
: context_(context), call_(channel->CreateCall(method, context, cq)) {
- init_ops_.SetOutputTag(tag);
+ init_ops_.set_output_tag(tag);
init_ops_.SendInitialMetadata(context->send_initial_metadata_);
init_ops_.SendMessage(request);
init_ops_.ClientSendClose();
@@ -423,13 +423,13 @@
void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!context_->initial_metadata_received_);
- meta_ops_.SetOutputTag(tag);
+ meta_ops_.set_output_tag(tag);
meta_ops_.RecvInitialMetadata(context_);
call_.PerformOps(&meta_ops_);
}
void Read(R* msg, void* tag) GRPC_OVERRIDE {
- read_ops_.SetOutputTag(tag);
+ read_ops_.set_output_tag(tag);
if (!context_->initial_metadata_received_) {
read_ops_.RecvInitialMetadata(context_);
}
@@ -438,7 +438,7 @@
}
void Finish(Status* status, void* tag) GRPC_OVERRIDE {
- finish_ops_.SetOutputTag(tag);
+ finish_ops_.set_output_tag(tag);
if (!context_->initial_metadata_received_) {
finish_ops_.RecvInitialMetadata(context_);
}
@@ -473,7 +473,7 @@
call_(channel->CreateCall(method, context, cq)) {
finish_ops_.RecvMessage(response);
- init_ops_.SetOutputTag(tag);
+ init_ops_.set_output_tag(tag);
init_ops_.SendInitialMetadata(context->send_initial_metadata_);
call_.PerformOps(&init_ops_);
}
@@ -481,25 +481,25 @@
void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!context_->initial_metadata_received_);
- meta_ops_.SetOutputTag(tag);
+ meta_ops_.set_output_tag(tag);
meta_ops_.RecvInitialMetadata(context_);
call_.PerformOps(&meta_ops_);
}
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
- write_ops_.SetOutputTag(tag);
+ write_ops_.set_output_tag(tag);
write_ops_.SendMessage(msg);
call_.PerformOps(&write_ops_);
}
void WritesDone(void* tag) GRPC_OVERRIDE {
- writes_done_ops_.SetOutputTag(tag);
+ writes_done_ops_.set_output_tag(tag);
writes_done_ops_.ClientSendClose();
call_.PerformOps(&writes_done_ops_);
}
void Finish(Status* status, void* tag) GRPC_OVERRIDE {
- finish_ops_.SetOutputTag(tag);
+ finish_ops_.set_output_tag(tag);
if (!context_->initial_metadata_received_) {
finish_ops_.RecvInitialMetadata(context_);
}
@@ -534,7 +534,7 @@
const RpcMethod& method, ClientContext* context,
void* tag)
: context_(context), call_(channel->CreateCall(method, context, cq)) {
- init_ops_.SetOutputTag(tag);
+ init_ops_.set_output_tag(tag);
init_ops_.SendInitialMetadata(context->send_initial_metadata_);
call_.PerformOps(&init_ops_);
}
@@ -542,34 +542,34 @@
void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!context_->initial_metadata_received_);
- meta_ops_.SetOutputTag(tag);
+ meta_ops_.set_output_tag(tag);
meta_ops_.RecvInitialMetadata(context_);
call_.PerformOps(&meta_ops_);
}
void Read(R* msg, void* tag) GRPC_OVERRIDE {
- read_ops_.SetOutputTag(tag);
+ read_ops_.set_output_tag(tag);
if (!context_->initial_metadata_received_) {
read_ops_.RecvInitialMetadata(context_);
}
- read_ops_.AddRecvMessage(msg);
+ read_ops_.RecvMessage(msg);
call_.PerformOps(&read_ops_);
}
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
- write_ops_.SetOutputTag(tag);
+ write_ops_.set_output_tag(tag);
write_ops_.SendMessage(msg);
call_.PerformOps(&write_ops_);
}
void WritesDone(void* tag) GRPC_OVERRIDE {
- writes_done_ops_.SetOutputTag(tag);
+ writes_done_ops_.set_output_tag(tag);
writes_done_ops_.ClientSendClose();
call_.PerformOps(&writes_done_ops_);
}
void Finish(Status* status, void* tag) GRPC_OVERRIDE {
- finish_ops_.SetOutputTag(tag);
+ finish_ops_.set_output_tag(tag);
if (!context_->initial_metadata_received_) {
finish_ops_.RecvInitialMetadata(context_);
}
@@ -598,20 +598,20 @@
void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
- meta_ops_.SetOutputTag(tag);
+ meta_ops_.set_output_tag(tag);
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_ops_);
}
void Read(R* msg, void* tag) GRPC_OVERRIDE {
- read_ops_.SetOutputTag(tag);
+ read_ops_.set_output_tag(tag);
read_ops_.RecvMessage(msg);
call_.PerformOps(&read_ops_);
}
void Finish(const W& msg, const Status& status, void* tag) {
- finish_ops_.SetOutputTag(tag);
+ finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
@@ -626,7 +626,7 @@
void FinishWithError(const Status& status, void* tag) {
GPR_ASSERT(!status.IsOk());
- finish_ops_.SetOutputTag(tag);
+ finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
@@ -655,14 +655,14 @@
void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
- meta_ops_.SetOutputTag(tag);
+ meta_ops_.set_output_tag(tag);
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_ops_);
}
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
- write_ops_.SetOutputTag(tag);
+ write_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
@@ -672,7 +672,7 @@
}
void Finish(const Status& status, void* tag) {
- finish_ops_.SetOutputTag(tag);
+ finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
@@ -703,20 +703,20 @@
void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
- meta_ops_.SetOutputTag(tag);
+ meta_ops_.set_output_tag(tag);
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_ops_);
}
void Read(R* msg, void* tag) GRPC_OVERRIDE {
- read_ops_.SetOutputTag(tag);
- read_ops_.AddRecvMessage(msg);
+ read_ops_.set_output_tag(tag);
+ read_ops_.RecvMessage(msg);
call_.PerformOps(&read_ops_);
}
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
- write_ops_.SetOutputTag(tag);
+ write_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
@@ -726,7 +726,7 @@
}
void Finish(const Status& status, void* tag) {
- finish_ops_.SetOutputTag(tag);
+ finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc
index b0d2b5d..d03eadd 100644
--- a/src/compiler/cpp_generator.cc
+++ b/src/compiler/cpp_generator.cc
@@ -113,6 +113,7 @@
grpc::string temp =
"#include <grpc++/impl/internal_stub.h>\n"
"#include <grpc++/impl/rpc_method.h>\n"
+ "#include <grpc++/impl/proto_utils.h>\n"
"#include <grpc++/impl/service_type.h>\n"
"#include <grpc++/async_unary_call.h>\n"
"#include <grpc++/status.h>\n"
@@ -1045,8 +1046,7 @@
" new ::grpc::RpcMethodHandler< $ns$$Service$::Service, "
"$Request$, "
"$Response$>(\n"
- " std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n"
- " new $Request$, new $Response$));\n");
+ " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
} else if (ClientOnlyStreaming(method)) {
printer->Print(
*vars,
@@ -1055,8 +1055,7 @@
" ::grpc::RpcMethod::CLIENT_STREAMING,\n"
" new ::grpc::ClientStreamingHandler< "
"$ns$$Service$::Service, $Request$, $Response$>(\n"
- " std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n"
- " new $Request$, new $Response$));\n");
+ " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
} else if (ServerOnlyStreaming(method)) {
printer->Print(
*vars,
@@ -1065,8 +1064,7 @@
" ::grpc::RpcMethod::SERVER_STREAMING,\n"
" new ::grpc::ServerStreamingHandler< "
"$ns$$Service$::Service, $Request$, $Response$>(\n"
- " std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n"
- " new $Request$, new $Response$));\n");
+ " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
} else if (BidiStreaming(method)) {
printer->Print(
*vars,
@@ -1075,8 +1073,7 @@
" ::grpc::RpcMethod::BIDI_STREAMING,\n"
" new ::grpc::BidiStreamingHandler< "
"$ns$$Service$::Service, $Request$, $Response$>(\n"
- " std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n"
- " new $Request$, new $Response$));\n");
+ " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
}
}
printer->Print("return service_;\n");
diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc
index 475a20d..6e6278c 100644
--- a/src/cpp/client/channel.cc
+++ b/src/cpp/client/channel.cc
@@ -41,7 +41,6 @@
#include <grpc/support/slice.h>
#include "src/core/profiling/timers.h"
-#include "src/cpp/proto/proto_utils.h"
#include <grpc++/channel_arguments.h>
#include <grpc++/client_context.h>
#include <grpc++/completion_queue.h>
@@ -75,14 +74,14 @@
return Call(c_call, this, cq);
}
-void Channel::PerformOpsOnCall(CallOpBuffer* buf, Call* call) {
+void Channel::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
static const size_t MAX_OPS = 8;
- size_t nops = MAX_OPS;
- grpc_op ops[MAX_OPS];
+ size_t nops = 0;
+ grpc_op cops[MAX_OPS];
GRPC_TIMER_BEGIN(GRPC_PTAG_CPP_PERFORM_OPS, call->call());
- buf->FillOps(ops, &nops);
+ ops->FillOps(cops, &nops);
GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_start_batch(call->call(), ops, nops, buf));
+ grpc_call_start_batch(call->call(), cops, nops, ops));
GRPC_TIMER_END(GRPC_PTAG_CPP_PERFORM_OPS, call->call());
}
diff --git a/src/cpp/client/channel.h b/src/cpp/client/channel.h
index cd23924..69baa41 100644
--- a/src/cpp/client/channel.h
+++ b/src/cpp/client/channel.h
@@ -44,7 +44,7 @@
namespace grpc {
class Call;
-class CallOpBuffer;
+class CallOpSetInterface;
class ChannelArguments;
class CompletionQueue;
class Credentials;
@@ -59,7 +59,7 @@
virtual void *RegisterMethod(const char *method) GRPC_OVERRIDE;
virtual Call CreateCall(const RpcMethod& method, ClientContext* context,
CompletionQueue* cq) GRPC_OVERRIDE;
- virtual void PerformOpsOnCall(CallOpBuffer* ops, Call* call) GRPC_OVERRIDE;
+ virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) GRPC_OVERRIDE;
private:
const grpc::string target_;
diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc
index 1068111..dc3d36f 100644
--- a/src/cpp/common/call.cc
+++ b/src/cpp/common/call.cc
@@ -39,10 +39,10 @@
#include <grpc++/channel_interface.h>
#include "src/core/profiling/timers.h"
-#include "src/cpp/proto/proto_utils.h"
namespace grpc {
+#if 0
CallOpBuffer::CallOpBuffer()
: return_tag_(this),
send_initial_metadata_(false),
@@ -338,6 +338,7 @@
}
return true;
}
+#endif
Call::Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq)
: call_hook_(call_hook), cq_(cq), call_(call), max_message_size_(-1) {}
@@ -349,11 +350,11 @@
call_(call),
max_message_size_(max_message_size) {}
-void Call::PerformOps(CallOpBuffer* buffer) {
+void Call::PerformOps(CallOpSetInterface* ops) {
if (max_message_size_ > 0) {
- buffer->set_max_message_size(max_message_size_);
+ ops->set_max_message_size(max_message_size_);
}
- call_hook_->PerformOpsOnCall(buffer, this);
+ call_hook_->PerformOpsOnCall(ops, this);
}
} // namespace grpc
diff --git a/src/cpp/proto/proto_utils.cc b/src/cpp/proto/proto_utils.cc
index 7a7e73b..a4a37c7 100644
--- a/src/cpp/proto/proto_utils.cc
+++ b/src/cpp/proto/proto_utils.cc
@@ -31,7 +31,7 @@
*
*/
-#include "src/cpp/proto/proto_utils.h"
+#include <grpc++/impl/proto_utils.h>
#include <grpc++/config.h>
#include <grpc/grpc.h>
@@ -157,15 +157,23 @@
return msg.SerializeToZeroCopyStream(&writer);
}
-bool DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg,
+Status DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg,
int max_message_size) {
- if (!buffer) return false;
+ if (!buffer) {
+ return Status(INVALID_ARGUMENT, "No payload");
+ }
GrpcBufferReader reader(buffer);
::grpc::protobuf::io::CodedInputStream decoder(&reader);
if (max_message_size > 0) {
decoder.SetTotalBytesLimit(max_message_size, max_message_size);
}
- return msg->ParseFromCodedStream(&decoder) && decoder.ConsumedEntireMessage();
+ if (!msg->ParseFromCodedStream(&decoder)) {
+ return Status(INVALID_ARGUMENT, msg->InitializationErrorString());
+ }
+ if (!decoder.ConsumedEntireMessage()) {
+ return Status(INVALID_ARGUMENT, "Did not read entire message");
+ }
+ return Status::OK;
}
} // namespace grpc
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index dbd88c5..c08506c 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -48,7 +48,6 @@
#include <grpc++/time.h>
#include "src/core/profiling/timers.h"
-#include "src/cpp/proto/proto_utils.h"
namespace grpc {
@@ -68,10 +67,7 @@
in_flight_(false),
has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC ||
method->method_type() ==
- RpcMethod::SERVER_STREAMING),
- has_response_payload_(method->method_type() == RpcMethod::NORMAL_RPC ||
- method->method_type() ==
- RpcMethod::CLIENT_STREAMING) {
+ RpcMethod::SERVER_STREAMING) {
grpc_metadata_array_init(&request_metadata_);
}
@@ -116,7 +112,6 @@
ctx_(mrd->deadline_, mrd->request_metadata_.metadata,
mrd->request_metadata_.count),
has_request_payload_(mrd->has_request_payload_),
- has_response_payload_(mrd->has_response_payload_),
request_payload_(mrd->request_payload_),
method_(mrd->method_) {
ctx_.call_ = mrd->call_;
@@ -133,35 +128,9 @@
}
void Run() {
- std::unique_ptr<grpc::protobuf::Message> req;
- std::unique_ptr<grpc::protobuf::Message> res;
- if (has_request_payload_) {
- GRPC_TIMER_BEGIN(GRPC_PTAG_PROTO_DESERIALIZE, call_.call());
- req.reset(method_->AllocateRequestProto());
- if (!DeserializeProto(request_payload_, req.get(),
- call_.max_message_size())) {
- // FIXME(yangg) deal with deserialization failure
- cq_.Shutdown();
- return;
- }
- GRPC_TIMER_END(GRPC_PTAG_PROTO_DESERIALIZE, call_.call());
- }
- if (has_response_payload_) {
- res.reset(method_->AllocateResponseProto());
- }
ctx_.BeginCompletionOp(&call_);
- auto status = method_->handler()->RunHandler(
- MethodHandler::HandlerParameter(&call_, &ctx_, req.get(), res.get()));
- CallOpBuffer buf;
- if (!ctx_.sent_initial_metadata_) {
- buf.AddSendInitialMetadata(&ctx_.initial_metadata_);
- }
- if (has_response_payload_) {
- buf.AddSendMessage(*res);
- }
- buf.AddServerSendStatus(&ctx_.trailing_metadata_, status);
- call_.PerformOps(&buf);
- cq_.Pluck(&buf); /* status ignored */
+ method_->handler()->RunHandler(
+ MethodHandler::HandlerParameter(&call_, &ctx_, request_payload_, call_.max_message_size()));
void* ignored_tag;
bool ignored_ok;
cq_.Shutdown();
@@ -173,7 +142,6 @@
Call call_;
ServerContext ctx_;
const bool has_request_payload_;
- const bool has_response_payload_;
grpc_byte_buffer* request_payload_;
RpcServiceMethod* const method_;
};
@@ -183,7 +151,6 @@
void* const tag_;
bool in_flight_;
const bool has_request_payload_;
- const bool has_response_payload_;
grpc_call* call_;
gpr_timespec deadline_;
grpc_metadata_array request_metadata_;
@@ -251,9 +218,9 @@
}
bool Server::RegisterAsyncService(AsynchronousService* service) {
- GPR_ASSERT(service->dispatch_impl_ == nullptr &&
+ GPR_ASSERT(service->server_ == nullptr &&
"Can only register an asynchronous service against one server.");
- service->dispatch_impl_ = this;
+ service->server_ = this;
service->request_args_ = new void*[service->method_count_];
for (size_t i = 0; i < service->method_count_; ++i) {
void* tag = grpc_server_register_method(server_, service->method_names_[i],
@@ -318,15 +285,16 @@
}
}
-void Server::PerformOpsOnCall(CallOpBuffer* buf, Call* call) {
+void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
static const size_t MAX_OPS = 8;
- size_t nops = MAX_OPS;
- grpc_op ops[MAX_OPS];
- buf->FillOps(ops, &nops);
+ size_t nops = 0;
+ grpc_op cops[MAX_OPS];
+ ops->FillOps(cops, &nops);
GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_start_batch(call->call(), ops, nops, buf));
+ grpc_call_start_batch(call->call(), cops, nops, ops));
}
+#if 0
class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
public:
AsyncRequest(Server* server, void* registered_method, ServerContext* ctx,
@@ -352,9 +320,7 @@
notification_cq->cq(), this);
}
- AsyncRequest(Server* server, GenericServerContext* ctx,
- ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
- ServerCompletionQueue* notification_cq, void* tag)
+ AsyncRequest()
: tag_(tag),
request_(nullptr),
stream_(stream),
@@ -454,6 +420,7 @@
void* tag) {
new AsyncRequest(this, context, stream, call_cq, notification_cq, tag);
}
+#endif
void Server::ScheduleCallback() {
{
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc
index 6b5e41d..eea9645 100644
--- a/src/cpp/server/server_context.cc
+++ b/src/cpp/server/server_context.cc
@@ -43,12 +43,12 @@
// CompletionOp
-class ServerContext::CompletionOp GRPC_FINAL : public CallOpBuffer {
+class ServerContext::CompletionOp GRPC_FINAL : public CallOpSetInterface {
public:
// initial refs: one in the server context, one in the cq
- CompletionOp() : refs_(2), finalized_(false), cancelled_(false) {
- AddServerRecvClose(&cancelled_);
- }
+ CompletionOp() : refs_(2), finalized_(false), cancelled_(0) {}
+
+ void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE;
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
bool CheckCancelled(CompletionQueue* cq);
@@ -59,7 +59,7 @@
grpc::mutex mu_;
int refs_;
bool finalized_;
- bool cancelled_;
+ int cancelled_;
};
void ServerContext::CompletionOp::Unref() {
@@ -73,14 +73,19 @@
bool ServerContext::CompletionOp::CheckCancelled(CompletionQueue* cq) {
cq->TryPluck(this);
grpc::lock_guard<grpc::mutex> g(mu_);
- return finalized_ ? cancelled_ : false;
+ return finalized_ ? cancelled_ != 0 : false;
+}
+
+void ServerContext::CompletionOp::FillOps(grpc_op* ops, size_t* nops) {
+ ops->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
+ ops->data.recv_close_on_server.cancelled = &cancelled_;
+ *nops = 1;
}
bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) {
- GPR_ASSERT(CallOpBuffer::FinalizeResult(tag, status));
grpc::unique_lock<grpc::mutex> lock(mu_);
finalized_ = true;
- if (!*status) cancelled_ = true;
+ if (!*status) cancelled_ = 1;
if (--refs_ == 0) {
lock.unlock();
delete this;
diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal
index feac024..70bd0b6 100644
--- a/tools/doxygen/Doxyfile.c++.internal
+++ b/tools/doxygen/Doxyfile.c++.internal
@@ -760,7 +760,7 @@
# spaces.
# Note: If this tag is empty the current directory is searched.
-INPUT = include/grpc++/async_generic_service.h include/grpc++/async_unary_call.h include/grpc++/byte_buffer.h include/grpc++/channel_arguments.h include/grpc++/channel_interface.h include/grpc++/client_context.h include/grpc++/completion_queue.h include/grpc++/config.h include/grpc++/create_channel.h include/grpc++/credentials.h include/grpc++/generic_stub.h include/grpc++/impl/call.h include/grpc++/impl/client_unary_call.h include/grpc++/impl/grpc_library.h include/grpc++/impl/internal_stub.h include/grpc++/impl/rpc_method.h include/grpc++/impl/rpc_service_method.h include/grpc++/impl/service_type.h include/grpc++/impl/sync.h include/grpc++/impl/sync_cxx11.h include/grpc++/impl/sync_no_cxx11.h include/grpc++/impl/thd.h include/grpc++/impl/thd_cxx11.h include/grpc++/impl/thd_no_cxx11.h include/grpc++/server.h include/grpc++/server_builder.h include/grpc++/server_context.h include/grpc++/server_credentials.h include/grpc++/slice.h include/grpc++/status.h include/grpc++/status_code_enum.h include/grpc++/stream.h include/grpc++/thread_pool_interface.h include/grpc++/time.h src/cpp/client/secure_credentials.h src/cpp/server/secure_server_credentials.h src/cpp/client/channel.h src/cpp/proto/proto_utils.h src/cpp/server/thread_pool.h src/cpp/client/secure_credentials.cc src/cpp/server/secure_server_credentials.cc src/cpp/client/channel.cc src/cpp/client/channel_arguments.cc src/cpp/client/client_context.cc src/cpp/client/client_unary_call.cc src/cpp/client/create_channel.cc src/cpp/client/credentials.cc src/cpp/client/generic_stub.cc src/cpp/client/insecure_credentials.cc src/cpp/client/internal_stub.cc src/cpp/common/call.cc src/cpp/common/completion_queue.cc src/cpp/common/rpc_method.cc src/cpp/proto/proto_utils.cc src/cpp/server/async_generic_service.cc src/cpp/server/create_default_thread_pool.cc src/cpp/server/insecure_server_credentials.cc src/cpp/server/server.cc src/cpp/server/server_builder.cc src/cpp/server/server_context.cc src/cpp/server/server_credentials.cc src/cpp/server/thread_pool.cc src/cpp/util/byte_buffer.cc src/cpp/util/slice.cc src/cpp/util/status.cc src/cpp/util/time.cc
+INPUT = include/grpc++/async_generic_service.h include/grpc++/async_unary_call.h include/grpc++/byte_buffer.h include/grpc++/channel_arguments.h include/grpc++/channel_interface.h include/grpc++/client_context.h include/grpc++/completion_queue.h include/grpc++/config.h include/grpc++/create_channel.h include/grpc++/credentials.h include/grpc++/generic_stub.h include/grpc++/impl/call.h include/grpc++/impl/client_unary_call.h include/grpc++/impl/grpc_library.h include/grpc++/impl/internal_stub.h include/grpc++/impl/rpc_method.h include/grpc++/impl/rpc_service_method.h include/grpc++/impl/service_type.h include/grpc++/impl/sync.h include/grpc++/impl/sync_cxx11.h include/grpc++/impl/sync_no_cxx11.h include/grpc++/impl/thd.h include/grpc++/impl/thd_cxx11.h include/grpc++/impl/thd_no_cxx11.h include/grpc++/server.h include/grpc++/server_builder.h include/grpc++/server_context.h include/grpc++/server_credentials.h include/grpc++/slice.h include/grpc++/status.h include/grpc++/status_code_enum.h include/grpc++/stream.h include/grpc++/thread_pool_interface.h include/grpc++/time.h src/cpp/client/secure_credentials.h src/cpp/server/secure_server_credentials.h src/cpp/client/channel.h src/cpp/proto/proto_utils.h src/cpp/server/thread_pool.h src/cpp/client/secure_credentials.cc src/cpp/server/secure_server_credentials.cc src/cpp/client/channel.cc src/cpp/client/channel_arguments.cc src/cpp/client/client_context.cc src/cpp/client/create_channel.cc src/cpp/client/credentials.cc src/cpp/client/generic_stub.cc src/cpp/client/insecure_credentials.cc src/cpp/client/internal_stub.cc src/cpp/common/call.cc src/cpp/common/completion_queue.cc src/cpp/common/rpc_method.cc src/cpp/proto/proto_utils.cc src/cpp/server/async_generic_service.cc src/cpp/server/create_default_thread_pool.cc src/cpp/server/insecure_server_credentials.cc src/cpp/server/server.cc src/cpp/server/server_builder.cc src/cpp/server/server_context.cc src/cpp/server/server_credentials.cc src/cpp/server/thread_pool.cc src/cpp/util/byte_buffer.cc src/cpp/util/slice.cc src/cpp/util/status.cc src/cpp/util/time.cc
# This tag can be used to specify the character encoding of the source files
# that doxygen parses. Internally doxygen uses the UTF-8 encoding. Doxygen uses
diff --git a/vsprojects/grpc++/grpc++.vcxproj b/vsprojects/grpc++/grpc++.vcxproj
index d233f9e..ee39df2 100644
--- a/vsprojects/grpc++/grpc++.vcxproj
+++ b/vsprojects/grpc++/grpc++.vcxproj
@@ -199,8 +199,6 @@
</ClCompile>
<ClCompile Include="..\..\src\cpp\client\client_context.cc">
</ClCompile>
- <ClCompile Include="..\..\src\cpp\client\client_unary_call.cc">
- </ClCompile>
<ClCompile Include="..\..\src\cpp\client\create_channel.cc">
</ClCompile>
<ClCompile Include="..\..\src\cpp\client\credentials.cc">
diff --git a/vsprojects/grpc++/grpc++.vcxproj.filters b/vsprojects/grpc++/grpc++.vcxproj.filters
index dd375c7..19c7c93 100644
--- a/vsprojects/grpc++/grpc++.vcxproj.filters
+++ b/vsprojects/grpc++/grpc++.vcxproj.filters
@@ -16,9 +16,6 @@
<ClCompile Include="..\..\src\cpp\client\client_context.cc">
<Filter>src\cpp\client</Filter>
</ClCompile>
- <ClCompile Include="..\..\src\cpp\client\client_unary_call.cc">
- <Filter>src\cpp\client</Filter>
- </ClCompile>
<ClCompile Include="..\..\src\cpp\client\create_channel.cc">
<Filter>src\cpp\client</Filter>
</ClCompile>
diff --git a/vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj b/vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj
index 9b2ef91..6148517 100644
--- a/vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj
+++ b/vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj
@@ -193,8 +193,6 @@
</ClCompile>
<ClCompile Include="..\..\src\cpp\client\client_context.cc">
</ClCompile>
- <ClCompile Include="..\..\src\cpp\client\client_unary_call.cc">
- </ClCompile>
<ClCompile Include="..\..\src\cpp\client\create_channel.cc">
</ClCompile>
<ClCompile Include="..\..\src\cpp\client\credentials.cc">
diff --git a/vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj.filters b/vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj.filters
index d616e33..6d37054 100644
--- a/vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj.filters
+++ b/vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj.filters
@@ -10,9 +10,6 @@
<ClCompile Include="..\..\src\cpp\client\client_context.cc">
<Filter>src\cpp\client</Filter>
</ClCompile>
- <ClCompile Include="..\..\src\cpp\client\client_unary_call.cc">
- <Filter>src\cpp\client</Filter>
- </ClCompile>
<ClCompile Include="..\..\src\cpp\client\create_channel.cc">
<Filter>src\cpp\client</Filter>
</ClCompile>