Convert byte_stream API to C++.
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 90b93fb..bbc5160 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -798,7 +798,8 @@
   grpc_linked_mdelem* send_initial_metadata_storage;
   grpc_metadata_batch send_initial_metadata;
   // For send_message.
-  grpc_caching_byte_stream send_message;
+  grpc_core::ManualConstructor<grpc_core::ByteStreamCache::CachingByteStream>
+      send_message;
   // For send_trailing_metadata.
   grpc_linked_mdelem* send_trailing_metadata_storage;
   grpc_metadata_batch send_trailing_metadata;
@@ -808,7 +809,7 @@
   bool trailing_metadata_available;
   // For intercepting recv_message.
   grpc_closure recv_message_ready;
-  grpc_byte_stream* recv_message;
+  grpc_core::OrphanablePtr<grpc_core::ByteStream> recv_message;
   // For intercepting recv_trailing_metadata.
   grpc_metadata_batch recv_trailing_metadata;
   grpc_transport_stream_stats collect_stats;
@@ -914,12 +915,12 @@
   gpr_atm* peer_string;
   // send_message
   // When we get a send_message op, we replace the original byte stream
-  // with a grpc_caching_byte_stream that caches the slices to a
-  // local buffer for use in retries.
+  // with a CachingByteStream that caches the slices to a local buffer for
+  // use in retries.
   // Note: We inline the cache for the first 3 send_message ops and use
   // dynamic allocation after that.  This number was essentially picked
   // at random; it could be changed in the future to tune performance.
-  grpc_core::InlinedVector<grpc_byte_stream_cache*, 3> send_messages;
+  grpc_core::InlinedVector<grpc_core::ByteStreamCache*, 3> send_messages;
   // send_trailing_metadata
   bool seen_send_trailing_metadata;
   grpc_linked_mdelem* send_trailing_metadata_storage;
@@ -964,10 +965,11 @@
   }
   // Set up cache for send_message ops.
   if (batch->send_message) {
-    grpc_byte_stream_cache* cache = (grpc_byte_stream_cache*)gpr_arena_alloc(
-        calld->arena, sizeof(grpc_byte_stream_cache));
-    grpc_byte_stream_cache_init(cache,
-                                batch->payload->send_message.send_message);
+    grpc_core::ByteStreamCache* cache =
+        static_cast<grpc_core::ByteStreamCache*>(
+            gpr_arena_alloc(calld->arena, sizeof(grpc_core::ByteStreamCache)));
+    new (cache) grpc_core::ByteStreamCache(
+        std::move(batch->payload->send_message.send_message));
     calld->send_messages.push_back(cache);
   }
   // Save metadata batch for send_trailing_metadata ops.
@@ -1002,7 +1004,7 @@
               "]",
               chand, calld, i);
     }
-    grpc_byte_stream_cache_destroy(calld->send_messages[i]);
+    calld->send_messages[i]->Destroy();
   }
   if (retry_state->completed_send_trailing_metadata) {
     grpc_metadata_batch_destroy(&calld->send_trailing_metadata);
@@ -1026,8 +1028,8 @@
               "]",
               chand, calld, retry_state->completed_send_message_count - 1);
     }
-    grpc_byte_stream_cache_destroy(
-        calld->send_messages[retry_state->completed_send_message_count - 1]);
+    calld->send_messages[retry_state->completed_send_message_count - 1]
+        ->Destroy();
   }
   if (batch_data->batch.send_trailing_metadata) {
     grpc_metadata_batch_destroy(&calld->send_trailing_metadata);
@@ -1079,7 +1081,7 @@
     if (batch->send_message) {
       calld->pending_send_message = true;
       calld->bytes_buffered_for_retry +=
-          batch->payload->send_message.send_message->length;
+          batch->payload->send_message.send_message->length();
     }
     if (batch->send_trailing_metadata) {
       calld->pending_send_trailing_metadata = true;
@@ -1680,7 +1682,7 @@
   GPR_ASSERT(pending != nullptr);
   // Return payload.
   *pending->batch->payload->recv_message.recv_message =
-      batch_data->recv_message;
+      std::move(batch_data->recv_message);
   // Update bookkeeping.
   // Note: Need to do this before invoking the callback, since invoking
   // the callback will result in yielding the call combiner.
@@ -2124,13 +2126,13 @@
             "chand=%p calld=%p: starting calld->send_messages[%" PRIuPTR "]",
             chand, calld, retry_state->started_send_message_count);
   }
-  grpc_byte_stream_cache* cache =
+  grpc_core::ByteStreamCache* cache =
       calld->send_messages[retry_state->started_send_message_count];
   ++retry_state->started_send_message_count;
-  grpc_caching_byte_stream_init(&batch_data->send_message, cache);
+  batch_data->send_message.Init(cache);
   batch_data->batch.send_message = true;
-  batch_data->batch.payload->send_message.send_message =
-      &batch_data->send_message.base;
+  batch_data->batch.payload->send_message.send_message.reset(
+      batch_data->send_message.get());
 }
 
 // Adds retriable send_trailing_metadata op to batch_data.