Completion queue binding for new requests API change
Move completion queue binding for new requests to the new request
request time, not server instantiation time.
diff --git a/include/grpc++/async_generic_service.h b/include/grpc++/async_generic_service.h
index 911d31c..b435c6e 100644
--- a/include/grpc++/async_generic_service.h
+++ b/include/grpc++/async_generic_service.h
@@ -65,10 +65,8 @@
void RequestCall(GenericServerContext* ctx,
GenericServerAsyncReaderWriter* reader_writer,
- CompletionQueue* cq, void* tag);
-
- // The new rpc event should be obtained from this completion queue.
- CompletionQueue* completion_queue();
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq, void* tag);
private:
friend class Server;
diff --git a/include/grpc++/completion_queue.h b/include/grpc++/completion_queue.h
index 5c2b1cc..e8429c8 100644
--- a/include/grpc++/completion_queue.h
+++ b/include/grpc++/completion_queue.h
@@ -58,6 +58,7 @@
class CompletionQueue;
class Server;
+class ServerBuilder;
class ServerContext;
class CompletionQueueTag {
@@ -137,6 +138,12 @@
grpc_completion_queue* cq_; // owned
};
+class ServerCompletionQueue : public CompletionQueue {
+ private:
+ friend class ServerBuilder;
+ ServerCompletionQueue() {}
+};
+
} // namespace grpc
#endif // GRPCXX_COMPLETION_QUEUE_H
diff --git a/include/grpc++/impl/service_type.h b/include/grpc++/impl/service_type.h
index 7cd3dda..bc39bb8 100644
--- a/include/grpc++/impl/service_type.h
+++ b/include/grpc++/impl/service_type.h
@@ -39,8 +39,10 @@
namespace grpc {
class Call;
+class CompletionQueue;
class RpcService;
class Server;
+class ServerCompletionQueue;
class ServerContext;
class Status;
@@ -70,52 +72,55 @@
ServerContext* context,
::grpc::protobuf::Message* request,
ServerAsyncStreamingInterface* stream,
- CompletionQueue* cq, void* tag) = 0;
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq,
+ void* tag) = 0;
};
- AsynchronousService(CompletionQueue* cq, const char** method_names,
- size_t method_count)
- : cq_(cq),
- dispatch_impl_(nullptr),
+ AsynchronousService(const char** method_names, size_t method_count)
+ : dispatch_impl_(nullptr),
method_names_(method_names),
method_count_(method_count),
request_args_(nullptr) {}
~AsynchronousService() { delete[] request_args_; }
- CompletionQueue* completion_queue() const { return cq_; }
-
protected:
void RequestAsyncUnary(int index, ServerContext* context,
grpc::protobuf::Message* request,
ServerAsyncStreamingInterface* stream,
- CompletionQueue* cq, void* tag) {
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq, void* tag) {
dispatch_impl_->RequestAsyncCall(request_args_[index], context, request,
- stream, cq, tag);
+ stream, call_cq, notification_cq, tag);
}
void RequestClientStreaming(int index, ServerContext* context,
ServerAsyncStreamingInterface* stream,
- CompletionQueue* cq, void* tag) {
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq,
+ void* tag) {
dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr,
- stream, cq, tag);
+ stream, call_cq, notification_cq, tag);
}
void RequestServerStreaming(int index, ServerContext* context,
grpc::protobuf::Message* request,
ServerAsyncStreamingInterface* stream,
- CompletionQueue* cq, void* tag) {
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq,
+ void* tag) {
dispatch_impl_->RequestAsyncCall(request_args_[index], context, request,
- stream, cq, tag);
+ stream, call_cq, notification_cq, tag);
}
void RequestBidiStreaming(int index, ServerContext* context,
ServerAsyncStreamingInterface* stream,
- CompletionQueue* cq, void* tag) {
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq, void* tag) {
dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr,
- stream, cq, tag);
+ stream, call_cq, notification_cq, tag);
}
private:
friend class Server;
- CompletionQueue* const cq_;
DispatchImpl* dispatch_impl_;
const char** const method_names_;
size_t method_count_;
diff --git a/include/grpc++/server.h b/include/grpc++/server.h
index b2b9044..50a2416 100644
--- a/include/grpc++/server.h
+++ b/include/grpc++/server.h
@@ -101,11 +101,15 @@
void RequestAsyncCall(void* registered_method, ServerContext* context,
grpc::protobuf::Message* request,
ServerAsyncStreamingInterface* stream,
- CompletionQueue* cq, void* tag) GRPC_OVERRIDE;
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq,
+ void* tag) GRPC_OVERRIDE;
void RequestAsyncGenericCall(GenericServerContext* context,
ServerAsyncStreamingInterface* stream,
- CompletionQueue* cq, void* tag);
+ CompletionQueue* cq,
+ ServerCompletionQueue* notification_cq,
+ void* tag);
const int max_message_size_;
diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h
index 7155c7f..ecee475 100644
--- a/include/grpc++/server_builder.h
+++ b/include/grpc++/server_builder.h
@@ -46,6 +46,7 @@
class CompletionQueue;
class RpcService;
class Server;
+class ServerCompletionQueue;
class ServerCredentials;
class SynchronousService;
class ThreadPoolInterface;
@@ -82,6 +83,11 @@
// Does not take ownership.
void SetThreadPool(ThreadPoolInterface* thread_pool);
+ // Add a completion queue for handling asynchronous services
+ // Caller is required to keep this completion queue live until calling
+ // BuildAndStart()
+ std::unique_ptr<ServerCompletionQueue> AddCompletionQueue();
+
// Return a running server which is ready for processing rpcs.
std::unique_ptr<Server> BuildAndStart();
@@ -96,6 +102,7 @@
std::vector<RpcService*> services_;
std::vector<AsynchronousService*> async_services_;
std::vector<Port> ports_;
+ std::vector<ServerCompletionQueue*> cqs_;
std::shared_ptr<ServerCredentials> creds_;
AsyncGenericService* generic_service_;
ThreadPoolInterface* thread_pool_;