Merge pull request #1296 from yugui/fix/static

Correct storage classes of variables an functions
diff --git a/src/node/examples/pubsub/empty.proto b/src/node/examples/pubsub/empty.proto
deleted file mode 100644
index 5d6eb10..0000000
--- a/src/node/examples/pubsub/empty.proto
+++ /dev/null
@@ -1,44 +0,0 @@
-// This file will be moved to a new location.
-
-// Copyright 2015, Google Inc.
-// All rights reserved.
-//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are
-// met:
-//
-//     * Redistributions of source code must retain the above copyright
-// notice, this list of conditions and the following disclaimer.
-//     * Redistributions in binary form must reproduce the above
-// copyright notice, this list of conditions and the following disclaimer
-// in the documentation and/or other materials provided with the
-// distribution.
-//     * Neither the name of Google Inc. nor the names of its
-// contributors may be used to endorse or promote products derived from
-// this software without specific prior written permission.
-//
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-syntax = "proto2";
-
-package proto2;
-
-// An empty message that you can re-use to avoid defining duplicated empty
-// messages in your project. A typical example is to use it as argument or the
-// return value of a service API. For instance:
-//
-//   service Foo {
-//     rpc Bar (proto2.Empty) returns (proto2.Empty) { };
-//   };
-//
-message Empty {}
diff --git a/src/node/examples/pubsub/label.proto b/src/node/examples/pubsub/label.proto
deleted file mode 100644
index 0af15a2..0000000
--- a/src/node/examples/pubsub/label.proto
+++ /dev/null
@@ -1,79 +0,0 @@
-// This file will be moved to a new location.
-
-// Copyright 2015, Google Inc.
-// All rights reserved.
-//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are
-// met:
-//
-//     * Redistributions of source code must retain the above copyright
-// notice, this list of conditions and the following disclaimer.
-//     * Redistributions in binary form must reproduce the above
-// copyright notice, this list of conditions and the following disclaimer
-// in the documentation and/or other materials provided with the
-// distribution.
-//     * Neither the name of Google Inc. nor the names of its
-// contributors may be used to endorse or promote products derived from
-// this software without specific prior written permission.
-//
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-// Labels provide a way to associate user-defined metadata with various
-// objects.  Labels may be used to organize objects into non-hierarchical
-// groups; think metadata tags attached to mp3s.
-
-syntax = "proto2";
-
-package tech.label;
-
-// A key-value pair applied to a given object.
-message Label {
-  // The key of a label is a syntactically valid URL (as per RFC 1738) with
-  // the "scheme" and initial slashes omitted and with the additional
-  // restrictions noted below.  Each key should be globally unique.  The
-  // "host" portion is called the "namespace" and is not necessarily
-  // resolvable to a network endpoint.  Instead, the namespace indicates what
-  // system or entity defines the semantics of the label.  Namespaces do not
-  // restrict the set of objects to which a label may be associated.
-  //
-  // Keys are defined by the following grammar:
-  //
-  //   key          = hostname "/" kpath
-  //   kpath        = ksegment *[ "/" ksegment ]
-  //   ksegment     = alphadigit | *[ alphadigit | "-" | "_" | "." ]
-  //
-  // where "hostname" and "alphadigit" are defined as in RFC 1738.
-  //
-  // Example key:
-  //   spanner.google.com/universe
-  required string key = 1;
-
-  // The value of the label.
-  oneof value {
-    // A string value.
-    string str_value = 2;
-    // An integer value.
-    int64 num_value = 3;
-  }
-}
-
-// A collection of labels, such as the set of all labels attached to an
-// object.  Each label in the set must have a different key.
-//
-// Users should prefer to embed "repeated Label" directly when possible.
-// This message should only be used in cases where that isn't possible (e.g.
-// with oneof).
-message Labels {
-  repeated Label label = 1;
-}
diff --git a/src/node/examples/pubsub/pubsub.proto b/src/node/examples/pubsub/pubsub.proto
deleted file mode 100644
index 41a3547..0000000
--- a/src/node/examples/pubsub/pubsub.proto
+++ /dev/null
@@ -1,734 +0,0 @@
-// This file will be moved to a new location.
-
-// Copyright 2015, Google Inc.
-// All rights reserved.
-//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are
-// met:
-//
-//     * Redistributions of source code must retain the above copyright
-// notice, this list of conditions and the following disclaimer.
-//     * Redistributions in binary form must reproduce the above
-// copyright notice, this list of conditions and the following disclaimer
-// in the documentation and/or other materials provided with the
-// distribution.
-//     * Neither the name of Google Inc. nor the names of its
-// contributors may be used to endorse or promote products derived from
-// this software without specific prior written permission.
-//
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-
-// Specification of the Pubsub API.
-
-syntax = "proto2";
-
-import "empty.proto";
-import "label.proto";
-
-package tech.pubsub;
-
-// -----------------------------------------------------------------------------
-// Overview of the Pubsub API
-// -----------------------------------------------------------------------------
-
-// This file describes an API for a Pubsub system.  This system provides a
-// reliable many-to-many communication mechanism between independently written
-// publishers and subscribers where the publisher publishes messages to "topics"
-// and each subscriber creates a "subscription" and consumes messages from it.
-//
-// (a) The pubsub system maintains bindings between topics and subscriptions.
-// (b) A publisher publishes messages into a topic.
-// (c) The pubsub system delivers messages from topics into relevant
-//     subscriptions.
-// (d) A subscriber receives pending messages from its subscription and
-//     acknowledges or nacks each one to the pubsub system.
-// (e) The pubsub system removes acknowledged messages from that subscription.
-
-// -----------------------------------------------------------------------------
-// Data Model
-// -----------------------------------------------------------------------------
-
-// The data model consists of the following:
-//
-// * Topic: A topic is a resource to which messages are published by publishers.
-//     Topics are named, and the name of the topic is unique within the pubsub
-//     system.
-//
-// * Subscription: A subscription records the subscriber's interest in a topic.
-//     It can optionally include a query to select a subset of interesting
-//     messages.  The pubsub system maintains a logical cursor tracking the
-//     matching messages which still need to be delivered and acked so that
-//     they can retried as needed.  The set of messages that have not been
-//     acknowledged is called the subscription backlog.
-//
-// * Message: A message is a unit of data that flows in the system.  It contains
-//     opaque data from the publisher along with its labels.
-//
-// * Message Labels (optional): A set of opaque key, value pairs assigned
-//     by the publisher which the subscriber can use for filtering out messages
-//     in the topic.  For example, a label with key "foo.com/device_type" and
-//     value "mobile" may be added for messages that are only relevant for a
-//     mobile subscriber; a subscriber on a phone may decide to create a
-//     subscription only for messages that have this label.
-
-// -----------------------------------------------------------------------------
-// Publisher Flow
-// -----------------------------------------------------------------------------
-
-// A publisher publishes messages to the topic using the Publish request:
-//
-//   PubsubMessage message;
-//   message.set_data("....");
-//   Label label;
-//   label.set_key("foo.com/key1");
-//   label.set_str_value("value1");
-//   message.add_label(label);
-//   PublishRequest request;
-//   request.set_topic("topicName");
-//   request.set_message(message);
-//   PublisherService.Publish(request);
-
-// -----------------------------------------------------------------------------
-// Subscriber Flow
-// -----------------------------------------------------------------------------
-
-// The subscriber part of the API is richer than the publisher part and has a
-// number of concepts w.r.t. subscription creation and monitoring:
-//
-// (1) A subscriber creates a subscription using the CreateSubscription call.
-//     It may specify an optional "query" to indicate that it wants to receive
-//     only messages with a certain set of labels using the label query syntax.
-//     It may also specify an optional truncation policy to indicate when old
-//     messages from the subcription can be removed.
-//
-// (2) A subscriber receives messages in one of two ways: via push or pull.
-//
-// (a) To receive messages via push, the PushConfig field must be specified in
-//     the Subscription parameter when creating a subscription.  The PushConfig
-//     specifies an endpoint at which the subscriber must expose the
-//     PushEndpointService.  Messages are received via the HandlePubsubEvent
-//     method.  The push subscriber responds to the HandlePubsubEvent method
-//     with a result code that indicates one of three things: Ack (the message
-//     has been successfully processed and the Pubsub system may delete it),
-//     Nack (the message has been rejected, the Pubsub system should resend it
-//     at a later time), or Push-Back (this is a Nack with the additional
-//     semantics that the subscriber is overloaded and the pubsub system should
-//     back off on the rate at which it is invoking HandlePubsubEvent).  The
-//     endpoint may be a load balancer for better scalability.
-//
-// (b) To receive messages via pull a subscriber calls the Pull method on the
-//     SubscriberService to get messages from the subscription.  For each
-//     individual message, the subscriber may use the ack_id received in the
-//     PullResponse to Ack the message, Nack the message, or modify the ack
-//     deadline with ModifyAckDeadline.  See the
-//     Subscription.ack_deadline_seconds field documentation for details on the
-//     ack deadline behavior.
-//
-//     Note: Messages may be consumed in parallel by multiple subscribers making
-//       Pull calls to the same subscription; this will result in the set of
-//       messages from the subscription being shared and each subscriber
-//       receiving a subset of the messages.
-//
-// (4) The subscriber can explicitly truncate the current subscription.
-//
-// (5) "Truncated" events are delivered when a subscription is
-//     truncated, whether due to the subscription's truncation policy
-//     or an explicit request from the subscriber.
-//
-// Subscription creation:
-//
-//   Subscription subscription;
-//   subscription.set_topic("topicName");
-//   subscription.set_name("subscriptionName");
-//   subscription.push_config().set_push_endpoint("machinename:8888");
-//   SubscriberService.CreateSubscription(subscription);
-//
-// Consuming messages via push:
-//
-//  TODO(eschapira): Add HTTP push example.
-//
-//  The port 'machinename:8888' must be bound to a stubby server that implements
-//  the PushEndpointService with the following method:
-//
-//   int HandlePubsubEvent(PubsubEvent event) {
-//     if (event.subscription().equals("subscriptionName")) {
-//       if (event.has_message()) {
-//         Process(event.message().data());
-//       } else if (event.truncated()) {
-//         ProcessTruncatedEvent();
-//       }
-//     }
-//     return OK;  // This return code implies an acknowledgment
-//   }
-//
-// Consuming messages via pull:
-//
-//  The subscription must be created without setting the push_config field.
-//
-//   PullRequest pull_request;
-//   pull_request.set_subscription("subscriptionName");
-//   pull_request.set_return_immediately(false);
-//   while (true) {
-//     PullResponse pull_response;
-//     if (SubscriberService.Pull(pull_request, pull_response) == OK) {
-//       PubsubEvent event = pull_response.pubsub_event();
-//       if (event.has_message()) {
-//         Process(event.message().data());
-//       } else if (event.truncated()) {
-//         ProcessTruncatedEvent();
-//       }
-//       AcknowledgeRequest ack_request;
-//       ackRequest.set_subscription("subscriptionName");
-//       ackRequest.set_ack_id(pull_response.ack_id());
-//       SubscriberService.Acknowledge(ack_request);
-//     }
-//   }
-
-// -----------------------------------------------------------------------------
-// Reliability Semantics
-// -----------------------------------------------------------------------------
-
-// When a subscriber successfully creates a subscription using
-// Subscriber.CreateSubscription, it establishes a "subscription point" with
-// respect to that subscription - the subscriber is guaranteed to receive any
-// message published after this subscription point that matches the
-// subscription's query.  Note that messages published before the Subscription
-// point may or may not be delivered.
-//
-// If the system truncates the subscription according to the specified
-// truncation policy, the system delivers a subscription status event with the
-// "truncated" field set to true.  We refer to such events as "truncation
-// events".  A truncation event:
-//
-// * Informs the subscriber that part of the subscription messages have been
-//   discarded.  The subscriber may want to recover from the message loss, e.g.,
-//   by resyncing its state with its backend.
-// * Establishes a new subscription point, i.e., the subscriber is guaranteed to
-//   receive all changes published after the trunction event is received (or
-//   until another truncation event is received).
-//
-// Note that messages are not delivered in any particular order by the pubsub
-// system.  Furthermore, the system guarantees at-least-once delivery
-// of each message or truncation events until acked.
-
-// -----------------------------------------------------------------------------
-// Deletion
-// -----------------------------------------------------------------------------
-
-// Both topics and subscriptions may be deleted.  Deletion of a topic implies
-// deletion of all attached subscriptions.
-//
-// When a subscription is deleted directly by calling DeleteSubscription, all
-// messages are immediately dropped.  If it is a pull subscriber, future pull
-// requests will return NOT_FOUND.
-//
-// When a topic is deleted all corresponding subscriptions are immediately
-// deleted, and subscribers experience the same behavior as directly deleting
-// the subscription.
-
-// -----------------------------------------------------------------------------
-// The Publisher service and its protos.
-// -----------------------------------------------------------------------------
-
-// The service that an application uses to manipulate topics, and to send
-// messages to a topic.
-service PublisherService {
-
-  // Creates the given topic with the given name.
-  rpc CreateTopic(Topic) returns (Topic) {
-  }
-
-  // Adds a message to the topic.  Returns NOT_FOUND if the topic does not
-  // exist.
-  // (-- For different error code values returned via Stubby, see
-  // util/task/codes.proto. --)
-  rpc Publish(PublishRequest) returns (proto2.Empty) {
-  }
-
-  // Adds one or more messages to the topic. Returns NOT_FOUND if the topic does
-  // not exist.
-  rpc PublishBatch(PublishBatchRequest) returns (PublishBatchResponse) {
-  }
-
-  // Gets the configuration of a topic. Since the topic only has the name
-  // attribute, this method is only useful to check the existence of a topic.
-  // If other attributes are added in the future, they will be returned here.
-  rpc GetTopic(GetTopicRequest) returns (Topic) {
-  }
-
-  // Lists matching topics.
-  rpc ListTopics(ListTopicsRequest) returns (ListTopicsResponse) {
-  }
-
-  // Deletes the topic with the given name.  All subscriptions to this topic
-  // are also deleted. Returns NOT_FOUND if the topic does not exist.
-  // After a topic is deleted, a new topic may be created with the same name.
-  rpc DeleteTopic(DeleteTopicRequest) returns (proto2.Empty)  {
-  }
-}
-
-// A topic resource.
-message Topic {
-  // Name of the topic.
-  optional string name = 1;
-}
-
-// A message data and its labels.
-message PubsubMessage {
-  // The message payload.
-  optional bytes data = 1;
-
-  // Optional list of labels for this message. Keys in this collection must
-  // be unique.
-  //(-- TODO(eschapira): Define how key namespace may be scoped to the topic.--)
-  repeated tech.label.Label label = 2;
-
-  // ID of this message assigned by the server at publication time. Guaranteed
-  // to be unique within the topic. This value may be read by a subscriber
-  // that receives a PubsubMessage via a Pull call or a push delivery. It must
-  // not be populated by a publisher in a Publish call.
-  optional string message_id = 3;
-}
-
-// Request for the GetTopic method.
-message GetTopicRequest {
-  // The name of the topic to get.
-  optional string topic = 1;
-}
-
-// Request for the Publish method.
-message PublishRequest {
-  // The message in the request will be published on this topic.
-  optional string topic = 1;
-
-  // The message to publish.
-  optional PubsubMessage message = 2;
-}
-
-// Request for the PublishBatch method.
-message PublishBatchRequest {
-  // The messages in the request will be published on this topic.
-  optional string topic = 1;
-
-  // The messages to publish.
-  repeated PubsubMessage messages = 2;
-}
-
-// Response for the PublishBatch method.
-message PublishBatchResponse {
-  // The server-assigned ID of each published message, in the same order as
-  // the messages in the request. IDs are guaranteed to be unique within
-  // the topic.
-  repeated string message_ids = 1;
-}
-
-// Request for the ListTopics method.
-message ListTopicsRequest {
-  // A valid label query expression.
-  //
-  optional string query = 1;
-
-  // Maximum number of topics to return.
-  // (-- If not specified or <= 0, the implementation will select a reasonable
-  // value. --)
-  optional int32 max_results = 2;
-
-  // The value obtained in the last <code>ListTopicsResponse</code>
-  // for continuation.
-  optional string page_token = 3;
-
-}
-
-// Response for the ListTopics method.
-message ListTopicsResponse {
-  // The resulting topics.
-  repeated Topic topic = 1;
-
-  // If not empty, indicates that there are more topics that match the request,
-  // and this value should be passed to the next <code>ListTopicsRequest</code>
-  // to continue.
-  optional string next_page_token = 2;
-}
-
-// Request for the Delete method.
-message DeleteTopicRequest {
-  // Name of the topic to delete.
-  optional string topic = 1;
-}
-
-// -----------------------------------------------------------------------------
-// The Subscriber service and its protos.
-// -----------------------------------------------------------------------------
-
-// The service that an application uses to manipulate subscriptions and to
-// consume messages from a subscription via the pull method.
-service SubscriberService {
-
-  // Creates a subscription on a given topic for a given subscriber.
-  // If the subscription already exists, returns ALREADY_EXISTS.
-  // If the corresponding topic doesn't exist, returns NOT_FOUND.
-  //
-  // If the name is not provided in the request, the server will assign a random
-  // name for this subscription on the same project as the topic.
-  rpc CreateSubscription(Subscription) returns (Subscription) {
-  }
-
-  // Gets the configuration details of a subscription.
-  rpc GetSubscription(GetSubscriptionRequest) returns (Subscription) {
-  }
-
-  // Lists matching subscriptions.
-  rpc ListSubscriptions(ListSubscriptionsRequest)
-      returns (ListSubscriptionsResponse) {
-  }
-
-  // Deletes an existing subscription. All pending messages in the subscription
-  // are immediately dropped. Calls to Pull after deletion will return
-  // NOT_FOUND.
-  rpc DeleteSubscription(DeleteSubscriptionRequest) returns (proto2.Empty) {
-  }
-
-  // Removes all the pending messages in the subscription and releases the
-  // storage associated with them. Results in a truncation event to be sent to
-  // the subscriber. Messages added after this call returns are stored in the
-  // subscription as before.
-  rpc TruncateSubscription(TruncateSubscriptionRequest) returns (proto2.Empty) {
-  }
-
-  //
-  // Push subscriber calls.
-  //
-
-  // Modifies the <code>PushConfig</code> for a specified subscription.
-  // This method can be used to suspend the flow of messages to an endpoint
-  // by clearing the <code>PushConfig</code> field in the request. Messages
-  // will be accumulated for delivery even if no push configuration is
-  // defined or while the configuration is modified.
-  rpc ModifyPushConfig(ModifyPushConfigRequest) returns (proto2.Empty) {
-  }
-
-  //
-  // Pull Subscriber calls
-  //
-
-  // Pulls a single message from the server.
-  // If return_immediately is true, and no messages are available in the
-  // subscription, this method returns FAILED_PRECONDITION. The system is free
-  // to return an UNAVAILABLE error if no messages are available in a
-  // reasonable amount of time (to reduce system load).
-  rpc Pull(PullRequest) returns (PullResponse) {
-  }
-
-  // Pulls messages from the server. Returns an empty list if there are no
-  // messages available in the backlog. The system is free to return UNAVAILABLE
-  // if there are too many pull requests outstanding for the given subscription.
-  rpc PullBatch(PullBatchRequest) returns (PullBatchResponse) {
-  }
-
-  // Modifies the Ack deadline for a message received from a pull request.
-  rpc ModifyAckDeadline(ModifyAckDeadlineRequest) returns (proto2.Empty) {
-  }
-
-  // Acknowledges a particular received message: the Pub/Sub system can remove
-  // the given message from the subscription. Acknowledging a message whose
-  // Ack deadline has expired may succeed, but the message could have been
-  // already redelivered. Acknowledging a message more than once will not
-  // result in an error. This is only used for messages received via pull.
-  rpc Acknowledge(AcknowledgeRequest) returns (proto2.Empty) {
-  }
-
-  // Refuses processing a particular received message. The system will
-  // redeliver this message to some consumer of the subscription at some
-  // future time. This is only used for messages received via pull.
-  rpc Nack(NackRequest) returns (proto2.Empty) {
-  }
-}
-
-// A subscription resource.
-message Subscription {
-  // Name of the subscription.
-  optional string name = 1;
-
-  // The name of the topic from which this subscription is receiving messages.
-  optional string topic = 2;
-
-  // If <code>query</code> is non-empty, only messages on the subscriber's
-  // topic whose labels match the query will be returned. Otherwise all
-  // messages on the topic will be returned.
-  //
-  optional string query = 3;
-
-  // The subscriber may specify requirements for truncating unacknowledged
-  // subscription entries. The system will honor the
-  // <code>CreateSubscription</code> request only if it can meet these
-  // requirements. If this field is not specified, messages are never truncated
-  // by the system.
-  optional TruncationPolicy truncation_policy = 4;
-
-  // Specifies which messages can be truncated by the system.
-  message TruncationPolicy {
-    oneof policy {
-      // If <code>max_bytes</code> is specified, the system is allowed to drop
-      // old messages to keep the combined size of stored messages under
-      // <code>max_bytes</code>. This is a hint; the system may keep more than
-      // this many bytes, but will make a best effort to keep the size from
-      // growing much beyond this parameter.
-      int64 max_bytes = 1;
-
-      // If <code>max_age_seconds</code> is specified, the system is allowed to
-      // drop messages that have been stored for at least this many seconds.
-      // This is a hint; the system may keep these messages, but will make a
-      // best effort to remove them when their maximum age is reached.
-      int64 max_age_seconds = 2;
-    }
-  }
-
-  // If push delivery is used with this subscription, this field is
-  // used to configure it.
-  optional PushConfig push_config = 5;
-
-  // For either push or pull delivery, the value is the maximum time after a
-  // subscriber receives a message before the subscriber should acknowledge or
-  // Nack the message. If the Ack deadline for a message passes without an
-  // Ack or a Nack, the Pub/Sub system will eventually redeliver the message.
-  // If a subscriber acknowledges after the deadline, the Pub/Sub system may
-  // accept the Ack, but it is possible that the message has been already
-  // delivered again. Multiple Acks to the message are allowed and will
-  // succeed.
-  //
-  // For push delivery, this value is used to set the request timeout for
-  // the call to the push endpoint.
-  //
-  // For pull delivery, this value is used as the initial value for the Ack
-  // deadline. It may be overridden for a specific pull request (message) with
-  // <code>ModifyAckDeadline</code>.
-  // While a message is outstanding (i.e. it has been delivered to a pull
-  // subscriber and the subscriber has not yet Acked or Nacked), the Pub/Sub
-  // system will not deliver that message to another pull subscriber
-  // (on a best-effort basis).
-  optional int32 ack_deadline_seconds = 6;
-
-  // If this parameter is set to n, the system is allowed to (but not required
-  // to) delete the subscription when at least n seconds have elapsed since the
-  // client presence was detected. (Presence is detected through any
-  // interaction using the subscription ID, including Pull(), Get(), or
-  // acknowledging a message.)
-  //
-  // If this parameter is not set, the subscription will stay live until
-  // explicitly deleted.
-  //
-  // Clients can detect such garbage collection when a Get call or a Pull call
-  // (for pull subscribers only) returns NOT_FOUND.
-  optional int64 garbage_collect_seconds = 7;
-}
-
-// Configuration for a push delivery endpoint.
-message PushConfig {
-  // A URL locating the endpoint to which messages should be pushed.
-  // For example, a Webhook endpoint might use "https://example.com/push".
-  // (-- An Android application might use "gcm:<REGID>", where <REGID> is a
-  // GCM registration id allocated for pushing messages to the application. --)
-  optional string push_endpoint = 1;
-}
-
-// An event indicating a received message or truncation event.
-message PubsubEvent {
-  // The subscription that received the event.
-  optional string subscription = 1;
-
-  oneof type {
-    // A received message.
-    PubsubMessage message = 2;
-
-    // Indicates that this subscription has been truncated.
-    bool truncated = 3;
-
-    // Indicates that this subscription has been deleted. (Note that pull
-    // subscribers will always receive NOT_FOUND in response in their pull
-    // request on the subscription, rather than seeing this boolean.)
-    bool deleted = 4;
-  }
-}
-
-// Request for the GetSubscription method.
-message GetSubscriptionRequest {
-  // The name of the subscription to get.
-  optional string subscription = 1;
-}
-
-// Request for the ListSubscriptions method.
-message ListSubscriptionsRequest {
-  // A valid label query expression.
-  // (-- Which labels are required or supported is implementation-specific.
-  // TODO(eschapira): This method must support to query by topic. We must
-  // define the key URI for the "topic" label. --)
-  optional string query = 1;
-
-  // Maximum number of subscriptions to return.
-  // (-- If not specified or <= 0, the implementation will select a reasonable
-  // value. --)
-  optional int32 max_results = 3;
-
-  // The value obtained in the last <code>ListSubscriptionsResponse</code>
-  // for continuation.
-  optional string page_token = 4;
-}
-
-// Response for the ListSubscriptions method.
-message ListSubscriptionsResponse {
-  // The subscriptions that match the request.
-  repeated Subscription subscription = 1;
-
-  // If not empty, indicates that there are more subscriptions that match the
-  // request and this value should be passed to the next
-  // <code>ListSubscriptionsRequest</code> to continue.
-  optional string next_page_token = 2;
-}
-
-// Request for the TruncateSubscription method.
-message TruncateSubscriptionRequest {
-  // The subscription that is being truncated.
-  optional string subscription = 1;
-}
-
-// Request for the DeleteSubscription method.
-message DeleteSubscriptionRequest {
-  // The subscription to delete.
-  optional string subscription = 1;
-}
-
-// Request for the ModifyPushConfig method.
-message ModifyPushConfigRequest {
-  // The name of the subscription.
-  optional string subscription = 1;
-
-  // An empty <code>push_config</code> indicates that the Pub/Sub system should
-  // pause pushing messages from the given subscription.
-  optional PushConfig push_config = 2;
-}
-
-// -----------------------------------------------------------------------------
-// The protos used by a pull subscriber.
-// -----------------------------------------------------------------------------
-
-// Request for the Pull method.
-message PullRequest {
-  // The subscription from which a message should be pulled.
-  optional string subscription = 1;
-
-  // If this is specified as true the system will respond immediately even if
-  // it is not able to return a message in the Pull response. Otherwise the
-  // system is allowed to wait until at least one message is available rather
-  // than returning FAILED_PRECONDITION. The client may cancel the request if
-  // it does not wish to wait any longer for the response.
-  optional bool return_immediately = 2;
-}
-
-// Either a <code>PubsubMessage</code> or a truncation event. One of these two
-// must be populated.
-message PullResponse {
-  // This ID must be used to acknowledge the received event or message.
-  optional string ack_id = 1;
-
-  // A pubsub message or truncation event.
-  optional PubsubEvent pubsub_event = 2;
-}
-
-// Request for the PullBatch method.
-message PullBatchRequest {
-  // The subscription from which messages should be pulled.
-  optional string subscription = 1;
-
-  // If this is specified as true the system will respond immediately even if
-  // it is not able to return a message in the Pull response. Otherwise the
-  // system is allowed to wait until at least one message is available rather
-  // than returning no messages. The client may cancel the request if it does
-  // not wish to wait any longer for the response.
-  optional bool return_immediately = 2;
-
-  // The maximum number of PubsubEvents returned for this request. The Pub/Sub
-  // system may return fewer than the number of events specified.
-  optional int32 max_events = 3;
-}
-
-// Response for the PullBatch method.
-message PullBatchResponse {
-
-  // Received Pub/Sub messages or status events. The Pub/Sub system will return
-  // zero messages if there are no more messages available in the backlog. The
-  // Pub/Sub system may return fewer than the max_events requested even if
-  // there are more messages available in the backlog.
-  repeated PullResponse pull_responses = 2;
-}
-
-// Request for the ModifyAckDeadline method.
-message ModifyAckDeadlineRequest {
-  // The name of the subscription from which messages are being pulled.
-  optional string subscription = 1;
-
-  // The acknowledgment ID.
-  optional string ack_id = 2;
-
-  // The new Ack deadline. Must be >= 0.
-  optional int32 ack_deadline_seconds = 3;
-}
-
-// Request for the Acknowledge method.
-message AcknowledgeRequest {
-  // The subscription whose message is being acknowledged.
-  optional string subscription = 1;
-
-  // The acknowledgment ID for the message being acknowledged. This was
-  // returned by the Pub/Sub system in the Pull response.
-  repeated string ack_id = 2;
-}
-
-// Request for the Nack method.
-message NackRequest {
-  // The subscription whose message is being Nacked.
-  optional string subscription = 1;
-
-  // The acknowledgment ID for the message being refused. This was returned by
-  // the Pub/Sub system in the Pull response.
-  repeated string ack_id = 2;
-}
-
-// -----------------------------------------------------------------------------
-// The service and protos used by a push subscriber.
-// -----------------------------------------------------------------------------
-
-// The service that a subscriber uses to handle messages sent via push
-// delivery.
-// This service is not currently exported for HTTP clients.
-// TODO(eschapira): Explain HTTP subscribers.
-service PushEndpointService {
-  // Sends a <code>PubsubMessage</code> or a subscription status event to a
-  // push endpoint.
-  // The push endpoint responds with an empty message and a code from
-  // util/task/codes.proto. The following codes have a particular meaning to the
-  // Pub/Sub system:
-  // OK          - This is interpreted by Pub/Sub as Ack.
-  // ABORTED     - This is intepreted by Pub/Sub as a Nack, without implying
-  //               pushback for congestion control.  The Pub/Sub system will
-  //               retry this message at a later time.
-  // UNAVAILABLE - This is intepreted by Pub/Sub as a Nack, with the additional
-  //               semantics of push-back.  The Pub/Sub system will use an AIMD
-  //               congestion control algorithm to backoff the rate of sending
-  //               messages from this subscription.
-  // Any other code, or a failure to respond, will be interpreted in the same
-  // way as ABORTED; i.e. the system will retry the message at a later time to
-  // ensure reliable delivery.
-  rpc HandlePubsubEvent(PubsubEvent) returns (proto2.Empty);
-}
diff --git a/src/node/examples/pubsub/pubsub_demo.js b/src/node/examples/pubsub/pubsub_demo.js
deleted file mode 100644
index 2630151..0000000
--- a/src/node/examples/pubsub/pubsub_demo.js
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- *     * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- *     * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- *     * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-'use strict';
-
-var async = require('async');
-var fs = require('fs');
-var GoogleAuth = require('google-auth-library');
-var parseArgs = require('minimist');
-var strftime = require('strftime');
-var _ = require('underscore');
-var grpc = require('../..');
-var PROTO_PATH = __dirname + '/pubsub.proto';
-var pubsub = grpc.load(PROTO_PATH).tech.pubsub;
-
-function PubsubRunner(pub, sub, args) {
-  this.pub = pub;
-  this.sub = sub;
-  this.args = args;
-}
-
-PubsubRunner.prototype.getTestTopicName = function() {
-  var base_name = '/topics/' + this.args.project_id + '/';
-  if (this.args.topic_name) {
-    return base_name + this.args.topic_name;
-  }
-  var now_text = strftime('%Y%m%d%H%M%S%L');
-  return base_name + process.env.USER + '-' + now_text;
-};
-
-PubsubRunner.prototype.getTestSubName = function() {
-  var base_name = '/subscriptions/' + this.args.project_id + '/';
-  if (this.args.sub_name) {
-    return base_name + this.args.sub_name;
-  }
-  var now_text = strftime('%Y%m%d%H%M%S%L');
-  return base_name + process.env.USER + '-' + now_text;
-};
-
-PubsubRunner.prototype.listProjectTopics = function(callback) {
-  var q = ('cloud.googleapis.com/project in (/projects/' +
-      this.args.project_id + ')');
-  this.pub.listTopics({query: q}, callback);
-};
-
-PubsubRunner.prototype.topicExists = function(name, callback) {
-  this.listProjectTopics(function(err, response) {
-    if (err) {
-      callback(err);
-    } else {
-      callback(null, _.some(response.topic, function(t) {
-        return t.name === name;
-      }));
-    }
-  });
-};
-
-PubsubRunner.prototype.createTopicIfNeeded = function(name, callback) {
-  var self = this;
-  this.topicExists(name, function(err, exists) {
-    if (err) {
-      callback(err);
-    } else{
-      if (exists) {
-        callback(null);
-      } else {
-        self.pub.createTopic({name: name}, callback);
-      }
-    }
-  });
-};
-
-PubsubRunner.prototype.removeTopic = function(callback) {
-  var name = this.getTestTopicName();
-  console.log('... removing Topic', name);
-  this.pub.deleteTopic({topic: name}, function(err, value) {
-    if (err) {
-      console.log('Could not delete a topic: rpc failed with', err);
-      callback(err);
-    } else {
-      console.log('removed Topic', name, 'OK');
-      callback(null);
-    }
-  });
-};
-
-PubsubRunner.prototype.createTopic = function(callback) {
-  var name = this.getTestTopicName();
-  console.log('... creating Topic', name);
-  this.pub.createTopic({name: name}, function(err, value) {
-    if (err) {
-      console.log('Could not create a topic: rpc failed with', err);
-      callback(err);
-    } else {
-      console.log('created Topic', name, 'OK');
-      callback(null);
-    }
-  });
-};
-
-PubsubRunner.prototype.listSomeTopics = function(callback) {
-  console.log('Listing topics');
-  console.log('-------------_');
-  this.listProjectTopics(function(err, response) {
-    if (err) {
-      console.log('Could not list topic: rpc failed with', err);
-      callback(err);
-    } else {
-      _.each(response.topic, function(t) {
-        console.log(t.name);
-      });
-      callback(null);
-    }
-  });
-};
-
-PubsubRunner.prototype.checkExists = function(callback) {
-  var name = this.getTestTopicName();
-  console.log('... checking for topic', name);
-  this.topicExists(name, function(err, exists) {
-    if (err) {
-      console.log('Could not check for a topics: rpc failed with', err);
-      callback(err);
-    } else {
-      if (exists) {
-        console.log(name, 'is a topic');
-      } else {
-        console.log(name, 'is not a topic');
-      }
-      callback(null);
-    }
-  });
-};
-
-PubsubRunner.prototype.randomPubSub = function(callback) {
-  var self = this;
-  var topic_name = this.getTestTopicName();
-  var sub_name = this.getTestSubName();
-  var subscription = {name: sub_name, topic: topic_name};
-  async.waterfall([
-    _.bind(this.createTopicIfNeeded, this, topic_name),
-    _.bind(this.sub.createSubscription, this.sub, subscription),
-    function(resp, cb) {
-      var msg_count = _.random(10, 30);
-      // Set up msg_count messages to publish
-      var message_senders = _.times(msg_count, function(n) {
-        return _.bind(self.pub.publish, self.pub, {
-          topic: topic_name,
-          message: {data: new Buffer('message ' + n)}
-        });
-      });
-      async.parallel(message_senders, function(err, result) {
-        cb(err, result, msg_count);
-      });
-    },
-    function(result, msg_count, cb) {
-      console.log('Sent', msg_count, 'messages to', topic_name + ',',
-                  'checking for them now.');
-      var batch_request = {
-        subscription: sub_name,
-        max_events: msg_count
-      };
-      self.sub.pullBatch(batch_request, cb);
-    },
-    function(batch, cb) {
-      var ack_id = _.pluck(batch.pull_responses, 'ack_id');
-      console.log('Got', ack_id.length, 'messages, acknowledging them...');
-      var ack_request = {
-        subscription: sub_name,
-        ack_id: ack_id
-      };
-      self.sub.acknowledge(ack_request, cb);
-    },
-    function(result, cb) {
-      console.log(
-          'Test messages were acknowledged OK, deleting the subscription');
-      self.sub.deleteSubscription({subscription: sub_name}, cb);
-    }
-  ], function (err, result) {
-    if (err) {
-      console.log('Could not do random pub sub: rpc failed with', err);
-    }
-    callback(err, result);
-  });
-};
-
-function main(callback) {
-  var argv = parseArgs(process.argv, {
-    string: [
-      'host',
-      'oauth_scope',
-      'port',
-      'action',
-      'project_id',
-      'topic_name',
-      'sub_name'
-    ],
-    default: {
-      host: 'pubsub-staging.googleapis.com',
-      oauth_scope: 'https://www.googleapis.com/auth/pubsub',
-      port: 443,
-      action: 'listSomeTopics',
-      project_id: 'stoked-keyword-656'
-    }
-  });
-  var valid_actions = [
-    'createTopic',
-    'removeTopic',
-    'listSomeTopics',
-    'checkExists',
-    'randomPubSub'
-  ];
-  if (_.some(valid_actions, function(action) {
-    return action === argv.action;
-  })) {
-    callback(new Error('Action was not valid'));
-  }
-  var address = argv.host + ':' + argv.port;
-  (new GoogleAuth()).getApplicationDefault(function(err, credential) {
-    if (err) {
-      callback(err);
-      return;
-    }
-    if (credential.createScopedRequired()) {
-      credential = credential.createScoped(argv.oauth_scope);
-    }
-    var updateMetadata = grpc.getGoogleAuthDelegate(credential);
-    var ca_path = process.env.SSL_CERT_FILE;
-    fs.readFile(ca_path, function(err, ca_data) {
-      if (err) {
-        callback(err);
-        return;
-      }
-      var ssl_creds = grpc.Credentials.createSsl(ca_data);
-      var options = {
-        credentials: ssl_creds,
-        'grpc.ssl_target_name_override': argv.host
-      };
-      var pub = new pubsub.PublisherService(address, options, updateMetadata);
-      var sub = new pubsub.SubscriberService(address, options, updateMetadata);
-      var runner = new PubsubRunner(pub, sub, argv);
-      runner[argv.action](callback);
-    });
-  });
-}
-
-if (require.main === module) {
-  main(function(err) {
-    if (err) {
-      throw err;
-    }
-  });
-}
-
-module.exports = PubsubRunner;
diff --git a/src/ruby/bin/interop/interop_client.rb b/src/ruby/bin/interop/interop_client.rb
index af7a1d5..6f1fe26 100755
--- a/src/ruby/bin/interop/interop_client.rb
+++ b/src/ruby/bin/interop/interop_client.rb
@@ -136,12 +136,14 @@
   include Grpc::Testing::PayloadType
   attr_accessor :assertions # required by Minitest::Assertions
   attr_accessor :queue
+  attr_accessor :canceller_op
 
   # reqs is the enumerator over the requests
   def initialize(msg_sizes)
     @queue = Queue.new
     @msg_sizes = msg_sizes
     @assertions = 0  # required by Minitest::Assertions
+    @canceller_op = nil  # used to cancel after the first response
   end
 
   def each_item
@@ -155,12 +157,15 @@
                         response_parameters: [p_cls.new(size: resp_size)])
       yield req
       resp = @queue.pop
-      assert_equal(:COMPRESSABLE, resp.payload.type,
-                   'payload type is wrong')
+      assert_equal(:COMPRESSABLE, resp.payload.type, 'payload type is wrong')
       assert_equal(resp_size, resp.payload.body.length,
-                   'payload body #{i} has the wrong length')
+                   "payload body #{count} has the wrong length")
       p "OK: ping_pong #{count}"
       count += 1
+      unless @canceller_op.nil?
+        canceller_op.cancel
+        break
+      end
     end
   end
 end
@@ -260,6 +265,27 @@
     p 'OK: ping_pong'
   end
 
+  def cancel_after_begin
+    msg_sizes = [27_182, 8, 1828, 45_904]
+    reqs = msg_sizes.map do |x|
+      req = Payload.new(body: nulls(x))
+      StreamingInputCallRequest.new(payload: req)
+    end
+    op = @stub.streaming_input_call(reqs, return_op: true)
+    op.cancel
+    assert_raises(GRPC::Cancelled) { op.execute }
+    p 'OK: cancel_after_begin'
+  end
+
+  def cancel_after_first
+    msg_sizes = [[27_182, 31_415], [8, 9], [1828, 2653], [45_904, 58_979]]
+    ppp = PingPongPlayer.new(msg_sizes)
+    op = @stub.full_duplex_call(ppp.each_item, return_op: true)
+    ppp.canceller_op = op  # causes ppp to cancel after the 1st message
+    assert_raises(GRPC::Cancelled) { op.execute.each { |r| ppp.queue.push(r) } }
+    p 'OK: cancel_after_first'
+  end
+
   def all
     all_methods = NamedTests.instance_methods(false).map(&:to_s)
     all_methods.each do |m|
diff --git a/src/ruby/lib/grpc/errors.rb b/src/ruby/lib/grpc/errors.rb
index b237937..35e9c02 100644
--- a/src/ruby/lib/grpc/errors.rb
+++ b/src/ruby/lib/grpc/errors.rb
@@ -54,4 +54,8 @@
       Status.new(code, details)
     end
   end
+
+  # Cancelled is an exception class that indicates that an rpc was cancelled.
+  class Cancelled < StandardError
+  end
 end
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index 489349c..8d63de4 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -30,6 +30,22 @@
 require 'forwardable'
 require 'grpc/generic/bidi_call'
 
+class Struct
+  # BatchResult is the struct returned by calls to call#start_batch.
+  class BatchResult
+    # check_status returns the status, raising an error if the status
+    # is non-nil and not OK.
+    def check_status
+      return nil if status.nil?
+      fail GRPC::Cancelled if status.code == GRPC::Core::StatusCodes::CANCELLED
+      if status.code != GRPC::Core::StatusCodes::OK
+        fail GRPC::BadStatus.new(status.code, status.details)
+      end
+      status
+    end
+  end
+end
+
 # GRPC contains the General RPC module.
 module GRPC
   # The ActiveCall class provides simple methods for sending marshallable
@@ -38,7 +54,9 @@
     include Core::StatusCodes
     include Core::TimeConsts
     include Core::CallOps
+    extend Forwardable
     attr_reader(:deadline)
+    def_delegators :@call, :cancel, :metadata
 
     # client_invoke begins a client invocation.
     #
@@ -101,50 +119,6 @@
       @metadata_tag = metadata_tag
     end
 
-    # Obtains the status of the call.
-    #
-    # this value is nil until the call completes
-    # @return this call's status
-    def status
-      @call.status
-    end
-
-    # Obtains the metadata of the call.
-    #
-    # At the start of the call this will be nil.  During the call this gets
-    # some values as soon as the other end of the connection acknowledges the
-    # request.
-    #
-    # @return this calls's metadata
-    def metadata
-      @call.metadata
-    end
-
-    # Cancels the call.
-    #
-    # Cancels the call.  The call does not return any result, but once this it
-    # has been called, the call should eventually terminate.  Due to potential
-    # races between the execution of the cancel and the in-flight request, the
-    # result of the call after calling #cancel is indeterminate:
-    #
-    # - the call may terminate with a BadStatus exception, with code=CANCELLED
-    # - the call may terminate with OK Status, and return a response
-    # - the call may terminate with a different BadStatus exception if that
-    #   was happening
-    def cancel
-      @call.cancel
-    end
-
-    # indicates if the call is shutdown
-    def shutdown
-      @shutdown ||= false
-    end
-
-    # indicates if the call is cancelled.
-    def cancelled
-      @cancelled ||= false
-    end
-
     # multi_req_view provides a restricted view of this ActiveCall for use
     # in a server client-streaming handler.
     def multi_req_view
@@ -176,9 +150,9 @@
         SEND_CLOSE_FROM_CLIENT => nil
       }
       ops[RECV_STATUS_ON_CLIENT] = nil if assert_finished
-      @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
+      batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
       return unless assert_finished
-      @call.status
+      batch_result.check_status
     end
 
     # finished waits until a client call is completed.
@@ -192,17 +166,12 @@
       elsif !batch_result.metadata.nil?
         @call.metadata.merge!(batch_result.metadata)
       end
-      if batch_result.status.code != Core::StatusCodes::OK
-        fail BadStatus.new(batch_result.status.code,
-                           batch_result.status.details)
-      end
-      batch_result
+      batch_result.check_status
     end
 
     # remote_send sends a request to the remote endpoint.
     #
-    # It blocks until the remote endpoint acknowledges by sending a
-    # WRITE_ACCEPTED.  req can be marshalled already.
+    # It blocks until the remote endpoint accepts the message.
     #
     # @param req [Object, String] the object to send or it's marshal form.
     # @param marshalled [false, true] indicates if the object is already
@@ -332,6 +301,9 @@
       response = remote_read
       finished unless response.is_a? Struct::Status
       response
+    rescue GRPC::Core::CallError => e
+      finished  # checks for Cancelled
+      raise e
     end
 
     # client_streamer sends a stream of requests to a GRPC server, and
@@ -355,6 +327,9 @@
       response = remote_read
       finished unless response.is_a? Struct::Status
       response
+    rescue GRPC::Core::CallError => e
+      finished  # checks for Cancelled
+      raise e
     end
 
     # server_streamer sends one request to the GRPC server, which yields a
@@ -381,6 +356,9 @@
       replies = enum_for(:each_remote_read_then_finish)
       return replies unless block_given?
       replies.each { |r| yield r }
+    rescue GRPC::Core::CallError => e
+      finished  # checks for Cancelled
+      raise e
     end
 
     # bidi_streamer sends a stream of requests to the GRPC server, and yields
@@ -416,6 +394,9 @@
       start_call(**kw) unless @started
       bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline)
       bd.run_on_client(requests, &blk)
+    rescue GRPC::Core::CallError => e
+      finished  # checks for Cancelled
+      raise e
     end
 
     # run_server_bidi orchestrates a BiDi stream processing on a server.
@@ -436,9 +417,10 @@
 
     private
 
+    # Starts the call if not already started
     def start_call(**kw)
-      tags = ActiveCall.client_invoke(@call, @cq, @deadline, **kw)
-      @finished_tag, @read_metadata_tag = tags
+      return if @started
+      @metadata_tag = ActiveCall.client_invoke(@call, @cq, @deadline, **kw)
       @started = true
     end
 
@@ -466,6 +448,6 @@
     # Operation limits access to an ActiveCall's methods for use as
     # a Operation on the client.
     Operation = view_class(:cancel, :cancelled, :deadline, :execute,
-                           :metadata, :status)
+                           :metadata, :status, :start_call)
   end
 end
diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb
index 1c1b3b0..b813ab5 100644
--- a/src/ruby/lib/grpc/generic/bidi_call.rb
+++ b/src/ruby/lib/grpc/generic/bidi_call.rb
@@ -78,13 +78,11 @@
     # @param requests the Enumerable of requests to send
     # @return an Enumerator of requests to yield
     def run_on_client(requests, &blk)
-      enq_th = start_write_loop(requests)
-      loop_th = start_read_loop
+      @enq_th = start_write_loop(requests)
+      @loop_th = start_read_loop
       replies = each_queued_msg
       return replies if blk.nil?
       replies.each { |r| blk.call(r) }
-      enq_th.join
-      loop_th.join
     end
 
     # Begins orchestration of the Bidi stream for a server generating replies.
@@ -100,10 +98,8 @@
     # @param gen_each_reply [Proc] generates the BiDi stream replies.
     def run_on_server(gen_each_reply)
       replys = gen_each_reply.call(each_queued_msg)
-      enq_th = start_write_loop(replys, is_client: false)
-      loop_th = start_read_loop
-      loop_th.join
-      enq_th.join
+      @enq_th = start_write_loop(replys, is_client: false)
+      @loop_th = start_read_loop
     end
 
     private
@@ -122,10 +118,13 @@
         logger.debug("each_queued_msg: msg##{count}")
         count += 1
         req = @readq.pop
+        logger.debug("each_queued_msg: req = #{req}")
         throw req if req.is_a? StandardError
         break if req.equal?(END_OF_READS)
         yield req
       end
+      @loop_th.join
+      @enq_th.join
     end
 
     # during bidi-streaming, read the requests to send from a separate thread
@@ -136,20 +135,23 @@
         begin
           count = 0
           requests.each do |req|
+            logger.debug("bidi-write_loop: #{count}")
             count += 1
             payload = @marshal.call(req)
             @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
                             SEND_MESSAGE => payload)
           end
           if is_client
-            logger.debug("bidi-client: sent #{count} reqs, waiting to finish")
-            @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
-                            SEND_CLOSE_FROM_CLIENT => nil,
-                            RECV_STATUS_ON_CLIENT => nil)
+            logger.debug("bidi-write-loop: sent #{count}, waiting to finish")
+            batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
+                                           SEND_CLOSE_FROM_CLIENT => nil,
+                                           RECV_STATUS_ON_CLIENT => nil)
+            batch_result.check_status
           end
         rescue StandardError => e
-          logger.warn('bidi: write_loop failed')
+          logger.warn('bidi-write_loop: failed')
           logger.warn(e)
+          raise e
         end
       end
     end
@@ -163,7 +165,7 @@
 
           # queue the initial read before beginning the loop
           loop do
-            logger.debug("waiting for read #{count}")
+            logger.debug("bidi-read_loop: #{count}")
             count += 1
             # TODO: ensure metadata is read if available, currently it's not
             batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE,
@@ -171,7 +173,7 @@
             # handle the next message
             if batch_result.message.nil?
               @readq.push(END_OF_READS)
-              logger.debug('done reading!')
+              logger.debug('bidi-read-loop: done reading!')
               break
             end
 
diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb
index 245999e..1323bac 100644
--- a/src/ruby/spec/generic/rpc_server_spec.rb
+++ b/src/ruby/spec/generic/rpc_server_spec.rb
@@ -388,6 +388,23 @@
         t.join
       end
 
+      it 'should handle cancellation correctly', server: true do
+        service = SlowService.new
+        @srv.handle(service)
+        t = Thread.new { @srv.run }
+        @srv.wait_till_running
+        req = EchoMsg.new
+        stub = SlowStub.new(@host, **@client_opts)
+        op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true)
+        Thread.new do  # cancel the call
+          sleep 0.1
+          op.cancel
+        end
+        expect { op.execute }.to raise_error GRPC::Cancelled
+        @srv.stop
+        t.join
+      end
+
       it 'should receive updated metadata', server: true do
         service = EchoService.new
         @srv.handle(service)