Lock free deadline filter
diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c
index 720cfb4..fcc08c5 100644
--- a/src/core/lib/channel/deadline_filter.c
+++ b/src/core/lib/channel/deadline_filter.c
@@ -43,6 +43,8 @@
 #include "src/core/lib/iomgr/timer.h"
 #include "src/core/lib/slice/slice_internal.h"
 
+#define TOMBSTONE_TIMER 1
+
 //
 // grpc_deadline_state
 //
@@ -52,9 +54,6 @@
                            grpc_error* error) {
   grpc_call_element* elem = arg;
   grpc_deadline_state* deadline_state = elem->call_data;
-  gpr_mu_lock(&deadline_state->timer_mu);
-  deadline_state->timer_pending = false;
-  gpr_mu_unlock(&deadline_state->timer_mu);
   if (error != GRPC_ERROR_CANCELLED) {
     grpc_call_element_signal_error(
         exec_ctx, elem,
@@ -66,53 +65,54 @@
 }
 
 // Starts the deadline timer.
-static void start_timer_if_needed_locked(grpc_exec_ctx* exec_ctx,
-                                         grpc_call_element* elem,
-                                         gpr_timespec deadline) {
-  grpc_deadline_state* deadline_state = elem->call_data;
-  deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
-  // Note: We do not start the timer if there is already a timer
-  // pending.  This should be okay, because this is only called from two
-  // functions exported by this module: grpc_deadline_state_start(), which
-  // starts the initial timer, and grpc_deadline_state_reset(), which
-  // cancels any pre-existing timer before starting a new one.  In
-  // particular, we want to ensure that if grpc_deadline_state_start()
-  // winds up trying to start the timer after grpc_deadline_state_reset()
-  // has already done so, we ignore the value from the former.
-  if (!deadline_state->timer_pending &&
-      gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) {
-    // Take a reference to the call stack, to be owned by the timer.
-    GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer");
-    deadline_state->timer_pending = true;
-    grpc_closure_init(&deadline_state->timer_callback, timer_callback, elem,
-                      grpc_schedule_on_exec_ctx);
-    grpc_timer_init(exec_ctx, &deadline_state->timer, deadline,
-                    &deadline_state->timer_callback,
-                    gpr_now(GPR_CLOCK_MONOTONIC));
-  }
-}
 static void start_timer_if_needed(grpc_exec_ctx* exec_ctx,
                                   grpc_call_element* elem,
                                   gpr_timespec deadline) {
+  deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
+  if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) {
+    return;
+  }
   grpc_deadline_state* deadline_state = elem->call_data;
-  gpr_mu_lock(&deadline_state->timer_mu);
-  start_timer_if_needed_locked(exec_ctx, elem, deadline);
-  gpr_mu_unlock(&deadline_state->timer_mu);
+  for (size_t i = 0; i < GPR_ARRAY_SIZE(deadline_state->timers); i++) {
+    if (gpr_atm_acq_load(&deadline_state->timers[i]) == 0) {
+      grpc_deadline_timer* timer = (i == 0 ? &deadline_state->inlined_timer
+                                           : gpr_malloc(sizeof(*timer)));
+      if (gpr_atm_rel_cas(&deadline_state->timers[i], 0, (gpr_atm)timer)) {
+        grpc_timer_init(
+            exec_ctx, &timer->timer, deadline,
+            grpc_closure_init(&timer->timer_callback, timer_callback, elem,
+                              grpc_schedule_on_exec_ctx),
+            gpr_now(GPR_CLOCK_MONOTONIC));
+      } else if (i != 0) {
+        gpr_free(timer);
+      }
+    }
+  }
+  GPR_UNREACHABLE_CODE(return;);
 }
 
 // Cancels the deadline timer.
-static void cancel_timer_if_needed_locked(grpc_exec_ctx* exec_ctx,
-                                          grpc_deadline_state* deadline_state) {
-  if (deadline_state->timer_pending) {
-    grpc_timer_cancel(exec_ctx, &deadline_state->timer);
-    deadline_state->timer_pending = false;
-  }
-}
 static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx,
                                    grpc_deadline_state* deadline_state) {
-  gpr_mu_lock(&deadline_state->timer_mu);
-  cancel_timer_if_needed_locked(exec_ctx, deadline_state);
-  gpr_mu_unlock(&deadline_state->timer_mu);
+  for (size_t i = 0; i < GPR_ARRAY_SIZE(deadline_state->timers); i++) {
+    gpr_atm timer_val;
+    timer_val = gpr_atm_acq_load(&deadline_state->timers[i]);
+    switch (timer_val) {
+      case 0:
+        break;
+      case TOMBSTONE_TIMER:
+        break;
+      default:
+        if (!gpr_atm_rel_cas(&deadline_state->timers[i], timer_val,
+                             TOMBSTONE_TIMER)) {
+          break;  // must have become a tombstone
+        }
+        grpc_deadline_timer* timer = (grpc_deadline_timer*)timer_val;
+        grpc_timer_cancel(exec_ctx, &timer->timer);
+        if (i != 0) gpr_free(timer);
+        break;
+    }
+  }
 }
 
 // Callback run when the call is complete.
@@ -138,14 +138,12 @@
   grpc_deadline_state* deadline_state = elem->call_data;
   memset(deadline_state, 0, sizeof(*deadline_state));
   deadline_state->call_stack = call_stack;
-  gpr_mu_init(&deadline_state->timer_mu);
 }
 
 void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx,
                                  grpc_call_element* elem) {
   grpc_deadline_state* deadline_state = elem->call_data;
   cancel_timer_if_needed(exec_ctx, deadline_state);
-  gpr_mu_destroy(&deadline_state->timer_mu);
 }
 
 // Callback and associated state for starting the timer after call stack
@@ -187,10 +185,8 @@
 void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
                                gpr_timespec new_deadline) {
   grpc_deadline_state* deadline_state = elem->call_data;
-  gpr_mu_lock(&deadline_state->timer_mu);
-  cancel_timer_if_needed_locked(exec_ctx, deadline_state);
-  start_timer_if_needed_locked(exec_ctx, elem, new_deadline);
-  gpr_mu_unlock(&deadline_state->timer_mu);
+  cancel_timer_if_needed(exec_ctx, deadline_state);
+  start_timer_if_needed(exec_ctx, elem, new_deadline);
 }
 
 void grpc_deadline_state_client_start_transport_stream_op(