Completion queue binding for new requests API change
Move completion queue binding for new requests to the new request
request time, not server instantiation time.
diff --git a/include/grpc++/async_generic_service.h b/include/grpc++/async_generic_service.h
index 911d31c..b435c6e 100644
--- a/include/grpc++/async_generic_service.h
+++ b/include/grpc++/async_generic_service.h
@@ -65,10 +65,8 @@
void RequestCall(GenericServerContext* ctx,
GenericServerAsyncReaderWriter* reader_writer,
- CompletionQueue* cq, void* tag);
-
- // The new rpc event should be obtained from this completion queue.
- CompletionQueue* completion_queue();
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq, void* tag);
private:
friend class Server;
diff --git a/include/grpc++/completion_queue.h b/include/grpc++/completion_queue.h
index 5c2b1cc..e8429c8 100644
--- a/include/grpc++/completion_queue.h
+++ b/include/grpc++/completion_queue.h
@@ -58,6 +58,7 @@
class CompletionQueue;
class Server;
+class ServerBuilder;
class ServerContext;
class CompletionQueueTag {
@@ -137,6 +138,12 @@
grpc_completion_queue* cq_; // owned
};
+class ServerCompletionQueue : public CompletionQueue {
+ private:
+ friend class ServerBuilder;
+ ServerCompletionQueue() {}
+};
+
} // namespace grpc
#endif // GRPCXX_COMPLETION_QUEUE_H
diff --git a/include/grpc++/impl/service_type.h b/include/grpc++/impl/service_type.h
index 7cd3dda..bc39bb8 100644
--- a/include/grpc++/impl/service_type.h
+++ b/include/grpc++/impl/service_type.h
@@ -39,8 +39,10 @@
namespace grpc {
class Call;
+class CompletionQueue;
class RpcService;
class Server;
+class ServerCompletionQueue;
class ServerContext;
class Status;
@@ -70,52 +72,55 @@
ServerContext* context,
::grpc::protobuf::Message* request,
ServerAsyncStreamingInterface* stream,
- CompletionQueue* cq, void* tag) = 0;
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq,
+ void* tag) = 0;
};
- AsynchronousService(CompletionQueue* cq, const char** method_names,
- size_t method_count)
- : cq_(cq),
- dispatch_impl_(nullptr),
+ AsynchronousService(const char** method_names, size_t method_count)
+ : dispatch_impl_(nullptr),
method_names_(method_names),
method_count_(method_count),
request_args_(nullptr) {}
~AsynchronousService() { delete[] request_args_; }
- CompletionQueue* completion_queue() const { return cq_; }
-
protected:
void RequestAsyncUnary(int index, ServerContext* context,
grpc::protobuf::Message* request,
ServerAsyncStreamingInterface* stream,
- CompletionQueue* cq, void* tag) {
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq, void* tag) {
dispatch_impl_->RequestAsyncCall(request_args_[index], context, request,
- stream, cq, tag);
+ stream, call_cq, notification_cq, tag);
}
void RequestClientStreaming(int index, ServerContext* context,
ServerAsyncStreamingInterface* stream,
- CompletionQueue* cq, void* tag) {
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq,
+ void* tag) {
dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr,
- stream, cq, tag);
+ stream, call_cq, notification_cq, tag);
}
void RequestServerStreaming(int index, ServerContext* context,
grpc::protobuf::Message* request,
ServerAsyncStreamingInterface* stream,
- CompletionQueue* cq, void* tag) {
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq,
+ void* tag) {
dispatch_impl_->RequestAsyncCall(request_args_[index], context, request,
- stream, cq, tag);
+ stream, call_cq, notification_cq, tag);
}
void RequestBidiStreaming(int index, ServerContext* context,
ServerAsyncStreamingInterface* stream,
- CompletionQueue* cq, void* tag) {
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq, void* tag) {
dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr,
- stream, cq, tag);
+ stream, call_cq, notification_cq, tag);
}
private:
friend class Server;
- CompletionQueue* const cq_;
DispatchImpl* dispatch_impl_;
const char** const method_names_;
size_t method_count_;
diff --git a/include/grpc++/server.h b/include/grpc++/server.h
index b2b9044..50a2416 100644
--- a/include/grpc++/server.h
+++ b/include/grpc++/server.h
@@ -101,11 +101,15 @@
void RequestAsyncCall(void* registered_method, ServerContext* context,
grpc::protobuf::Message* request,
ServerAsyncStreamingInterface* stream,
- CompletionQueue* cq, void* tag) GRPC_OVERRIDE;
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq,
+ void* tag) GRPC_OVERRIDE;
void RequestAsyncGenericCall(GenericServerContext* context,
ServerAsyncStreamingInterface* stream,
- CompletionQueue* cq, void* tag);
+ CompletionQueue* cq,
+ ServerCompletionQueue* notification_cq,
+ void* tag);
const int max_message_size_;
diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h
index 7155c7f..ecee475 100644
--- a/include/grpc++/server_builder.h
+++ b/include/grpc++/server_builder.h
@@ -46,6 +46,7 @@
class CompletionQueue;
class RpcService;
class Server;
+class ServerCompletionQueue;
class ServerCredentials;
class SynchronousService;
class ThreadPoolInterface;
@@ -82,6 +83,11 @@
// Does not take ownership.
void SetThreadPool(ThreadPoolInterface* thread_pool);
+ // Add a completion queue for handling asynchronous services
+ // Caller is required to keep this completion queue live until calling
+ // BuildAndStart()
+ std::unique_ptr<ServerCompletionQueue> AddCompletionQueue();
+
// Return a running server which is ready for processing rpcs.
std::unique_ptr<Server> BuildAndStart();
@@ -96,6 +102,7 @@
std::vector<RpcService*> services_;
std::vector<AsynchronousService*> async_services_;
std::vector<Port> ports_;
+ std::vector<ServerCompletionQueue*> cqs_;
std::shared_ptr<ServerCredentials> creds_;
AsyncGenericService* generic_service_;
ThreadPoolInterface* thread_pool_;
diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h
index 9bb826f..be12356 100644
--- a/include/grpc/grpc.h
+++ b/include/grpc/grpc.h
@@ -460,7 +460,8 @@
grpc_call_error grpc_server_request_call(
grpc_server *server, grpc_call **call, grpc_call_details *details,
grpc_metadata_array *request_metadata,
- grpc_completion_queue *cq_bound_to_call, void *tag_new);
+ grpc_completion_queue *cq_bound_to_call,
+ grpc_completion_queue *cq_for_notification, void *tag_new);
/* Registers a method in the server.
Methods to this (host, method) pair will not be reported by
@@ -470,21 +471,26 @@
Must be called before grpc_server_start.
Returns NULL on failure. */
void *grpc_server_register_method(grpc_server *server, const char *method,
- const char *host,
- grpc_completion_queue *new_call_cq);
+ const char *host);
/* Request notification of a new pre-registered call */
grpc_call_error grpc_server_request_registered_call(
grpc_server *server, void *registered_method, grpc_call **call,
gpr_timespec *deadline, grpc_metadata_array *request_metadata,
grpc_byte_buffer **optional_payload,
- grpc_completion_queue *cq_bound_to_call, void *tag_new);
+ grpc_completion_queue *cq_bound_to_call,
+ grpc_completion_queue *cq_for_notification, void *tag_new);
/* Create a server. Additional configuration for each incoming channel can
be specified with args. If no additional configuration is needed, args can
be NULL. See grpc_channel_args for more. */
-grpc_server *grpc_server_create(grpc_completion_queue *cq,
- const grpc_channel_args *args);
+grpc_server *grpc_server_create(const grpc_channel_args *args);
+
+/* Register a completion queue with the server. Must be done for any completion
+ queue that is passed to grpc_server_request_* call. Must be performed prior
+ to grpc_server_start. */
+void grpc_server_register_completion_queue(grpc_server *server,
+ grpc_completion_queue *cq);
/* Add a HTTP2 over plaintext over tcp listener.
Returns bound port number on success, 0 on failure.