Merge pull request #3794 from ctiller/mf

Metadata atomic magic
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 1f8ffd8..cfd6f21 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -1528,7 +1528,6 @@
   grpc_metadata_array *dest;
   grpc_metadata *mdusr;
   int is_trailing;
-  grpc_mdctx *mdctx = call->metadata_context;
 
   is_trailing = call->read_state >= READ_STATE_GOT_INITIAL_METADATA;
   for (l = md->list.head; l != NULL; l = l->next) {
@@ -1588,14 +1587,12 @@
     call->read_state = READ_STATE_GOT_INITIAL_METADATA;
   }
 
-  grpc_mdctx_lock(mdctx);
   for (l = md->list.head; l; l = l->next) {
-    if (l->md) GRPC_MDCTX_LOCKED_MDELEM_UNREF(mdctx, l->md);
+    if (l->md) GRPC_MDELEM_UNREF(l->md);
   }
   for (l = md->garbage.head; l; l = l->next) {
-    GRPC_MDCTX_LOCKED_MDELEM_UNREF(mdctx, l->md);
+    GRPC_MDELEM_UNREF(l->md);
   }
-  grpc_mdctx_unlock(mdctx);
 }
 
 grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) {
diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c
index 6c7f7a9..24a5d95 100644
--- a/src/core/transport/chttp2/stream_encoder.c
+++ b/src/core/transport/chttp2/stream_encoder.c
@@ -584,7 +584,6 @@
   size_t max_take_size;
   gpr_uint32 curop = 0;
   gpr_uint32 unref_op;
-  grpc_mdctx *mdctx = compressor->mdctx;
   grpc_linked_mdelem *l;
   int need_unref = 0;
   gpr_timespec deadline;
@@ -650,17 +649,15 @@
   finish_frame(&st, 1, eof);
 
   if (need_unref) {
-    grpc_mdctx_lock(mdctx);
     for (unref_op = 0; unref_op < curop; unref_op++) {
       op = &ops[unref_op];
       if (op->type != GRPC_OP_METADATA) continue;
       for (l = op->data.metadata.list.head; l; l = l->next) {
-        if (l->md) GRPC_MDCTX_LOCKED_MDELEM_UNREF(mdctx, l->md);
+        if (l->md) GRPC_MDELEM_UNREF(l->md);
       }
       for (l = op->data.metadata.garbage.head; l; l = l->next) {
-        GRPC_MDCTX_LOCKED_MDELEM_UNREF(mdctx, l->md);
+        GRPC_MDELEM_UNREF(l->md);
       }
     }
-    grpc_mdctx_unlock(mdctx);
   }
 }
diff --git a/src/core/transport/metadata.c b/src/core/transport/metadata.c
index 3dbb9f0..68f2317 100644
--- a/src/core/transport/metadata.c
+++ b/src/core/transport/metadata.c
@@ -62,6 +62,8 @@
 #define REF_MD_LOCKED(s) ref_md_locked((s))
 #endif
 
+typedef void (*destroy_user_data_func)(void *user_data);
+
 typedef struct internal_string {
   /* must be byte compatible with grpc_mdstr */
   gpr_slice slice;
@@ -88,8 +90,8 @@
 
   /* private only data */
   gpr_mu mu_user_data;
-  void *user_data;
-  void (*destroy_user_data)(void *user_data);
+  gpr_atm destroy_user_data;
+  gpr_atm user_data;
 
   grpc_mdctx *context;
   struct internal_metadata *bucket_next;
@@ -158,8 +160,10 @@
           grpc_mdstr_as_c_string((grpc_mdstr *)md->key),
           grpc_mdstr_as_c_string((grpc_mdstr *)md->value));
 #endif
-  if (0 == gpr_atm_no_barrier_fetch_add(&md->refcnt, 1)) {
+  if (0 == gpr_atm_no_barrier_fetch_add(&md->refcnt, 2)) {
     md->context->mdtab_free--;
+  } else {
+    GPR_ASSERT(1 != gpr_atm_no_barrier_fetch_add(&md->refcnt, -1));
   }
 }
 
@@ -197,12 +201,14 @@
   for (i = 0; i < ctx->mdtab_capacity; i++) {
     cur = ctx->mdtab[i];
     while (cur) {
+      void *user_data = (void *)gpr_atm_no_barrier_load(&cur->user_data);
       GPR_ASSERT(gpr_atm_acq_load(&cur->refcnt) == 0);
       next = cur->bucket_next;
       INTERNAL_STRING_UNREF(cur->key);
       INTERNAL_STRING_UNREF(cur->value);
-      if (cur->user_data) {
-        cur->destroy_user_data(cur->user_data);
+      if (user_data != NULL) {
+        ((destroy_user_data_func)gpr_atm_no_barrier_load(
+            &cur->destroy_user_data))(user_data);
       }
       gpr_mu_destroy(&cur->mu_user_data);
       gpr_free(cur);
@@ -388,12 +394,14 @@
   for (i = 0; i < ctx->mdtab_capacity; i++) {
     prev_next = &ctx->mdtab[i];
     for (md = ctx->mdtab[i]; md; md = next) {
+      void *user_data = (void *)gpr_atm_no_barrier_load(&md->user_data);
       next = md->bucket_next;
       if (gpr_atm_acq_load(&md->refcnt) == 0) {
         INTERNAL_STRING_UNREF(md->key);
         INTERNAL_STRING_UNREF(md->value);
         if (md->user_data) {
-          md->destroy_user_data(md->user_data);
+          ((destroy_user_data_func)gpr_atm_no_barrier_load(
+              &md->destroy_user_data))(user_data);
         }
         gpr_free(md);
         *prev_next = next;
@@ -465,12 +473,12 @@
 
   /* not found: create a new pair */
   md = gpr_malloc(sizeof(internal_metadata));
-  gpr_atm_rel_store(&md->refcnt, 1);
+  gpr_atm_rel_store(&md->refcnt, 2);
   md->context = ctx;
   md->key = key;
   md->value = value;
-  md->user_data = NULL;
-  md->destroy_user_data = NULL;
+  md->user_data = 0;
+  md->destroy_user_data = 0;
   md->bucket_next = ctx->mdtab[hash % ctx->mdtab_capacity];
   gpr_mu_init(&md->mu_user_data);
 #ifdef GRPC_METADATA_REFCOUNT_DEBUG
@@ -527,15 +535,13 @@
      this function - meaning that no adjustment to mdtab_free is necessary,
      simplifying the logic here to be just an atomic increment */
   /* use C assert to have this removed in opt builds */
-  assert(gpr_atm_no_barrier_load(&md->refcnt) >= 1);
+  assert(gpr_atm_no_barrier_load(&md->refcnt) >= 2);
   gpr_atm_no_barrier_fetch_add(&md->refcnt, 1);
   return gmd;
 }
 
 void grpc_mdelem_unref(grpc_mdelem *gmd DEBUG_ARGS) {
   internal_metadata *md = (internal_metadata *)gmd;
-  grpc_mdctx *ctx = md->context;
-  lock(ctx);
 #ifdef GRPC_METADATA_REFCOUNT_DEBUG
   gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
           "ELM UNREF:%p:%d->%d: '%s' = '%s'", md,
@@ -544,11 +550,15 @@
           grpc_mdstr_as_c_string((grpc_mdstr *)md->key),
           grpc_mdstr_as_c_string((grpc_mdstr *)md->value));
 #endif
-  assert(gpr_atm_no_barrier_load(&md->refcnt) >= 1);
-  if (1 == gpr_atm_full_fetch_add(&md->refcnt, -1)) {
-    ctx->mdtab_free++;
+  if (2 == gpr_atm_full_fetch_add(&md->refcnt, -1)) {
+    grpc_mdctx *ctx = md->context;
+    lock(ctx);
+    if (1 == gpr_atm_no_barrier_load(&md->refcnt)) {
+      ctx->mdtab_free++;
+      gpr_atm_no_barrier_store(&md->refcnt, 0);
+    }
+    unlock(ctx);
   }
-  unlock(ctx);
 }
 
 const char *grpc_mdstr_as_c_string(grpc_mdstr *s) {
@@ -584,13 +594,14 @@
   return ctx->mdtab_free;
 }
 
-void *grpc_mdelem_get_user_data(grpc_mdelem *md,
-                                void (*if_destroy_func)(void *)) {
+void *grpc_mdelem_get_user_data(grpc_mdelem *md, void (*destroy_func)(void *)) {
   internal_metadata *im = (internal_metadata *)md;
   void *result;
-  gpr_mu_lock(&im->mu_user_data);
-  result = im->destroy_user_data == if_destroy_func ? im->user_data : NULL;
-  gpr_mu_unlock(&im->mu_user_data);
+  if (gpr_atm_acq_load(&im->destroy_user_data) == (gpr_atm)destroy_func) {
+    return (void *)gpr_atm_no_barrier_load(&im->user_data);
+  } else {
+    return NULL;
+  }
   return result;
 }
 
@@ -599,7 +610,7 @@
   internal_metadata *im = (internal_metadata *)md;
   GPR_ASSERT((user_data == NULL) == (destroy_func == NULL));
   gpr_mu_lock(&im->mu_user_data);
-  if (im->destroy_user_data) {
+  if (gpr_atm_no_barrier_load(&im->destroy_user_data)) {
     /* user data can only be set once */
     gpr_mu_unlock(&im->mu_user_data);
     if (destroy_func != NULL) {
@@ -607,8 +618,8 @@
     }
     return;
   }
-  im->destroy_user_data = destroy_func;
-  im->user_data = user_data;
+  gpr_atm_no_barrier_store(&im->user_data, (gpr_atm)user_data);
+  gpr_atm_rel_store(&im->destroy_user_data, (gpr_atm)destroy_func);
   gpr_mu_unlock(&im->mu_user_data);
 }
 
@@ -627,29 +638,6 @@
   return slice;
 }
 
-void grpc_mdctx_lock(grpc_mdctx *ctx) { lock(ctx); }
-
-void grpc_mdctx_locked_mdelem_unref(grpc_mdctx *ctx,
-                                    grpc_mdelem *gmd DEBUG_ARGS) {
-  internal_metadata *md = (internal_metadata *)gmd;
-  grpc_mdctx *elem_ctx = md->context;
-  GPR_ASSERT(ctx == elem_ctx);
-#ifdef GRPC_METADATA_REFCOUNT_DEBUG
-  gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
-          "ELM UNREF:%p:%d->%d: '%s' = '%s'", md,
-          gpr_atm_no_barrier_load(&md->refcnt),
-          gpr_atm_no_barrier_load(&md->refcnt) - 1,
-          grpc_mdstr_as_c_string((grpc_mdstr *)md->key),
-          grpc_mdstr_as_c_string((grpc_mdstr *)md->value));
-#endif
-  assert(gpr_atm_no_barrier_load(&md->refcnt) >= 1);
-  if (1 == gpr_atm_full_fetch_add(&md->refcnt, -1)) {
-    ctx->mdtab_free++;
-  }
-}
-
-void grpc_mdctx_unlock(grpc_mdctx *ctx) { unlock(ctx); }
-
 static int conforms_to(grpc_mdstr *s, const gpr_uint8 *legal_bits) {
   const gpr_uint8 *p = GPR_SLICE_START_PTR(s->slice);
   const gpr_uint8 *e = GPR_SLICE_END_PTR(s->slice);
diff --git a/src/core/transport/metadata.h b/src/core/transport/metadata.h
index 136a65f..9a81640 100644
--- a/src/core/transport/metadata.h
+++ b/src/core/transport/metadata.h
@@ -155,28 +155,6 @@
 int grpc_mdstr_is_legal_nonbin_header(grpc_mdstr *s);
 int grpc_mdstr_is_bin_suffixed(grpc_mdstr *s);
 
-/* Batch mode metadata functions.
-   These API's have equivalents above, but allow taking the mdctx just once,
-   performing a bunch of work, and then leaving the mdctx. */
-
-/* Lock the metadata context: it's only safe to call _locked_ functions against
-   this context from the calling thread until grpc_mdctx_unlock is called */
-void grpc_mdctx_lock(grpc_mdctx *ctx);
-#ifdef GRPC_METADATA_REFCOUNT_DEBUG
-#define GRPC_MDCTX_LOCKED_MDELEM_UNREF(ctx, elem) \
-  grpc_mdctx_locked_mdelem_unref((ctx), (elem), __FILE__, __LINE__)
-/* Unref a metadata element */
-void grpc_mdctx_locked_mdelem_unref(grpc_mdctx *ctx, grpc_mdelem *elem,
-                                    const char *file, int line);
-#else
-#define GRPC_MDCTX_LOCKED_MDELEM_UNREF(ctx, elem) \
-  grpc_mdctx_locked_mdelem_unref((ctx), (elem))
-/* Unref a metadata element */
-void grpc_mdctx_locked_mdelem_unref(grpc_mdctx *ctx, grpc_mdelem *elem);
-#endif
-/* Unlock the metadata context */
-void grpc_mdctx_unlock(grpc_mdctx *ctx);
-
 #define GRPC_MDSTR_KV_HASH(k_hash, v_hash) (GPR_ROTL((k_hash), 2) ^ (v_hash))
 
 #endif /* GRPC_INTERNAL_CORE_TRANSPORT_METADATA_H */