Merge pull request #13297 from yang-g/pollcv

Use same cq for client and server
diff --git a/include/grpc++/impl/codegen/config_protobuf.h b/include/grpc++/impl/codegen/config_protobuf.h
index c5e5bdf..7387fa2 100644
--- a/include/grpc++/impl/codegen/config_protobuf.h
+++ b/include/grpc++/impl/codegen/config_protobuf.h
@@ -19,6 +19,8 @@
 #ifndef GRPCXX_IMPL_CODEGEN_CONFIG_PROTOBUF_H
 #define GRPCXX_IMPL_CODEGEN_CONFIG_PROTOBUF_H
 
+#define GRPC_OPEN_SOURCE_PROTO
+
 #ifndef GRPC_CUSTOM_PROTOBUF_INT64
 #include <google/protobuf/stubs/common.h>
 #define GRPC_CUSTOM_PROTOBUF_INT64 ::google::protobuf::int64
diff --git a/include/grpc++/impl/codegen/core_codegen.h b/include/grpc++/impl/codegen/core_codegen.h
index c751c1e..d7c57be 100644
--- a/include/grpc++/impl/codegen/core_codegen.h
+++ b/include/grpc++/impl/codegen/core_codegen.h
@@ -50,6 +50,9 @@
   void* gpr_malloc(size_t size) override;
   void gpr_free(void* p) override;
 
+  void grpc_init() override;
+  void grpc_shutdown() override;
+
   void gpr_mu_init(gpr_mu* mu) override;
   void gpr_mu_destroy(gpr_mu* mu) override;
   void gpr_mu_lock(gpr_mu* mu) override;
@@ -89,6 +92,7 @@
   grpc_slice grpc_slice_ref(grpc_slice slice) override;
   grpc_slice grpc_slice_split_tail(grpc_slice* s, size_t split) override;
   grpc_slice grpc_slice_split_head(grpc_slice* s, size_t split) override;
+  grpc_slice grpc_slice_sub(grpc_slice s, size_t begin, size_t end) override;
   void grpc_slice_buffer_add(grpc_slice_buffer* sb, grpc_slice slice) override;
   void grpc_slice_buffer_pop(grpc_slice_buffer* sb) override;
   grpc_slice grpc_slice_from_static_buffer(const void* buffer,
diff --git a/include/grpc++/impl/codegen/core_codegen_interface.h b/include/grpc++/impl/codegen/core_codegen_interface.h
index a4c50da..1949cda 100644
--- a/include/grpc++/impl/codegen/core_codegen_interface.h
+++ b/include/grpc++/impl/codegen/core_codegen_interface.h
@@ -63,6 +63,13 @@
   virtual void* gpr_malloc(size_t size) = 0;
   virtual void gpr_free(void* p) = 0;
 
+  // These are only to be used to fix edge cases involving grpc_init and
+  // grpc_shutdown. Calling grpc_init from the codegen interface before
+  // the real grpc_init is called will cause a crash, so if you use this
+  // function, ensure that it is not the first call to grpc_init.
+  virtual void grpc_init() = 0;
+  virtual void grpc_shutdown() = 0;
+
   virtual void gpr_mu_init(gpr_mu* mu) = 0;
   virtual void gpr_mu_destroy(gpr_mu* mu) = 0;
   virtual void gpr_mu_lock(gpr_mu* mu) = 0;
@@ -103,6 +110,7 @@
   virtual grpc_slice grpc_slice_ref(grpc_slice slice) = 0;
   virtual grpc_slice grpc_slice_split_tail(grpc_slice* s, size_t split) = 0;
   virtual grpc_slice grpc_slice_split_head(grpc_slice* s, size_t split) = 0;
+  virtual grpc_slice grpc_slice_sub(grpc_slice s, size_t begin, size_t end) = 0;
   virtual void grpc_slice_buffer_add(grpc_slice_buffer* sb,
                                      grpc_slice slice) = 0;
   virtual void grpc_slice_buffer_pop(grpc_slice_buffer* sb) = 0;
diff --git a/include/grpc++/impl/codegen/proto_utils.h b/include/grpc++/impl/codegen/proto_utils.h
index 67e8f71..0f7e115 100644
--- a/include/grpc++/impl/codegen/proto_utils.h
+++ b/include/grpc++/impl/codegen/proto_utils.h
@@ -39,8 +39,7 @@
 
 const int kGrpcBufferWriterMaxBufferLength = 1024 * 1024;
 
-class GrpcBufferWriter final
-    : public ::grpc::protobuf::io::ZeroCopyOutputStream {
+class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream {
  public:
   explicit GrpcBufferWriter(grpc_byte_buffer** bp, int block_size)
       : block_size_(block_size), byte_count_(0), have_backup_(false) {
@@ -88,7 +87,7 @@
 
   grpc::protobuf::int64 ByteCount() const override { return byte_count_; }
 
- private:
+ protected:
   friend class GrpcBufferWriterPeer;
   const int block_size_;
   int64_t byte_count_;
@@ -98,8 +97,7 @@
   grpc_slice slice_;
 };
 
-class GrpcBufferReader final
-    : public ::grpc::protobuf::io::ZeroCopyInputStream {
+class GrpcBufferReader : public ::grpc::protobuf::io::ZeroCopyInputStream {
  public:
   explicit GrpcBufferReader(grpc_byte_buffer* buffer)
       : byte_count_(0), backup_count_(0), status_() {
@@ -160,7 +158,7 @@
     return byte_count_ - backup_count_;
   }
 
- private:
+ protected:
   int64_t byte_count_;
   int64_t backup_count_;
   grpc_byte_buffer_reader reader_;
@@ -168,57 +166,85 @@
   Status status_;
 };
 
+// BufferWriter must be a subclass of io::ZeroCopyOutputStream.
+template <class BufferWriter, class T>
+Status GenericSerialize(const grpc::protobuf::Message& msg,
+                        grpc_byte_buffer** bp, bool* own_buffer) {
+  static_assert(
+      std::is_base_of<protobuf::io::ZeroCopyOutputStream, BufferWriter>::value,
+      "BufferWriter must be a subclass of io::ZeroCopyOutputStream");
+  *own_buffer = true;
+  int byte_size = msg.ByteSize();
+  if (byte_size <= kGrpcBufferWriterMaxBufferLength) {
+    grpc_slice slice = g_core_codegen_interface->grpc_slice_malloc(byte_size);
+    GPR_CODEGEN_ASSERT(
+        GRPC_SLICE_END_PTR(slice) ==
+        msg.SerializeWithCachedSizesToArray(GRPC_SLICE_START_PTR(slice)));
+    *bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(&slice, 1);
+    g_core_codegen_interface->grpc_slice_unref(slice);
+    return g_core_codegen_interface->ok();
+  } else {
+    BufferWriter writer(bp, kGrpcBufferWriterMaxBufferLength);
+    return msg.SerializeToZeroCopyStream(&writer)
+               ? g_core_codegen_interface->ok()
+               : Status(StatusCode::INTERNAL, "Failed to serialize message");
+  }
+}
+
+// BufferReader must be a subclass of io::ZeroCopyInputStream.
+template <class BufferReader, class T>
+Status GenericDeserialize(grpc_byte_buffer* buffer,
+                          grpc::protobuf::Message* msg) {
+  static_assert(
+      std::is_base_of<protobuf::io::ZeroCopyInputStream, BufferReader>::value,
+      "BufferReader must be a subclass of io::ZeroCopyInputStream");
+  if (buffer == nullptr) {
+    return Status(StatusCode::INTERNAL, "No payload");
+  }
+  Status result = g_core_codegen_interface->ok();
+  {
+    BufferReader reader(buffer);
+    if (!reader.status().ok()) {
+      return reader.status();
+    }
+    ::grpc::protobuf::io::CodedInputStream decoder(&reader);
+    decoder.SetTotalBytesLimit(INT_MAX, INT_MAX);
+    if (!msg->ParseFromCodedStream(&decoder)) {
+      result = Status(StatusCode::INTERNAL, msg->InitializationErrorString());
+    }
+    if (!decoder.ConsumedEntireMessage()) {
+      result = Status(StatusCode::INTERNAL, "Did not read entire message");
+    }
+  }
+  g_core_codegen_interface->grpc_byte_buffer_destroy(buffer);
+  return result;
+}
+
 }  // namespace internal
 
+// this is needed so the following class does not conflict with protobuf
+// serializers that utilize internal-only tools.
+#ifdef GRPC_OPEN_SOURCE_PROTO
+// This class provides a protobuf serializer. It translates between protobuf
+// objects and grpc_byte_buffers. More information about SerializationTraits can
+// be found in include/grpc++/impl/codegen/serialization_traits.h.
 template <class T>
 class SerializationTraits<T, typename std::enable_if<std::is_base_of<
                                  grpc::protobuf::Message, T>::value>::type> {
  public:
   static Status Serialize(const grpc::protobuf::Message& msg,
                           grpc_byte_buffer** bp, bool* own_buffer) {
-    *own_buffer = true;
-    int byte_size = msg.ByteSize();
-    if (byte_size <= internal::kGrpcBufferWriterMaxBufferLength) {
-      grpc_slice slice = g_core_codegen_interface->grpc_slice_malloc(byte_size);
-      GPR_CODEGEN_ASSERT(
-          GRPC_SLICE_END_PTR(slice) ==
-          msg.SerializeWithCachedSizesToArray(GRPC_SLICE_START_PTR(slice)));
-      *bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(&slice, 1);
-      g_core_codegen_interface->grpc_slice_unref(slice);
-      return g_core_codegen_interface->ok();
-    } else {
-      internal::GrpcBufferWriter writer(
-          bp, internal::kGrpcBufferWriterMaxBufferLength);
-      return msg.SerializeToZeroCopyStream(&writer)
-                 ? g_core_codegen_interface->ok()
-                 : Status(StatusCode::INTERNAL, "Failed to serialize message");
-    }
+    return internal::GenericSerialize<internal::GrpcBufferWriter, T>(
+        msg, bp, own_buffer);
   }
 
   static Status Deserialize(grpc_byte_buffer* buffer,
                             grpc::protobuf::Message* msg) {
-    if (buffer == nullptr) {
-      return Status(StatusCode::INTERNAL, "No payload");
-    }
-    Status result = g_core_codegen_interface->ok();
-    {
-      internal::GrpcBufferReader reader(buffer);
-      if (!reader.status().ok()) {
-        return reader.status();
-      }
-      ::grpc::protobuf::io::CodedInputStream decoder(&reader);
-      decoder.SetTotalBytesLimit(INT_MAX, INT_MAX);
-      if (!msg->ParseFromCodedStream(&decoder)) {
-        result = Status(StatusCode::INTERNAL, msg->InitializationErrorString());
-      }
-      if (!decoder.ConsumedEntireMessage()) {
-        result = Status(StatusCode::INTERNAL, "Did not read entire message");
-      }
-    }
-    g_core_codegen_interface->grpc_byte_buffer_destroy(buffer);
-    return result;
+    return internal::GenericDeserialize<internal::GrpcBufferReader, T>(buffer,
+                                                                       msg);
   }
 };
+#endif
 
 }  // namespace grpc
 
diff --git a/src/cpp/common/core_codegen.cc b/src/cpp/common/core_codegen.cc
index 6ea5f1d..3cbf08a 100644
--- a/src/cpp/common/core_codegen.cc
+++ b/src/cpp/common/core_codegen.cc
@@ -76,6 +76,9 @@
 
 void CoreCodegen::gpr_free(void* p) { return ::gpr_free(p); }
 
+void CoreCodegen::grpc_init() { ::grpc_init(); }
+void CoreCodegen::grpc_shutdown() { ::grpc_shutdown(); }
+
 void CoreCodegen::gpr_mu_init(gpr_mu* mu) { ::gpr_mu_init(mu); };
 void CoreCodegen::gpr_mu_destroy(gpr_mu* mu) { ::gpr_mu_destroy(mu); }
 void CoreCodegen::gpr_mu_lock(gpr_mu* mu) { ::gpr_mu_lock(mu); }
@@ -156,6 +159,10 @@
   return ::grpc_slice_split_head(s, split);
 }
 
+grpc_slice CoreCodegen::grpc_slice_sub(grpc_slice s, size_t begin, size_t end) {
+  return ::grpc_slice_sub(s, begin, end);
+}
+
 grpc_slice CoreCodegen::grpc_slice_from_static_buffer(const void* buffer,
                                                       size_t length) {
   return ::grpc_slice_from_static_buffer(buffer, length);