Exposes event#finish as #close.

- ensures that it's a runtime error if an event if used after it's finished

- updates all calls where the completion_queue is used to ensure the event's
retrieved are explicitly finished
	Change on 2014/12/18 by temiola <temiola@google.com>
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=82445748
diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c
index 872f8e3..bf292fa 100644
--- a/src/ruby/ext/grpc/rb_call.c
+++ b/src/ruby/ext/grpc/rb_call.c
@@ -38,7 +38,6 @@
 #include <grpc/grpc.h>
 #include "rb_byte_buffer.h"
 #include "rb_completion_queue.h"
-#include "rb_event.h"
 #include "rb_metadata.h"
 #include "rb_grpc.h"
 
diff --git a/src/ruby/ext/grpc/rb_completion_queue.c b/src/ruby/ext/grpc/rb_completion_queue.c
index dfde442..dc95838 100644
--- a/src/ruby/ext/grpc/rb_completion_queue.c
+++ b/src/ruby/ext/grpc/rb_completion_queue.c
@@ -141,8 +141,7 @@
   if (next_call.event == NULL) {
     return Qnil;
   }
-  return Data_Wrap_Struct(rb_cEvent, GC_NOT_MARKED, grpc_rb_event_finish,
-                          next_call.event);
+  return grpc_rb_new_event(next_call.event);
 }
 
 /* Blocks until the next event for given tag is available, and returns the
@@ -160,8 +159,7 @@
   if (next_call.event == NULL) {
     return Qnil;
   }
-  return Data_Wrap_Struct(rb_cEvent, GC_NOT_MARKED, grpc_rb_event_finish,
-                          next_call.event);
+  return grpc_rb_new_event(next_call.event);
 }
 
 /* rb_cCompletionQueue is the ruby class that proxies grpc_completion_queue. */
diff --git a/src/ruby/ext/grpc/rb_event.c b/src/ruby/ext/grpc/rb_event.c
index 76ea6ad..9200f92 100644
--- a/src/ruby/ext/grpc/rb_event.c
+++ b/src/ruby/ext/grpc/rb_event.c
@@ -41,12 +41,49 @@
 #include "rb_call.h"
 #include "rb_metadata.h"
 
+/* grpc_rb_event wraps a grpc_event.  It provides a peer ruby object,
+ * 'mark' to minimize copying when an event is created from ruby. */
+typedef struct grpc_rb_event {
+  /* Holder of ruby objects involved in constructing the channel */
+  VALUE mark;
+  /* The actual event */
+  grpc_event *wrapped;
+} grpc_rb_event;
+
+
 /* rb_mCompletionType is a ruby module that holds the completion type values */
 VALUE rb_mCompletionType = Qnil;
 
-/* Helper function to free an event. */
-void grpc_rb_event_finish(void *p) {
-  grpc_event_finish(p);
+/* Destroys Event instances. */
+static void grpc_rb_event_free(void *p) {
+  grpc_rb_event *ev = NULL;
+  if (p == NULL) {
+    return;
+  };
+  ev = (grpc_rb_event *)p;
+
+  /* Deletes the wrapped object if the mark object is Qnil, which indicates
+   * that no other object is the actual owner. */
+  if (ev->wrapped != NULL && ev->mark == Qnil) {
+    grpc_event_finish(ev->wrapped);
+    rb_warning("event gc: destroyed the c event");
+  } else {
+    rb_warning("event gc: did not destroy the c event");
+  }
+
+  xfree(p);
+}
+
+/* Protects the mark object from GC */
+static void grpc_rb_event_mark(void *p) {
+  grpc_rb_event *event = NULL;
+  if (p == NULL) {
+    return;
+  }
+  event = (grpc_rb_event *)p;
+  if (event->mark != Qnil) {
+    rb_gc_mark(event->mark);
+  }
 }
 
 static VALUE grpc_rb_event_result(VALUE self);
@@ -54,7 +91,14 @@
 /* Obtains the type of an event. */
 static VALUE grpc_rb_event_type(VALUE self) {
   grpc_event *event = NULL;
-  Data_Get_Struct(self, grpc_event, event);
+  grpc_rb_event *wrapper = NULL;
+  Data_Get_Struct(self, grpc_rb_event, wrapper);
+  if (wrapper->wrapped == NULL) {
+    rb_raise(rb_eRuntimeError, "finished!");
+    return Qnil;
+  }
+
+  event = wrapper->wrapped;
   switch (event->type) {
     case GRPC_QUEUE_SHUTDOWN:
       return rb_const_get(rb_mCompletionType, rb_intern("QUEUE_SHUTDOWN"));
@@ -94,7 +138,14 @@
 /* Obtains the tag associated with an event. */
 static VALUE grpc_rb_event_tag(VALUE self) {
   grpc_event *event = NULL;
-  Data_Get_Struct(self, grpc_event, event);
+  grpc_rb_event *wrapper = NULL;
+  Data_Get_Struct(self, grpc_rb_event, wrapper);
+  if (wrapper->wrapped == NULL) {
+    rb_raise(rb_eRuntimeError, "finished!");
+    return Qnil;
+  }
+
+  event = wrapper->wrapped;
   if (event->tag == NULL) {
     return Qnil;
   }
@@ -103,10 +154,17 @@
 
 /* Obtains the call associated with an event. */
 static VALUE grpc_rb_event_call(VALUE self) {
-  grpc_event *ev = NULL;
-  Data_Get_Struct(self, grpc_event, ev);
-  if (ev->call != NULL) {
-    return grpc_rb_wrap_call(ev->call);
+  grpc_event *event = NULL;
+  grpc_rb_event *wrapper = NULL;
+  Data_Get_Struct(self, grpc_rb_event, wrapper);
+  if (wrapper->wrapped == NULL) {
+    rb_raise(rb_eRuntimeError, "finished!");
+    return Qnil;
+  }
+
+  event = wrapper->wrapped;
+  if (event->call != NULL) {
+    return grpc_rb_wrap_call(event->call);
   }
   return Qnil;
 }
@@ -114,6 +172,7 @@
 /* Obtains the metadata associated with an event. */
 static VALUE grpc_rb_event_metadata(VALUE self) {
   grpc_event *event = NULL;
+  grpc_rb_event *wrapper = NULL;
   grpc_metadata *metadata = NULL;
   VALUE key = Qnil;
   VALUE new_ary = Qnil;
@@ -121,9 +180,14 @@
   VALUE value = Qnil;
   size_t count = 0;
   size_t i = 0;
+  Data_Get_Struct(self, grpc_rb_event, wrapper);
+  if (wrapper->wrapped == NULL) {
+    rb_raise(rb_eRuntimeError, "finished!");
+    return Qnil;
+  }
 
   /* Figure out which metadata to read. */
-  Data_Get_Struct(self, grpc_event, event);
+  event = wrapper->wrapped;
   switch (event->type) {
 
     case GRPC_CLIENT_METADATA_READ:
@@ -179,7 +243,13 @@
 /* Obtains the data associated with an event. */
 static VALUE grpc_rb_event_result(VALUE self) {
   grpc_event *event = NULL;
-  Data_Get_Struct(self, grpc_event, event);
+  grpc_rb_event *wrapper = NULL;
+  Data_Get_Struct(self, grpc_rb_event, wrapper);
+  if (wrapper->wrapped == NULL) {
+    rb_raise(rb_eRuntimeError, "finished!");
+    return Qnil;
+  }
+  event = wrapper->wrapped;
 
   switch (event->type) {
 
@@ -245,11 +315,19 @@
   return Qfalse;
 }
 
-/* rb_sNewServerRpc is the struct that holds new server rpc details. */
-VALUE rb_sNewServerRpc = Qnil;
-
-/* rb_sStatus is the struct that holds status details. */
-VALUE rb_sStatus = Qnil;
+static VALUE grpc_rb_event_finish(VALUE self) {
+  grpc_event *event = NULL;
+  grpc_rb_event *wrapper = NULL;
+  Data_Get_Struct(self, grpc_rb_event, wrapper);
+  if (wrapper->wrapped == NULL) {  /* already closed  */
+    return Qnil;
+  }
+  event = wrapper->wrapped;
+  grpc_event_finish(event);
+  wrapper->wrapped = NULL;
+  wrapper->mark = Qnil;
+  return Qnil;
+}
 
 /* rb_cEvent is the Event class whose instances proxy grpc_event */
 VALUE rb_cEvent = Qnil;
@@ -262,9 +340,6 @@
   rb_eEventError = rb_define_class_under(rb_mGoogleRpcCore, "EventError",
                                          rb_eStandardError);
   rb_cEvent = rb_define_class_under(rb_mGoogleRpcCore, "Event", rb_cObject);
-  rb_sNewServerRpc = rb_struct_define("NewServerRpc", "method", "host",
-                                      "deadline", "metadata", NULL);
-  rb_sStatus = rb_struct_define("Status", "code", "details", "metadata", NULL);
 
   /* Prevent allocation or inialization from ruby. */
   rb_define_alloc_func(rb_cEvent, grpc_rb_cannot_alloc);
@@ -276,6 +351,8 @@
   rb_define_method(rb_cEvent, "result", grpc_rb_event_result, 0);
   rb_define_method(rb_cEvent, "tag", grpc_rb_event_tag, 0);
   rb_define_method(rb_cEvent, "type", grpc_rb_event_type, 0);
+  rb_define_method(rb_cEvent, "finish", grpc_rb_event_finish, 0);
+  rb_define_alias(rb_cEvent, "close", "finish");
 
   /* Constants representing the completion types */
   rb_mCompletionType = rb_define_module_under(rb_mGoogleRpcCore,
@@ -298,3 +375,11 @@
   rb_define_const(rb_mCompletionType, "RESERVED",
                   INT2NUM(GRPC_COMPLETION_DO_NOT_USE));
 }
+
+VALUE grpc_rb_new_event(grpc_event *ev) {
+  grpc_rb_event *wrapper = ALLOC(grpc_rb_event);
+  wrapper->wrapped = ev;
+  wrapper->mark = Qnil;
+  return Data_Wrap_Struct(rb_cEvent, grpc_rb_event_mark, grpc_rb_event_free,
+                          wrapper);
+}
diff --git a/src/ruby/ext/grpc/rb_event.h b/src/ruby/ext/grpc/rb_event.h
index 459502c..e4e4a79 100644
--- a/src/ruby/ext/grpc/rb_event.h
+++ b/src/ruby/ext/grpc/rb_event.h
@@ -35,12 +35,7 @@
 #define GRPC_RB_EVENT_H_
 
 #include <ruby.h>
-
-/* rb_sNewServerRpc is the struct that holds new server rpc details. */
-extern VALUE rb_sNewServerRpc;
-
-/* rb_sStruct is the struct that holds status details. */
-extern VALUE rb_sStatus;
+#include <grpc/grpc.h>
 
 /* rb_cEvent is the Event class whose instances proxy grpc_event. */
 extern VALUE rb_cEvent;
@@ -49,8 +44,8 @@
    event processing. */
 extern VALUE rb_eEventError;
 
-/* Helper function to free an event. */
-void grpc_rb_event_finish(void *p);
+/* Used to create new ruby event objects */
+VALUE grpc_rb_new_event(grpc_event *ev);
 
 /* Initializes the Event and EventError classes. */
 void Init_google_rpc_event();
diff --git a/src/ruby/ext/grpc/rb_grpc.c b/src/ruby/ext/grpc/rb_grpc.c
index f0e432a..eae011d 100644
--- a/src/ruby/ext/grpc/rb_grpc.c
+++ b/src/ruby/ext/grpc/rb_grpc.c
@@ -245,16 +245,27 @@
   grpc_shutdown();
 }
 
+/* Initialize the Google RPC module structs */
+
+/* rb_sNewServerRpc is the struct that holds new server rpc details. */
+VALUE rb_sNewServerRpc = Qnil;
+/* rb_sStatus is the struct that holds status details. */
+VALUE rb_sStatus = Qnil;
+
 /* Initialize the Google RPC module. */
 VALUE rb_mGoogle = Qnil;
 VALUE rb_mGoogleRPC = Qnil;
 VALUE rb_mGoogleRpcCore = Qnil;
+
 void Init_grpc() {
   grpc_init();
   ruby_vm_at_exit(grpc_rb_shutdown);
   rb_mGoogle = rb_define_module("Google");
   rb_mGoogleRPC = rb_define_module_under(rb_mGoogle, "RPC");
   rb_mGoogleRpcCore = rb_define_module_under(rb_mGoogleRPC, "Core");
+  rb_sNewServerRpc = rb_struct_define("NewServerRpc", "method", "host",
+                                      "deadline", "metadata", NULL);
+  rb_sStatus = rb_struct_define("Status", "code", "details", "metadata", NULL);
 
   Init_google_rpc_byte_buffer();
   Init_google_rpc_event();
diff --git a/src/ruby/ext/grpc/rb_grpc.h b/src/ruby/ext/grpc/rb_grpc.h
index 68f8a06..c2c8942 100644
--- a/src/ruby/ext/grpc/rb_grpc.h
+++ b/src/ruby/ext/grpc/rb_grpc.h
@@ -47,6 +47,12 @@
 /* Class used to wrap timeval structs. */
 extern VALUE rb_cTimeVal;
 
+/* rb_sNewServerRpc is the struct that holds new server rpc details. */
+extern VALUE rb_sNewServerRpc;
+
+/* rb_sStruct is the struct that holds status details. */
+extern VALUE rb_sStatus;
+
 /* GC_NOT_MARKED is used in calls to Data_Wrap_Struct to indicate that the
    wrapped struct does not need to participate in ruby gc. */
 extern const RUBY_DATA_FUNC GC_NOT_MARKED;
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index b16c8f8..288ea08 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -73,6 +73,7 @@
       # wait for the invocation to be accepted
       ev = q.pluck(invoke_accepted, INFINITE_FUTURE)
       raise OutOfTime if ev.nil?
+      ev.close
 
       [finished_tag, client_metadata_read]
     end
@@ -191,11 +192,17 @@
     def writes_done(assert_finished=true)
       @call.writes_done(self)
       ev = @cq.pluck(self, INFINITE_FUTURE)
-      assert_event_type(ev, FINISH_ACCEPTED)
-      logger.debug("Writes done: waiting for finish? #{assert_finished}")
+      begin
+        assert_event_type(ev, FINISH_ACCEPTED)
+        logger.debug("Writes done: waiting for finish? #{assert_finished}")
+      ensure
+        ev.close
+      end
+
       if assert_finished
         ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
         raise "unexpected event: #{ev.inspect}" if ev.nil?
+        ev.close
         return @call.status
       end
     end
@@ -206,22 +213,21 @@
     # event.
     def finished
       ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
-      raise "unexpected event: #{ev.inspect}" unless ev.type == FINISHED
-      if @call.metadata.nil?
-        @call.metadata = ev.result.metadata
-      else
-        @call.metadata.merge!(ev.result.metadata)
-      end
+      begin
+        raise "unexpected event: #{ev.inspect}" unless ev.type == FINISHED
+        if @call.metadata.nil?
+          @call.metadata = ev.result.metadata
+        else
+          @call.metadata.merge!(ev.result.metadata)
+        end
 
-      if ev.result.code != Core::StatusCodes::OK
-        raise BadStatus.new(ev.result.code, ev.result.details)
+        if ev.result.code != Core::StatusCodes::OK
+          raise BadStatus.new(ev.result.code, ev.result.details)
+        end
+        res = ev.result
+      ensure
+        ev.close
       end
-      res = ev.result
-
-      # NOTE(temiola): This is necessary to allow the C call struct wrapped
-      # within the active_call to be GCed; this is necessary so that other
-      # C-level destructors get called in the required order.
-      ev = nil  # allow the event to be GCed
       res
     end
 
@@ -246,8 +252,11 @@
       # call queue#pluck, and wait for WRITE_ACCEPTED, so as not to return
       # until the flow control allows another send on this call.
       ev = @cq.pluck(self, INFINITE_FUTURE)
-      assert_event_type(ev, WRITE_ACCEPTED)
-      ev = nil
+      begin
+        assert_event_type(ev, WRITE_ACCEPTED)
+      ensure
+        ev.close
+      end
     end
 
     # send_status sends a status to the remote endpoint
@@ -260,7 +269,11 @@
       assert_queue_is_ready
       @call.start_write_status(code, details, self)
       ev = @cq.pluck(self, INFINITE_FUTURE)
-      assert_event_type(ev, FINISH_ACCEPTED)
+      begin
+        assert_event_type(ev, FINISH_ACCEPTED)
+      ensure
+        ev.close
+      end
       logger.debug("Status sent: #{code}:'#{details}'")
       if assert_finished
         return finished
@@ -283,13 +296,17 @@
 
       @call.start_read(self)
       ev = @cq.pluck(self, INFINITE_FUTURE)
-      assert_event_type(ev, READ)
-      logger.debug("received req: #{ev.result.inspect}")
-      if !ev.result.nil?
-        logger.debug("received req.to_s: #{ev.result.to_s}")
-        res = @unmarshal.call(ev.result.to_s)
-        logger.debug("received_req (unmarshalled): #{res.inspect}")
-        return res
+      begin
+        assert_event_type(ev, READ)
+        logger.debug("received req: #{ev.result.inspect}")
+        if !ev.result.nil?
+          logger.debug("received req.to_s: #{ev.result.to_s}")
+          res = @unmarshal.call(ev.result.to_s)
+          logger.debug("received_req (unmarshalled): #{res.inspect}")
+          return res
+        end
+      ensure
+        ev.close
       end
       logger.debug('found nil; the final response has been sent')
       nil
@@ -515,12 +532,15 @@
     # confirms that no events are enqueued, and that the queue is not
     # shutdown.
     def assert_queue_is_ready
+      ev = nil
       begin
         ev = @cq.pluck(self, ZERO)
         raise "unexpected event #{ev.inspect}" unless ev.nil?
       rescue OutOfTime
         # expected, nothing should be on the queue and the deadline was ZERO,
         # except things using another tag
+      ensure
+        ev.close unless ev.nil?
       end
     end
 
diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb
index fc9bb85..066ec85 100644
--- a/src/ruby/lib/grpc/generic/bidi_call.rb
+++ b/src/ruby/lib/grpc/generic/bidi_call.rb
@@ -149,15 +149,27 @@
             payload = @marshal.call(req)
             @call.start_write(Core::ByteBuffer.new(payload), write_tag)
             ev = @cq.pluck(write_tag, INFINITE_FUTURE)
-            assert_event_type(ev, WRITE_ACCEPTED)
+            begin
+              assert_event_type(ev, WRITE_ACCEPTED)
+            ensure
+              ev.close
+            end
           end
           if is_client
             @call.writes_done(write_tag)
             ev = @cq.pluck(write_tag, INFINITE_FUTURE)
-            assert_event_type(ev, FINISH_ACCEPTED)
+            begin
+              assert_event_type(ev, FINISH_ACCEPTED)
+            ensure
+              ev.close
+            end
             logger.debug("bidi-client: sent #{count} reqs, waiting to finish")
             ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
-            assert_event_type(ev, FINISHED)
+            begin
+              assert_event_type(ev, FINISHED)
+            ensure
+              ev.close
+            end
             logger.debug('bidi-client: finished received')
           end
         rescue StandardError => e
@@ -180,19 +192,23 @@
             count += 1
             @call.start_read(read_tag)
             ev = @cq.pluck(read_tag, INFINITE_FUTURE)
-            assert_event_type(ev, READ)
+            begin
+              assert_event_type(ev, READ)
 
-            # handle the next event.
-            if ev.result.nil?
-              @readq.push(END_OF_READS)
-              logger.debug('done reading!')
-              break
+              # handle the next event.
+              if ev.result.nil?
+                @readq.push(END_OF_READS)
+                logger.debug('done reading!')
+                break
+              end
+
+              # push the latest read onto the queue and continue reading
+              logger.debug("received req.to_s: #{ev.result.to_s}")
+              res = @unmarshal.call(ev.result.to_s)
+              @readq.push(res)
+            ensure
+              ev.close
             end
-
-            # push the latest read onto the queue and continue reading
-            logger.debug("received req.to_s: #{ev.result.to_s}")
-            res = @unmarshal.call(ev.result.to_s)
-            @readq.push(res)
           end
 
         rescue StandardError => e
diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb
index 2054d73..81db688 100644
--- a/src/ruby/lib/grpc/generic/rpc_server.rb
+++ b/src/ruby/lib/grpc/generic/rpc_server.rb
@@ -217,18 +217,13 @@
         next if ev.nil?
         if ev.type != SERVER_RPC_NEW
           logger.warn("bad evt: got:#{ev.type}, want:#{SERVER_RPC_NEW}")
+          ev.close
           next
         end
         c = new_active_server_call(ev.call, ev.result)
         if !c.nil?
           mth = ev.result.method.to_sym
-
-          # NOTE(temiola): This is necessary to allow the C call struct wrapped
-          # within the active_call created by the event to be GCed; this is
-          # necessary so that other C-level destructors get called in the
-          # required order.
-          ev = nil
-
+          ev.close
           @pool.schedule(c) do |call|
             rpc_descs[mth].run_server_method(call, rpc_handlers[mth])
           end