Adds an explicit Cancellation exception

- uses Forwardable to provide access the @call within an ActiveCall
- removes redundant methods from ActiveCall
diff --git a/src/ruby/lib/grpc/errors.rb b/src/ruby/lib/grpc/errors.rb
index b237937..35e9c02 100644
--- a/src/ruby/lib/grpc/errors.rb
+++ b/src/ruby/lib/grpc/errors.rb
@@ -54,4 +54,8 @@
       Status.new(code, details)
     end
   end
+
+  # Cancelled is an exception class that indicates that an rpc was cancelled.
+  class Cancelled < StandardError
+  end
 end
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index 489349c..8d63de4 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -30,6 +30,22 @@
 require 'forwardable'
 require 'grpc/generic/bidi_call'
 
+class Struct
+  # BatchResult is the struct returned by calls to call#start_batch.
+  class BatchResult
+    # check_status returns the status, raising an error if the status
+    # is non-nil and not OK.
+    def check_status
+      return nil if status.nil?
+      fail GRPC::Cancelled if status.code == GRPC::Core::StatusCodes::CANCELLED
+      if status.code != GRPC::Core::StatusCodes::OK
+        fail GRPC::BadStatus.new(status.code, status.details)
+      end
+      status
+    end
+  end
+end
+
 # GRPC contains the General RPC module.
 module GRPC
   # The ActiveCall class provides simple methods for sending marshallable
@@ -38,7 +54,9 @@
     include Core::StatusCodes
     include Core::TimeConsts
     include Core::CallOps
+    extend Forwardable
     attr_reader(:deadline)
+    def_delegators :@call, :cancel, :metadata
 
     # client_invoke begins a client invocation.
     #
@@ -101,50 +119,6 @@
       @metadata_tag = metadata_tag
     end
 
-    # Obtains the status of the call.
-    #
-    # this value is nil until the call completes
-    # @return this call's status
-    def status
-      @call.status
-    end
-
-    # Obtains the metadata of the call.
-    #
-    # At the start of the call this will be nil.  During the call this gets
-    # some values as soon as the other end of the connection acknowledges the
-    # request.
-    #
-    # @return this calls's metadata
-    def metadata
-      @call.metadata
-    end
-
-    # Cancels the call.
-    #
-    # Cancels the call.  The call does not return any result, but once this it
-    # has been called, the call should eventually terminate.  Due to potential
-    # races between the execution of the cancel and the in-flight request, the
-    # result of the call after calling #cancel is indeterminate:
-    #
-    # - the call may terminate with a BadStatus exception, with code=CANCELLED
-    # - the call may terminate with OK Status, and return a response
-    # - the call may terminate with a different BadStatus exception if that
-    #   was happening
-    def cancel
-      @call.cancel
-    end
-
-    # indicates if the call is shutdown
-    def shutdown
-      @shutdown ||= false
-    end
-
-    # indicates if the call is cancelled.
-    def cancelled
-      @cancelled ||= false
-    end
-
     # multi_req_view provides a restricted view of this ActiveCall for use
     # in a server client-streaming handler.
     def multi_req_view
@@ -176,9 +150,9 @@
         SEND_CLOSE_FROM_CLIENT => nil
       }
       ops[RECV_STATUS_ON_CLIENT] = nil if assert_finished
-      @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
+      batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
       return unless assert_finished
-      @call.status
+      batch_result.check_status
     end
 
     # finished waits until a client call is completed.
@@ -192,17 +166,12 @@
       elsif !batch_result.metadata.nil?
         @call.metadata.merge!(batch_result.metadata)
       end
-      if batch_result.status.code != Core::StatusCodes::OK
-        fail BadStatus.new(batch_result.status.code,
-                           batch_result.status.details)
-      end
-      batch_result
+      batch_result.check_status
     end
 
     # remote_send sends a request to the remote endpoint.
     #
-    # It blocks until the remote endpoint acknowledges by sending a
-    # WRITE_ACCEPTED.  req can be marshalled already.
+    # It blocks until the remote endpoint accepts the message.
     #
     # @param req [Object, String] the object to send or it's marshal form.
     # @param marshalled [false, true] indicates if the object is already
@@ -332,6 +301,9 @@
       response = remote_read
       finished unless response.is_a? Struct::Status
       response
+    rescue GRPC::Core::CallError => e
+      finished  # checks for Cancelled
+      raise e
     end
 
     # client_streamer sends a stream of requests to a GRPC server, and
@@ -355,6 +327,9 @@
       response = remote_read
       finished unless response.is_a? Struct::Status
       response
+    rescue GRPC::Core::CallError => e
+      finished  # checks for Cancelled
+      raise e
     end
 
     # server_streamer sends one request to the GRPC server, which yields a
@@ -381,6 +356,9 @@
       replies = enum_for(:each_remote_read_then_finish)
       return replies unless block_given?
       replies.each { |r| yield r }
+    rescue GRPC::Core::CallError => e
+      finished  # checks for Cancelled
+      raise e
     end
 
     # bidi_streamer sends a stream of requests to the GRPC server, and yields
@@ -416,6 +394,9 @@
       start_call(**kw) unless @started
       bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline)
       bd.run_on_client(requests, &blk)
+    rescue GRPC::Core::CallError => e
+      finished  # checks for Cancelled
+      raise e
     end
 
     # run_server_bidi orchestrates a BiDi stream processing on a server.
@@ -436,9 +417,10 @@
 
     private
 
+    # Starts the call if not already started
     def start_call(**kw)
-      tags = ActiveCall.client_invoke(@call, @cq, @deadline, **kw)
-      @finished_tag, @read_metadata_tag = tags
+      return if @started
+      @metadata_tag = ActiveCall.client_invoke(@call, @cq, @deadline, **kw)
       @started = true
     end
 
@@ -466,6 +448,6 @@
     # Operation limits access to an ActiveCall's methods for use as
     # a Operation on the client.
     Operation = view_class(:cancel, :cancelled, :deadline, :execute,
-                           :metadata, :status)
+                           :metadata, :status, :start_call)
   end
 end
diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb
index 25ad6f7..1323bac 100644
--- a/src/ruby/spec/generic/rpc_server_spec.rb
+++ b/src/ruby/spec/generic/rpc_server_spec.rb
@@ -396,11 +396,11 @@
         req = EchoMsg.new
         stub = SlowStub.new(@host, **@client_opts)
         op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true)
-        cancel_thread = Thread.new do 
+        Thread.new do  # cancel the call
           sleep 0.1
           op.cancel
         end
-        expect{op.execute}.to raise_error GRPC::Cancelled
+        expect { op.execute }.to raise_error GRPC::Cancelled
         @srv.stop
         t.join
       end