Merge pull request #9529 from ctiller/racey

Fix race detected by TSAN
diff --git a/doc/environment_variables.md b/doc/environment_variables.md
index 832762a..2c83972 100644
--- a/doc/environment_variables.md
+++ b/doc/environment_variables.md
@@ -35,6 +35,7 @@
   A comma separated list of tracers that provide additional insight into how
   gRPC C core is processing requests via debug logs. Available tracers include:
   - api - traces api calls to the C core
+  - call_error - traces the possible errors contributing to final call status
   - channel - traces operations on the C core channel stack
   - combiner - traces combiner lock state
   - compression - traces compression operations
@@ -55,10 +56,10 @@
   - secure_endpoint - traces bytes flowing through encrypted channels
   - transport_security - traces metadata about secure channel establishment
   - tcp - traces bytes in and out of a channel
-  
+
   'all' can additionally be used to turn all traces on.
   Individual traces can be disabled by prefixing them with '-'.
- 
+
   Example:
   export GRPC_TRACE=all,-pending_tags
 
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index d64c241..70bab4c 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -86,8 +86,11 @@
   /* Status came from 'the wire' - or somewhere below the surface
      layer */
   STATUS_FROM_WIRE,
-  /* Status was created by some internal channel stack operation */
+  /* Status was created by some internal channel stack operation: must come via
+     add_batch_error */
   STATUS_FROM_CORE,
+  /* Status was created by some surface error */
+  STATUS_FROM_SURFACE,
   /* Status came from the server sending status */
   STATUS_FROM_SERVER_STATUS,
   STATUS_SOURCE_COUNT
@@ -212,6 +215,8 @@
   void *saved_receiving_stream_ready_bctlp;
 };
 
+int grpc_call_error_trace = 0;
+
 #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
 #define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1)
 #define CALL_ELEM_FROM_CALL(call, idx) \
@@ -221,11 +226,11 @@
 
 static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
                        grpc_transport_stream_op *op);
-static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
-                                          grpc_status_code status,
-                                          const char *description);
+static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
+                               status_source source, grpc_status_code status,
+                               const char *description);
 static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c,
-                              grpc_error *error);
+                              status_source source, grpc_error *error);
 static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack,
                          grpc_error *error);
 static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
@@ -352,7 +357,8 @@
                                               call->start_time, send_deadline,
                                               CALL_STACK_FROM_CALL(call)));
   if (error != GRPC_ERROR_NONE) {
-    cancel_with_error(exec_ctx, call, GRPC_ERROR_REF(error));
+    cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE,
+                      GRPC_ERROR_REF(error));
   }
   if (args->cq != NULL) {
     GPR_ASSERT(
@@ -527,7 +533,6 @@
                                              grpc_status_code status,
                                              const char *description,
                                              void *reserved) {
-  grpc_call_error r;
   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
   GRPC_API_TRACE(
       "grpc_call_cancel_with_status("
@@ -535,16 +540,16 @@
       4, (c, (int)status, description, reserved));
   GPR_ASSERT(reserved == NULL);
   gpr_mu_lock(&c->mu);
-  r = cancel_with_status(&exec_ctx, c, status, description);
+  cancel_with_status(&exec_ctx, c, STATUS_FROM_API_OVERRIDE, status,
+                     description);
   gpr_mu_unlock(&c->mu);
   grpc_exec_ctx_finish(&exec_ctx);
-  return r;
+  return GRPC_CALL_OK;
 }
 
 typedef struct termination_closure {
   grpc_closure closure;
   grpc_call *call;
-  grpc_error *error;
   grpc_transport_stream_op op;
 } termination_closure;
 
@@ -559,36 +564,27 @@
                              grpc_error *error) {
   termination_closure *tc = tcp;
   memset(&tc->op, 0, sizeof(tc->op));
-  tc->op.cancel_error = tc->error;
+  tc->op.cancel_error = GRPC_ERROR_REF(error);
   /* reuse closure to catch completion */
-  grpc_closure_init(&tc->closure, done_termination, tc,
-                    grpc_schedule_on_exec_ctx);
-  tc->op.on_complete = &tc->closure;
+  tc->op.on_complete = grpc_closure_init(&tc->closure, done_termination, tc,
+                                         grpc_schedule_on_exec_ctx);
   execute_op(exec_ctx, tc->call, &tc->op);
 }
 
-static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx,
-                                             termination_closure *tc) {
-  set_status_from_error(exec_ctx, tc->call, STATUS_FROM_API_OVERRIDE,
-                        GRPC_ERROR_REF(tc->error));
-  grpc_closure_init(&tc->closure, send_termination, tc,
-                    grpc_schedule_on_exec_ctx);
-  GRPC_CALL_INTERNAL_REF(tc->call, "termination");
-  grpc_closure_sched(exec_ctx, &tc->closure, GRPC_ERROR_NONE);
-  return GRPC_CALL_OK;
-}
-
-static grpc_call_error terminate_with_error(grpc_exec_ctx *exec_ctx,
-                                            grpc_call *c, grpc_error *error) {
+static void terminate_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c,
+                                 grpc_error *error) {
   termination_closure *tc = gpr_malloc(sizeof(*tc));
   memset(tc, 0, sizeof(*tc));
   tc->call = c;
-  tc->error = error;
-  return terminate_with_status(exec_ctx, tc);
+  GRPC_CALL_INTERNAL_REF(tc->call, "termination");
+  grpc_closure_sched(exec_ctx, grpc_closure_init(&tc->closure, send_termination,
+                                                 tc, grpc_schedule_on_exec_ctx),
+                     error);
 }
 
 static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c,
-                              grpc_error *error) {
+                              status_source source, grpc_error *error) {
+  set_status_from_error(exec_ctx, c, source, GRPC_ERROR_REF(error));
   terminate_with_error(exec_ctx, c, error);
 }
 
@@ -600,32 +596,35 @@
       GRPC_ERROR_INT_GRPC_STATUS, status);
 }
 
-static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
-                                          grpc_status_code status,
-                                          const char *description) {
-  return terminate_with_error(exec_ctx, c,
-                              error_from_status(status, description));
+static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
+                               status_source source, grpc_status_code status,
+                               const char *description) {
+  cancel_with_error(exec_ctx, c, source,
+                    error_from_status(status, description));
 }
 
 /*******************************************************************************
  * FINAL STATUS CODE MANIPULATION
  */
 
-static void get_final_status_from(grpc_call *call, status_source from_source,
-                                  void (*set_value)(grpc_status_code code,
-                                                    void *user_data),
-                                  void *set_value_user_data,
-                                  grpc_slice *details) {
+static bool get_final_status_from(
+    grpc_call *call, status_source from_source, bool allow_ok_status,
+    void (*set_value)(grpc_status_code code, void *user_data),
+    void *set_value_user_data, grpc_slice *details) {
   grpc_status_code code;
   const char *msg = NULL;
   grpc_error_get_status(call->status[from_source].error, call->send_deadline,
                         &code, &msg, NULL);
+  if (code == GRPC_STATUS_OK && !allow_ok_status) {
+    return false;
+  }
 
   set_value(code, set_value_user_data);
   if (details != NULL) {
     *details =
         msg == NULL ? grpc_empty_slice() : grpc_slice_from_copied_string(msg);
   }
+  return true;
 }
 
 static void get_final_status(grpc_call *call,
@@ -633,22 +632,37 @@
                                                void *user_data),
                              void *set_value_user_data, grpc_slice *details) {
   int i;
-  /* search for the best status we can present: ideally the error we use has a
-     clearly defined grpc-status, and we'll prefer that. */
-  for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
-    if (call->status[i].is_set &&
-        grpc_error_has_clear_grpc_status(call->status[i].error)) {
-      get_final_status_from(call, (status_source)i, set_value,
-                            set_value_user_data, details);
-      return;
+  if (grpc_call_error_trace) {
+    gpr_log(GPR_DEBUG, "get_final_status %s", call->is_client ? "CLI" : "SVR");
+    for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
+      if (call->status[i].is_set) {
+        gpr_log(GPR_DEBUG, "  %d: %s", i,
+                grpc_error_string(call->status[i].error));
+      }
     }
   }
-  /* If no clearly defined status exists, search for 'anything' */
-  for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
-    if (call->status[i].is_set) {
-      get_final_status_from(call, (status_source)i, set_value,
-                            set_value_user_data, details);
-      return;
+  /* first search through ignoring "OK" statuses: if something went wrong,
+   * ensure we report it */
+  for (int allow_ok_status = 0; allow_ok_status < 2; allow_ok_status++) {
+    /* search for the best status we can present: ideally the error we use has a
+       clearly defined grpc-status, and we'll prefer that. */
+    for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
+      if (call->status[i].is_set &&
+          grpc_error_has_clear_grpc_status(call->status[i].error)) {
+        if (get_final_status_from(call, (status_source)i, allow_ok_status != 0,
+                                  set_value, set_value_user_data, details)) {
+          return;
+        }
+      }
+    }
+    /* If no clearly defined status exists, search for 'anything' */
+    for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
+      if (call->status[i].is_set) {
+        if (get_final_status_from(call, (status_source)i, allow_ok_status != 0,
+                                  set_value, set_value_user_data, details)) {
+          return;
+        }
+      }
     }
   }
   /* If nothing exists, set some default */
@@ -1029,11 +1043,6 @@
 
   gpr_mu_lock(&call->mu);
 
-  if (error != GRPC_ERROR_NONE) {
-    set_status_from_error(exec_ctx, call, STATUS_FROM_CORE,
-                          GRPC_ERROR_REF(error));
-  }
-
   if (bctl->send_initial_metadata) {
     grpc_metadata_batch_destroy(
         exec_ctx,
@@ -1176,7 +1185,8 @@
   grpc_call *call = bctl->call;
   gpr_mu_lock(&bctl->call->mu);
   if (error != GRPC_ERROR_NONE) {
-    cancel_with_error(exec_ctx, call, GRPC_ERROR_REF(error));
+    cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE,
+                      GRPC_ERROR_REF(error));
   }
   if (call->has_initial_md_been_received || error != GRPC_ERROR_NONE ||
       call->receiving_stream == NULL) {
@@ -1203,7 +1213,8 @@
       gpr_asprintf(&error_msg, "Invalid compression algorithm value '%d'.",
                    algo);
       gpr_log(GPR_ERROR, "%s", error_msg);
-      cancel_with_status(exec_ctx, call, GRPC_STATUS_UNIMPLEMENTED, error_msg);
+      cancel_with_status(exec_ctx, call, STATUS_FROM_SURFACE,
+                         GRPC_STATUS_UNIMPLEMENTED, error_msg);
     } else if (grpc_compression_options_is_algorithm_enabled(
                    &compression_options, algo) == 0) {
       /* check if algorithm is supported by current channel config */
@@ -1212,7 +1223,8 @@
       gpr_asprintf(&error_msg, "Compression algorithm '%s' is disabled.",
                    algo_name);
       gpr_log(GPR_ERROR, "%s", error_msg);
-      cancel_with_status(exec_ctx, call, GRPC_STATUS_UNIMPLEMENTED, error_msg);
+      cancel_with_status(exec_ctx, call, STATUS_FROM_SURFACE,
+                         GRPC_STATUS_UNIMPLEMENTED, error_msg);
     } else {
       call->incoming_compression_algorithm = algo;
     }
@@ -1242,7 +1254,10 @@
                             grpc_error *error) {
   if (error == GRPC_ERROR_NONE) return;
   int idx = (int)gpr_atm_no_barrier_fetch_add(&bctl->num_errors, 1);
-  if (idx == 0) cancel_with_error(exec_ctx, bctl->call, GRPC_ERROR_REF(error));
+  if (idx == 0) {
+    cancel_with_error(exec_ctx, bctl->call, STATUS_FROM_CORE,
+                      GRPC_ERROR_REF(error));
+  }
   bctl->errors[idx] = error;
 }
 
diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h
index 8c46a83..b70343d 100644
--- a/src/core/lib/surface/call.h
+++ b/src/core/lib/surface/call.h
@@ -125,6 +125,8 @@
 grpc_compression_algorithm grpc_call_compression_for_level(
     grpc_call *call, grpc_compression_level level);
 
+extern int grpc_call_error_trace;
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c
index cfa1882..787e4d0 100644
--- a/src/core/lib/surface/init.c
+++ b/src/core/lib/surface/init.c
@@ -199,6 +199,7 @@
     grpc_cq_event_timeout_trace = 1;
     grpc_register_tracer("op_failure", &grpc_trace_operation_failures);
     grpc_register_tracer("resource_quota", &grpc_resource_quota_trace);
+    grpc_register_tracer("call_error", &grpc_call_error_trace);
 #ifndef NDEBUG
     grpc_register_tracer("pending_tags", &grpc_trace_pending_tags);
 #endif