Revert "Implement call combiner"
diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c
index b721ce4..9bf3f0c 100644
--- a/src/core/lib/security/transport/server_auth_filter.c
+++ b/src/core/lib/security/transport/server_auth_filter.c
@@ -26,15 +26,7 @@
#include "src/core/lib/security/transport/auth_filters.h"
#include "src/core/lib/slice/slice_internal.h"
-typedef enum {
- STATE_INIT = 0,
- STATE_DONE,
- STATE_CANCELLED,
-} async_state;
-
typedef struct call_data {
- grpc_call_combiner *call_combiner;
- grpc_call_stack *owning_call;
grpc_transport_stream_op_batch *recv_initial_metadata_batch;
grpc_closure *original_recv_initial_metadata_ready;
grpc_closure recv_initial_metadata_ready;
@@ -42,8 +34,6 @@
const grpc_metadata *consumed_md;
size_t num_consumed_md;
grpc_auth_context *auth_context;
- grpc_closure cancel_closure;
- gpr_atm state; // async_state
} call_data;
typedef struct channel_data {
@@ -88,92 +78,54 @@
return GRPC_FILTERED_MDELEM(md);
}
-static void on_md_processing_done_inner(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- const grpc_metadata *consumed_md,
- size_t num_consumed_md,
- const grpc_metadata *response_md,
- size_t num_response_md,
- grpc_error *error) {
- call_data *calld = elem->call_data;
- grpc_transport_stream_op_batch *batch = calld->recv_initial_metadata_batch;
- grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner, NULL);
- /* TODO(jboeuf): Implement support for response_md. */
- if (response_md != NULL && num_response_md > 0) {
- gpr_log(GPR_INFO,
- "response_md in auth metadata processing not supported for now. "
- "Ignoring...");
- }
- if (error == GRPC_ERROR_NONE) {
- calld->consumed_md = consumed_md;
- calld->num_consumed_md = num_consumed_md;
- error = grpc_metadata_batch_filter(
- exec_ctx, batch->payload->recv_initial_metadata.recv_initial_metadata,
- remove_consumed_md, elem, "Response metadata filtering error");
- }
- GRPC_CLOSURE_SCHED(exec_ctx, calld->original_recv_initial_metadata_ready,
- error);
-}
-
-// Called from application code.
+/* called from application code */
static void on_md_processing_done(
void *user_data, const grpc_metadata *consumed_md, size_t num_consumed_md,
const grpc_metadata *response_md, size_t num_response_md,
grpc_status_code status, const char *error_details) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
+ grpc_transport_stream_op_batch *batch = calld->recv_initial_metadata_batch;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- // If the call was not cancelled while we were in flight, process the result.
- if (gpr_atm_full_cas(&calld->state, (gpr_atm)STATE_INIT,
- (gpr_atm)STATE_DONE)) {
- grpc_error *error = GRPC_ERROR_NONE;
- if (status != GRPC_STATUS_OK) {
- if (error_details == NULL) {
- error_details = "Authentication metadata processing failed.";
- }
- error = grpc_error_set_int(
- GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_details),
- GRPC_ERROR_INT_GRPC_STATUS, status);
- }
- on_md_processing_done_inner(&exec_ctx, elem, consumed_md, num_consumed_md,
- response_md, num_response_md, error);
+ /* TODO(jboeuf): Implement support for response_md. */
+ if (response_md != NULL && num_response_md > 0) {
+ gpr_log(GPR_INFO,
+ "response_md in auth metadata processing not supported for now. "
+ "Ignoring...");
}
- // Clean up.
+ grpc_error *error = GRPC_ERROR_NONE;
+ if (status == GRPC_STATUS_OK) {
+ calld->consumed_md = consumed_md;
+ calld->num_consumed_md = num_consumed_md;
+ error = grpc_metadata_batch_filter(
+ &exec_ctx, batch->payload->recv_initial_metadata.recv_initial_metadata,
+ remove_consumed_md, elem, "Response metadata filtering error");
+ } else {
+ if (error_details == NULL) {
+ error_details = "Authentication metadata processing failed.";
+ }
+ error =
+ grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_details),
+ GRPC_ERROR_INT_GRPC_STATUS, status);
+ }
for (size_t i = 0; i < calld->md.count; i++) {
grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].key);
grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].value);
}
grpc_metadata_array_destroy(&calld->md);
- GRPC_CALL_STACK_UNREF(&exec_ctx, calld->owning_call, "server_auth_metadata");
+ GRPC_CLOSURE_SCHED(&exec_ctx, calld->original_recv_initial_metadata_ready,
+ error);
grpc_exec_ctx_finish(&exec_ctx);
}
-static void cancel_call(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
- grpc_call_element *elem = (grpc_call_element *)arg;
- call_data *calld = elem->call_data;
- // If the result was not already processed, invoke the callback now.
- if (gpr_atm_full_cas(&calld->state, (gpr_atm)STATE_INIT,
- (gpr_atm)STATE_CANCELLED)) {
- on_md_processing_done_inner(exec_ctx, elem, NULL, 0, NULL, 0,
- GRPC_ERROR_REF(error));
- }
-}
-
static void recv_initial_metadata_ready(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
- grpc_call_element *elem = (grpc_call_element *)arg;
+ grpc_call_element *elem = arg;
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
grpc_transport_stream_op_batch *batch = calld->recv_initial_metadata_batch;
if (error == GRPC_ERROR_NONE) {
if (chand->creds != NULL && chand->creds->processor.process != NULL) {
- // We're calling out to the application, so we need to make sure
- // to drop the call combiner early if we get cancelled.
- GRPC_CLOSURE_INIT(&calld->cancel_closure, cancel_call, elem,
- grpc_schedule_on_exec_ctx);
- grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner,
- &calld->cancel_closure);
- GRPC_CALL_STACK_REF(calld->owning_call, "server_auth_metadata");
calld->md = metadata_batch_to_md_array(
batch->payload->recv_initial_metadata.recv_initial_metadata);
chand->creds->processor.process(
@@ -207,8 +159,6 @@
const grpc_call_element_args *args) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- calld->call_combiner = args->call_combiner;
- calld->owning_call = args->call_stack;
GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
recv_initial_metadata_ready, elem,
grpc_schedule_on_exec_ctx);
@@ -268,5 +218,6 @@
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
+ grpc_call_next_get_peer,
grpc_channel_next_get_info,
"server-auth"};