Merge pull request #12343 from sreecha/enable-epoll1

Enable epoll1 polling engine
diff --git a/bazel/grpc_build_system.bzl b/bazel/grpc_build_system.bzl
index bff8988..ba7ffcc 100644
--- a/bazel/grpc_build_system.bzl
+++ b/bazel/grpc_build_system.bzl
@@ -106,7 +106,7 @@
     args = args,
     data = data)
 
-def grpc_package(name, visibility = "private"):
+def grpc_package(name, visibility = "private", features = []):
   if visibility == "tests":
     visibility = ["//test:__subpackages__"]
   elif visibility == "public":
@@ -118,5 +118,6 @@
 
   if len(visibility) != 0:
     native.package(
-      default_visibility = visibility
+      default_visibility = visibility,
+      features = features
     )
diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h
index 0cb11b4..8e70225 100644
--- a/include/grpc++/impl/codegen/call.h
+++ b/include/grpc++/impl/codegen/call.h
@@ -272,7 +272,7 @@
 
 class CallOpSendMessage {
  public:
-  CallOpSendMessage() : send_buf_(nullptr), own_buf_(false) {}
+  CallOpSendMessage() : send_buf_(nullptr) {}
 
   /// Send \a message using \a options for the write. The \a options are cleared
   /// after use.
@@ -295,20 +295,25 @@
     write_options_.Clear();
   }
   void FinishOp(bool* status) {
-    if (own_buf_) g_core_codegen_interface->grpc_byte_buffer_destroy(send_buf_);
+    g_core_codegen_interface->grpc_byte_buffer_destroy(send_buf_);
     send_buf_ = nullptr;
   }
 
  private:
   grpc_byte_buffer* send_buf_;
   WriteOptions write_options_;
-  bool own_buf_;
 };
 
 template <class M>
 Status CallOpSendMessage::SendMessage(const M& message, WriteOptions options) {
   write_options_ = options;
-  return SerializationTraits<M>::Serialize(message, &send_buf_, &own_buf_);
+  bool own_buf;
+  Status result =
+      SerializationTraits<M>::Serialize(message, &send_buf_, &own_buf);
+  if (!own_buf) {
+    send_buf_ = g_core_codegen_interface->grpc_byte_buffer_copy(send_buf_);
+  }
+  return result;
 }
 
 template <class M>
diff --git a/include/grpc++/impl/codegen/core_codegen.h b/include/grpc++/impl/codegen/core_codegen.h
index 2b15a01..c751c1e 100644
--- a/include/grpc++/impl/codegen/core_codegen.h
+++ b/include/grpc++/impl/codegen/core_codegen.h
@@ -68,6 +68,7 @@
   void grpc_call_unref(grpc_call* call) override;
   virtual void* grpc_call_arena_alloc(grpc_call* call, size_t length) override;
 
+  grpc_byte_buffer* grpc_byte_buffer_copy(grpc_byte_buffer* bb) override;
   void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) override;
 
   int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader,
diff --git a/include/grpc++/impl/codegen/core_codegen_interface.h b/include/grpc++/impl/codegen/core_codegen_interface.h
index b4c771a..a4c50da 100644
--- a/include/grpc++/impl/codegen/core_codegen_interface.h
+++ b/include/grpc++/impl/codegen/core_codegen_interface.h
@@ -74,6 +74,7 @@
   virtual void gpr_cv_signal(gpr_cv* cv) = 0;
   virtual void gpr_cv_broadcast(gpr_cv* cv) = 0;
 
+  virtual grpc_byte_buffer* grpc_byte_buffer_copy(grpc_byte_buffer* bb) = 0;
   virtual void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) = 0;
 
   virtual int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader,
diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h
index 9079506..59b90af 100644
--- a/include/grpc/impl/codegen/grpc_types.h
+++ b/include/grpc/impl/codegen/grpc_types.h
@@ -511,6 +511,11 @@
       } maybe_stream_compression_level;
     } send_initial_metadata;
     struct grpc_op_send_message {
+      /** This op takes ownership of the slices in send_message.  After
+       * a call completes, the contents of send_message are not guaranteed
+       * and likely empty.  The original owner should still call
+       * grpc_byte_buffer_destroy() on this object however.
+       */
       struct grpc_byte_buffer *send_message;
     } send_message;
     struct grpc_op_send_status_from_server {
diff --git a/src/core/lib/transport/byte_stream.c b/src/core/lib/transport/byte_stream.c
index fb03a10..08f6162 100644
--- a/src/core/lib/transport/byte_stream.c
+++ b/src/core/lib/transport/byte_stream.c
@@ -85,6 +85,7 @@
 static void slice_buffer_stream_destroy(grpc_exec_ctx *exec_ctx,
                                         grpc_byte_stream *byte_stream) {
   grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream;
+  grpc_slice_buffer_reset_and_unref_internal(exec_ctx, stream->backing_buffer);
   GRPC_ERROR_UNREF(stream->shutdown_error);
 }
 
diff --git a/src/core/lib/transport/byte_stream.h b/src/core/lib/transport/byte_stream.h
index 1e1e831..be2a352 100644
--- a/src/core/lib/transport/byte_stream.h
+++ b/src/core/lib/transport/byte_stream.h
@@ -81,7 +81,9 @@
 
 // grpc_slice_buffer_stream
 //
-// A grpc_byte_stream that wraps a slice buffer.
+// A grpc_byte_stream that wraps a slice buffer.  The stream takes
+// ownership of the slices in the buffer, and on destruction will
+// reset the contents of the buffer.
 
 typedef struct grpc_slice_buffer_stream {
   grpc_byte_stream base;
diff --git a/src/cpp/common/channel_filter.h b/src/cpp/common/channel_filter.h
index 4b37c20..c3d187d 100644
--- a/src/cpp/common/channel_filter.h
+++ b/src/cpp/common/channel_filter.h
@@ -195,6 +195,15 @@
     op_->payload->send_message.send_message = send_message;
   }
 
+  grpc_byte_stream **recv_message() const {
+    return op_->recv_message ? op_->payload->recv_message.recv_message
+                             : nullptr;
+  }
+  void set_recv_message(grpc_byte_stream **recv_message) {
+    op_->recv_message = true;
+    op_->payload->recv_message.recv_message = recv_message;
+  }
+
   census_context *get_census_context() const {
     return (census_context *)op_->payload->context[GRPC_CONTEXT_TRACING].value;
   }
diff --git a/src/cpp/common/core_codegen.cc b/src/cpp/common/core_codegen.cc
index c7c6b6b..6ea5f1d 100644
--- a/src/cpp/common/core_codegen.cc
+++ b/src/cpp/common/core_codegen.cc
@@ -89,6 +89,10 @@
 void CoreCodegen::gpr_cv_signal(gpr_cv* cv) { ::gpr_cv_signal(cv); }
 void CoreCodegen::gpr_cv_broadcast(gpr_cv* cv) { ::gpr_cv_broadcast(cv); }
 
+grpc_byte_buffer* CoreCodegen::grpc_byte_buffer_copy(grpc_byte_buffer* bb) {
+  return ::grpc_byte_buffer_copy(bb);
+}
+
 void CoreCodegen::grpc_byte_buffer_destroy(grpc_byte_buffer* bb) {
   ::grpc_byte_buffer_destroy(bb);
 }
diff --git a/test/core/bad_client/BUILD b/test/core/bad_client/BUILD
index caf2210..99593dc 100644
--- a/test/core/bad_client/BUILD
+++ b/test/core/bad_client/BUILD
@@ -14,7 +14,7 @@
 
 load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_cc_binary", "grpc_package")
 
-grpc_package(name = "bad_client")
+grpc_package(name = "test/core/bad_client")
 
 licenses(["notice"])  # Apache v2
 
diff --git a/test/core/bad_ssl/BUILD b/test/core/bad_ssl/BUILD
index 51210b6..0ea1520 100644
--- a/test/core/bad_ssl/BUILD
+++ b/test/core/bad_ssl/BUILD
@@ -14,7 +14,7 @@
 
 load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_cc_binary", "grpc_package")
 
-grpc_package(name = "bad_ssl")
+grpc_package(name = "test/core/bad_ssl")
 
 licenses(["notice"])  # Apache v2
 
diff --git a/test/core/census/BUILD b/test/core/census/BUILD
index a22ffd1..24fd280 100644
--- a/test/core/census/BUILD
+++ b/test/core/census/BUILD
@@ -14,7 +14,7 @@
 
 load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_cc_binary", "grpc_package")
 
-grpc_package(name = "census")
+grpc_package(name = "test/core/census")
 
 licenses(["notice"])  # Apache v2
 
diff --git a/test/core/channel/BUILD b/test/core/channel/BUILD
index d6b1e11..ef861cc 100644
--- a/test/core/channel/BUILD
+++ b/test/core/channel/BUILD
@@ -14,7 +14,7 @@
 
 load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_cc_binary", "grpc_package")
 
-grpc_package(name = "core_test_channel")
+grpc_package(name = "test/core/channel")
 
 licenses(["notice"])  # Apache v2
 
diff --git a/test/core/client_channel/BUILD b/test/core/client_channel/BUILD
index 23e258b..c4a9323 100644
--- a/test/core/client_channel/BUILD
+++ b/test/core/client_channel/BUILD
@@ -14,7 +14,7 @@
 
 load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_cc_binary", "grpc_package")
 
-grpc_package(name = "core_test_client_channel")
+grpc_package(name = "test/core/client_channel")
 
 licenses(["notice"])  # Apache v2
 
diff --git a/test/core/end2end/BUILD b/test/core/end2end/BUILD
index f1ea4f5..49bfc43 100644
--- a/test/core/end2end/BUILD
+++ b/test/core/end2end/BUILD
@@ -16,7 +16,7 @@
 
 licenses(["notice"])  # Apache v2
 
-grpc_package(name = "core_end2end", visibility = "private")
+grpc_package(name = "test/core/end2end")
 
 load(":generate_tests.bzl", "grpc_end2end_tests")
 
diff --git a/test/core/end2end/fuzzers/BUILD b/test/core/end2end/fuzzers/BUILD
index c480131..4ed9a70 100644
--- a/test/core/end2end/fuzzers/BUILD
+++ b/test/core/end2end/fuzzers/BUILD
@@ -14,7 +14,7 @@
 
 load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_cc_binary", "grpc_package")
 
-grpc_package(name = "core_end2end_fuzzers")
+grpc_package(name = "test/core/end2end/fuzzers")
 
 licenses(["notice"])  # Apache v2
 
diff --git a/test/core/end2end/tests/cancel_after_round_trip.c b/test/core/end2end/tests/cancel_after_round_trip.c
index 0fc8b95..ad24b4e 100644
--- a/test/core/end2end/tests/cancel_after_round_trip.c
+++ b/test/core/end2end/tests/cancel_after_round_trip.c
@@ -114,7 +114,9 @@
       grpc_slice_from_copied_string("hello you");
   grpc_byte_buffer *request_payload =
       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
-  grpc_byte_buffer *response_payload =
+  grpc_byte_buffer *response_payload1 =
+      grpc_raw_byte_buffer_create(&response_payload_slice, 1);
+  grpc_byte_buffer *response_payload2 =
       grpc_raw_byte_buffer_create(&response_payload_slice, 1);
   int was_cancelled = 2;
 
@@ -199,7 +201,7 @@
   op->reserved = NULL;
   op++;
   op->op = GRPC_OP_SEND_MESSAGE;
-  op->data.send_message.send_message = response_payload;
+  op->data.send_message.send_message = response_payload1;
   op->flags = 0;
   op->reserved = NULL;
   op++;
@@ -242,7 +244,7 @@
   op->reserved = NULL;
   op++;
   op->op = GRPC_OP_SEND_MESSAGE;
-  op->data.send_message.send_message = response_payload;
+  op->data.send_message.send_message = response_payload2;
   op->flags = 0;
   op->reserved = NULL;
   op++;
@@ -262,7 +264,8 @@
   grpc_call_details_destroy(&call_details);
 
   grpc_byte_buffer_destroy(request_payload);
-  grpc_byte_buffer_destroy(response_payload);
+  grpc_byte_buffer_destroy(response_payload1);
+  grpc_byte_buffer_destroy(response_payload2);
   grpc_byte_buffer_destroy(request_payload_recv);
   grpc_byte_buffer_destroy(response_payload_recv);
   grpc_slice_unref(details);
diff --git a/test/core/end2end/tests/resource_quota_server.c b/test/core/end2end/tests/resource_quota_server.c
index 010e20c..34a6a80 100644
--- a/test/core/end2end/tests/resource_quota_server.c
+++ b/test/core/end2end/tests/resource_quota_server.c
@@ -143,6 +143,8 @@
       malloc(sizeof(grpc_call_details) * NUM_CALLS);
   grpc_status_code *status = malloc(sizeof(grpc_status_code) * NUM_CALLS);
   grpc_slice *details = malloc(sizeof(grpc_slice) * NUM_CALLS);
+  grpc_byte_buffer **request_payload =
+      malloc(sizeof(grpc_byte_buffer *) * NUM_CALLS);
   grpc_byte_buffer **request_payload_recv =
       malloc(sizeof(grpc_byte_buffer *) * NUM_CALLS);
   int *was_cancelled = malloc(sizeof(int) * NUM_CALLS);
@@ -156,9 +158,6 @@
   int deadline_exceeded = 0;
   int unavailable = 0;
 
-  grpc_byte_buffer *request_payload =
-      grpc_raw_byte_buffer_create(&request_payload_slice, 1);
-
   grpc_op ops[6];
   grpc_op *op;
 
@@ -167,6 +166,7 @@
     grpc_metadata_array_init(&trailing_metadata_recv[i]);
     grpc_metadata_array_init(&request_metadata_recv[i]);
     grpc_call_details_init(&call_details[i]);
+    request_payload[i] = grpc_raw_byte_buffer_create(&request_payload_slice, 1);
     request_payload_recv[i] = NULL;
     was_cancelled[i] = 0;
   }
@@ -195,7 +195,7 @@
     op->reserved = NULL;
     op++;
     op->op = GRPC_OP_SEND_MESSAGE;
-    op->data.send_message.send_message = request_payload;
+    op->data.send_message.send_message = request_payload[i];
     op->flags = 0;
     op->reserved = NULL;
     op++;
@@ -261,6 +261,7 @@
       grpc_metadata_array_destroy(&trailing_metadata_recv[call_id]);
       grpc_call_unref(client_calls[call_id]);
       grpc_slice_unref(details[call_id]);
+      grpc_byte_buffer_destroy(request_payload[call_id]);
 
       pending_client_calls--;
     } else if (ev_tag < SERVER_RECV_BASE_TAG) {
@@ -351,7 +352,6 @@
           NUM_CALLS, cancelled_calls_on_server, cancelled_calls_on_client,
           deadline_exceeded, unavailable);
 
-  grpc_byte_buffer_destroy(request_payload);
   grpc_slice_unref(request_payload_slice);
   grpc_resource_quota_unref(resource_quota);
 
@@ -366,6 +366,7 @@
   free(call_details);
   free(status);
   free(details);
+  free(request_payload);
   free(request_payload_recv);
   free(was_cancelled);
 }
diff --git a/test/core/network_benchmarks/BUILD b/test/core/network_benchmarks/BUILD
index dd0c75c..0e15393 100644
--- a/test/core/network_benchmarks/BUILD
+++ b/test/core/network_benchmarks/BUILD
@@ -14,17 +14,12 @@
 
 load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_cc_binary", "grpc_package")
 
-grpc_package(name = "test/core/network_benchmarks")
+grpc_package(name = "test/core/network_benchmarks",
+             features = ["-layering_check", "-parse_headers" ]
+)
 
 licenses(["notice"])  # Apache v2
 
-package(
-    features = [
-        "-layering_check",
-        "-parse_headers",
-    ],
-)
-
 grpc_cc_binary(
     name = "low_level_ping_pong",
     srcs = ["low_level_ping_pong.c"],
diff --git a/test/core/util/BUILD b/test/core/util/BUILD
index 09c006d..10eefe1 100644
--- a/test/core/util/BUILD
+++ b/test/core/util/BUILD
@@ -16,7 +16,7 @@
 
 licenses(["notice"])  # Apache v2
 
-grpc_package(name = "core_test_util", visibility = "public")
+grpc_package(name = "test/core/util", visibility = "public")
 
 grpc_cc_library(
     name = "gpr_test_util",