Merge pull request #6667 from kpayson64/ruby_eventfd_bug

Fixed ruby fd bug
diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c
index 1b06273..b436057 100644
--- a/src/ruby/ext/grpc/rb_call.c
+++ b/src/ruby/ext/grpc/rb_call.c
@@ -101,30 +101,14 @@
 static VALUE sym_status;
 static VALUE sym_cancelled;
 
-/* hash_all_calls is a hash of Call address -> reference count that is used to
- * track the creation and destruction of rb_call instances.
- */
-static VALUE hash_all_calls;
-
 /* Destroys a Call. */
 static void grpc_rb_call_destroy(void *p) {
-  grpc_call *call = NULL;
-  VALUE ref_count = Qnil;
+  grpc_call* call = NULL;
   if (p == NULL) {
     return;
-  };
-  call = (grpc_call *)p;
-
-  ref_count = rb_hash_aref(hash_all_calls, OFFT2NUM((VALUE)call));
-  if (ref_count == Qnil) {
-    return; /* No longer in the hash, so already deleted */
-  } else if (NUM2UINT(ref_count) == 1) {
-    rb_hash_delete(hash_all_calls, OFFT2NUM((VALUE)call));
-    grpc_call_destroy(call);
-  } else {
-    rb_hash_aset(hash_all_calls, OFFT2NUM((VALUE)call),
-                 UINT2NUM(NUM2UINT(ref_count) - 1));
   }
+  call = (grpc_call *)p;
+  grpc_call_destroy(call);
 }
 
 static size_t md_ary_datasize(const void *p) {
@@ -151,7 +135,7 @@
      * touches a hash object.
      * TODO(yugui) Directly use st_table and call the free function earlier?
      */
-    0,
+     0,
 #endif
 };
 
@@ -163,12 +147,7 @@
     NULL,
     NULL,
 #ifdef RUBY_TYPED_FREE_IMMEDIATELY
-    /* it is unsafe to specify RUBY_TYPED_FREE_IMMEDIATELY because
-     * grpc_rb_call_destroy
-     * touches a hash object.
-     * TODO(yugui) Directly use st_table and call the free function earlier?
-     */
-    0,
+    RUBY_TYPED_FREE_IMMEDIATELY
 #endif
 };
 
@@ -190,6 +169,11 @@
 static VALUE grpc_rb_call_cancel(VALUE self) {
   grpc_call *call = NULL;
   grpc_call_error err;
+  if (RTYPEDDATA_DATA(self) == NULL) {
+    //This call has been closed
+    return Qnil;
+  }
+
   TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call);
   err = grpc_call_cancel(call, NULL);
   if (err != GRPC_CALL_OK) {
@@ -200,11 +184,29 @@
   return Qnil;
 }
 
+/* Releases the c-level resources associated with a call
+   Once a call has been closed, no further requests can be
+   processed.
+*/
+static VALUE grpc_rb_call_close(VALUE self) {
+  grpc_call *call = NULL;
+  TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call);
+  if(call != NULL) {
+    grpc_call_destroy(call);
+    RTYPEDDATA_DATA(self) = NULL;
+  }
+  return Qnil;
+}
+
 /* Called to obtain the peer that this call is connected to. */
 static VALUE grpc_rb_call_get_peer(VALUE self) {
   VALUE res = Qnil;
   grpc_call *call = NULL;
   char *peer = NULL;
+  if (RTYPEDDATA_DATA(self) == NULL) {
+    rb_raise(grpc_rb_eCallError, "Cannot get peer value on closed call");
+    return Qnil;
+  }
   TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call);
   peer = grpc_call_get_peer(call);
   res = rb_str_new2(peer);
@@ -218,6 +220,10 @@
   grpc_call *call = NULL;
   VALUE res = Qnil;
   grpc_auth_context *ctx = NULL;
+  if (RTYPEDDATA_DATA(self) == NULL) {
+    rb_raise(grpc_rb_eCallError, "Cannot get peer cert on closed call");
+    return Qnil;
+  }
   TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call);
 
   ctx = grpc_call_auth_context(call);
@@ -323,6 +329,10 @@
   grpc_call *call = NULL;
   grpc_call_credentials *creds;
   grpc_call_error err;
+  if (RTYPEDDATA_DATA(self) == NULL) {
+    rb_raise(grpc_rb_eCallError, "Cannot set credentials of closed call");
+    return Qnil;
+  }
   TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call);
   creds = grpc_rb_get_wrapped_call_credentials(credentials);
   err = grpc_call_set_credentials(call, creds);
@@ -731,7 +741,7 @@
    }
    tag = Object.new
    timeout = 10
-   call.start_batch(cqueue, tag, timeout, ops)
+   call.start_batch(cq, tag, timeout, ops)
 
    Start a batch of operations defined in the array ops; when complete, post a
    completion of type 'tag' to the completion queue bound to the call.
@@ -749,6 +759,10 @@
   VALUE result = Qnil;
   VALUE rb_write_flag = rb_ivar_get(self, id_write_flag);
   unsigned write_flag = 0;
+  if (RTYPEDDATA_DATA(self) == NULL) {
+    rb_raise(grpc_rb_eCallError, "Cannot run batch on closed call");
+    return Qnil;
+  }
   TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call);
 
   /* Validate the ops args, adding them to a ruby array */
@@ -888,6 +902,7 @@
   /* Add ruby analogues of the Call methods. */
   rb_define_method(grpc_rb_cCall, "run_batch", grpc_rb_call_run_batch, 4);
   rb_define_method(grpc_rb_cCall, "cancel", grpc_rb_call_cancel, 0);
+  rb_define_method(grpc_rb_cCall, "close", grpc_rb_call_close, 0);
   rb_define_method(grpc_rb_cCall, "peer", grpc_rb_call_get_peer, 0);
   rb_define_method(grpc_rb_cCall, "peer_cert", grpc_rb_call_get_peer_cert, 0);
   rb_define_method(grpc_rb_cCall, "status", grpc_rb_call_get_status, 0);
@@ -925,11 +940,6 @@
       "BatchResult", "send_message", "send_metadata", "send_close",
       "send_status", "message", "metadata", "status", "cancelled", NULL);
 
-  /* The hash for reference counting calls, to ensure they can't be destroyed
-   * more than once */
-  hash_all_calls = rb_hash_new();
-  rb_define_const(grpc_rb_cCall, "INTERNAL_ALL_CALLs", hash_all_calls);
-
   Init_grpc_error_codes();
   Init_grpc_op_codes();
   Init_grpc_write_flags();
@@ -944,16 +954,8 @@
 
 /* Obtains the wrapped object for a given call */
 VALUE grpc_rb_wrap_call(grpc_call *c) {
-  VALUE obj = Qnil;
   if (c == NULL) {
     return Qnil;
   }
-  obj = rb_hash_aref(hash_all_calls, OFFT2NUM((VALUE)c));
-  if (obj == Qnil) { /* Not in the hash add it */
-    rb_hash_aset(hash_all_calls, OFFT2NUM((VALUE)c), UINT2NUM(1));
-  } else {
-    rb_hash_aset(hash_all_calls, OFFT2NUM((VALUE)c),
-                 UINT2NUM(NUM2UINT(obj) + 1));
-  }
   return TypedData_Wrap_Struct(grpc_rb_cCall, &grpc_call_data_type, c);
 }
diff --git a/src/ruby/ext/grpc/rb_completion_queue.c b/src/ruby/ext/grpc/rb_completion_queue.c
index b6ddbe8..9466402 100644
--- a/src/ruby/ext/grpc/rb_completion_queue.c
+++ b/src/ruby/ext/grpc/rb_completion_queue.c
@@ -150,6 +150,14 @@
 #endif
 };
 
+/* Releases the c-level resources associated with a completion queue */
+static VALUE grpc_rb_completion_queue_close(VALUE self) {
+  grpc_completion_queue* cq = grpc_rb_get_wrapped_completion_queue(self);
+  grpc_rb_completion_queue_destroy(cq);
+  RTYPEDDATA_DATA(self) = NULL;
+  return Qnil;
+}
+
 /* Allocates a completion queue. */
 static VALUE grpc_rb_completion_queue_alloc(VALUE cls) {
   grpc_completion_queue *cq = grpc_completion_queue_create(NULL);
@@ -212,6 +220,11 @@
      this func, so no separate initialization step is necessary. */
   rb_define_alloc_func(grpc_rb_cCompletionQueue,
                        grpc_rb_completion_queue_alloc);
+
+  /* close: Provides a way to close the underlying file descriptor without
+     waiting for ruby garbage collection. */
+  rb_define_method(grpc_rb_cCompletionQueue, "close",
+                   grpc_rb_completion_queue_close, 0);
 }
 
 /* Gets the wrapped completion queue from the ruby wrapper */
diff --git a/src/ruby/ext/grpc/rb_grpc.c b/src/ruby/ext/grpc/rb_grpc.c
index 06a07ac..9246893 100644
--- a/src/ruby/ext/grpc/rb_grpc.c
+++ b/src/ruby/ext/grpc/rb_grpc.c
@@ -318,7 +318,7 @@
   grpc_rb_mGrpcCore = rb_define_module_under(grpc_rb_mGRPC, "Core");
   grpc_rb_sNewServerRpc =
       rb_struct_define("NewServerRpc", "method", "host",
-                       "deadline", "metadata", "call", NULL);
+                       "deadline", "metadata", "call", "cq", NULL);
   grpc_rb_sStatus =
       rb_struct_define("Status", "code", "details", "metadata", NULL);
   sym_code = ID2SYM(rb_intern("code"));
diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c
index 0899feb..f108b8a 100644
--- a/src/ruby/ext/grpc/rb_server.c
+++ b/src/ruby/ext/grpc/rb_server.c
@@ -234,7 +234,7 @@
     err = grpc_server_request_call(
         s->wrapped, &call, &st.details, &st.md_ary,
         grpc_rb_get_wrapped_completion_queue(cqueue),
-        grpc_rb_get_wrapped_completion_queue(cqueue),
+        grpc_rb_get_wrapped_completion_queue(s->mark),
         ROBJECT(tag_new));
     if (err != GRPC_CALL_OK) {
       grpc_request_call_stack_cleanup(&st);
@@ -244,7 +244,7 @@
       return Qnil;
     }
 
-    ev = grpc_rb_completion_queue_pluck_event(cqueue, tag_new, timeout);
+    ev = grpc_rb_completion_queue_pluck_event(s->mark, tag_new, timeout);
     if (ev.type == GRPC_QUEUE_TIMEOUT) {
       grpc_request_call_stack_cleanup(&st);
       return Qnil;
@@ -262,7 +262,7 @@
         rb_str_new2(st.details.host),
         rb_funcall(rb_cTime, id_at, 2, INT2NUM(deadline.tv_sec),
                    INT2NUM(deadline.tv_nsec)),
-        grpc_rb_md_ary_to_h(&st.md_ary), grpc_rb_wrap_call(call), NULL);
+        grpc_rb_md_ary_to_h(&st.md_ary), grpc_rb_wrap_call(call), cqueue, NULL);
     grpc_request_call_stack_cleanup(&st);
     return result;
   }
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index e449e89..b03ddbc 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -103,7 +103,7 @@
     #
     # @param call [Call] the call used by the ActiveCall
     # @param q [CompletionQueue] the completion queue used to accept
-    #          the call
+    #          the call.  This queue will be closed on call completion.
     # @param marshal [Function] f(obj)->string that marshal requests
     # @param unmarshal [Function] f(string)->obj that unmarshals responses
     # @param deadline [Fixnum] the deadline for the call to complete
@@ -191,6 +191,8 @@
       @call.status = batch_result.status
       op_is_done
       batch_result.check_status
+      @call.close
+      @cq.close
     end
 
     # remote_send sends a request to the remote endpoint.
diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb
index 1f6d5f3..238f409 100644
--- a/src/ruby/lib/grpc/generic/bidi_call.rb
+++ b/src/ruby/lib/grpc/generic/bidi_call.rb
@@ -69,6 +69,10 @@
       @readq = Queue.new
       @unmarshal = unmarshal
       @metadata_tag = metadata_tag
+      @reads_complete = false
+      @writes_complete = false
+      @complete = false
+      @done_mutex = Mutex.new
     end
 
     # Begins orchestration of the Bidi stream for a client sending requests.
@@ -115,6 +119,16 @@
       @op_notifier.notify(self)
     end
 
+    # signals that a bidi operation is complete (read + write)
+    def finished
+      @done_mutex.synchronize do
+        return unless @reads_complete && @writes_complete && !@complete
+        @call.close
+        @cq.close
+        @complete = true
+      end
+    end
+
     # performs a read using @call.run_batch, ensures metadata is set up
     def read_using_run_batch
       ops = { RECV_MESSAGE => nil }
@@ -163,12 +177,16 @@
                         SEND_CLOSE_FROM_CLIENT => nil)
         GRPC.logger.debug('bidi-write-loop: done')
         notify_done
+        @writes_complete = true
+        finished
       end
       GRPC.logger.debug('bidi-write-loop: finished')
     rescue StandardError => e
       GRPC.logger.warn('bidi-write-loop: failed')
       GRPC.logger.warn(e)
       notify_done
+      @writes_complete = true
+      finished
       raise e
     end
 
@@ -212,6 +230,8 @@
           @readq.push(e)  # let each_queued_msg terminate with this error
         end
         GRPC.logger.debug('bidi-read-loop: finished')
+        @reads_complete = true
+        finished
       end
     end
   end
diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb
index 6b0b4ce..ab7333d 100644
--- a/src/ruby/lib/grpc/generic/rpc_server.rb
+++ b/src/ruby/lib/grpc/generic/rpc_server.rb
@@ -355,7 +355,7 @@
       return an_rpc if @pool.jobs_waiting <= @max_waiting_requests
       GRPC.logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}")
       noop = proc { |x| x }
-      c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline)
+      c = ActiveCall.new(an_rpc.call, an_rpc.cq, noop, noop, an_rpc.deadline)
       c.send_status(GRPC::Core::StatusCodes::RESOURCE_EXHAUSTED, '')
       nil
     end
@@ -366,7 +366,7 @@
       return an_rpc if rpc_descs.key?(mth)
       GRPC.logger.warn("UNIMPLEMENTED: #{an_rpc}")
       noop = proc { |x| x }
-      c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline)
+      c = ActiveCall.new(an_rpc.call, an_rpc.cq, noop, noop, an_rpc.deadline)
       c.send_status(GRPC::Core::StatusCodes::UNIMPLEMENTED, '')
       nil
     end
@@ -377,7 +377,8 @@
       loop_tag = Object.new
       while running_state == :running
         begin
-          an_rpc = @server.request_call(@cq, loop_tag, INFINITE_FUTURE)
+          comp_queue = Core::CompletionQueue.new
+          an_rpc = @server.request_call(comp_queue, loop_tag, INFINITE_FUTURE)
           break if (!an_rpc.nil?) && an_rpc.call.nil?
           active_call = new_active_server_call(an_rpc)
           unless active_call.nil?
@@ -416,15 +417,16 @@
       unless @connect_md_proc.nil?
         connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.metadata)
       end
-      an_rpc.call.run_batch(@cq, handle_call_tag, INFINITE_FUTURE,
+      an_rpc.call.run_batch(an_rpc.cq, handle_call_tag, INFINITE_FUTURE,
                             SEND_INITIAL_METADATA => connect_md)
+
       return nil unless available?(an_rpc)
       return nil unless implemented?(an_rpc)
 
       # Create the ActiveCall
       GRPC.logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})")
       rpc_desc = rpc_descs[an_rpc.method.to_sym]
-      c = ActiveCall.new(an_rpc.call, @cq,
+      c = ActiveCall.new(an_rpc.call, an_rpc.cq,
                          rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input),
                          an_rpc.deadline)
       mth = an_rpc.method.to_sym