blob: 10e59a99de187b4b433d73e3729b84ae60ffda8a [file] [log] [blame]
Mark D. Roth09e458c2017-05-02 08:13:26 -07001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2017 gRPC authors.
Mark D. Roth09e458c2017-05-02 08:13:26 -07004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
Mark D. Roth09e458c2017-05-02 08:13:26 -07008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
Mark D. Roth09e458c2017-05-02 08:13:26 -070010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
Mark D. Roth09e458c2017-05-02 08:13:26 -070016 *
17 */
18
19#include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h"
20
21#include <grpc/support/atm.h>
22#include <grpc/support/log.h>
23
24#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h"
25#include "src/core/lib/iomgr/error.h"
26#include "src/core/lib/profiling/timers.h"
27
28static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,
29 grpc_channel_element *elem,
30 grpc_channel_element_args *args) {
31 return GRPC_ERROR_NONE;
32}
33
34static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
35 grpc_channel_element *elem) {}
36
37typedef struct {
38 // Stats object to update.
39 grpc_grpclb_client_stats *client_stats;
40 // State for intercepting send_initial_metadata.
41 grpc_closure on_complete_for_send;
42 grpc_closure *original_on_complete_for_send;
43 bool send_initial_metadata_succeeded;
44 // State for intercepting recv_initial_metadata.
45 grpc_closure recv_initial_metadata_ready;
46 grpc_closure *original_recv_initial_metadata_ready;
47 bool recv_initial_metadata_succeeded;
48} call_data;
49
50static void on_complete_for_send(grpc_exec_ctx *exec_ctx, void *arg,
51 grpc_error *error) {
52 call_data *calld = arg;
53 if (error == GRPC_ERROR_NONE) {
54 calld->send_initial_metadata_succeeded = true;
55 }
56 grpc_closure_run(exec_ctx, calld->original_on_complete_for_send,
57 GRPC_ERROR_REF(error));
58}
59
60static void recv_initial_metadata_ready(grpc_exec_ctx *exec_ctx, void *arg,
61 grpc_error *error) {
62 call_data *calld = arg;
63 if (error == GRPC_ERROR_NONE) {
64 calld->recv_initial_metadata_succeeded = true;
65 }
66 grpc_closure_run(exec_ctx, calld->original_recv_initial_metadata_ready,
67 GRPC_ERROR_REF(error));
68}
69
70static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
71 grpc_call_element *elem,
72 const grpc_call_element_args *args) {
73 call_data *calld = elem->call_data;
74 // Get stats object from context and take a ref.
75 GPR_ASSERT(args->context != NULL);
76 GPR_ASSERT(args->context[GRPC_GRPCLB_CLIENT_STATS].value != NULL);
77 calld->client_stats = grpc_grpclb_client_stats_ref(
78 args->context[GRPC_GRPCLB_CLIENT_STATS].value);
79 // Record call started.
80 grpc_grpclb_client_stats_add_call_started(calld->client_stats);
81 return GRPC_ERROR_NONE;
82}
83
84static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
85 const grpc_call_final_info *final_info,
86 grpc_closure *ignored) {
87 call_data *calld = elem->call_data;
88 // Record call finished, optionally setting client_failed_to_send and
89 // received.
90 grpc_grpclb_client_stats_add_call_finished(
91 false /* drop_for_rate_limiting */, false /* drop_for_load_balancing */,
92 !calld->send_initial_metadata_succeeded /* client_failed_to_send */,
93 calld->recv_initial_metadata_succeeded /* known_received */,
94 calld->client_stats);
95 // All done, so unref the stats object.
96 grpc_grpclb_client_stats_unref(calld->client_stats);
97}
98
99static void start_transport_stream_op_batch(
100 grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
101 grpc_transport_stream_op_batch *batch) {
102 call_data *calld = elem->call_data;
103 GPR_TIMER_BEGIN("clr_start_transport_stream_op_batch", 0);
104 // Intercept send_initial_metadata.
105 if (batch->send_initial_metadata) {
106 calld->original_on_complete_for_send = batch->on_complete;
107 grpc_closure_init(&calld->on_complete_for_send, on_complete_for_send, calld,
108 grpc_schedule_on_exec_ctx);
109 batch->on_complete = &calld->on_complete_for_send;
110 }
111 // Intercept recv_initial_metadata.
112 if (batch->recv_initial_metadata) {
113 calld->original_recv_initial_metadata_ready =
114 batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
115 grpc_closure_init(&calld->recv_initial_metadata_ready,
116 recv_initial_metadata_ready, calld,
117 grpc_schedule_on_exec_ctx);
118 batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
119 &calld->recv_initial_metadata_ready;
120 }
121 // Chain to next filter.
122 grpc_call_next_op(exec_ctx, elem, batch);
123 GPR_TIMER_END("clr_start_transport_stream_op_batch", 0);
124}
125
126const grpc_channel_filter grpc_client_load_reporting_filter = {
127 start_transport_stream_op_batch,
128 grpc_channel_next_op,
129 sizeof(call_data),
130 init_call_elem,
131 grpc_call_stack_ignore_set_pollset_or_pollset_set,
132 destroy_call_elem,
133 0, // sizeof(channel_data)
134 init_channel_elem,
135 destroy_channel_elem,
136 grpc_call_next_get_peer,
137 grpc_channel_next_get_info,
138 "client_load_reporting"};