Merge branch 'master' into stream_corked_pr
diff --git a/include/grpc++/impl/codegen/async_stream.h b/include/grpc++/impl/codegen/async_stream.h
index 1a5cbbd..8f52989 100644
--- a/include/grpc++/impl/codegen/async_stream.h
+++ b/include/grpc++/impl/codegen/async_stream.h
@@ -101,6 +101,39 @@
/// \param[in] msg The message to be written.
/// \param[in] tag The tag identifying the operation.
virtual void Write(const W& msg, void* tag) = 0;
+
+ /// Request the writing of \a msg using WriteOptions \a options with
+ /// identifying tag \a tag.
+ ///
+ /// Only one write may be outstanding at any given time. This means that
+ /// after calling Write, one must wait to receive \a tag from the completion
+ /// queue BEFORE calling Write again.
+ /// WriteOptions \a options is used to set the write options of this message.
+ /// This is thread-safe with respect to \a Read
+ ///
+ /// \param[in] msg The message to be written.
+ /// \param[in] options The WriteOptions to be used to write this message.
+ /// \param[in] tag The tag identifying the operation.
+ virtual void Write(const W& msg, WriteOptions options, void* tag) = 0;
+
+ /// Request the writing of \a msg and coalesce it with the writing
+ /// of trailing metadata, using WriteOptions \a options with identifying tag
+ /// \a tag.
+ ///
+ /// For client, WriteLast is equivalent of performing Write and WritesDone in
+ /// a single step.
+ /// For server, WriteLast buffers the \a msg. The writing of \a msg is held
+ /// until Finish is called, where \a msg and trailing metadata are coalesced
+ /// and write is initiated. Note that WriteLast can only buffer \a msg up to
+ /// the flow control window size. If \a msg size is larger than the window
+ /// size, it will be sent on wire without buffering.
+ ///
+ /// \param[in] msg The message to be written.
+ /// \param[in] options The WriteOptions to be used to write this message.
+ /// \param[in] tag The tag identifying the operation.
+ void WriteLast(const W& msg, WriteOptions options, void* tag) {
+ Write(msg, options.set_last_message(), tag);
+ }
};
template <class R>
@@ -183,11 +216,17 @@
: context_(context), call_(channel->CreateCall(method, context, cq)) {
finish_ops_.RecvMessage(response);
finish_ops_.AllowNoMessage();
-
- init_ops_.set_output_tag(tag);
- init_ops_.SendInitialMetadata(context->send_initial_metadata_,
- context->initial_metadata_flags());
- call_.PerformOps(&init_ops_);
+ // if corked bit is set in context, we buffer up the initial metadata to
+ // coalesce with later message to be sent. No op is performed.
+ if (context_->initial_metadata_corked_) {
+ write_ops_.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
+ } else {
+ write_ops_.set_output_tag(tag);
+ write_ops_.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
+ call_.PerformOps(&write_ops_);
+ }
}
void ReadInitialMetadata(void* tag) override {
@@ -205,10 +244,21 @@
call_.PerformOps(&write_ops_);
}
+ void Write(const W& msg, WriteOptions options, void* tag) override {
+ write_ops_.set_output_tag(tag);
+ if (options.is_last_message()) {
+ options.set_buffer_hint();
+ write_ops_.ClientSendClose();
+ }
+ // TODO(ctiller): don't assert
+ GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
+ call_.PerformOps(&write_ops_);
+ }
+
void WritesDone(void* tag) override {
- writes_done_ops_.set_output_tag(tag);
- writes_done_ops_.ClientSendClose();
- call_.PerformOps(&writes_done_ops_);
+ write_ops_.set_output_tag(tag);
+ write_ops_.ClientSendClose();
+ call_.PerformOps(&write_ops_);
}
void Finish(Status* status, void* tag) override {
@@ -223,10 +273,9 @@
private:
ClientContext* context_;
Call call_;
- CallOpSet<CallOpSendInitialMetadata> init_ops_;
CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
- CallOpSet<CallOpSendMessage> write_ops_;
- CallOpSet<CallOpClientSendClose> writes_done_ops_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose>
+ write_ops_;
CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage,
CallOpClientRecvStatus>
finish_ops_;
@@ -253,10 +302,17 @@
const RpcMethod& method, ClientContext* context,
void* tag)
: context_(context), call_(channel->CreateCall(method, context, cq)) {
- init_ops_.set_output_tag(tag);
- init_ops_.SendInitialMetadata(context->send_initial_metadata_,
- context->initial_metadata_flags());
- call_.PerformOps(&init_ops_);
+ if (context_->initial_metadata_corked_) {
+ // if corked bit is set in context, we buffer up the initial metadata to
+ // coalesce with later message to be sent. No op is performed.
+ write_ops_.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
+ } else {
+ write_ops_.set_output_tag(tag);
+ write_ops_.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
+ call_.PerformOps(&write_ops_);
+ }
}
void ReadInitialMetadata(void* tag) override {
@@ -283,10 +339,21 @@
call_.PerformOps(&write_ops_);
}
+ void Write(const W& msg, WriteOptions options, void* tag) override {
+ write_ops_.set_output_tag(tag);
+ if (options.is_last_message()) {
+ options.set_buffer_hint();
+ write_ops_.ClientSendClose();
+ }
+ // TODO(ctiller): don't assert
+ GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
+ call_.PerformOps(&write_ops_);
+ }
+
void WritesDone(void* tag) override {
- writes_done_ops_.set_output_tag(tag);
- writes_done_ops_.ClientSendClose();
- call_.PerformOps(&writes_done_ops_);
+ write_ops_.set_output_tag(tag);
+ write_ops_.ClientSendClose();
+ call_.PerformOps(&write_ops_);
}
void Finish(Status* status, void* tag) override {
@@ -301,11 +368,10 @@
private:
ClientContext* context_;
Call call_;
- CallOpSet<CallOpSendInitialMetadata> init_ops_;
CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
- CallOpSet<CallOpSendMessage> write_ops_;
- CallOpSet<CallOpClientSendClose> writes_done_ops_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose>
+ write_ops_;
CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_;
};
@@ -395,6 +461,20 @@
public AsyncWriterInterface<W> {
public:
virtual void Finish(const Status& status, void* tag) = 0;
+
+ /// Request the writing of \a msg and coalesce it with trailing metadata which
+ /// contains \a status, using WriteOptions options with identifying tag \a
+ /// tag.
+ ///
+ /// WriteAndFinish is equivalent of performing WriteLast and Finish in a
+ /// single step.
+ ///
+ /// \param[in] msg The message to be written.
+ /// \param[in] options The WriteOptions to be used to write this message.
+ /// \param[in] status The Status that server returns to client.
+ /// \param[in] tag The tag identifying the operation.
+ virtual void WriteAndFinish(const W& msg, WriteOptions options,
+ const Status& status, void* tag) = 0;
};
template <class W>
@@ -418,29 +498,37 @@
void Write(const W& msg, void* tag) override {
write_ops_.set_output_tag(tag);
- if (!ctx_->sent_initial_metadata_) {
- write_ops_.SendInitialMetadata(ctx_->initial_metadata_,
- ctx_->initial_metadata_flags());
- if (ctx_->compression_level_set()) {
- write_ops_.set_compression_level(ctx_->compression_level());
- }
- ctx_->sent_initial_metadata_ = true;
- }
+ EnsureInitialMetadataSent(&write_ops_);
// TODO(ctiller): don't assert
GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok());
call_.PerformOps(&write_ops_);
}
+ void Write(const W& msg, WriteOptions options, void* tag) override {
+ write_ops_.set_output_tag(tag);
+ if (options.is_last_message()) {
+ options.set_buffer_hint();
+ }
+
+ EnsureInitialMetadataSent(&write_ops_);
+ // TODO(ctiller): don't assert
+ GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
+ call_.PerformOps(&write_ops_);
+ }
+
+ void WriteAndFinish(const W& msg, WriteOptions options, const Status& status,
+ void* tag) override {
+ write_ops_.set_output_tag(tag);
+ EnsureInitialMetadataSent(&write_ops_);
+ options.set_buffer_hint();
+ GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
+ write_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
+ call_.PerformOps(&write_ops_);
+ }
+
void Finish(const Status& status, void* tag) override {
finish_ops_.set_output_tag(tag);
- if (!ctx_->sent_initial_metadata_) {
- finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
- ctx_->initial_metadata_flags());
- if (ctx_->compression_level_set()) {
- finish_ops_.set_compression_level(ctx_->compression_level());
- }
- ctx_->sent_initial_metadata_ = true;
- }
+ EnsureInitialMetadataSent(&finish_ops_);
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_ops_);
}
@@ -448,10 +536,24 @@
private:
void BindCall(Call* call) override { call_ = *call; }
+ template <class T>
+ void EnsureInitialMetadataSent(T* ops) {
+ if (!ctx_->sent_initial_metadata_) {
+ ops->SendInitialMetadata(ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ ops->set_compression_level(ctx_->compression_level());
+ }
+ ctx_->sent_initial_metadata_ = true;
+ }
+ }
+
Call call_;
ServerContext* ctx_;
CallOpSet<CallOpSendInitialMetadata> meta_ops_;
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpServerSendStatus>
+ write_ops_;
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
};
@@ -462,6 +564,20 @@
public AsyncReaderInterface<R> {
public:
virtual void Finish(const Status& status, void* tag) = 0;
+
+ /// Request the writing of \a msg and coalesce it with trailing metadata which
+ /// contains \a status, using WriteOptions options with identifying tag \a
+ /// tag.
+ ///
+ /// WriteAndFinish is equivalent of performing WriteLast and Finish in a
+ /// single step.
+ ///
+ /// \param[in] msg The message to be written.
+ /// \param[in] options The WriteOptions to be used to write this message.
+ /// \param[in] status The Status that server returns to client.
+ /// \param[in] tag The tag identifying the operation.
+ virtual void WriteAndFinish(const W& msg, WriteOptions options,
+ const Status& status, void* tag) = 0;
};
template <class W, class R>
@@ -492,29 +608,36 @@
void Write(const W& msg, void* tag) override {
write_ops_.set_output_tag(tag);
- if (!ctx_->sent_initial_metadata_) {
- write_ops_.SendInitialMetadata(ctx_->initial_metadata_,
- ctx_->initial_metadata_flags());
- if (ctx_->compression_level_set()) {
- write_ops_.set_compression_level(ctx_->compression_level());
- }
- ctx_->sent_initial_metadata_ = true;
- }
+ EnsureInitialMetadataSent(&write_ops_);
// TODO(ctiller): don't assert
GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok());
call_.PerformOps(&write_ops_);
}
+ void Write(const W& msg, WriteOptions options, void* tag) override {
+ write_ops_.set_output_tag(tag);
+ if (options.is_last_message()) {
+ options.set_buffer_hint();
+ }
+ EnsureInitialMetadataSent(&write_ops_);
+ GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
+ call_.PerformOps(&write_ops_);
+ }
+
+ void WriteAndFinish(const W& msg, WriteOptions options, const Status& status,
+ void* tag) override {
+ write_ops_.set_output_tag(tag);
+ EnsureInitialMetadataSent(&write_ops_);
+ options.set_buffer_hint();
+ GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
+ write_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
+ call_.PerformOps(&write_ops_);
+ }
+
void Finish(const Status& status, void* tag) override {
finish_ops_.set_output_tag(tag);
- if (!ctx_->sent_initial_metadata_) {
- finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
- ctx_->initial_metadata_flags());
- if (ctx_->compression_level_set()) {
- finish_ops_.set_compression_level(ctx_->compression_level());
- }
- ctx_->sent_initial_metadata_ = true;
- }
+ EnsureInitialMetadataSent(&finish_ops_);
+
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_ops_);
}
@@ -524,11 +647,25 @@
void BindCall(Call* call) override { call_ = *call; }
+ template <class T>
+ void EnsureInitialMetadataSent(T* ops) {
+ if (!ctx_->sent_initial_metadata_) {
+ ops->SendInitialMetadata(ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ ops->set_compression_level(ctx_->compression_level());
+ }
+ ctx_->sent_initial_metadata_ = true;
+ }
+ }
+
Call call_;
ServerContext* ctx_;
CallOpSet<CallOpSendInitialMetadata> meta_ops_;
CallOpSet<CallOpRecvMessage<R>> read_ops_;
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpServerSendStatus>
+ write_ops_;
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
};
diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h
index 19a5ca2..a3f2be6 100644
--- a/include/grpc++/impl/codegen/call.h
+++ b/include/grpc++/impl/codegen/call.h
@@ -84,8 +84,9 @@
/// Per-message write options.
class WriteOptions {
public:
- WriteOptions() : flags_(0) {}
- WriteOptions(const WriteOptions& other) : flags_(other.flags_) {}
+ WriteOptions() : flags_(0), last_message_(false) {}
+ WriteOptions(const WriteOptions& other)
+ : flags_(other.flags_), last_message_(other.last_message_) {}
/// Clear all flags.
inline void Clear() { flags_ = 0; }
@@ -141,6 +142,43 @@
/// \sa GRPC_WRITE_BUFFER_HINT
inline bool get_buffer_hint() const { return GetBit(GRPC_WRITE_BUFFER_HINT); }
+ /// corked bit: aliases set_buffer_hint currently, with the intent that
+ /// set_buffer_hint will be removed in the future
+ inline WriteOptions& set_corked() {
+ SetBit(GRPC_WRITE_BUFFER_HINT);
+ return *this;
+ }
+
+ inline WriteOptions& clear_corked() {
+ ClearBit(GRPC_WRITE_BUFFER_HINT);
+ return *this;
+ }
+
+ inline bool is_corked() const { return GetBit(GRPC_WRITE_BUFFER_HINT); }
+
+ /// last-message bit: indicates this is the last message in a stream
+ /// client-side: makes Write the equivalent of performing Write, WritesDone
+ /// in a single step
+ /// server-side: hold the Write until the service handler returns (sync api)
+ /// or until Finish is called (async api)
+ inline WriteOptions& set_last_message() {
+ last_message_ = true;
+ return *this;
+ }
+
+ /// Clears flag indicating that this is the last message in a stream,
+ /// disabling coalescing.
+ inline WriteOptions& clear_last_messsage() {
+ last_message_ = false;
+ return *this;
+ }
+
+ /// Get value for the flag indicating that this is the last message, and
+ /// should be coalesced with trailing metadata.
+ ///
+ /// \sa GRPC_WRITE_LAST_MESSAGE
+ bool is_last_message() const { return last_message_; }
+
WriteOptions& operator=(const WriteOptions& rhs) {
flags_ = rhs.flags_;
return *this;
@@ -154,6 +192,7 @@
bool GetBit(const uint32_t mask) const { return (flags_ & mask) != 0; }
uint32_t flags_;
+ bool last_message_;
};
/// Default argument for CallOpSet. I is unused by the class, but can be
@@ -224,7 +263,7 @@
/// after use.
template <class M>
Status SendMessage(const M& message,
- const WriteOptions& options) GRPC_MUST_USE_RESULT;
+ WriteOptions options) GRPC_MUST_USE_RESULT;
template <class M>
Status SendMessage(const M& message) GRPC_MUST_USE_RESULT;
@@ -252,8 +291,7 @@
};
template <class M>
-Status CallOpSendMessage::SendMessage(const M& message,
- const WriteOptions& options) {
+Status CallOpSendMessage::SendMessage(const M& message, WriteOptions options) {
write_options_ = options;
return SerializationTraits<M>::Serialize(message, &send_buf_, &own_buf_);
}
diff --git a/include/grpc++/impl/codegen/client_context.h b/include/grpc++/impl/codegen/client_context.h
index b91c7f6..3c50e6b 100644
--- a/include/grpc++/impl/codegen/client_context.h
+++ b/include/grpc++/impl/codegen/client_context.h
@@ -281,6 +281,17 @@
/// \param algorithm The compression algorithm used for the client call.
void set_compression_algorithm(grpc_compression_algorithm algorithm);
+ /// Flag whether the initial metadata should be \a corked
+ ///
+ /// If \a corked is true, then the initial metadata will be colasced with the
+ /// write of first message in the stream.
+ ///
+ /// \param corked The flag indicating whether the initial metadata is to be
+ /// corked or not.
+ void set_initial_metadata_corked(bool corked) {
+ initial_metadata_corked_ = corked;
+ }
+
/// Return the peer uri in a string.
///
/// \warning This value is never authenticated or subject to any security
@@ -357,7 +368,8 @@
(cacheable_ ? GRPC_INITIAL_METADATA_CACHEABLE_REQUEST : 0) |
(wait_for_ready_explicitly_set_
? GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET
- : 0);
+ : 0) |
+ (initial_metadata_corked_ ? GRPC_INITIAL_METADATA_CORKED : 0);
}
grpc::string authority() { return authority_; }
@@ -384,6 +396,7 @@
PropagationOptions propagation_options_;
grpc_compression_algorithm compression_algorithm_;
+ bool initial_metadata_corked_;
};
} // namespace grpc
diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h
index 4d9b074..ae3b8e4 100644
--- a/include/grpc++/impl/codegen/sync_stream.h
+++ b/include/grpc++/impl/codegen/sync_stream.h
@@ -100,22 +100,40 @@
public:
virtual ~WriterInterface() {}
- /// Blocking write \a msg to the stream with options.
+ /// Blocking write \a msg to the stream with WriteOptions \a options.
/// This is thread-safe with respect to \a Read
///
/// \param msg The message to be written to the stream.
- /// \param options Options affecting the write operation.
+ /// \param options The WriteOptions affecting the write operation.
///
/// \return \a true on success, \a false when the stream has been closed.
- virtual bool Write(const W& msg, const WriteOptions& options) = 0;
+ virtual bool Write(const W& msg, WriteOptions options) = 0;
- /// Blocking write \a msg to the stream with default options.
+ /// Blocking write \a msg to the stream with default write options.
/// This is thread-safe with respect to \a Read
///
/// \param msg The message to be written to the stream.
///
/// \return \a true on success, \a false when the stream has been closed.
inline bool Write(const W& msg) { return Write(msg, WriteOptions()); }
+
+ /// Write \a msg and coalesce it with the writing of trailing metadata, using
+ /// WriteOptions \a options.
+ ///
+ /// For client, WriteLast is equivalent of performing Write and WritesDone in
+ /// a single step. \a msg and trailing metadata are coalesced and sent on wire
+ /// by calling this function.
+ /// For server, WriteLast buffers the \a msg. The writing of \a msg is held
+ /// until the service handler returns, where \a msg and trailing metadata are
+ /// coalesced and sent on wire. Note that WriteLast can only buffer \a msg up
+ /// to the flow control window size. If \a msg size is larger than the window
+ /// size, it will be sent on wire without buffering.
+ ///
+ /// \param[in] msg The message to be written to the stream.
+ /// \param[in] options The WriteOptions to be used to write this message.
+ void WriteLast(const W& msg, WriteOptions options) {
+ Write(msg, options.set_last_message());
+ }
};
/// Client-side interface for streaming reads of message of type \a R.
@@ -213,11 +231,13 @@
finish_ops_.RecvMessage(response);
finish_ops_.AllowNoMessage();
- CallOpSet<CallOpSendInitialMetadata> ops;
- ops.SendInitialMetadata(context->send_initial_metadata_,
- context->initial_metadata_flags());
- call_.PerformOps(&ops);
- cq_.Pluck(&ops);
+ if (!context_->initial_metadata_corked_) {
+ CallOpSet<CallOpSendInitialMetadata> ops;
+ ops.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
+ call_.PerformOps(&ops);
+ cq_.Pluck(&ops);
+ }
}
void WaitForInitialMetadata() {
@@ -230,11 +250,24 @@
}
using WriterInterface<W>::Write;
- bool Write(const W& msg, const WriteOptions& options) override {
- CallOpSet<CallOpSendMessage> ops;
+ bool Write(const W& msg, WriteOptions options) override {
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpClientSendClose>
+ ops;
+
+ if (options.is_last_message()) {
+ options.set_buffer_hint();
+ ops.ClientSendClose();
+ }
+ if (context_->initial_metadata_corked_) {
+ ops.SendInitialMetadata(context_->send_initial_metadata_,
+ context_->initial_metadata_flags());
+ context_->set_initial_metadata_corked(false);
+ }
if (!ops.SendMessage(msg, options).ok()) {
return false;
}
+
call_.PerformOps(&ops);
return cq_.Pluck(&ops);
}
@@ -293,11 +326,13 @@
ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context)
: context_(context), call_(channel->CreateCall(method, context, &cq_)) {
- CallOpSet<CallOpSendInitialMetadata> ops;
- ops.SendInitialMetadata(context->send_initial_metadata_,
- context->initial_metadata_flags());
- call_.PerformOps(&ops);
- cq_.Pluck(&ops);
+ if (!context_->initial_metadata_corked_) {
+ CallOpSet<CallOpSendInitialMetadata> ops;
+ ops.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
+ call_.PerformOps(&ops);
+ cq_.Pluck(&ops);
+ }
}
void WaitForInitialMetadata() override {
@@ -325,9 +360,24 @@
}
using WriterInterface<W>::Write;
- bool Write(const W& msg, const WriteOptions& options) override {
- CallOpSet<CallOpSendMessage> ops;
- if (!ops.SendMessage(msg, options).ok()) return false;
+ bool Write(const W& msg, WriteOptions options) override {
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpClientSendClose>
+ ops;
+
+ if (options.is_last_message()) {
+ options.set_buffer_hint();
+ ops.ClientSendClose();
+ }
+ if (context_->initial_metadata_corked_) {
+ ops.SendInitialMetadata(context_->send_initial_metadata_,
+ context_->initial_metadata_flags());
+ context_->set_initial_metadata_corked(false);
+ }
+ if (!ops.SendMessage(msg, options).ok()) {
+ return false;
+ }
+
call_.PerformOps(&ops);
return cq_.Pluck(&ops);
}
@@ -423,7 +473,10 @@
}
using WriterInterface<W>::Write;
- bool Write(const W& msg, const WriteOptions& options) override {
+ bool Write(const W& msg, WriteOptions options) override {
+ if (options.is_last_message()) {
+ options.set_buffer_hint();
+ }
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
if (!ops.SendMessage(msg, options).ok()) {
return false;
@@ -485,7 +538,10 @@
return call_->cq()->Pluck(&ops) && ops.got_message;
}
- bool Write(const W& msg, const WriteOptions& options) {
+ bool Write(const W& msg, WriteOptions options) {
+ if (options.is_last_message()) {
+ options.set_buffer_hint();
+ }
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
if (!ops.SendMessage(msg, options).ok()) {
return false;
@@ -523,7 +579,7 @@
bool Read(R* msg) override { return body_.Read(msg); }
using WriterInterface<W>::Write;
- bool Write(const W& msg, const WriteOptions& options) override {
+ bool Write(const W& msg, WriteOptions options) override {
return body_.Write(msg, options);
}
@@ -562,8 +618,7 @@
}
using WriterInterface<ResponseType>::Write;
- bool Write(const ResponseType& response,
- const WriteOptions& options) override {
+ bool Write(const ResponseType& response, WriteOptions options) override {
if (write_done_ || !read_done_) {
return false;
}
@@ -604,8 +659,7 @@
}
using WriterInterface<ResponseType>::Write;
- bool Write(const ResponseType& response,
- const WriteOptions& options) override {
+ bool Write(const ResponseType& response, WriteOptions options) override {
return read_done_ && body_.Write(response, options);
}
diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h
index 887c176..baea961 100644
--- a/include/grpc/impl/codegen/grpc_types.h
+++ b/include/grpc/impl/codegen/grpc_types.h
@@ -316,13 +316,16 @@
/** Signal that GRPC_INITIAL_METADATA_WAIT_FOR_READY was explicitly set
by the calling application. */
#define GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET (0x00000080u)
+/** Signal that the initial metadata should be corked */
+#define GRPC_INITIAL_METADATA_CORKED (0x00000100u)
/** Mask of all valid flags */
-#define GRPC_INITIAL_METADATA_USED_MASK \
- (GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST | \
- GRPC_INITIAL_METADATA_WAIT_FOR_READY | \
- GRPC_INITIAL_METADATA_CACHEABLE_REQUEST | \
- GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET)
+#define GRPC_INITIAL_METADATA_USED_MASK \
+ (GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST | \
+ GRPC_INITIAL_METADATA_WAIT_FOR_READY | \
+ GRPC_INITIAL_METADATA_CACHEABLE_REQUEST | \
+ GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET | \
+ GRPC_INITIAL_METADATA_CORKED)
/** A single metadata element */
typedef struct grpc_metadata {
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 8676a37..967470b 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -1210,8 +1210,13 @@
}
} else {
GPR_ASSERT(s->id != 0);
- grpc_chttp2_become_writable(exec_ctx, t, s,
- GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED,
+ grpc_chttp2_stream_write_type write_type =
+ GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED;
+ if (op->send_message != NULL &&
+ (op->send_message->flags & GRPC_WRITE_BUFFER_HINT)) {
+ write_type = GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK;
+ }
+ grpc_chttp2_become_writable(exec_ctx, t, s, write_type,
"op.send_initial_metadata");
}
} else {
diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc
index c073741..3d884cf 100644
--- a/src/cpp/client/client_context.cc
+++ b/src/cpp/client/client_context.cc
@@ -67,7 +67,8 @@
call_canceled_(false),
deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)),
census_context_(nullptr),
- propagate_from_call_(nullptr) {
+ propagate_from_call_(nullptr),
+ initial_metadata_corked_(false) {
g_client_callbacks->DefaultConstructor(this);
}
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index 32e8a41..0b5215e 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -484,6 +484,81 @@
EXPECT_TRUE(recv_status.ok());
}
+// Two pings and a final pong.
+TEST_P(AsyncEnd2endTest, SimpleClientStreamingWithCoalescingApi) {
+ ResetStub();
+
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+ ClientContext cli_ctx;
+ ServerContext srv_ctx;
+ ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
+
+ send_request.set_message(GetParam().message_content);
+ cli_ctx.set_initial_metadata_corked(true);
+ // tag:1 never comes up since no op is performed
+ std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
+ stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
+
+ service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
+ tag(2));
+
+ cli_stream->Write(send_request, tag(3));
+
+ // 65536(64KB) is the default flow control window size. Should change this
+ // number when default flow control window size changes. For the write of
+ // send_request larger than the flow control window size, tag:3 will not come
+ // up until server read is initiated. For write of send_request smaller than
+ // the flow control window size, the request can take the free ride with
+ // initial metadata due to coalescing, thus write tag:3 will come up here.
+ if (GetParam().message_content.length() < 65536) {
+ Verifier(GetParam().disable_blocking)
+ .Expect(2, true)
+ .Expect(3, true)
+ .Verify(cq_.get());
+ } else {
+ Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
+ }
+
+ srv_stream.Read(&recv_request, tag(4));
+
+ if (GetParam().message_content.length() < 65536) {
+ Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
+ } else {
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
+ }
+
+ EXPECT_EQ(send_request.message(), recv_request.message());
+
+ cli_stream->WriteLast(send_request, WriteOptions(), tag(5));
+ srv_stream.Read(&recv_request, tag(6));
+ Verifier(GetParam().disable_blocking)
+ .Expect(5, true)
+ .Expect(6, true)
+ .Verify(cq_.get());
+ EXPECT_EQ(send_request.message(), recv_request.message());
+
+ srv_stream.Read(&recv_request, tag(7));
+ Verifier(GetParam().disable_blocking).Expect(7, false).Verify(cq_.get());
+
+ send_response.set_message(recv_request.message());
+ srv_stream.Finish(send_response, Status::OK, tag(8));
+ cli_stream->Finish(&recv_status, tag(9));
+ Verifier(GetParam().disable_blocking)
+ .Expect(8, true)
+ .Expect(9, true)
+ .Verify(cq_.get());
+
+ EXPECT_EQ(send_response.message(), recv_response.message());
+ EXPECT_TRUE(recv_status.ok());
+}
+
// One ping, two pongs.
TEST_P(AsyncEnd2endTest, SimpleServerStreaming) {
ResetStub();
@@ -540,6 +615,112 @@
EXPECT_TRUE(recv_status.ok());
}
+// One ping, two pongs. Using WriteAndFinish API
+TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWAF) {
+ ResetStub();
+
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+ ClientContext cli_ctx;
+ ServerContext srv_ctx;
+ ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
+
+ send_request.set_message(GetParam().message_content);
+ std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
+ stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
+
+ service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
+ cq_.get(), cq_.get(), tag(2));
+
+ Verifier(GetParam().disable_blocking)
+ .Expect(1, true)
+ .Expect(2, true)
+ .Verify(cq_.get());
+ EXPECT_EQ(send_request.message(), recv_request.message());
+
+ send_response.set_message(recv_request.message());
+ srv_stream.Write(send_response, tag(3));
+ cli_stream->Read(&recv_response, tag(4));
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
+ EXPECT_EQ(send_response.message(), recv_response.message());
+
+ srv_stream.WriteAndFinish(send_response, WriteOptions(), Status::OK, tag(5));
+ cli_stream->Read(&recv_response, tag(6));
+ Verifier(GetParam().disable_blocking)
+ .Expect(5, true)
+ .Expect(6, true)
+ .Verify(cq_.get());
+ EXPECT_EQ(send_response.message(), recv_response.message());
+
+ cli_stream->Read(&recv_response, tag(7));
+ Verifier(GetParam().disable_blocking).Expect(7, false).Verify(cq_.get());
+
+ cli_stream->Finish(&recv_status, tag(8));
+ Verifier(GetParam().disable_blocking).Expect(8, true).Verify(cq_.get());
+
+ EXPECT_TRUE(recv_status.ok());
+}
+
+// One ping, two pongs. Using WriteLast API
+TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWL) {
+ ResetStub();
+
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+ ClientContext cli_ctx;
+ ServerContext srv_ctx;
+ ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
+
+ send_request.set_message(GetParam().message_content);
+ std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
+ stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
+
+ service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
+ cq_.get(), cq_.get(), tag(2));
+
+ Verifier(GetParam().disable_blocking)
+ .Expect(1, true)
+ .Expect(2, true)
+ .Verify(cq_.get());
+ EXPECT_EQ(send_request.message(), recv_request.message());
+
+ send_response.set_message(recv_request.message());
+ srv_stream.Write(send_response, tag(3));
+ cli_stream->Read(&recv_response, tag(4));
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
+ EXPECT_EQ(send_response.message(), recv_response.message());
+
+ srv_stream.WriteLast(send_response, WriteOptions(), tag(5));
+ cli_stream->Read(&recv_response, tag(6));
+ srv_stream.Finish(Status::OK, tag(7));
+ Verifier(GetParam().disable_blocking)
+ .Expect(5, true)
+ .Expect(6, true)
+ .Expect(7, true)
+ .Verify(cq_.get());
+ EXPECT_EQ(send_response.message(), recv_response.message());
+
+ cli_stream->Read(&recv_response, tag(8));
+ Verifier(GetParam().disable_blocking).Expect(8, false).Verify(cq_.get());
+
+ cli_stream->Finish(&recv_status, tag(9));
+ Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get());
+
+ EXPECT_TRUE(recv_status.ok());
+}
+
// One ping, one pong.
TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) {
ResetStub();
@@ -599,6 +780,144 @@
EXPECT_TRUE(recv_status.ok());
}
+// One ping, one pong. Using server:WriteAndFinish api
+TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWAF) {
+ ResetStub();
+
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+ ClientContext cli_ctx;
+ ServerContext srv_ctx;
+ ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
+
+ send_request.set_message(GetParam().message_content);
+ cli_ctx.set_initial_metadata_corked(true);
+ std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
+ cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
+
+ service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
+ tag(2));
+
+ cli_stream->WriteLast(send_request, WriteOptions(), tag(3));
+
+ // 65536(64KB) is the default flow control window size. Should change this
+ // number when default flow control window size changes. For the write of
+ // send_request larger than the flow control window size, tag:3 will not come
+ // up until server read is initiated. For write of send_request smaller than
+ // the flow control window size, the request can take the free ride with
+ // initial metadata due to coalescing, thus write tag:3 will come up here.
+ if (GetParam().message_content.length() < 65536) {
+ Verifier(GetParam().disable_blocking)
+ .Expect(2, true)
+ .Expect(3, true)
+ .Verify(cq_.get());
+ } else {
+ Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
+ }
+
+ srv_stream.Read(&recv_request, tag(4));
+
+ if (GetParam().message_content.length() < 65536) {
+ Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
+ } else {
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
+ }
+ EXPECT_EQ(send_request.message(), recv_request.message());
+
+ srv_stream.Read(&recv_request, tag(5));
+ Verifier(GetParam().disable_blocking).Expect(5, false).Verify(cq_.get());
+
+ send_response.set_message(recv_request.message());
+ srv_stream.WriteAndFinish(send_response, WriteOptions(), Status::OK, tag(6));
+ cli_stream->Read(&recv_response, tag(7));
+ Verifier(GetParam().disable_blocking)
+ .Expect(6, true)
+ .Expect(7, true)
+ .Verify(cq_.get());
+ EXPECT_EQ(send_response.message(), recv_response.message());
+
+ cli_stream->Finish(&recv_status, tag(8));
+ Verifier(GetParam().disable_blocking).Expect(8, true).Verify(cq_.get());
+
+ EXPECT_TRUE(recv_status.ok());
+}
+
+// One ping, one pong. Using server:WriteLast api
+TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWL) {
+ ResetStub();
+
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+ ClientContext cli_ctx;
+ ServerContext srv_ctx;
+ ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
+
+ send_request.set_message(GetParam().message_content);
+ cli_ctx.set_initial_metadata_corked(true);
+ std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
+ cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
+
+ service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
+ tag(2));
+
+ cli_stream->WriteLast(send_request, WriteOptions(), tag(3));
+
+ // 65536(64KB) is the default flow control window size. Should change this
+ // number when default flow control window size changes. For the write of
+ // send_request larger than the flow control window size, tag:3 will not come
+ // up until server read is initiated. For write of send_request smaller than
+ // the flow control window size, the request can take the free ride with
+ // initial metadata due to coalescing, thus write tag:3 will come up here.
+ if (GetParam().message_content.length() < 65536) {
+ Verifier(GetParam().disable_blocking)
+ .Expect(2, true)
+ .Expect(3, true)
+ .Verify(cq_.get());
+ } else {
+ Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
+ }
+
+ srv_stream.Read(&recv_request, tag(4));
+
+ if (GetParam().message_content.length() < 65536) {
+ Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
+ } else {
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
+ }
+ EXPECT_EQ(send_request.message(), recv_request.message());
+
+ srv_stream.Read(&recv_request, tag(5));
+ Verifier(GetParam().disable_blocking).Expect(5, false).Verify(cq_.get());
+
+ send_response.set_message(recv_request.message());
+ srv_stream.WriteLast(send_response, WriteOptions(), tag(6));
+ srv_stream.Finish(Status::OK, tag(7));
+ cli_stream->Read(&recv_response, tag(8));
+ Verifier(GetParam().disable_blocking)
+ .Expect(6, true)
+ .Expect(7, true)
+ .Expect(8, true)
+ .Verify(cq_.get());
+ EXPECT_EQ(send_response.message(), recv_response.message());
+
+ cli_stream->Finish(&recv_status, tag(9));
+ Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get());
+
+ EXPECT_TRUE(recv_status.ok());
+}
+
// Metadata tests
TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
ResetStub();
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index df78557..d3a83b1 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -702,6 +702,21 @@
EXPECT_TRUE(s.ok());
}
+TEST_P(End2endTest, RequestStreamOneRequestWithCoalescingApi) {
+ ResetStub();
+ EchoRequest request;
+ EchoResponse response;
+ ClientContext context;
+
+ context.set_initial_metadata_corked(true);
+ auto stream = stub_->RequestStream(&context, &response);
+ request.set_message("hello");
+ stream->WriteLast(request, WriteOptions());
+ Status s = stream->Finish();
+ EXPECT_EQ(response.message(), request.message());
+ EXPECT_TRUE(s.ok());
+}
+
TEST_P(End2endTest, RequestStreamTwoRequests) {
ResetStub();
EchoRequest request;
@@ -718,6 +733,22 @@
EXPECT_TRUE(s.ok());
}
+TEST_P(End2endTest, RequestStreamTwoRequestsWithCoalescingApi) {
+ ResetStub();
+ EchoRequest request;
+ EchoResponse response;
+ ClientContext context;
+
+ context.set_initial_metadata_corked(true);
+ auto stream = stub_->RequestStream(&context, &response);
+ request.set_message("hello");
+ EXPECT_TRUE(stream->Write(request));
+ stream->WriteLast(request, WriteOptions());
+ Status s = stream->Finish();
+ EXPECT_EQ(response.message(), "hellohello");
+ EXPECT_TRUE(s.ok());
+}
+
TEST_P(End2endTest, ResponseStream) {
ResetStub();
EchoRequest request;
@@ -738,6 +769,27 @@
EXPECT_TRUE(s.ok());
}
+TEST_P(End2endTest, ResponseStreamWithCoalescingApi) {
+ ResetStub();
+ EchoRequest request;
+ EchoResponse response;
+ ClientContext context;
+ request.set_message("hello");
+ context.AddMetadata(kServerUseCoalescingApi, "1");
+
+ auto stream = stub_->ResponseStream(&context, request);
+ EXPECT_TRUE(stream->Read(&response));
+ EXPECT_EQ(response.message(), request.message() + "0");
+ EXPECT_TRUE(stream->Read(&response));
+ EXPECT_EQ(response.message(), request.message() + "1");
+ EXPECT_TRUE(stream->Read(&response));
+ EXPECT_EQ(response.message(), request.message() + "2");
+ EXPECT_FALSE(stream->Read(&response));
+
+ Status s = stream->Finish();
+ EXPECT_TRUE(s.ok());
+}
+
TEST_P(End2endTest, BidiStream) {
ResetStub();
EchoRequest request;
@@ -770,6 +822,39 @@
EXPECT_TRUE(s.ok());
}
+TEST_P(End2endTest, BidiStreamWithCoalescingApi) {
+ ResetStub();
+ EchoRequest request;
+ EchoResponse response;
+ ClientContext context;
+ context.AddMetadata(kServerFinishAfterNReads, "3");
+ context.set_initial_metadata_corked(true);
+ grpc::string msg("hello");
+
+ auto stream = stub_->BidiStream(&context);
+
+ request.set_message(msg + "0");
+ EXPECT_TRUE(stream->Write(request));
+ EXPECT_TRUE(stream->Read(&response));
+ EXPECT_EQ(response.message(), request.message());
+
+ request.set_message(msg + "1");
+ EXPECT_TRUE(stream->Write(request));
+ EXPECT_TRUE(stream->Read(&response));
+ EXPECT_EQ(response.message(), request.message());
+
+ request.set_message(msg + "2");
+ stream->WriteLast(request, WriteOptions());
+ EXPECT_TRUE(stream->Read(&response));
+ EXPECT_EQ(response.message(), request.message());
+
+ EXPECT_FALSE(stream->Read(&response));
+ EXPECT_FALSE(stream->Read(&response));
+
+ Status s = stream->Finish();
+ EXPECT_TRUE(s.ok());
+}
+
// Talk to the two services with the same name but different package names.
// The two stubs are created on the same channel.
TEST_P(End2endTest, DiffPackageServices) {
diff --git a/test/cpp/end2end/mock_test.cc b/test/cpp/end2end/mock_test.cc
index d6664da..fdb2732 100644
--- a/test/cpp/end2end/mock_test.cc
+++ b/test/cpp/end2end/mock_test.cc
@@ -89,7 +89,7 @@
return true;
}
- bool Write(const EchoRequest& msg, const WriteOptions& options) override {
+ bool Write(const EchoRequest& msg, WriteOptions options) override {
gpr_log(GPR_INFO, "mock recv msg %s", msg.message().c_str());
last_message_ = msg.message();
return true;
diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc
index 59d36e9..11729c4 100644
--- a/test/cpp/end2end/test_service_impl.cc
+++ b/test/cpp/end2end/test_service_impl.cc
@@ -246,6 +246,9 @@
int server_try_cancel = GetIntValueFromMetadata(
kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
+ int server_coalescing_api = GetIntValueFromMetadata(
+ kServerUseCoalescingApi, context->client_metadata(), 0);
+
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
ServerTryCancel(context);
return Status::CANCELLED;
@@ -260,7 +263,11 @@
for (int i = 0; i < kNumResponseStreamsMsgs; i++) {
response.set_message(request->message() + grpc::to_string(i));
- writer->Write(response);
+ if (i == kNumResponseStreamsMsgs - 1 && server_coalescing_api != 0) {
+ writer->WriteLast(response, WriteOptions());
+ } else {
+ writer->Write(response);
+ }
}
if (server_try_cancel_thd != nullptr) {
@@ -305,10 +312,21 @@
new std::thread(&TestServiceImpl::ServerTryCancel, this, context);
}
+ // kServerFinishAfterNReads suggests after how many reads, the server should
+ // write the last message and send status (coalesced using WriteLast)
+ int server_write_last = GetIntValueFromMetadata(
+ kServerFinishAfterNReads, context->client_metadata(), 0);
+
+ int read_counts = 0;
while (stream->Read(&request)) {
+ read_counts++;
gpr_log(GPR_INFO, "recv msg %s", request.message().c_str());
response.set_message(request.message());
- stream->Write(response);
+ if (read_counts == server_write_last) {
+ stream->WriteLast(response, WriteOptions());
+ } else {
+ stream->Write(response);
+ }
}
if (server_try_cancel_thd != nullptr) {
diff --git a/test/cpp/end2end/test_service_impl.h b/test/cpp/end2end/test_service_impl.h
index 88e0be7..b1f02f9 100644
--- a/test/cpp/end2end/test_service_impl.h
+++ b/test/cpp/end2end/test_service_impl.h
@@ -48,6 +48,8 @@
const char* const kServerCancelAfterReads = "cancel_after_reads";
const char* const kServerTryCancelRequest = "server_try_cancel";
const char* const kDebugInfoTrailerKey = "debug-info-bin";
+const char* const kServerFinishAfterNReads = "server_finish_after_n_reads";
+const char* const kServerUseCoalescingApi = "server_use_coalescing_api";
typedef enum {
DO_NOT_CANCEL = 0,
diff --git a/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc b/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc
index 00e37f7..42a5381 100644
--- a/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc
+++ b/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc
@@ -240,6 +240,173 @@
state.SetBytesProcessed(msg_size * state.iterations() * 2);
}
+// Repeatedly makes Streaming Bidi calls (exchanging a configurable number of
+// messages in each call) in a loop on a single channel. Different from
+// BM_StreamingPingPong we are using stream coalescing api, e.g. WriteLast,
+// WriteAndFinish, set_initial_metadata_corked. These apis aim at saving
+// sendmsg syscalls for streaming by coalescing 1. initial metadata with first
+// message; 2. final streaming message with trailing metadata.
+//
+// First parmeter (i.e state.range(0)): Message size (in bytes) to use
+// Second parameter (i.e state.range(1)): Number of ping pong messages.
+// Note: One ping-pong means two messages (one from client to server and
+// the other from server to client):
+// Third parameter (i.e state.range(2)): Switch between using WriteAndFinish
+// API and WriteLast API for server.
+template <class Fixture, class ClientContextMutator, class ServerContextMutator>
+static void BM_StreamingPingPongWithCoalescingApi(benchmark::State& state) {
+ const int msg_size = state.range(0);
+ const int max_ping_pongs = state.range(1);
+ // This options is used to test out server API: WriteLast and WriteAndFinish
+ // respectively, since we can not use both of them on server side at the same
+ // time. Value 1 means we are testing out the WriteAndFinish API, and
+ // otherwise we are testing out the WriteLast API.
+ const int write_and_finish = state.range(2);
+
+ EchoTestService::AsyncService service;
+ std::unique_ptr<Fixture> fixture(new Fixture(&service));
+ {
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ EchoRequest send_request;
+ EchoRequest recv_request;
+
+ if (msg_size > 0) {
+ send_request.set_message(std::string(msg_size, 'a'));
+ send_response.set_message(std::string(msg_size, 'b'));
+ }
+
+ std::unique_ptr<EchoTestService::Stub> stub(
+ EchoTestService::NewStub(fixture->channel()));
+
+ while (state.KeepRunning()) {
+ ServerContext svr_ctx;
+ ServerContextMutator svr_ctx_mut(&svr_ctx);
+ ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
+ service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
+ fixture->cq(), tag(0));
+
+ ClientContext cli_ctx;
+ ClientContextMutator cli_ctx_mut(&cli_ctx);
+ cli_ctx.set_initial_metadata_corked(true);
+ // tag:1 here will never comes up, since we are not performing any op due
+ // to initial metadata coalescing.
+ auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
+
+ void* t;
+ bool ok;
+ int need_tags;
+
+ // Send 'max_ping_pongs' number of ping pong messages
+ int ping_pong_cnt = 0;
+ while (ping_pong_cnt < max_ping_pongs) {
+ if (ping_pong_cnt == max_ping_pongs - 1) {
+ request_rw->WriteLast(send_request, WriteOptions(), tag(2));
+ } else {
+ request_rw->Write(send_request, tag(2)); // Start client send
+ }
+
+ need_tags = (1 << 2) | (1 << 3) | (1 << 4) | (1 << 5);
+
+ if (ping_pong_cnt == 0) {
+ // wait for the server call structure (call_hook, etc.) to be
+ // initialized (async stream between client side and server side
+ // established). It is necessary when client init metadata is
+ // coalesced
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ while ((int)(intptr_t)t != 0) {
+ // In some cases tag:2 comes before tag:0 (write tag comes out
+ // first), this while loop is to make sure get tag:0.
+ int i = (int)(intptr_t)t;
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ }
+ }
+
+ response_rw.Read(&recv_request, tag(3)); // Start server recv
+ request_rw->Read(&recv_response, tag(4)); // Start client recv
+
+ while (need_tags) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ GPR_ASSERT(ok);
+ int i = (int)(intptr_t)t;
+
+ // If server recv is complete, start the server send operation
+ if (i == 3) {
+ if (ping_pong_cnt == max_ping_pongs - 1) {
+ if (write_and_finish == 1) {
+ response_rw.WriteAndFinish(send_response, WriteOptions(),
+ Status::OK, tag(5));
+ } else {
+ response_rw.WriteLast(send_response, WriteOptions(), tag(5));
+ // WriteLast buffers the write, so neither server write op nor
+ // client read op will finish inside the while loop.
+ need_tags &= ~(1 << 4);
+ need_tags &= ~(1 << 5);
+ }
+ } else {
+ response_rw.Write(send_response, tag(5));
+ }
+ }
+
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ }
+
+ ping_pong_cnt++;
+ }
+
+ if (max_ping_pongs == 0) {
+ need_tags = (1 << 6) | (1 << 7) | (1 << 8);
+ } else {
+ if (write_and_finish == 1) {
+ need_tags = (1 << 8);
+ } else {
+ // server's buffered write and the client's read of the buffered write
+ // tags should come up.
+ need_tags = (1 << 4) | (1 << 5) | (1 << 7) | (1 << 8);
+ }
+ }
+
+ // No message write or initial metadata write happened yet.
+ if (max_ping_pongs == 0) {
+ request_rw->WritesDone(tag(6));
+ // wait for server call data structure(call_hook, etc.) to be
+ // initialized, since initial metadata is corked.
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ while ((int)(intptr_t)t != 0) {
+ int i = (int)(intptr_t)t;
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ }
+ response_rw.Finish(Status::OK, tag(7));
+ } else {
+ if (write_and_finish != 1) {
+ response_rw.Finish(Status::OK, tag(7));
+ }
+ }
+
+ Status recv_status;
+ request_rw->Finish(&recv_status, tag(8));
+
+ while (need_tags) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ int i = (int)(intptr_t)t;
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ }
+
+ GPR_ASSERT(recv_status.ok());
+ }
+ }
+
+ fixture->Finish(state);
+ fixture.reset();
+ state.SetBytesProcessed(msg_size * state.iterations() * max_ping_pongs * 2);
+}
+
/*******************************************************************************
* CONFIGURATIONS
*/
@@ -270,6 +437,30 @@
BENCHMARK_TEMPLATE(BM_StreamingPingPongMsgs, TCP, NoOpMutator, NoOpMutator)
->Range(0, 128 * 1024 * 1024);
+// Generate Args for StreamingPingPongWithCoalescingApi benchmarks. Currently
+// generates args for only "small streams" (i.e streams with 0, 1 or 2 messages)
+static void StreamingPingPongWithCoalescingApiArgs(
+ benchmark::internal::Benchmark* b) {
+ int msg_size = 0;
+
+ b->Args(
+ {0, 0, 0}); // spl case: 0 ping-pong msgs (msg_size doesn't matter here)
+ b->Args(
+ {0, 0, 1}); // spl case: 0 ping-pong msgs (msg_size doesn't matter here)
+
+ for (msg_size = 0; msg_size <= 128 * 1024 * 1024;
+ msg_size == 0 ? msg_size++ : msg_size *= 8) {
+ b->Args({msg_size, 1, 0});
+ b->Args({msg_size, 2, 0});
+ b->Args({msg_size, 1, 1});
+ b->Args({msg_size, 2, 1});
+ }
+}
+
+BENCHMARK_TEMPLATE(BM_StreamingPingPongWithCoalescingApi, InProcessCHTTP2,
+ NoOpMutator, NoOpMutator)
+ ->Apply(StreamingPingPongWithCoalescingApiArgs);
+
} // namespace testing
} // namespace grpc