blob: 6d9fadaf3063a7f8fb30f193e306fc11583e15f3 [file] [log] [blame]
Mark D. Roth09e458c2017-05-02 08:13:26 -07001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2017 gRPC authors.
Mark D. Roth09e458c2017-05-02 08:13:26 -07004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
Mark D. Roth09e458c2017-05-02 08:13:26 -07008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
Mark D. Roth09e458c2017-05-02 08:13:26 -070010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
Mark D. Roth09e458c2017-05-02 08:13:26 -070016 *
17 */
18
19#include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h"
20
21#include <grpc/support/atm.h>
22#include <grpc/support/log.h>
23
24#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h"
25#include "src/core/lib/iomgr/error.h"
26#include "src/core/lib/profiling/timers.h"
27
Craig Tillerbaa14a92017-11-03 09:09:36 -070028static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx,
29 grpc_channel_element* elem,
30 grpc_channel_element_args* args) {
Mark D. Roth09e458c2017-05-02 08:13:26 -070031 return GRPC_ERROR_NONE;
32}
33
Craig Tillerbaa14a92017-11-03 09:09:36 -070034static void destroy_channel_elem(grpc_exec_ctx* exec_ctx,
35 grpc_channel_element* elem) {}
Mark D. Roth09e458c2017-05-02 08:13:26 -070036
37typedef struct {
38 // Stats object to update.
Craig Tillerbaa14a92017-11-03 09:09:36 -070039 grpc_grpclb_client_stats* client_stats;
Mark D. Roth09e458c2017-05-02 08:13:26 -070040 // State for intercepting send_initial_metadata.
41 grpc_closure on_complete_for_send;
Craig Tillerbaa14a92017-11-03 09:09:36 -070042 grpc_closure* original_on_complete_for_send;
Mark D. Roth09e458c2017-05-02 08:13:26 -070043 bool send_initial_metadata_succeeded;
44 // State for intercepting recv_initial_metadata.
45 grpc_closure recv_initial_metadata_ready;
Craig Tillerbaa14a92017-11-03 09:09:36 -070046 grpc_closure* original_recv_initial_metadata_ready;
Mark D. Roth09e458c2017-05-02 08:13:26 -070047 bool recv_initial_metadata_succeeded;
48} call_data;
49
Craig Tillerbaa14a92017-11-03 09:09:36 -070050static void on_complete_for_send(grpc_exec_ctx* exec_ctx, void* arg,
51 grpc_error* error) {
52 call_data* calld = (call_data*)arg;
Mark D. Roth09e458c2017-05-02 08:13:26 -070053 if (error == GRPC_ERROR_NONE) {
54 calld->send_initial_metadata_succeeded = true;
55 }
ncteisen274bbbe2017-06-08 14:57:11 -070056 GRPC_CLOSURE_RUN(exec_ctx, calld->original_on_complete_for_send,
Mark D. Roth09e458c2017-05-02 08:13:26 -070057 GRPC_ERROR_REF(error));
58}
59
Craig Tillerbaa14a92017-11-03 09:09:36 -070060static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, void* arg,
61 grpc_error* error) {
62 call_data* calld = (call_data*)arg;
Mark D. Roth09e458c2017-05-02 08:13:26 -070063 if (error == GRPC_ERROR_NONE) {
64 calld->recv_initial_metadata_succeeded = true;
65 }
ncteisen274bbbe2017-06-08 14:57:11 -070066 GRPC_CLOSURE_RUN(exec_ctx, calld->original_recv_initial_metadata_ready,
Mark D. Roth09e458c2017-05-02 08:13:26 -070067 GRPC_ERROR_REF(error));
68}
69
Craig Tillerbaa14a92017-11-03 09:09:36 -070070static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
71 grpc_call_element* elem,
72 const grpc_call_element_args* args) {
73 call_data* calld = (call_data*)elem->call_data;
Mark D. Roth09e458c2017-05-02 08:13:26 -070074 // Get stats object from context and take a ref.
Craig Tiller4782d922017-11-10 09:53:21 -080075 GPR_ASSERT(args->context != nullptr);
76 GPR_ASSERT(args->context[GRPC_GRPCLB_CLIENT_STATS].value != nullptr);
Mark D. Roth09e458c2017-05-02 08:13:26 -070077 calld->client_stats = grpc_grpclb_client_stats_ref(
Craig Tillerbaa14a92017-11-03 09:09:36 -070078 (grpc_grpclb_client_stats*)args->context[GRPC_GRPCLB_CLIENT_STATS].value);
Mark D. Roth09e458c2017-05-02 08:13:26 -070079 // Record call started.
80 grpc_grpclb_client_stats_add_call_started(calld->client_stats);
81 return GRPC_ERROR_NONE;
82}
83
Craig Tillerbaa14a92017-11-03 09:09:36 -070084static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
85 const grpc_call_final_info* final_info,
86 grpc_closure* ignored) {
87 call_data* calld = (call_data*)elem->call_data;
Mark D. Roth09e458c2017-05-02 08:13:26 -070088 // Record call finished, optionally setting client_failed_to_send and
89 // received.
90 grpc_grpclb_client_stats_add_call_finished(
Mark D. Roth09e458c2017-05-02 08:13:26 -070091 !calld->send_initial_metadata_succeeded /* client_failed_to_send */,
92 calld->recv_initial_metadata_succeeded /* known_received */,
93 calld->client_stats);
94 // All done, so unref the stats object.
95 grpc_grpclb_client_stats_unref(calld->client_stats);
96}
97
98static void start_transport_stream_op_batch(
Craig Tillerbaa14a92017-11-03 09:09:36 -070099 grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
100 grpc_transport_stream_op_batch* batch) {
101 call_data* calld = (call_data*)elem->call_data;
Mark D. Roth09e458c2017-05-02 08:13:26 -0700102 GPR_TIMER_BEGIN("clr_start_transport_stream_op_batch", 0);
103 // Intercept send_initial_metadata.
104 if (batch->send_initial_metadata) {
105 calld->original_on_complete_for_send = batch->on_complete;
ncteisen274bbbe2017-06-08 14:57:11 -0700106 GRPC_CLOSURE_INIT(&calld->on_complete_for_send, on_complete_for_send, calld,
Mark D. Roth09e458c2017-05-02 08:13:26 -0700107 grpc_schedule_on_exec_ctx);
108 batch->on_complete = &calld->on_complete_for_send;
109 }
110 // Intercept recv_initial_metadata.
111 if (batch->recv_initial_metadata) {
112 calld->original_recv_initial_metadata_ready =
113 batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
ncteisen274bbbe2017-06-08 14:57:11 -0700114 GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
Mark D. Roth09e458c2017-05-02 08:13:26 -0700115 recv_initial_metadata_ready, calld,
116 grpc_schedule_on_exec_ctx);
117 batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
118 &calld->recv_initial_metadata_ready;
119 }
120 // Chain to next filter.
121 grpc_call_next_op(exec_ctx, elem, batch);
122 GPR_TIMER_END("clr_start_transport_stream_op_batch", 0);
123}
124
125const grpc_channel_filter grpc_client_load_reporting_filter = {
126 start_transport_stream_op_batch,
127 grpc_channel_next_op,
128 sizeof(call_data),
129 init_call_elem,
130 grpc_call_stack_ignore_set_pollset_or_pollset_set,
131 destroy_call_elem,
132 0, // sizeof(channel_data)
133 init_channel_elem,
134 destroy_channel_elem,
Mark D. Roth09e458c2017-05-02 08:13:26 -0700135 grpc_channel_next_get_info,
136 "client_load_reporting"};