Introduce a new memory reclamation scheme for channel stacks

Allows the bottom member of the stack to schedule the actual deallocation, allowing a final transport lock to be entered when destroying a call.
diff --git a/src/core/census/grpc_filter.c b/src/core/census/grpc_filter.c
index c8aaf31..987407f 100644
--- a/src/core/census/grpc_filter.c
+++ b/src/core/census/grpc_filter.c
@@ -134,7 +134,7 @@
 }
 
 static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx,
-                                     grpc_call_element *elem) {
+                                     grpc_call_element *elem, void *ignored) {
   call_data *d = elem->call_data;
   GPR_ASSERT(d != NULL);
   /* TODO(hongyu): record rpc client stats and census_rpc_end_op here */
@@ -152,7 +152,7 @@
 }
 
 static void server_destroy_call_elem(grpc_exec_ctx *exec_ctx,
-                                     grpc_call_element *elem) {
+                                     grpc_call_element *elem, void *ignored) {
   call_data *d = elem->call_data;
   GPR_ASSERT(d != NULL);
   /* TODO(hongyu): record rpc server stats and census_tracing_end_op here */
diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c
index 3e61688..1869597 100644
--- a/src/core/channel/channel_stack.c
+++ b/src/core/channel/channel_stack.c
@@ -213,14 +213,16 @@
                                         grpc_call_element *elem,
                                         grpc_pollset *pollset) {}
 
-void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack) {
+void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack,
+                             void *and_free_memory) {
   grpc_call_element *elems = CALL_ELEMS_FROM_STACK(stack);
   size_t count = stack->count;
   size_t i;
 
   /* destroy per-filter data */
   for (i = 0; i < count; i++) {
-    elems[i].filter->destroy_call_elem(exec_ctx, &elems[i]);
+    elems[i].filter->destroy_call_elem(exec_ctx, &elems[i],
+                                       i == count - 1 ? and_free_memory : NULL);
   }
 }
 
diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h
index 52362f0..4407333 100644
--- a/src/core/channel/channel_stack.h
+++ b/src/core/channel/channel_stack.h
@@ -104,8 +104,12 @@
   void (*set_pollset)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
                       grpc_pollset *pollset);
   /* Destroy per call data.
-     The filter does not need to do any chaining */
-  void (*destroy_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem);
+     The filter does not need to do any chaining.
+     The bottom filter of a stack will be passed a non-NULL pointer to
+     \a and_free_memory that should be passed to gpr_free when destruction
+     is complete. */
+  void (*destroy_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+                            void *and_free_memory);
 
   /* sizeof(per channel data) */
   size_t sizeof_channel_data;
@@ -223,7 +227,8 @@
 #endif
 
 /* Destroy a call stack */
-void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack);
+void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack,
+                             void *and_free_memory);
 
 /* Ignore set pollset - used by filters to implement the set_pollset method
    if they don't care about pollsets at all. Does nothing. */
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index f021a8a..27d6e03 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -379,9 +379,10 @@
 }
 
 /* Destructor for call_data */
-static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
-                              grpc_call_element *elem) {
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+                              void *and_free_memory) {
   grpc_subchannel_call_holder_destroy(exec_ctx, elem->call_data);
+  gpr_free(and_free_memory);
 }
 
 /* Constructor for channel_data */
diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c
index 3e7ca08..d5d5fc2 100644
--- a/src/core/channel/compress_filter.c
+++ b/src/core/channel/compress_filter.c
@@ -246,8 +246,8 @@
 }
 
 /* Destructor for call_data */
-static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
-                              grpc_call_element *elem) {
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+                              void *ignored) {
   /* grab pointers to our data from the call element */
   call_data *calld = elem->call_data;
   gpr_slice_buffer_destroy(&calld->slices);
diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c
index e7ed3cc..e6e9daa 100644
--- a/src/core/channel/connected_channel.c
+++ b/src/core/channel/connected_channel.c
@@ -37,13 +37,13 @@
 #include <stdio.h>
 #include <string.h>
 
-#include "src/core/support/string.h"
-#include "src/core/transport/transport.h"
-#include "src/core/profiling/timers.h"
 #include <grpc/byte_buffer.h>
 #include <grpc/support/alloc.h>
 #include <grpc/support/log.h>
 #include <grpc/support/slice_buffer.h>
+#include "src/core/profiling/timers.h"
+#include "src/core/support/string.h"
+#include "src/core/transport/transport.h"
 
 #define MAX_BUFFER_LENGTH 8192
 
@@ -102,12 +102,13 @@
 }
 
 /* Destructor for call_data */
-static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
-                              grpc_call_element *elem) {
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+                              void *and_free_memory) {
   call_data *calld = elem->call_data;
   channel_data *chand = elem->channel_data;
   grpc_transport_destroy_stream(exec_ctx, chand->transport,
-                                TRANSPORT_STREAM_FROM_CALL_DATA(calld));
+                                TRANSPORT_STREAM_FROM_CALL_DATA(calld),
+                                and_free_memory);
 }
 
 /* Constructor for channel_data */
diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c
index 1aa2720..26f1110 100644
--- a/src/core/channel/http_client_filter.c
+++ b/src/core/channel/http_client_filter.c
@@ -151,8 +151,8 @@
 }
 
 /* Destructor for call_data */
-static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
-                              grpc_call_element *elem) {}
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+                              void *ignored) {}
 
 static grpc_mdelem *scheme_from_args(const grpc_channel_args *args) {
   unsigned i;
diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c
index 370f8db..6c97a07 100644
--- a/src/core/channel/http_server_filter.c
+++ b/src/core/channel/http_server_filter.c
@@ -212,8 +212,8 @@
 }
 
 /* Destructor for call_data */
-static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
-                              grpc_call_element *elem) {}
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+                              void *ignored) {}
 
 /* Constructor for channel_data */
 static void init_channel_elem(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 8f150a8..7a61df4 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -620,9 +620,9 @@
                                     bool success) {
   grpc_subchannel_call *c = call;
   GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0);
-  grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c));
-  GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, c->connection, "subchannel_call");
-  gpr_free(c);
+  grpc_connected_subchannel *connection = c->connection;
+  grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), c);
+  GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, connection, "subchannel_call");
   GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0);
 }
 
diff --git a/src/core/security/client_auth_filter.c b/src/core/security/client_auth_filter.c
index 332d425..c72bfc4 100644
--- a/src/core/security/client_auth_filter.c
+++ b/src/core/security/client_auth_filter.c
@@ -277,8 +277,8 @@
 }
 
 /* Destructor for call_data */
-static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
-                              grpc_call_element *elem) {
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+                              void *ignored) {
   call_data *calld = elem->call_data;
   grpc_call_credentials_unref(calld->creds);
   if (calld->host != NULL) {
diff --git a/src/core/security/server_auth_filter.c b/src/core/security/server_auth_filter.c
index 3d8e5e8..3d348c8 100644
--- a/src/core/security/server_auth_filter.c
+++ b/src/core/security/server_auth_filter.c
@@ -224,8 +224,8 @@
                         grpc_pollset *pollset) {}
 
 /* Destructor for call_data */
-static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
-                              grpc_call_element *elem) {}
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+                              void *ignored) {}
 
 /* Constructor for channel_data */
 static void init_channel_elem(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index d9808e6..e7861b8 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -375,7 +375,6 @@
   if (c->receiving_stream != NULL) {
     grpc_byte_stream_destroy(exec_ctx, c->receiving_stream);
   }
-  grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c));
   GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->channel, "call");
   gpr_mu_destroy(&c->mu);
   for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
@@ -394,7 +393,7 @@
   if (c->cq) {
     GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
   }
-  gpr_free(c);
+  grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), c);
   GPR_TIMER_END("destroy_call", 0);
 }
 
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index 58f8994..9ef88bb 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -37,13 +37,13 @@
 
 #include <string.h>
 
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
 #include "src/core/channel/channel_stack.h"
 #include "src/core/support/string.h"
 #include "src/core/surface/api_trace.h"
-#include "src/core/surface/channel.h"
 #include "src/core/surface/call.h"
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
+#include "src/core/surface/channel.h"
 
 typedef struct {
   grpc_linked_mdelem status;
@@ -104,8 +104,10 @@
 static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
                            grpc_call_element_args *args) {}
 
-static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
-                              grpc_call_element *elem) {}
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+                              void *and_free_memory) {
+  gpr_free(and_free_memory);
+}
 
 static void init_channel_elem(grpc_exec_ctx *exec_ctx,
                               grpc_channel_element *elem,
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index da93474..88554e4 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -692,8 +692,8 @@
   server_ref(chand->server);
 }
 
-static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
-                              grpc_call_element *elem) {
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+                              void *ignored) {
   channel_data *chand = elem->channel_data;
   call_data *calld = elem->call_data;
 
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 182c713..3a73826 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -512,26 +512,20 @@
   return 0;
 }
 
-static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
-                           grpc_stream *gs) {
-  grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
-  grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
+static void destroy_stream_locked(grpc_exec_ctx *exec_ctx,
+                                  grpc_chttp2_transport *t,
+                                  grpc_chttp2_stream *s, void *arg) {
   grpc_byte_stream *bs;
 
-#if 0
-  int i;
-
   GPR_TIMER_BEGIN("destroy_stream", 0);
 
-  gpr_mu_lock(&t->mu);
-
   GPR_ASSERT((s->global.write_closed && s->global.read_closed) ||
              s->global.id == 0);
   GPR_ASSERT(!s->global.in_stream_map);
   if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) {
-    close_transport_locked(exec_ctx, t);
+    close_transport_locked(exec_ctx, t, NULL, NULL);
   }
-  if (!t->parsing_active && s->global.id) {
+  if (!t->executor.parsing_active && s->global.id) {
     GPR_ASSERT(grpc_chttp2_stream_map_find(&t->parsing_stream_map,
                                            s->global.id) == NULL);
   }
@@ -539,11 +533,11 @@
   grpc_chttp2_list_remove_unannounced_incoming_window_available(&t->global,
                                                                 &s->global);
   grpc_chttp2_list_remove_stalled_by_transport(&t->global, &s->global);
-#endif
 
-  int i;
+  /* TODO(ctiller): the remainder of this function could be done without the
+                    global lock */
 
-  for (i = 0; i < STREAM_LIST_COUNT; i++) {
+  for (int i = 0; i < STREAM_LIST_COUNT; i++) {
     if (s->included[i]) {
       gpr_log(GPR_ERROR, "%s stream %d still included in list %d",
               t->global.is_client ? "client" : "server", s->global.id, i);
@@ -574,6 +568,17 @@
   UNREF_TRANSPORT(exec_ctx, t, "stream");
 
   GPR_TIMER_END("destroy_stream", 0);
+
+  gpr_free(arg);
+}
+
+static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+                           grpc_stream *gs, void *and_free_memory) {
+  grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
+  grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
+
+  grpc_chttp2_run_with_global_lock(exec_ctx, t, s, destroy_stream_locked,
+                                   and_free_memory, 0);
 }
 
 grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream(
@@ -1509,8 +1514,8 @@
   GPR_TIMER_BEGIN("reading_action.parse", 0);
   size_t i = 0;
   for (; i < t->read_buffer.count &&
-         grpc_chttp2_perform_read(exec_ctx, &t->parsing,
-                                  t->read_buffer.slices[i]);
+             grpc_chttp2_perform_read(exec_ctx, &t->parsing,
+                                      t->read_buffer.slices[i]);
        i++)
     ;
   if (i != t->read_buffer.count) {
@@ -1602,10 +1607,9 @@
     grpc_connectivity_state state, const char *reason) {
   GRPC_CHTTP2_IF_TRACING(
       gpr_log(GPR_DEBUG, "set connectivity_state=%d", state));
-  grpc_connectivity_state_set(
-      exec_ctx,
-      &TRANSPORT_FROM_GLOBAL(transport_global)->channel_callback.state_tracker,
-      state, reason);
+  grpc_connectivity_state_set(exec_ctx, &TRANSPORT_FROM_GLOBAL(transport_global)
+                                             ->channel_callback.state_tracker,
+                              state, reason);
 }
 
 /*******************************************************************************
@@ -1915,15 +1919,10 @@
   return gpr_strdup(((grpc_chttp2_transport *)t)->peer_string);
 }
 
-static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream),
-                                             "chttp2",
-                                             init_stream,
-                                             set_pollset,
-                                             perform_stream_op,
-                                             perform_transport_op,
-                                             destroy_stream,
-                                             destroy_transport,
-                                             chttp2_get_peer};
+static const grpc_transport_vtable vtable = {
+    sizeof(grpc_chttp2_stream), "chttp2", init_stream, set_pollset,
+    perform_stream_op, perform_transport_op, destroy_stream, destroy_transport,
+    chttp2_get_peer};
 
 grpc_transport *grpc_create_chttp2_transport(
     grpc_exec_ctx *exec_ctx, const grpc_channel_args *channel_args,
diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c
index 5b5af0e..b429804 100644
--- a/src/core/transport/transport.c
+++ b/src/core/transport/transport.c
@@ -133,8 +133,9 @@
 
 void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx,
                                    grpc_transport *transport,
-                                   grpc_stream *stream) {
-  transport->vtable->destroy_stream(exec_ctx, transport, stream);
+                                   grpc_stream *stream, void *and_free_memory) {
+  transport->vtable->destroy_stream(exec_ctx, transport, stream,
+                                    and_free_memory);
 }
 
 char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index 908ef0b..ddb81f9 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -208,7 +208,7 @@
                  caller, but any child memory must be cleaned up) */
 void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx,
                                    grpc_transport *transport,
-                                   grpc_stream *stream);
+                                   grpc_stream *stream, void *and_free_memory);
 
 void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx,
                                                   grpc_transport_stream_op *op);
diff --git a/src/core/transport/transport_impl.h b/src/core/transport/transport_impl.h
index d9ecc4d..9d53971 100644
--- a/src/core/transport/transport_impl.h
+++ b/src/core/transport/transport_impl.h
@@ -63,7 +63,7 @@
 
   /* implementation of grpc_transport_destroy_stream */
   void (*destroy_stream)(grpc_exec_ctx *exec_ctx, grpc_transport *self,
-                         grpc_stream *stream);
+                         grpc_stream *stream, void *and_free_memory);
 
   /* implementation of grpc_transport_destroy */
   void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_transport *self);