Move client-side deadline handling to client_channel filter.
diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c
index 9dff1c4..e4f1326 100644
--- a/src/core/lib/channel/deadline_filter.c
+++ b/src/core/lib/channel/deadline_filter.c
@@ -40,26 +40,145 @@
 
 #include "src/core/lib/iomgr/timer.h"
 
+//
+// grpc_deadline_state
+//
+
+// Timer callback.
+static void timer_callback(grpc_exec_ctx *exec_ctx, void *arg,
+                           grpc_error *error) {
+  grpc_call_element* elem = arg;
+  grpc_deadline_state* deadline_state = elem->call_data;
+gpr_log(GPR_INFO, "==> %s(), is_client=%d", __func__, deadline_state->is_client);
+  gpr_mu_lock(&deadline_state->timer_mu);
+  deadline_state->timer_pending = false;
+  gpr_mu_unlock(&deadline_state->timer_mu);
+  if (error != GRPC_ERROR_CANCELLED) {
+gpr_log(GPR_INFO, "DEADLINE_EXCEEDED");
+// FIXME: change grpc_call_element_send_cancel_with_message to not call close
+//    grpc_call_element_send_cancel(exec_ctx, elem);
+    grpc_transport_stream_op op;
+    memset(&op, 0, sizeof(op));
+    op.cancel_error = grpc_error_set_int(
+        GRPC_ERROR_CREATE("Deadline Exceeded"),
+        GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_DEADLINE_EXCEEDED);
+    elem->filter->start_transport_stream_op(exec_ctx, elem, &op);
+  }
+  GRPC_CALL_STACK_UNREF(exec_ctx, deadline_state->call_stack, "deadline_timer");
+}
+
+// Starts the deadline timer.
+static void start_timer_if_needed(grpc_exec_ctx *exec_ctx,
+                                  grpc_call_element* elem,
+                                  gpr_timespec deadline) {
+  grpc_deadline_state* deadline_state = elem->call_data;
+gpr_log(GPR_INFO, "==> %s(), is_client=%d", __func__, deadline_state->is_client);
+  deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
+  if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) {
+    // Take a reference to the call stack, to be owned by the timer.
+    GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer");
+    gpr_mu_lock(&deadline_state->timer_mu);
+gpr_log(GPR_INFO, "STARTING TIMER -- is_client=%d", deadline_state->is_client);
+    deadline_state->timer_pending = true;
+    grpc_timer_init(exec_ctx, &deadline_state->timer, deadline, timer_callback,
+                    elem, gpr_now(GPR_CLOCK_MONOTONIC));
+    gpr_mu_unlock(&deadline_state->timer_mu);
+  }
+}
+
+// Cancels the deadline timer.
+static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx,
+                                   grpc_deadline_state* deadline_state) {
+gpr_log(GPR_INFO, "==> %s(), is_client=%d", __func__, deadline_state->is_client);
+  gpr_mu_lock(&deadline_state->timer_mu);
+  if (deadline_state->timer_pending) {
+gpr_log(GPR_INFO, "CANCELLING TIMER -- is_client=%d", deadline_state->is_client);
+    grpc_timer_cancel(exec_ctx, &deadline_state->timer);
+    deadline_state->timer_pending = false;
+  }
+  gpr_mu_unlock(&deadline_state->timer_mu);
+}
+
+// Callback run when the call is complete.
+static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
+  grpc_deadline_state* deadline_state = arg;
+gpr_log(GPR_INFO, "==> %s(), is_client=%d, next_on_complete->cb=%p", __func__, deadline_state->is_client, deadline_state->next_on_complete->cb);
+  cancel_timer_if_needed(exec_ctx, deadline_state);
+  // Invoke the next callback.
+  deadline_state->next_on_complete->cb(
+      exec_ctx, deadline_state->next_on_complete->cb_arg, error);
+}
+
+// Inject our own on_complete callback into op.
+static void inject_on_complete_cb(grpc_deadline_state* deadline_state,
+                                  grpc_transport_stream_op* op) {
+gpr_log(GPR_INFO, "==> %s(), is_client=%d", __func__, deadline_state->is_client);
+  deadline_state->next_on_complete = op->on_complete;
+  grpc_closure_init(&deadline_state->on_complete, on_complete, deadline_state);
+  op->on_complete = &deadline_state->on_complete;
+}
+
+void grpc_deadline_state_init(grpc_deadline_state* deadline_state,
+                              grpc_call_stack* call_stack) {
+gpr_log(GPR_INFO, "==> %s()", __func__);
+  memset(deadline_state, 0, sizeof(*deadline_state));
+  deadline_state->call_stack = call_stack;
+  gpr_mu_init(&deadline_state->timer_mu);
+}
+
+void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx,
+                                 grpc_deadline_state* deadline_state) {
+gpr_log(GPR_INFO, "==> %s(), is_client=%d", __func__, deadline_state->is_client);
+  cancel_timer_if_needed(exec_ctx, deadline_state);
+  gpr_mu_destroy(&deadline_state->timer_mu);
+}
+
+void grpc_deadline_state_client_start_transport_stream_op(
+    grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+    grpc_transport_stream_op* op) {
+gpr_log(GPR_INFO, "==> %s(): op=%p {on_complete=%p, cancel_error=%p, close_error=%p, send_initial_metadata=%p, send_trailing_metadata=%p, send_message=%p, recv_initial_metadata_ready=%p, recv_trailing_metadata=%p}", __func__, op, op->on_complete, op->cancel_error, op->close_error, op->send_initial_metadata, op->send_trailing_metadata, op->send_message, op->recv_initial_metadata_ready, op->recv_trailing_metadata);
+  grpc_deadline_state* deadline_state = elem->call_data;
+  if (op->cancel_error != GRPC_ERROR_NONE ||
+      op->close_error != GRPC_ERROR_NONE) {
+    cancel_timer_if_needed(exec_ctx, deadline_state);
+  } else {
+    // If we're sending initial metadata, get the deadline from the metadata
+    // and start the timer if needed.
+    if (op->send_initial_metadata != NULL) {
+      start_timer_if_needed(exec_ctx, elem,
+                            op->send_initial_metadata->deadline);
+    }
+    // Make sure we know when the call is complete, so that we can cancel
+    // the timer.
+    if (op->recv_trailing_metadata != NULL) {
+      inject_on_complete_cb(deadline_state, op);
+    }
+  }
+}
+
+//
+// filter code
+//
+
 // Used for both client and server filters.
 typedef struct channel_data {
 } channel_data;
 
+// Constructor for channel_data.  Used for both client and server filters.
+static void init_channel_elem(grpc_exec_ctx* exec_ctx,
+                              grpc_channel_element* elem,
+                              grpc_channel_element_args* args) {
+  GPR_ASSERT(!args->is_last);
+}
+
+// Destructor for channel_data.  Used for both client and server filters.
+static void destroy_channel_elem(grpc_exec_ctx* exec_ctx,
+                                 grpc_channel_element* elem) {
+}
+
 // Call data used for both client and server filter.
 typedef struct base_call_data {
-  // We take a reference to the call stack for the timer callback.
-  grpc_call_stack* call_stack;
-  // Guards access to timer_pending and timer.
-  gpr_mu timer_mu;
-  // True if the timer callback is currently pending.
-  bool timer_pending;
-  // The deadline timer.
-  grpc_timer timer;
-  // Closure to invoke when the call is complete.
-  // We use this to cancel the timer.
-  grpc_closure on_complete;
-  // The original on_complete closure, which we chain to after our own
-  // closure is invoked.
-  grpc_closure* next_on_complete;
+  grpc_deadline_state deadline_state;
 } base_call_data;
 
 // Additional call data used only for the server filter.
@@ -74,27 +193,18 @@
   grpc_closure* next_recv_initial_metadata_ready;
 } server_call_data;
 
-// Constructor for channel_data.  Used for both client and server filters.
-static void init_channel_elem(grpc_exec_ctx* exec_ctx,
-                              grpc_channel_element* elem,
-                              grpc_channel_element_args* args) {
-  GPR_ASSERT(!args->is_last);
-}
-
-// Destructor for channel_data.  Used for both client and server filters.
-static void destroy_channel_elem(grpc_exec_ctx* exec_ctx,
-                                 grpc_channel_element* elem) {
-}
-
 // Constructor for call_data.  Used for both client and server filters.
 static grpc_error *init_call_elem(grpc_exec_ctx* exec_ctx,
                                   grpc_call_element* elem,
                                   grpc_call_element_args* args) {
+gpr_log(GPR_INFO, "==> %s() -- call_data_size=%lu", __func__, elem->filter->sizeof_call_data);
   base_call_data* calld = elem->call_data;
   // Note: size of call data is different between client and server.
   memset(calld, 0, elem->filter->sizeof_call_data);
-  calld->call_stack = args->call_stack;
-  gpr_mu_init(&calld->timer_mu);
+  grpc_deadline_state_init(&calld->deadline_state, args->call_stack);
+
+calld->deadline_state.is_client = elem->filter->sizeof_call_data == sizeof(base_call_data);
+
   return GRPC_ERROR_NONE;
 }
 
@@ -103,83 +213,15 @@
                               const grpc_call_final_info* final_info,
                               void* and_free_memory) {
   base_call_data* calld = elem->call_data;
-  gpr_mu_destroy(&calld->timer_mu);
-}
-
-// Timer callback.
-static void timer_callback(grpc_exec_ctx *exec_ctx, void *arg,
-                           grpc_error *error) {
-  grpc_call_element* elem = arg;
-  base_call_data* calld = elem->call_data;
-  gpr_mu_lock(&calld->timer_mu);
-  calld->timer_pending = false;
-  gpr_mu_unlock(&calld->timer_mu);
-  if (error != GRPC_ERROR_CANCELLED) {
-    grpc_call_element_send_cancel(exec_ctx, elem);
-  }
-  GRPC_CALL_STACK_UNREF(exec_ctx, calld->call_stack, "deadline_timer");
-}
-
-// Starts the deadline timer.
-static void start_timer_if_needed(grpc_exec_ctx *exec_ctx,
-                                  grpc_call_element* elem,
-                                  gpr_timespec deadline) {
-  base_call_data* calld = elem->call_data;
-  deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
-  if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) {
-    // Take a reference to the call stack, to be owned by the timer.
-    GRPC_CALL_STACK_REF(calld->call_stack, "deadline_timer");
-    gpr_mu_lock(&calld->timer_mu);
-    calld->timer_pending = true;
-    grpc_timer_init(exec_ctx, &calld->timer, deadline, timer_callback, elem,
-                    gpr_now(GPR_CLOCK_MONOTONIC));
-    gpr_mu_unlock(&calld->timer_mu);
-  }
-}
-
-// Cancels the deadline timer.
-static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx,
-                                   base_call_data* calld) {
-  gpr_mu_lock(&calld->timer_mu);
-  if (calld->timer_pending) {
-    grpc_timer_cancel(exec_ctx, &calld->timer);
-    calld->timer_pending = false;
-  }
-  gpr_mu_unlock(&calld->timer_mu);
-}
-
-// Callback run when the call is complete.
-static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
-  base_call_data* calld = arg;
-  cancel_timer_if_needed(exec_ctx, calld);
-  // Invoke the next callback.
-  calld->next_on_complete->cb(exec_ctx, calld->next_on_complete->cb_arg, error);
+  grpc_deadline_state_destroy(exec_ctx, &calld->deadline_state);
 }
 
 // Method for starting a call op for client filter.
 static void client_start_transport_stream_op(grpc_exec_ctx* exec_ctx,
                                              grpc_call_element* elem,
                                              grpc_transport_stream_op* op) {
-  base_call_data* calld = elem->call_data;
-  // If the call is cancelled or closed, cancel the timer.
-  if (op->cancel_error != GRPC_ERROR_NONE ||
-      op->close_error != GRPC_ERROR_NONE) {
-    cancel_timer_if_needed(exec_ctx, calld);
-  } else {
-    // If we're sending initial metadata, get the deadline from the metadata
-    // and start the timer if needed.
-    if (op->send_initial_metadata != NULL) {
-      start_timer_if_needed(exec_ctx, elem,
-                            op->send_initial_metadata->deadline);
-    }
-    // Make sure we know when the call is complete, so that we can cancel
-    // the timer.
-    if (op->recv_trailing_metadata != NULL) {
-      calld->next_on_complete = op->on_complete;
-      grpc_closure_init(&calld->on_complete, on_complete, calld);
-      op->on_complete = &calld->on_complete;
-    }
-  }
+gpr_log(GPR_INFO, "==> %s()", __func__);
+  grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op);
   // Chain to next filter.
   grpc_call_next_op(exec_ctx, elem, op);
 }
@@ -201,11 +243,11 @@
 static void server_start_transport_stream_op(grpc_exec_ctx* exec_ctx,
                                              grpc_call_element* elem,
                                              grpc_transport_stream_op* op) {
+gpr_log(GPR_INFO, "==> %s(): op=%p {on_complete=%p, cancel_error=%p, close_error=%p, send_initial_metadata=%p, send_trailing_metadata=%p, send_message=%p, recv_initial_metadata_ready=%p, recv_trailing_metadata=%p}", __func__, op, op->on_complete, op->cancel_error, op->close_error, op->send_initial_metadata, op->send_trailing_metadata, op->send_message, op->recv_initial_metadata_ready, op->recv_trailing_metadata);
   server_call_data* calld = elem->call_data;
-  // If the call is cancelled or closed, cancel the timer.
   if (op->cancel_error != GRPC_ERROR_NONE ||
       op->close_error != GRPC_ERROR_NONE) {
-    cancel_timer_if_needed(exec_ctx, &calld->base);
+    cancel_timer_if_needed(exec_ctx, &calld->base.deadline_state);
   } else {
     // If we're receiving initial metadata, we need to get the deadline
     // from the recv_initial_metadata_ready callback.  So we inject our
@@ -223,9 +265,7 @@
     // the client never sends trailing metadata, because this is the
     // hook that tells us when the call is complete on the server side.
     if (op->recv_trailing_metadata != NULL) {
-      calld->base.next_on_complete = op->on_complete;
-      grpc_closure_init(&calld->base.on_complete, on_complete, calld);
-      op->on_complete = &calld->base.on_complete;
+      inject_on_complete_cb(&calld->base.deadline_state, op);
     }
   }
   // Chain to next filter.