Merge branch 'master' into stream_corked_pr
diff --git a/include/grpc++/impl/codegen/async_stream.h b/include/grpc++/impl/codegen/async_stream.h
index 1a5cbbd..8f52989 100644
--- a/include/grpc++/impl/codegen/async_stream.h
+++ b/include/grpc++/impl/codegen/async_stream.h
@@ -101,6 +101,39 @@
   /// \param[in] msg The message to be written.
   /// \param[in] tag The tag identifying the operation.
   virtual void Write(const W& msg, void* tag) = 0;
+
+  /// Request the writing of \a msg using WriteOptions \a options with
+  /// identifying tag \a tag.
+  ///
+  /// Only one write may be outstanding at any given time. This means that
+  /// after calling Write, one must wait to receive \a tag from the completion
+  /// queue BEFORE calling Write again.
+  /// WriteOptions \a options is used to set the write options of this message.
+  /// This is thread-safe with respect to \a Read
+  ///
+  /// \param[in] msg The message to be written.
+  /// \param[in] options The WriteOptions to be used to write this message.
+  /// \param[in] tag The tag identifying the operation.
+  virtual void Write(const W& msg, WriteOptions options, void* tag) = 0;
+
+  /// Request the writing of \a msg and coalesce it with the writing
+  /// of trailing metadata, using WriteOptions \a options with identifying tag
+  /// \a tag.
+  ///
+  /// For client, WriteLast is equivalent of performing Write and WritesDone in
+  /// a single step.
+  /// For server, WriteLast buffers the \a msg. The writing of \a msg is held
+  /// until Finish is called, where \a msg and trailing metadata are coalesced
+  /// and write is initiated. Note that WriteLast can only buffer \a msg up to
+  /// the flow control window size. If \a msg size is larger than the window
+  /// size, it will be sent on wire without buffering.
+  ///
+  /// \param[in] msg The message to be written.
+  /// \param[in] options The WriteOptions to be used to write this message.
+  /// \param[in] tag The tag identifying the operation.
+  void WriteLast(const W& msg, WriteOptions options, void* tag) {
+    Write(msg, options.set_last_message(), tag);
+  }
 };
 
 template <class R>
@@ -183,11 +216,17 @@
       : context_(context), call_(channel->CreateCall(method, context, cq)) {
     finish_ops_.RecvMessage(response);
     finish_ops_.AllowNoMessage();
-
-    init_ops_.set_output_tag(tag);
-    init_ops_.SendInitialMetadata(context->send_initial_metadata_,
-                                  context->initial_metadata_flags());
-    call_.PerformOps(&init_ops_);
+    // if corked bit is set in context, we buffer up the initial metadata to
+    // coalesce with later message to be sent. No op is performed.
+    if (context_->initial_metadata_corked_) {
+      write_ops_.SendInitialMetadata(context->send_initial_metadata_,
+                                     context->initial_metadata_flags());
+    } else {
+      write_ops_.set_output_tag(tag);
+      write_ops_.SendInitialMetadata(context->send_initial_metadata_,
+                                     context->initial_metadata_flags());
+      call_.PerformOps(&write_ops_);
+    }
   }
 
   void ReadInitialMetadata(void* tag) override {
@@ -205,10 +244,21 @@
     call_.PerformOps(&write_ops_);
   }
 
+  void Write(const W& msg, WriteOptions options, void* tag) override {
+    write_ops_.set_output_tag(tag);
+    if (options.is_last_message()) {
+      options.set_buffer_hint();
+      write_ops_.ClientSendClose();
+    }
+    // TODO(ctiller): don't assert
+    GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
+    call_.PerformOps(&write_ops_);
+  }
+
   void WritesDone(void* tag) override {
-    writes_done_ops_.set_output_tag(tag);
-    writes_done_ops_.ClientSendClose();
-    call_.PerformOps(&writes_done_ops_);
+    write_ops_.set_output_tag(tag);
+    write_ops_.ClientSendClose();
+    call_.PerformOps(&write_ops_);
   }
 
   void Finish(Status* status, void* tag) override {
@@ -223,10 +273,9 @@
  private:
   ClientContext* context_;
   Call call_;
-  CallOpSet<CallOpSendInitialMetadata> init_ops_;
   CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
-  CallOpSet<CallOpSendMessage> write_ops_;
-  CallOpSet<CallOpClientSendClose> writes_done_ops_;
+  CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose>
+      write_ops_;
   CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage,
             CallOpClientRecvStatus>
       finish_ops_;
@@ -253,10 +302,17 @@
                           const RpcMethod& method, ClientContext* context,
                           void* tag)
       : context_(context), call_(channel->CreateCall(method, context, cq)) {
-    init_ops_.set_output_tag(tag);
-    init_ops_.SendInitialMetadata(context->send_initial_metadata_,
-                                  context->initial_metadata_flags());
-    call_.PerformOps(&init_ops_);
+    if (context_->initial_metadata_corked_) {
+      // if corked bit is set in context, we buffer up the initial metadata to
+      // coalesce with later message to be sent. No op is performed.
+      write_ops_.SendInitialMetadata(context->send_initial_metadata_,
+                                     context->initial_metadata_flags());
+    } else {
+      write_ops_.set_output_tag(tag);
+      write_ops_.SendInitialMetadata(context->send_initial_metadata_,
+                                     context->initial_metadata_flags());
+      call_.PerformOps(&write_ops_);
+    }
   }
 
   void ReadInitialMetadata(void* tag) override {
@@ -283,10 +339,21 @@
     call_.PerformOps(&write_ops_);
   }
 
+  void Write(const W& msg, WriteOptions options, void* tag) override {
+    write_ops_.set_output_tag(tag);
+    if (options.is_last_message()) {
+      options.set_buffer_hint();
+      write_ops_.ClientSendClose();
+    }
+    // TODO(ctiller): don't assert
+    GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
+    call_.PerformOps(&write_ops_);
+  }
+
   void WritesDone(void* tag) override {
-    writes_done_ops_.set_output_tag(tag);
-    writes_done_ops_.ClientSendClose();
-    call_.PerformOps(&writes_done_ops_);
+    write_ops_.set_output_tag(tag);
+    write_ops_.ClientSendClose();
+    call_.PerformOps(&write_ops_);
   }
 
   void Finish(Status* status, void* tag) override {
@@ -301,11 +368,10 @@
  private:
   ClientContext* context_;
   Call call_;
-  CallOpSet<CallOpSendInitialMetadata> init_ops_;
   CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
   CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
-  CallOpSet<CallOpSendMessage> write_ops_;
-  CallOpSet<CallOpClientSendClose> writes_done_ops_;
+  CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose>
+      write_ops_;
   CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_;
 };
 
@@ -395,6 +461,20 @@
                                    public AsyncWriterInterface<W> {
  public:
   virtual void Finish(const Status& status, void* tag) = 0;
+
+  /// Request the writing of \a msg and coalesce it with trailing metadata which
+  /// contains \a status, using WriteOptions options with identifying tag \a
+  /// tag.
+  ///
+  /// WriteAndFinish is equivalent of performing WriteLast and Finish in a
+  /// single step.
+  ///
+  /// \param[in] msg The message to be written.
+  /// \param[in] options The WriteOptions to be used to write this message.
+  /// \param[in] status The Status that server returns to client.
+  /// \param[in] tag The tag identifying the operation.
+  virtual void WriteAndFinish(const W& msg, WriteOptions options,
+                              const Status& status, void* tag) = 0;
 };
 
 template <class W>
@@ -418,29 +498,37 @@
 
   void Write(const W& msg, void* tag) override {
     write_ops_.set_output_tag(tag);
-    if (!ctx_->sent_initial_metadata_) {
-      write_ops_.SendInitialMetadata(ctx_->initial_metadata_,
-                                     ctx_->initial_metadata_flags());
-      if (ctx_->compression_level_set()) {
-        write_ops_.set_compression_level(ctx_->compression_level());
-      }
-      ctx_->sent_initial_metadata_ = true;
-    }
+    EnsureInitialMetadataSent(&write_ops_);
     // TODO(ctiller): don't assert
     GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok());
     call_.PerformOps(&write_ops_);
   }
 
+  void Write(const W& msg, WriteOptions options, void* tag) override {
+    write_ops_.set_output_tag(tag);
+    if (options.is_last_message()) {
+      options.set_buffer_hint();
+    }
+
+    EnsureInitialMetadataSent(&write_ops_);
+    // TODO(ctiller): don't assert
+    GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
+    call_.PerformOps(&write_ops_);
+  }
+
+  void WriteAndFinish(const W& msg, WriteOptions options, const Status& status,
+                      void* tag) override {
+    write_ops_.set_output_tag(tag);
+    EnsureInitialMetadataSent(&write_ops_);
+    options.set_buffer_hint();
+    GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
+    write_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
+    call_.PerformOps(&write_ops_);
+  }
+
   void Finish(const Status& status, void* tag) override {
     finish_ops_.set_output_tag(tag);
-    if (!ctx_->sent_initial_metadata_) {
-      finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
-                                      ctx_->initial_metadata_flags());
-      if (ctx_->compression_level_set()) {
-        finish_ops_.set_compression_level(ctx_->compression_level());
-      }
-      ctx_->sent_initial_metadata_ = true;
-    }
+    EnsureInitialMetadataSent(&finish_ops_);
     finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
     call_.PerformOps(&finish_ops_);
   }
@@ -448,10 +536,24 @@
  private:
   void BindCall(Call* call) override { call_ = *call; }
 
+  template <class T>
+  void EnsureInitialMetadataSent(T* ops) {
+    if (!ctx_->sent_initial_metadata_) {
+      ops->SendInitialMetadata(ctx_->initial_metadata_,
+                               ctx_->initial_metadata_flags());
+      if (ctx_->compression_level_set()) {
+        ops->set_compression_level(ctx_->compression_level());
+      }
+      ctx_->sent_initial_metadata_ = true;
+    }
+  }
+
   Call call_;
   ServerContext* ctx_;
   CallOpSet<CallOpSendInitialMetadata> meta_ops_;
-  CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
+  CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+            CallOpServerSendStatus>
+      write_ops_;
   CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
 };
 
@@ -462,6 +564,20 @@
                                          public AsyncReaderInterface<R> {
  public:
   virtual void Finish(const Status& status, void* tag) = 0;
+
+  /// Request the writing of \a msg and coalesce it with trailing metadata which
+  /// contains \a status, using WriteOptions options with identifying tag \a
+  /// tag.
+  ///
+  /// WriteAndFinish is equivalent of performing WriteLast and Finish in a
+  /// single step.
+  ///
+  /// \param[in] msg The message to be written.
+  /// \param[in] options The WriteOptions to be used to write this message.
+  /// \param[in] status The Status that server returns to client.
+  /// \param[in] tag The tag identifying the operation.
+  virtual void WriteAndFinish(const W& msg, WriteOptions options,
+                              const Status& status, void* tag) = 0;
 };
 
 template <class W, class R>
@@ -492,29 +608,36 @@
 
   void Write(const W& msg, void* tag) override {
     write_ops_.set_output_tag(tag);
-    if (!ctx_->sent_initial_metadata_) {
-      write_ops_.SendInitialMetadata(ctx_->initial_metadata_,
-                                     ctx_->initial_metadata_flags());
-      if (ctx_->compression_level_set()) {
-        write_ops_.set_compression_level(ctx_->compression_level());
-      }
-      ctx_->sent_initial_metadata_ = true;
-    }
+    EnsureInitialMetadataSent(&write_ops_);
     // TODO(ctiller): don't assert
     GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok());
     call_.PerformOps(&write_ops_);
   }
 
+  void Write(const W& msg, WriteOptions options, void* tag) override {
+    write_ops_.set_output_tag(tag);
+    if (options.is_last_message()) {
+      options.set_buffer_hint();
+    }
+    EnsureInitialMetadataSent(&write_ops_);
+    GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
+    call_.PerformOps(&write_ops_);
+  }
+
+  void WriteAndFinish(const W& msg, WriteOptions options, const Status& status,
+                      void* tag) override {
+    write_ops_.set_output_tag(tag);
+    EnsureInitialMetadataSent(&write_ops_);
+    options.set_buffer_hint();
+    GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
+    write_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
+    call_.PerformOps(&write_ops_);
+  }
+
   void Finish(const Status& status, void* tag) override {
     finish_ops_.set_output_tag(tag);
-    if (!ctx_->sent_initial_metadata_) {
-      finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
-                                      ctx_->initial_metadata_flags());
-      if (ctx_->compression_level_set()) {
-        finish_ops_.set_compression_level(ctx_->compression_level());
-      }
-      ctx_->sent_initial_metadata_ = true;
-    }
+    EnsureInitialMetadataSent(&finish_ops_);
+
     finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
     call_.PerformOps(&finish_ops_);
   }
@@ -524,11 +647,25 @@
 
   void BindCall(Call* call) override { call_ = *call; }
 
+  template <class T>
+  void EnsureInitialMetadataSent(T* ops) {
+    if (!ctx_->sent_initial_metadata_) {
+      ops->SendInitialMetadata(ctx_->initial_metadata_,
+                               ctx_->initial_metadata_flags());
+      if (ctx_->compression_level_set()) {
+        ops->set_compression_level(ctx_->compression_level());
+      }
+      ctx_->sent_initial_metadata_ = true;
+    }
+  }
+
   Call call_;
   ServerContext* ctx_;
   CallOpSet<CallOpSendInitialMetadata> meta_ops_;
   CallOpSet<CallOpRecvMessage<R>> read_ops_;
-  CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
+  CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+            CallOpServerSendStatus>
+      write_ops_;
   CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
 };
 
diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h
index 19a5ca2..a3f2be6 100644
--- a/include/grpc++/impl/codegen/call.h
+++ b/include/grpc++/impl/codegen/call.h
@@ -84,8 +84,9 @@
 /// Per-message write options.
 class WriteOptions {
  public:
-  WriteOptions() : flags_(0) {}
-  WriteOptions(const WriteOptions& other) : flags_(other.flags_) {}
+  WriteOptions() : flags_(0), last_message_(false) {}
+  WriteOptions(const WriteOptions& other)
+      : flags_(other.flags_), last_message_(other.last_message_) {}
 
   /// Clear all flags.
   inline void Clear() { flags_ = 0; }
@@ -141,6 +142,43 @@
   /// \sa GRPC_WRITE_BUFFER_HINT
   inline bool get_buffer_hint() const { return GetBit(GRPC_WRITE_BUFFER_HINT); }
 
+  /// corked bit: aliases set_buffer_hint currently, with the intent that
+  /// set_buffer_hint will be removed in the future
+  inline WriteOptions& set_corked() {
+    SetBit(GRPC_WRITE_BUFFER_HINT);
+    return *this;
+  }
+
+  inline WriteOptions& clear_corked() {
+    ClearBit(GRPC_WRITE_BUFFER_HINT);
+    return *this;
+  }
+
+  inline bool is_corked() const { return GetBit(GRPC_WRITE_BUFFER_HINT); }
+
+  /// last-message bit: indicates this is the last message in a stream
+  /// client-side:  makes Write the equivalent of performing Write, WritesDone
+  /// in a single step
+  /// server-side:  hold the Write until the service handler returns (sync api)
+  /// or until Finish is called (async api)
+  inline WriteOptions& set_last_message() {
+    last_message_ = true;
+    return *this;
+  }
+
+  /// Clears flag indicating that this is the last message in a stream,
+  /// disabling coalescing.
+  inline WriteOptions& clear_last_messsage() {
+    last_message_ = false;
+    return *this;
+  }
+
+  /// Get value for the flag indicating that this is the last message, and
+  /// should be coalesced with trailing metadata.
+  ///
+  /// \sa GRPC_WRITE_LAST_MESSAGE
+  bool is_last_message() const { return last_message_; }
+
   WriteOptions& operator=(const WriteOptions& rhs) {
     flags_ = rhs.flags_;
     return *this;
@@ -154,6 +192,7 @@
   bool GetBit(const uint32_t mask) const { return (flags_ & mask) != 0; }
 
   uint32_t flags_;
+  bool last_message_;
 };
 
 /// Default argument for CallOpSet. I is unused by the class, but can be
@@ -224,7 +263,7 @@
   /// after use.
   template <class M>
   Status SendMessage(const M& message,
-                     const WriteOptions& options) GRPC_MUST_USE_RESULT;
+                     WriteOptions options) GRPC_MUST_USE_RESULT;
 
   template <class M>
   Status SendMessage(const M& message) GRPC_MUST_USE_RESULT;
@@ -252,8 +291,7 @@
 };
 
 template <class M>
-Status CallOpSendMessage::SendMessage(const M& message,
-                                      const WriteOptions& options) {
+Status CallOpSendMessage::SendMessage(const M& message, WriteOptions options) {
   write_options_ = options;
   return SerializationTraits<M>::Serialize(message, &send_buf_, &own_buf_);
 }
diff --git a/include/grpc++/impl/codegen/client_context.h b/include/grpc++/impl/codegen/client_context.h
index b91c7f6..3c50e6b 100644
--- a/include/grpc++/impl/codegen/client_context.h
+++ b/include/grpc++/impl/codegen/client_context.h
@@ -281,6 +281,17 @@
   /// \param algorithm The compression algorithm used for the client call.
   void set_compression_algorithm(grpc_compression_algorithm algorithm);
 
+  /// Flag whether the initial metadata should be \a corked
+  ///
+  /// If \a corked is true, then the initial metadata will be colasced with the
+  /// write of first message in the stream.
+  ///
+  /// \param corked The flag indicating whether the initial metadata is to be
+  /// corked or not.
+  void set_initial_metadata_corked(bool corked) {
+    initial_metadata_corked_ = corked;
+  }
+
   /// Return the peer uri in a string.
   ///
   /// \warning This value is never authenticated or subject to any security
@@ -357,7 +368,8 @@
            (cacheable_ ? GRPC_INITIAL_METADATA_CACHEABLE_REQUEST : 0) |
            (wait_for_ready_explicitly_set_
                 ? GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET
-                : 0);
+                : 0) |
+           (initial_metadata_corked_ ? GRPC_INITIAL_METADATA_CORKED : 0);
   }
 
   grpc::string authority() { return authority_; }
@@ -384,6 +396,7 @@
   PropagationOptions propagation_options_;
 
   grpc_compression_algorithm compression_algorithm_;
+  bool initial_metadata_corked_;
 };
 
 }  // namespace grpc
diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h
index 4d9b074..ae3b8e4 100644
--- a/include/grpc++/impl/codegen/sync_stream.h
+++ b/include/grpc++/impl/codegen/sync_stream.h
@@ -100,22 +100,40 @@
  public:
   virtual ~WriterInterface() {}
 
-  /// Blocking write \a msg to the stream with options.
+  /// Blocking write \a msg to the stream with WriteOptions \a options.
   /// This is thread-safe with respect to \a Read
   ///
   /// \param msg The message to be written to the stream.
-  /// \param options Options affecting the write operation.
+  /// \param options The WriteOptions affecting the write operation.
   ///
   /// \return \a true on success, \a false when the stream has been closed.
-  virtual bool Write(const W& msg, const WriteOptions& options) = 0;
+  virtual bool Write(const W& msg, WriteOptions options) = 0;
 
-  /// Blocking write \a msg to the stream with default options.
+  /// Blocking write \a msg to the stream with default write options.
   /// This is thread-safe with respect to \a Read
   ///
   /// \param msg The message to be written to the stream.
   ///
   /// \return \a true on success, \a false when the stream has been closed.
   inline bool Write(const W& msg) { return Write(msg, WriteOptions()); }
+
+  /// Write \a msg and coalesce it with the writing of trailing metadata, using
+  /// WriteOptions \a options.
+  ///
+  /// For client, WriteLast is equivalent of performing Write and WritesDone in
+  /// a single step. \a msg and trailing metadata are coalesced and sent on wire
+  /// by calling this function.
+  /// For server, WriteLast buffers the \a msg. The writing of \a msg is held
+  /// until the service handler returns, where \a msg and trailing metadata are
+  /// coalesced and sent on wire. Note that WriteLast can only buffer \a msg up
+  /// to the flow control window size. If \a msg size is larger than the window
+  /// size, it will be sent on wire without buffering.
+  ///
+  /// \param[in] msg The message to be written to the stream.
+  /// \param[in] options The WriteOptions to be used to write this message.
+  void WriteLast(const W& msg, WriteOptions options) {
+    Write(msg, options.set_last_message());
+  }
 };
 
 /// Client-side interface for streaming reads of message of type \a R.
@@ -213,11 +231,13 @@
     finish_ops_.RecvMessage(response);
     finish_ops_.AllowNoMessage();
 
-    CallOpSet<CallOpSendInitialMetadata> ops;
-    ops.SendInitialMetadata(context->send_initial_metadata_,
-                            context->initial_metadata_flags());
-    call_.PerformOps(&ops);
-    cq_.Pluck(&ops);
+    if (!context_->initial_metadata_corked_) {
+      CallOpSet<CallOpSendInitialMetadata> ops;
+      ops.SendInitialMetadata(context->send_initial_metadata_,
+                              context->initial_metadata_flags());
+      call_.PerformOps(&ops);
+      cq_.Pluck(&ops);
+    }
   }
 
   void WaitForInitialMetadata() {
@@ -230,11 +250,24 @@
   }
 
   using WriterInterface<W>::Write;
-  bool Write(const W& msg, const WriteOptions& options) override {
-    CallOpSet<CallOpSendMessage> ops;
+  bool Write(const W& msg, WriteOptions options) override {
+    CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+              CallOpClientSendClose>
+        ops;
+
+    if (options.is_last_message()) {
+      options.set_buffer_hint();
+      ops.ClientSendClose();
+    }
+    if (context_->initial_metadata_corked_) {
+      ops.SendInitialMetadata(context_->send_initial_metadata_,
+                              context_->initial_metadata_flags());
+      context_->set_initial_metadata_corked(false);
+    }
     if (!ops.SendMessage(msg, options).ok()) {
       return false;
     }
+
     call_.PerformOps(&ops);
     return cq_.Pluck(&ops);
   }
@@ -293,11 +326,13 @@
   ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method,
                      ClientContext* context)
       : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
-    CallOpSet<CallOpSendInitialMetadata> ops;
-    ops.SendInitialMetadata(context->send_initial_metadata_,
-                            context->initial_metadata_flags());
-    call_.PerformOps(&ops);
-    cq_.Pluck(&ops);
+    if (!context_->initial_metadata_corked_) {
+      CallOpSet<CallOpSendInitialMetadata> ops;
+      ops.SendInitialMetadata(context->send_initial_metadata_,
+                              context->initial_metadata_flags());
+      call_.PerformOps(&ops);
+      cq_.Pluck(&ops);
+    }
   }
 
   void WaitForInitialMetadata() override {
@@ -325,9 +360,24 @@
   }
 
   using WriterInterface<W>::Write;
-  bool Write(const W& msg, const WriteOptions& options) override {
-    CallOpSet<CallOpSendMessage> ops;
-    if (!ops.SendMessage(msg, options).ok()) return false;
+  bool Write(const W& msg, WriteOptions options) override {
+    CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+              CallOpClientSendClose>
+        ops;
+
+    if (options.is_last_message()) {
+      options.set_buffer_hint();
+      ops.ClientSendClose();
+    }
+    if (context_->initial_metadata_corked_) {
+      ops.SendInitialMetadata(context_->send_initial_metadata_,
+                              context_->initial_metadata_flags());
+      context_->set_initial_metadata_corked(false);
+    }
+    if (!ops.SendMessage(msg, options).ok()) {
+      return false;
+    }
+
     call_.PerformOps(&ops);
     return cq_.Pluck(&ops);
   }
@@ -423,7 +473,10 @@
   }
 
   using WriterInterface<W>::Write;
-  bool Write(const W& msg, const WriteOptions& options) override {
+  bool Write(const W& msg, WriteOptions options) override {
+    if (options.is_last_message()) {
+      options.set_buffer_hint();
+    }
     CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
     if (!ops.SendMessage(msg, options).ok()) {
       return false;
@@ -485,7 +538,10 @@
     return call_->cq()->Pluck(&ops) && ops.got_message;
   }
 
-  bool Write(const W& msg, const WriteOptions& options) {
+  bool Write(const W& msg, WriteOptions options) {
+    if (options.is_last_message()) {
+      options.set_buffer_hint();
+    }
     CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
     if (!ops.SendMessage(msg, options).ok()) {
       return false;
@@ -523,7 +579,7 @@
   bool Read(R* msg) override { return body_.Read(msg); }
 
   using WriterInterface<W>::Write;
-  bool Write(const W& msg, const WriteOptions& options) override {
+  bool Write(const W& msg, WriteOptions options) override {
     return body_.Write(msg, options);
   }
 
@@ -562,8 +618,7 @@
   }
 
   using WriterInterface<ResponseType>::Write;
-  bool Write(const ResponseType& response,
-             const WriteOptions& options) override {
+  bool Write(const ResponseType& response, WriteOptions options) override {
     if (write_done_ || !read_done_) {
       return false;
     }
@@ -604,8 +659,7 @@
   }
 
   using WriterInterface<ResponseType>::Write;
-  bool Write(const ResponseType& response,
-             const WriteOptions& options) override {
+  bool Write(const ResponseType& response, WriteOptions options) override {
     return read_done_ && body_.Write(response, options);
   }
 
diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h
index 887c176..baea961 100644
--- a/include/grpc/impl/codegen/grpc_types.h
+++ b/include/grpc/impl/codegen/grpc_types.h
@@ -316,13 +316,16 @@
 /** Signal that GRPC_INITIAL_METADATA_WAIT_FOR_READY was explicitly set
     by the calling application. */
 #define GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET (0x00000080u)
+/** Signal that the initial metadata should be corked */
+#define GRPC_INITIAL_METADATA_CORKED (0x00000100u)
 
 /** Mask of all valid flags */
-#define GRPC_INITIAL_METADATA_USED_MASK       \
-  (GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST | \
-   GRPC_INITIAL_METADATA_WAIT_FOR_READY |     \
-   GRPC_INITIAL_METADATA_CACHEABLE_REQUEST |  \
-   GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET)
+#define GRPC_INITIAL_METADATA_USED_MASK                  \
+  (GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST |            \
+   GRPC_INITIAL_METADATA_WAIT_FOR_READY |                \
+   GRPC_INITIAL_METADATA_CACHEABLE_REQUEST |             \
+   GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET | \
+   GRPC_INITIAL_METADATA_CORKED)
 
 /** A single metadata element */
 typedef struct grpc_metadata {
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 8676a37..967470b 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -1210,8 +1210,13 @@
           }
         } else {
           GPR_ASSERT(s->id != 0);
-          grpc_chttp2_become_writable(exec_ctx, t, s,
-                                      GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED,
+          grpc_chttp2_stream_write_type write_type =
+              GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED;
+          if (op->send_message != NULL &&
+              (op->send_message->flags & GRPC_WRITE_BUFFER_HINT)) {
+            write_type = GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK;
+          }
+          grpc_chttp2_become_writable(exec_ctx, t, s, write_type,
                                       "op.send_initial_metadata");
         }
       } else {
diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc
index c073741..3d884cf 100644
--- a/src/cpp/client/client_context.cc
+++ b/src/cpp/client/client_context.cc
@@ -67,7 +67,8 @@
       call_canceled_(false),
       deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)),
       census_context_(nullptr),
-      propagate_from_call_(nullptr) {
+      propagate_from_call_(nullptr),
+      initial_metadata_corked_(false) {
   g_client_callbacks->DefaultConstructor(this);
 }
 
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index 32e8a41..0b5215e 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -484,6 +484,81 @@
   EXPECT_TRUE(recv_status.ok());
 }
 
+// Two pings and a final pong.
+TEST_P(AsyncEnd2endTest, SimpleClientStreamingWithCoalescingApi) {
+  ResetStub();
+
+  EchoRequest send_request;
+  EchoRequest recv_request;
+  EchoResponse send_response;
+  EchoResponse recv_response;
+  Status recv_status;
+  ClientContext cli_ctx;
+  ServerContext srv_ctx;
+  ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
+
+  send_request.set_message(GetParam().message_content);
+  cli_ctx.set_initial_metadata_corked(true);
+  // tag:1 never comes up since no op is performed
+  std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
+      stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
+
+  service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
+                                tag(2));
+
+  cli_stream->Write(send_request, tag(3));
+
+  // 65536(64KB) is the default flow control window size. Should change this
+  // number when default flow control window size changes. For the write of
+  // send_request larger than the flow control window size, tag:3 will not come
+  // up until server read is initiated. For write of send_request smaller than
+  // the flow control window size, the request can take the free ride with
+  // initial metadata due to coalescing, thus write tag:3 will come up here.
+  if (GetParam().message_content.length() < 65536) {
+    Verifier(GetParam().disable_blocking)
+        .Expect(2, true)
+        .Expect(3, true)
+        .Verify(cq_.get());
+  } else {
+    Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
+  }
+
+  srv_stream.Read(&recv_request, tag(4));
+
+  if (GetParam().message_content.length() < 65536) {
+    Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
+  } else {
+    Verifier(GetParam().disable_blocking)
+        .Expect(3, true)
+        .Expect(4, true)
+        .Verify(cq_.get());
+  }
+
+  EXPECT_EQ(send_request.message(), recv_request.message());
+
+  cli_stream->WriteLast(send_request, WriteOptions(), tag(5));
+  srv_stream.Read(&recv_request, tag(6));
+  Verifier(GetParam().disable_blocking)
+      .Expect(5, true)
+      .Expect(6, true)
+      .Verify(cq_.get());
+  EXPECT_EQ(send_request.message(), recv_request.message());
+
+  srv_stream.Read(&recv_request, tag(7));
+  Verifier(GetParam().disable_blocking).Expect(7, false).Verify(cq_.get());
+
+  send_response.set_message(recv_request.message());
+  srv_stream.Finish(send_response, Status::OK, tag(8));
+  cli_stream->Finish(&recv_status, tag(9));
+  Verifier(GetParam().disable_blocking)
+      .Expect(8, true)
+      .Expect(9, true)
+      .Verify(cq_.get());
+
+  EXPECT_EQ(send_response.message(), recv_response.message());
+  EXPECT_TRUE(recv_status.ok());
+}
+
 // One ping, two pongs.
 TEST_P(AsyncEnd2endTest, SimpleServerStreaming) {
   ResetStub();
@@ -540,6 +615,112 @@
   EXPECT_TRUE(recv_status.ok());
 }
 
+// One ping, two pongs. Using WriteAndFinish API
+TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWAF) {
+  ResetStub();
+
+  EchoRequest send_request;
+  EchoRequest recv_request;
+  EchoResponse send_response;
+  EchoResponse recv_response;
+  Status recv_status;
+  ClientContext cli_ctx;
+  ServerContext srv_ctx;
+  ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
+
+  send_request.set_message(GetParam().message_content);
+  std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
+      stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
+
+  service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
+                                 cq_.get(), cq_.get(), tag(2));
+
+  Verifier(GetParam().disable_blocking)
+      .Expect(1, true)
+      .Expect(2, true)
+      .Verify(cq_.get());
+  EXPECT_EQ(send_request.message(), recv_request.message());
+
+  send_response.set_message(recv_request.message());
+  srv_stream.Write(send_response, tag(3));
+  cli_stream->Read(&recv_response, tag(4));
+  Verifier(GetParam().disable_blocking)
+      .Expect(3, true)
+      .Expect(4, true)
+      .Verify(cq_.get());
+  EXPECT_EQ(send_response.message(), recv_response.message());
+
+  srv_stream.WriteAndFinish(send_response, WriteOptions(), Status::OK, tag(5));
+  cli_stream->Read(&recv_response, tag(6));
+  Verifier(GetParam().disable_blocking)
+      .Expect(5, true)
+      .Expect(6, true)
+      .Verify(cq_.get());
+  EXPECT_EQ(send_response.message(), recv_response.message());
+
+  cli_stream->Read(&recv_response, tag(7));
+  Verifier(GetParam().disable_blocking).Expect(7, false).Verify(cq_.get());
+
+  cli_stream->Finish(&recv_status, tag(8));
+  Verifier(GetParam().disable_blocking).Expect(8, true).Verify(cq_.get());
+
+  EXPECT_TRUE(recv_status.ok());
+}
+
+// One ping, two pongs. Using WriteLast API
+TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWL) {
+  ResetStub();
+
+  EchoRequest send_request;
+  EchoRequest recv_request;
+  EchoResponse send_response;
+  EchoResponse recv_response;
+  Status recv_status;
+  ClientContext cli_ctx;
+  ServerContext srv_ctx;
+  ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
+
+  send_request.set_message(GetParam().message_content);
+  std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
+      stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
+
+  service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
+                                 cq_.get(), cq_.get(), tag(2));
+
+  Verifier(GetParam().disable_blocking)
+      .Expect(1, true)
+      .Expect(2, true)
+      .Verify(cq_.get());
+  EXPECT_EQ(send_request.message(), recv_request.message());
+
+  send_response.set_message(recv_request.message());
+  srv_stream.Write(send_response, tag(3));
+  cli_stream->Read(&recv_response, tag(4));
+  Verifier(GetParam().disable_blocking)
+      .Expect(3, true)
+      .Expect(4, true)
+      .Verify(cq_.get());
+  EXPECT_EQ(send_response.message(), recv_response.message());
+
+  srv_stream.WriteLast(send_response, WriteOptions(), tag(5));
+  cli_stream->Read(&recv_response, tag(6));
+  srv_stream.Finish(Status::OK, tag(7));
+  Verifier(GetParam().disable_blocking)
+      .Expect(5, true)
+      .Expect(6, true)
+      .Expect(7, true)
+      .Verify(cq_.get());
+  EXPECT_EQ(send_response.message(), recv_response.message());
+
+  cli_stream->Read(&recv_response, tag(8));
+  Verifier(GetParam().disable_blocking).Expect(8, false).Verify(cq_.get());
+
+  cli_stream->Finish(&recv_status, tag(9));
+  Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get());
+
+  EXPECT_TRUE(recv_status.ok());
+}
+
 // One ping, one pong.
 TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) {
   ResetStub();
@@ -599,6 +780,144 @@
   EXPECT_TRUE(recv_status.ok());
 }
 
+// One ping, one pong. Using server:WriteAndFinish api
+TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWAF) {
+  ResetStub();
+
+  EchoRequest send_request;
+  EchoRequest recv_request;
+  EchoResponse send_response;
+  EchoResponse recv_response;
+  Status recv_status;
+  ClientContext cli_ctx;
+  ServerContext srv_ctx;
+  ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
+
+  send_request.set_message(GetParam().message_content);
+  cli_ctx.set_initial_metadata_corked(true);
+  std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
+      cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
+
+  service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
+                             tag(2));
+
+  cli_stream->WriteLast(send_request, WriteOptions(), tag(3));
+
+  // 65536(64KB) is the default flow control window size. Should change this
+  // number when default flow control window size changes. For the write of
+  // send_request larger than the flow control window size, tag:3 will not come
+  // up until server read is initiated. For write of send_request smaller than
+  // the flow control window size, the request can take the free ride with
+  // initial metadata due to coalescing, thus write tag:3 will come up here.
+  if (GetParam().message_content.length() < 65536) {
+    Verifier(GetParam().disable_blocking)
+        .Expect(2, true)
+        .Expect(3, true)
+        .Verify(cq_.get());
+  } else {
+    Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
+  }
+
+  srv_stream.Read(&recv_request, tag(4));
+
+  if (GetParam().message_content.length() < 65536) {
+    Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
+  } else {
+    Verifier(GetParam().disable_blocking)
+        .Expect(3, true)
+        .Expect(4, true)
+        .Verify(cq_.get());
+  }
+  EXPECT_EQ(send_request.message(), recv_request.message());
+
+  srv_stream.Read(&recv_request, tag(5));
+  Verifier(GetParam().disable_blocking).Expect(5, false).Verify(cq_.get());
+
+  send_response.set_message(recv_request.message());
+  srv_stream.WriteAndFinish(send_response, WriteOptions(), Status::OK, tag(6));
+  cli_stream->Read(&recv_response, tag(7));
+  Verifier(GetParam().disable_blocking)
+      .Expect(6, true)
+      .Expect(7, true)
+      .Verify(cq_.get());
+  EXPECT_EQ(send_response.message(), recv_response.message());
+
+  cli_stream->Finish(&recv_status, tag(8));
+  Verifier(GetParam().disable_blocking).Expect(8, true).Verify(cq_.get());
+
+  EXPECT_TRUE(recv_status.ok());
+}
+
+// One ping, one pong. Using server:WriteLast api
+TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWL) {
+  ResetStub();
+
+  EchoRequest send_request;
+  EchoRequest recv_request;
+  EchoResponse send_response;
+  EchoResponse recv_response;
+  Status recv_status;
+  ClientContext cli_ctx;
+  ServerContext srv_ctx;
+  ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
+
+  send_request.set_message(GetParam().message_content);
+  cli_ctx.set_initial_metadata_corked(true);
+  std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
+      cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
+
+  service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
+                             tag(2));
+
+  cli_stream->WriteLast(send_request, WriteOptions(), tag(3));
+
+  // 65536(64KB) is the default flow control window size. Should change this
+  // number when default flow control window size changes. For the write of
+  // send_request larger than the flow control window size, tag:3 will not come
+  // up until server read is initiated. For write of send_request smaller than
+  // the flow control window size, the request can take the free ride with
+  // initial metadata due to coalescing, thus write tag:3 will come up here.
+  if (GetParam().message_content.length() < 65536) {
+    Verifier(GetParam().disable_blocking)
+        .Expect(2, true)
+        .Expect(3, true)
+        .Verify(cq_.get());
+  } else {
+    Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
+  }
+
+  srv_stream.Read(&recv_request, tag(4));
+
+  if (GetParam().message_content.length() < 65536) {
+    Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
+  } else {
+    Verifier(GetParam().disable_blocking)
+        .Expect(3, true)
+        .Expect(4, true)
+        .Verify(cq_.get());
+  }
+  EXPECT_EQ(send_request.message(), recv_request.message());
+
+  srv_stream.Read(&recv_request, tag(5));
+  Verifier(GetParam().disable_blocking).Expect(5, false).Verify(cq_.get());
+
+  send_response.set_message(recv_request.message());
+  srv_stream.WriteLast(send_response, WriteOptions(), tag(6));
+  srv_stream.Finish(Status::OK, tag(7));
+  cli_stream->Read(&recv_response, tag(8));
+  Verifier(GetParam().disable_blocking)
+      .Expect(6, true)
+      .Expect(7, true)
+      .Expect(8, true)
+      .Verify(cq_.get());
+  EXPECT_EQ(send_response.message(), recv_response.message());
+
+  cli_stream->Finish(&recv_status, tag(9));
+  Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get());
+
+  EXPECT_TRUE(recv_status.ok());
+}
+
 // Metadata tests
 TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
   ResetStub();
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index df78557..d3a83b1 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -702,6 +702,21 @@
   EXPECT_TRUE(s.ok());
 }
 
+TEST_P(End2endTest, RequestStreamOneRequestWithCoalescingApi) {
+  ResetStub();
+  EchoRequest request;
+  EchoResponse response;
+  ClientContext context;
+
+  context.set_initial_metadata_corked(true);
+  auto stream = stub_->RequestStream(&context, &response);
+  request.set_message("hello");
+  stream->WriteLast(request, WriteOptions());
+  Status s = stream->Finish();
+  EXPECT_EQ(response.message(), request.message());
+  EXPECT_TRUE(s.ok());
+}
+
 TEST_P(End2endTest, RequestStreamTwoRequests) {
   ResetStub();
   EchoRequest request;
@@ -718,6 +733,22 @@
   EXPECT_TRUE(s.ok());
 }
 
+TEST_P(End2endTest, RequestStreamTwoRequestsWithCoalescingApi) {
+  ResetStub();
+  EchoRequest request;
+  EchoResponse response;
+  ClientContext context;
+
+  context.set_initial_metadata_corked(true);
+  auto stream = stub_->RequestStream(&context, &response);
+  request.set_message("hello");
+  EXPECT_TRUE(stream->Write(request));
+  stream->WriteLast(request, WriteOptions());
+  Status s = stream->Finish();
+  EXPECT_EQ(response.message(), "hellohello");
+  EXPECT_TRUE(s.ok());
+}
+
 TEST_P(End2endTest, ResponseStream) {
   ResetStub();
   EchoRequest request;
@@ -738,6 +769,27 @@
   EXPECT_TRUE(s.ok());
 }
 
+TEST_P(End2endTest, ResponseStreamWithCoalescingApi) {
+  ResetStub();
+  EchoRequest request;
+  EchoResponse response;
+  ClientContext context;
+  request.set_message("hello");
+  context.AddMetadata(kServerUseCoalescingApi, "1");
+
+  auto stream = stub_->ResponseStream(&context, request);
+  EXPECT_TRUE(stream->Read(&response));
+  EXPECT_EQ(response.message(), request.message() + "0");
+  EXPECT_TRUE(stream->Read(&response));
+  EXPECT_EQ(response.message(), request.message() + "1");
+  EXPECT_TRUE(stream->Read(&response));
+  EXPECT_EQ(response.message(), request.message() + "2");
+  EXPECT_FALSE(stream->Read(&response));
+
+  Status s = stream->Finish();
+  EXPECT_TRUE(s.ok());
+}
+
 TEST_P(End2endTest, BidiStream) {
   ResetStub();
   EchoRequest request;
@@ -770,6 +822,39 @@
   EXPECT_TRUE(s.ok());
 }
 
+TEST_P(End2endTest, BidiStreamWithCoalescingApi) {
+  ResetStub();
+  EchoRequest request;
+  EchoResponse response;
+  ClientContext context;
+  context.AddMetadata(kServerFinishAfterNReads, "3");
+  context.set_initial_metadata_corked(true);
+  grpc::string msg("hello");
+
+  auto stream = stub_->BidiStream(&context);
+
+  request.set_message(msg + "0");
+  EXPECT_TRUE(stream->Write(request));
+  EXPECT_TRUE(stream->Read(&response));
+  EXPECT_EQ(response.message(), request.message());
+
+  request.set_message(msg + "1");
+  EXPECT_TRUE(stream->Write(request));
+  EXPECT_TRUE(stream->Read(&response));
+  EXPECT_EQ(response.message(), request.message());
+
+  request.set_message(msg + "2");
+  stream->WriteLast(request, WriteOptions());
+  EXPECT_TRUE(stream->Read(&response));
+  EXPECT_EQ(response.message(), request.message());
+
+  EXPECT_FALSE(stream->Read(&response));
+  EXPECT_FALSE(stream->Read(&response));
+
+  Status s = stream->Finish();
+  EXPECT_TRUE(s.ok());
+}
+
 // Talk to the two services with the same name but different package names.
 // The two stubs are created on the same channel.
 TEST_P(End2endTest, DiffPackageServices) {
diff --git a/test/cpp/end2end/mock_test.cc b/test/cpp/end2end/mock_test.cc
index d6664da..fdb2732 100644
--- a/test/cpp/end2end/mock_test.cc
+++ b/test/cpp/end2end/mock_test.cc
@@ -89,7 +89,7 @@
     return true;
   }
 
-  bool Write(const EchoRequest& msg, const WriteOptions& options) override {
+  bool Write(const EchoRequest& msg, WriteOptions options) override {
     gpr_log(GPR_INFO, "mock recv msg %s", msg.message().c_str());
     last_message_ = msg.message();
     return true;
diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc
index 59d36e9..11729c4 100644
--- a/test/cpp/end2end/test_service_impl.cc
+++ b/test/cpp/end2end/test_service_impl.cc
@@ -246,6 +246,9 @@
   int server_try_cancel = GetIntValueFromMetadata(
       kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
 
+  int server_coalescing_api = GetIntValueFromMetadata(
+      kServerUseCoalescingApi, context->client_metadata(), 0);
+
   if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
     ServerTryCancel(context);
     return Status::CANCELLED;
@@ -260,7 +263,11 @@
 
   for (int i = 0; i < kNumResponseStreamsMsgs; i++) {
     response.set_message(request->message() + grpc::to_string(i));
-    writer->Write(response);
+    if (i == kNumResponseStreamsMsgs - 1 && server_coalescing_api != 0) {
+      writer->WriteLast(response, WriteOptions());
+    } else {
+      writer->Write(response);
+    }
   }
 
   if (server_try_cancel_thd != nullptr) {
@@ -305,10 +312,21 @@
         new std::thread(&TestServiceImpl::ServerTryCancel, this, context);
   }
 
+  // kServerFinishAfterNReads suggests after how many reads, the server should
+  // write the last message and send status (coalesced using WriteLast)
+  int server_write_last = GetIntValueFromMetadata(
+      kServerFinishAfterNReads, context->client_metadata(), 0);
+
+  int read_counts = 0;
   while (stream->Read(&request)) {
+    read_counts++;
     gpr_log(GPR_INFO, "recv msg %s", request.message().c_str());
     response.set_message(request.message());
-    stream->Write(response);
+    if (read_counts == server_write_last) {
+      stream->WriteLast(response, WriteOptions());
+    } else {
+      stream->Write(response);
+    }
   }
 
   if (server_try_cancel_thd != nullptr) {
diff --git a/test/cpp/end2end/test_service_impl.h b/test/cpp/end2end/test_service_impl.h
index 88e0be7..b1f02f9 100644
--- a/test/cpp/end2end/test_service_impl.h
+++ b/test/cpp/end2end/test_service_impl.h
@@ -48,6 +48,8 @@
 const char* const kServerCancelAfterReads = "cancel_after_reads";
 const char* const kServerTryCancelRequest = "server_try_cancel";
 const char* const kDebugInfoTrailerKey = "debug-info-bin";
+const char* const kServerFinishAfterNReads = "server_finish_after_n_reads";
+const char* const kServerUseCoalescingApi = "server_use_coalescing_api";
 
 typedef enum {
   DO_NOT_CANCEL = 0,
diff --git a/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc b/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc
index 00e37f7..42a5381 100644
--- a/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc
+++ b/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc
@@ -240,6 +240,173 @@
   state.SetBytesProcessed(msg_size * state.iterations() * 2);
 }
 
+// Repeatedly makes Streaming Bidi calls (exchanging a configurable number of
+// messages in each call) in a loop on a single channel. Different from
+// BM_StreamingPingPong we are using stream coalescing api, e.g. WriteLast,
+// WriteAndFinish, set_initial_metadata_corked. These apis aim at saving
+// sendmsg syscalls for streaming by coalescing 1. initial metadata with first
+// message; 2. final streaming message with trailing metadata.
+//
+//  First parmeter (i.e state.range(0)):  Message size (in bytes) to use
+//  Second parameter (i.e state.range(1)): Number of ping pong messages.
+//      Note: One ping-pong means two messages (one from client to server and
+//      the other from server to client):
+//  Third parameter (i.e state.range(2)): Switch between using WriteAndFinish
+//  API and WriteLast API for server.
+template <class Fixture, class ClientContextMutator, class ServerContextMutator>
+static void BM_StreamingPingPongWithCoalescingApi(benchmark::State& state) {
+  const int msg_size = state.range(0);
+  const int max_ping_pongs = state.range(1);
+  // This options is used to test out server API: WriteLast and WriteAndFinish
+  // respectively, since we can not use both of them on server side at the same
+  // time. Value 1 means we are testing out the WriteAndFinish API, and
+  // otherwise we are testing out the WriteLast API.
+  const int write_and_finish = state.range(2);
+
+  EchoTestService::AsyncService service;
+  std::unique_ptr<Fixture> fixture(new Fixture(&service));
+  {
+    EchoResponse send_response;
+    EchoResponse recv_response;
+    EchoRequest send_request;
+    EchoRequest recv_request;
+
+    if (msg_size > 0) {
+      send_request.set_message(std::string(msg_size, 'a'));
+      send_response.set_message(std::string(msg_size, 'b'));
+    }
+
+    std::unique_ptr<EchoTestService::Stub> stub(
+        EchoTestService::NewStub(fixture->channel()));
+
+    while (state.KeepRunning()) {
+      ServerContext svr_ctx;
+      ServerContextMutator svr_ctx_mut(&svr_ctx);
+      ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
+      service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
+                                fixture->cq(), tag(0));
+
+      ClientContext cli_ctx;
+      ClientContextMutator cli_ctx_mut(&cli_ctx);
+      cli_ctx.set_initial_metadata_corked(true);
+      // tag:1 here will never comes up, since we are not performing any op due
+      // to initial metadata coalescing.
+      auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
+
+      void* t;
+      bool ok;
+      int need_tags;
+
+      // Send 'max_ping_pongs' number of ping pong messages
+      int ping_pong_cnt = 0;
+      while (ping_pong_cnt < max_ping_pongs) {
+        if (ping_pong_cnt == max_ping_pongs - 1) {
+          request_rw->WriteLast(send_request, WriteOptions(), tag(2));
+        } else {
+          request_rw->Write(send_request, tag(2));  // Start client send
+        }
+
+        need_tags = (1 << 2) | (1 << 3) | (1 << 4) | (1 << 5);
+
+        if (ping_pong_cnt == 0) {
+          // wait for the server call structure (call_hook, etc.) to be
+          // initialized (async stream between client side and server side
+          // established). It is necessary when client init metadata is
+          // coalesced
+          GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+          while ((int)(intptr_t)t != 0) {
+            // In some cases tag:2 comes before tag:0 (write tag comes out
+            // first), this while loop is to make sure get tag:0.
+            int i = (int)(intptr_t)t;
+            GPR_ASSERT(need_tags & (1 << i));
+            need_tags &= ~(1 << i);
+            GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+          }
+        }
+
+        response_rw.Read(&recv_request, tag(3));   // Start server recv
+        request_rw->Read(&recv_response, tag(4));  // Start client recv
+
+        while (need_tags) {
+          GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+          GPR_ASSERT(ok);
+          int i = (int)(intptr_t)t;
+
+          // If server recv is complete, start the server send operation
+          if (i == 3) {
+            if (ping_pong_cnt == max_ping_pongs - 1) {
+              if (write_and_finish == 1) {
+                response_rw.WriteAndFinish(send_response, WriteOptions(),
+                                           Status::OK, tag(5));
+              } else {
+                response_rw.WriteLast(send_response, WriteOptions(), tag(5));
+                // WriteLast buffers the write, so neither server write op nor
+                // client read op will finish inside the while loop.
+                need_tags &= ~(1 << 4);
+                need_tags &= ~(1 << 5);
+              }
+            } else {
+              response_rw.Write(send_response, tag(5));
+            }
+          }
+
+          GPR_ASSERT(need_tags & (1 << i));
+          need_tags &= ~(1 << i);
+        }
+
+        ping_pong_cnt++;
+      }
+
+      if (max_ping_pongs == 0) {
+        need_tags = (1 << 6) | (1 << 7) | (1 << 8);
+      } else {
+        if (write_and_finish == 1) {
+          need_tags = (1 << 8);
+        } else {
+          // server's buffered write and the client's read of the buffered write
+          // tags should come up.
+          need_tags = (1 << 4) | (1 << 5) | (1 << 7) | (1 << 8);
+        }
+      }
+
+      // No message write or initial metadata write happened yet.
+      if (max_ping_pongs == 0) {
+        request_rw->WritesDone(tag(6));
+        // wait for server call data structure(call_hook, etc.) to be
+        // initialized, since initial metadata is corked.
+        GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+        while ((int)(intptr_t)t != 0) {
+          int i = (int)(intptr_t)t;
+          GPR_ASSERT(need_tags & (1 << i));
+          need_tags &= ~(1 << i);
+          GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+        }
+        response_rw.Finish(Status::OK, tag(7));
+      } else {
+        if (write_and_finish != 1) {
+          response_rw.Finish(Status::OK, tag(7));
+        }
+      }
+
+      Status recv_status;
+      request_rw->Finish(&recv_status, tag(8));
+
+      while (need_tags) {
+        GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+        int i = (int)(intptr_t)t;
+        GPR_ASSERT(need_tags & (1 << i));
+        need_tags &= ~(1 << i);
+      }
+
+      GPR_ASSERT(recv_status.ok());
+    }
+  }
+
+  fixture->Finish(state);
+  fixture.reset();
+  state.SetBytesProcessed(msg_size * state.iterations() * max_ping_pongs * 2);
+}
+
 /*******************************************************************************
  * CONFIGURATIONS
  */
@@ -270,6 +437,30 @@
 BENCHMARK_TEMPLATE(BM_StreamingPingPongMsgs, TCP, NoOpMutator, NoOpMutator)
     ->Range(0, 128 * 1024 * 1024);
 
+// Generate Args for StreamingPingPongWithCoalescingApi benchmarks. Currently
+// generates args for only "small streams" (i.e streams with 0, 1 or 2 messages)
+static void StreamingPingPongWithCoalescingApiArgs(
+    benchmark::internal::Benchmark* b) {
+  int msg_size = 0;
+
+  b->Args(
+      {0, 0, 0});  // spl case: 0 ping-pong msgs (msg_size doesn't matter here)
+  b->Args(
+      {0, 0, 1});  // spl case: 0 ping-pong msgs (msg_size doesn't matter here)
+
+  for (msg_size = 0; msg_size <= 128 * 1024 * 1024;
+       msg_size == 0 ? msg_size++ : msg_size *= 8) {
+    b->Args({msg_size, 1, 0});
+    b->Args({msg_size, 2, 0});
+    b->Args({msg_size, 1, 1});
+    b->Args({msg_size, 2, 1});
+  }
+}
+
+BENCHMARK_TEMPLATE(BM_StreamingPingPongWithCoalescingApi, InProcessCHTTP2,
+                   NoOpMutator, NoOpMutator)
+    ->Apply(StreamingPingPongWithCoalescingApiArgs);
+
 }  // namespace testing
 }  // namespace grpc