blob: a171f9066688061dc7e195e91909e7e36627a4b0 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2015-2016 gRPC authors.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080016 *
17 */
Yash Tibrewal37fdb732017-09-25 16:45:02 -070018#include <grpc/support/port_platform.h>
19
Craig Tiller9533d042016-03-25 17:11:06 -070020#include "src/core/lib/surface/completion_queue.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080021
Yash Tibrewalfcd26bc2017-09-25 15:08:28 -070022#include <inttypes.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080023#include <stdio.h>
24#include <string.h>
25
Craig Tiller69b093b2016-02-25 19:04:07 -080026#include <grpc/support/alloc.h>
27#include <grpc/support/atm.h>
28#include <grpc/support/log.h>
Craig Tiller4e41e362016-08-19 13:12:54 -070029#include <grpc/support/string_util.h>
Craig Tiller69b093b2016-02-25 19:04:07 -080030#include <grpc/support/time.h>
Ken Payson42bd87e2017-10-20 10:32:30 -070031#include <grpc/support/tls.h>
Craig Tiller69b093b2016-02-25 19:04:07 -080032
Craig Tillerf1dc9c32017-09-13 14:21:27 -070033#include "src/core/lib/debug/stats.h"
Craig Tiller9533d042016-03-25 17:11:06 -070034#include "src/core/lib/iomgr/pollset.h"
35#include "src/core/lib/iomgr/timer.h"
36#include "src/core/lib/profiling/timers.h"
Sree Kuchibhotlafe5f2352017-04-18 13:46:10 -070037#include "src/core/lib/support/spinlock.h"
Craig Tiller9533d042016-03-25 17:11:06 -070038#include "src/core/lib/support/string.h"
39#include "src/core/lib/surface/api_trace.h"
40#include "src/core/lib/surface/call.h"
41#include "src/core/lib/surface/event_string.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080042
ncteisen7712c7c2017-07-12 23:11:27 -070043grpc_tracer_flag grpc_trace_operation_failures =
44 GRPC_TRACER_INITIALIZER(false, "op_failure");
Craig Tiller4e41e362016-08-19 13:12:54 -070045#ifndef NDEBUG
ncteisen7712c7c2017-07-12 23:11:27 -070046grpc_tracer_flag grpc_trace_pending_tags =
47 GRPC_TRACER_INITIALIZER(false, "pending_tags");
48grpc_tracer_flag grpc_trace_cq_refcount =
49 GRPC_TRACER_INITIALIZER(false, "cq_refcount");
Craig Tiller4e41e362016-08-19 13:12:54 -070050#endif
Craig Tillera286b042016-06-13 15:20:39 +000051
Ken Payson42bd87e2017-10-20 10:32:30 -070052// Specifies a cq thread local cache.
53// The first event that occurs on a thread
54// with a cq cache will go into that cache, and
55// will only be returned on the thread that initialized the cache.
56// NOTE: Only one event will ever be cached.
57GPR_TLS_DECL(g_cached_event);
58GPR_TLS_DECL(g_cached_cq);
59
Craig Tillera82950e2015-09-22 12:33:20 -070060typedef struct {
Craig Tillerbaa14a92017-11-03 09:09:36 -070061 grpc_pollset_worker** worker;
62 void* tag;
Craig Tiller5ddbb9d2015-07-29 15:58:11 -070063} plucker;
64
Craig Tiller2f40ff42017-04-11 16:01:19 -070065typedef struct {
Craig Tiller75bfb972017-04-11 17:55:12 -070066 bool can_get_pollset;
Craig Tiller11c58322017-04-12 08:21:17 -070067 bool can_listen;
Craig Tiller2f40ff42017-04-11 16:01:19 -070068 size_t (*size)(void);
Craig Tillerbaa14a92017-11-03 09:09:36 -070069 void (*init)(grpc_pollset* pollset, gpr_mu** mu);
70 grpc_error* (*kick)(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
71 grpc_pollset_worker* specific_worker);
72 grpc_error* (*work)(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
73 grpc_pollset_worker** worker, grpc_millis deadline);
74 void (*shutdown)(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
75 grpc_closure* closure);
76 void (*destroy)(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset);
Craig Tiller2f40ff42017-04-11 16:01:19 -070077} cq_poller_vtable;
78
79typedef struct non_polling_worker {
80 gpr_cv cv;
81 bool kicked;
Craig Tillerbaa14a92017-11-03 09:09:36 -070082 struct non_polling_worker* next;
83 struct non_polling_worker* prev;
Craig Tiller2f40ff42017-04-11 16:01:19 -070084} non_polling_worker;
85
86typedef struct {
87 gpr_mu mu;
Craig Tillerbaa14a92017-11-03 09:09:36 -070088 non_polling_worker* root;
89 grpc_closure* shutdown;
Craig Tiller2f40ff42017-04-11 16:01:19 -070090} non_polling_poller;
91
92static size_t non_polling_poller_size(void) {
93 return sizeof(non_polling_poller);
94}
95
Craig Tillerbaa14a92017-11-03 09:09:36 -070096static void non_polling_poller_init(grpc_pollset* pollset, gpr_mu** mu) {
97 non_polling_poller* npp = (non_polling_poller*)pollset;
Craig Tiller2f40ff42017-04-11 16:01:19 -070098 gpr_mu_init(&npp->mu);
99 *mu = &npp->mu;
100}
101
Craig Tillerbaa14a92017-11-03 09:09:36 -0700102static void non_polling_poller_destroy(grpc_exec_ctx* exec_ctx,
103 grpc_pollset* pollset) {
104 non_polling_poller* npp = (non_polling_poller*)pollset;
Craig Tiller2f40ff42017-04-11 16:01:19 -0700105 gpr_mu_destroy(&npp->mu);
106}
107
Craig Tillerbaa14a92017-11-03 09:09:36 -0700108static grpc_error* non_polling_poller_work(grpc_exec_ctx* exec_ctx,
109 grpc_pollset* pollset,
110 grpc_pollset_worker** worker,
Craig Tillerdc3998e2017-05-12 09:55:30 -0700111 grpc_millis deadline) {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700112 non_polling_poller* npp = (non_polling_poller*)pollset;
Craig Tiller75bfb972017-04-11 17:55:12 -0700113 if (npp->shutdown) return GRPC_ERROR_NONE;
Craig Tiller2f40ff42017-04-11 16:01:19 -0700114 non_polling_worker w;
115 gpr_cv_init(&w.cv);
Craig Tiller4782d922017-11-10 09:53:21 -0800116 if (worker != nullptr) *worker = (grpc_pollset_worker*)&w;
117 if (npp->root == nullptr) {
Craig Tiller2f40ff42017-04-11 16:01:19 -0700118 npp->root = w.next = w.prev = &w;
119 } else {
120 w.next = npp->root;
121 w.prev = w.next->prev;
122 w.next->prev = w.prev->next = &w;
123 }
124 w.kicked = false;
Craig Tillerdc3998e2017-05-12 09:55:30 -0700125 gpr_timespec deadline_ts =
Craig Tillerc0df1c02017-07-17 16:12:33 -0700126 grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME);
Craig Tillerdc3998e2017-05-12 09:55:30 -0700127 while (!npp->shutdown && !w.kicked &&
128 !gpr_cv_wait(&w.cv, &npp->mu, deadline_ts))
Craig Tiller2f40ff42017-04-11 16:01:19 -0700129 ;
Jan Tattermusch62a4fd32017-11-04 18:09:12 +0100130 grpc_exec_ctx_invalidate_now(exec_ctx);
Craig Tiller2f40ff42017-04-11 16:01:19 -0700131 if (&w == npp->root) {
132 npp->root = w.next;
133 if (&w == npp->root) {
134 if (npp->shutdown) {
ncteisen274bbbe2017-06-08 14:57:11 -0700135 GRPC_CLOSURE_SCHED(exec_ctx, npp->shutdown, GRPC_ERROR_NONE);
Craig Tiller2f40ff42017-04-11 16:01:19 -0700136 }
Craig Tiller4782d922017-11-10 09:53:21 -0800137 npp->root = nullptr;
Craig Tiller2f40ff42017-04-11 16:01:19 -0700138 }
Craig Tiller2f40ff42017-04-11 16:01:19 -0700139 }
Craig Tiller75bfb972017-04-11 17:55:12 -0700140 w.next->prev = w.prev;
141 w.prev->next = w.next;
Craig Tiller2f40ff42017-04-11 16:01:19 -0700142 gpr_cv_destroy(&w.cv);
Craig Tiller4782d922017-11-10 09:53:21 -0800143 if (worker != nullptr) *worker = nullptr;
Craig Tiller2f40ff42017-04-11 16:01:19 -0700144 return GRPC_ERROR_NONE;
145}
146
Craig Tillerbaa14a92017-11-03 09:09:36 -0700147static grpc_error* non_polling_poller_kick(
148 grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
149 grpc_pollset_worker* specific_worker) {
150 non_polling_poller* p = (non_polling_poller*)pollset;
Craig Tillerbe98d242017-11-10 15:26:57 -0800151 if (specific_worker == nullptr)
152 specific_worker = (grpc_pollset_worker*)p->root;
Craig Tiller4782d922017-11-10 09:53:21 -0800153 if (specific_worker != nullptr) {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700154 non_polling_worker* w = (non_polling_worker*)specific_worker;
Craig Tiller2f40ff42017-04-11 16:01:19 -0700155 if (!w->kicked) {
156 w->kicked = true;
157 gpr_cv_signal(&w->cv);
158 }
159 }
160 return GRPC_ERROR_NONE;
161}
162
Craig Tillerbaa14a92017-11-03 09:09:36 -0700163static void non_polling_poller_shutdown(grpc_exec_ctx* exec_ctx,
164 grpc_pollset* pollset,
165 grpc_closure* closure) {
166 non_polling_poller* p = (non_polling_poller*)pollset;
Craig Tiller4782d922017-11-10 09:53:21 -0800167 GPR_ASSERT(closure != nullptr);
Craig Tiller2f40ff42017-04-11 16:01:19 -0700168 p->shutdown = closure;
Craig Tiller4782d922017-11-10 09:53:21 -0800169 if (p->root == nullptr) {
ncteisen274bbbe2017-06-08 14:57:11 -0700170 GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE);
Craig Tiller2f40ff42017-04-11 16:01:19 -0700171 } else {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700172 non_polling_worker* w = p->root;
Craig Tiller2f40ff42017-04-11 16:01:19 -0700173 do {
174 gpr_cv_signal(&w->cv);
175 w = w->next;
176 } while (w != p->root);
177 }
178}
179
180static const cq_poller_vtable g_poller_vtable_by_poller_type[] = {
181 /* GRPC_CQ_DEFAULT_POLLING */
Yash Tibrewald8b84a22017-09-25 13:38:03 -0700182 {true, true, grpc_pollset_size, grpc_pollset_init, grpc_pollset_kick,
183 grpc_pollset_work, grpc_pollset_shutdown, grpc_pollset_destroy},
Craig Tiller2f40ff42017-04-11 16:01:19 -0700184 /* GRPC_CQ_NON_LISTENING */
Yash Tibrewald8b84a22017-09-25 13:38:03 -0700185 {true, false, grpc_pollset_size, grpc_pollset_init, grpc_pollset_kick,
186 grpc_pollset_work, grpc_pollset_shutdown, grpc_pollset_destroy},
Craig Tiller2f40ff42017-04-11 16:01:19 -0700187 /* GRPC_CQ_NON_POLLING */
Yash Tibrewald8b84a22017-09-25 13:38:03 -0700188 {false, false, non_polling_poller_size, non_polling_poller_init,
189 non_polling_poller_kick, non_polling_poller_work,
190 non_polling_poller_shutdown, non_polling_poller_destroy},
Craig Tiller2f40ff42017-04-11 16:01:19 -0700191};
192
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700193typedef struct cq_vtable {
194 grpc_cq_completion_type cq_completion_type;
Craig Tillerbe290852017-06-01 12:42:09 -0700195 size_t data_size;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700196 void (*init)(void* data);
197 void (*shutdown)(grpc_exec_ctx* exec_ctx, grpc_completion_queue* cq);
198 void (*destroy)(void* data);
199 bool (*begin_op)(grpc_completion_queue* cq, void* tag);
200 void (*end_op)(grpc_exec_ctx* exec_ctx, grpc_completion_queue* cq, void* tag,
201 grpc_error* error,
202 void (*done)(grpc_exec_ctx* exec_ctx, void* done_arg,
203 grpc_cq_completion* storage),
204 void* done_arg, grpc_cq_completion* storage);
205 grpc_event (*next)(grpc_completion_queue* cq, gpr_timespec deadline,
206 void* reserved);
207 grpc_event (*pluck)(grpc_completion_queue* cq, void* tag,
208 gpr_timespec deadline, void* reserved);
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700209} cq_vtable;
210
211/* Queue that holds the cq_completion_events. Internally uses gpr_mpscq queue
212 * (a lockfree multiproducer single consumer queue). It uses a queue_lock
213 * to support multiple consumers.
214 * Only used in completion queues whose completion_type is GRPC_CQ_NEXT */
Sree Kuchibhotla078a3402017-04-11 21:23:03 -0700215typedef struct grpc_cq_event_queue {
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700216 /* Spinlock to serialize consumers i.e pop() operations */
Sree Kuchibhotlafe5f2352017-04-18 13:46:10 -0700217 gpr_spinlock queue_lock;
Sree Kuchibhotla078a3402017-04-11 21:23:03 -0700218
219 gpr_mpscq queue;
220
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700221 /* A lazy counter of number of items in the queue. This is NOT atomically
222 incremented/decremented along with push/pop operations and hence is only
223 eventually consistent */
Sree Kuchibhotla078a3402017-04-11 21:23:03 -0700224 gpr_atm num_queue_items;
225} grpc_cq_event_queue;
226
Craig Tillerbe290852017-06-01 12:42:09 -0700227typedef struct cq_next_data {
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700228 /** Completed events for completion-queues of type GRPC_CQ_NEXT */
Sree Kuchibhotla078a3402017-04-11 21:23:03 -0700229 grpc_cq_event_queue queue;
Sree Kuchibhotla5461a8b2017-04-10 09:52:40 -0700230
Craig Tillerbe290852017-06-01 12:42:09 -0700231 /** Counter of how many things have ever been queued on this completion queue
232 useful for avoiding locks to check the queue */
233 gpr_atm things_queued_ever;
234
Craig Tillercaf8ea92017-06-01 16:08:58 -0700235 /* Number of outstanding events (+1 if not shut down) */
236 gpr_atm pending_events;
237
Sree Kuchibhotlacfce4512017-08-02 23:30:40 -0700238 /** 0 initially. 1 once we initiated shutdown */
239 bool shutdown_called;
Craig Tillerbe290852017-06-01 12:42:09 -0700240} cq_next_data;
241
242typedef struct cq_pluck_data {
243 /** Completed events for completion-queues of type GRPC_CQ_PLUCK */
244 grpc_cq_completion completed_head;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700245 grpc_cq_completion* completed_tail;
Craig Tillerbe290852017-06-01 12:42:09 -0700246
247 /** Number of pending events (+1 if we're not shutdown) */
Sree Kuchibhotlacfce4512017-08-02 23:30:40 -0700248 gpr_atm pending_events;
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700249
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700250 /** Counter of how many things have ever been queued on this completion queue
Craig Tilleraef3a792016-08-24 15:13:53 -0700251 useful for avoiding locks to check the queue */
252 gpr_atm things_queued_ever;
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700253
Sree Kuchibhotlacfce4512017-08-02 23:30:40 -0700254 /** 0 initially. 1 once we completed shutting */
255 /* TODO: (sreek) This is not needed since (shutdown == 1) if and only if
256 * (pending_events == 0). So consider removing this in future and use
257 * pending_events */
Sree Kuchibhotla94aff9e2017-04-10 10:25:03 -0700258 gpr_atm shutdown;
Sree Kuchibhotlacfce4512017-08-02 23:30:40 -0700259
260 /** 0 initially. 1 once we initiated shutdown */
261 bool shutdown_called;
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700262
Craig Tiller5ddbb9d2015-07-29 15:58:11 -0700263 int num_pluckers;
Craig Tiller489df072015-08-01 16:15:45 -0700264 plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
Craig Tillerbe290852017-06-01 12:42:09 -0700265} cq_pluck_data;
266
267/* Completion queue structure */
268struct grpc_completion_queue {
269 /** Once owning_refs drops to zero, we will destroy the cq */
270 gpr_refcount owning_refs;
271
Craig Tillerbaa14a92017-11-03 09:09:36 -0700272 gpr_mu* mu;
Craig Tillerbe290852017-06-01 12:42:09 -0700273
Craig Tillerbaa14a92017-11-03 09:09:36 -0700274 const cq_vtable* vtable;
275 const cq_poller_vtable* poller_vtable;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800276
Craig Tiller4bf29282015-12-14 11:25:48 -0800277#ifndef NDEBUG
Craig Tillerbaa14a92017-11-03 09:09:36 -0700278 void** outstanding_tags;
Craig Tiller4bf29282015-12-14 11:25:48 -0800279 size_t outstanding_tag_count;
280 size_t outstanding_tag_capacity;
281#endif
282
Craig Tillerbe290852017-06-01 12:42:09 -0700283 grpc_closure pollset_shutdown_done;
284 int num_polls;
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700285};
286
Sree Kuchibhotla736dd902017-04-25 18:26:53 -0700287/* Forward declarations */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700288static void cq_finish_shutdown_next(grpc_exec_ctx* exec_ctx,
289 grpc_completion_queue* cq);
290static void cq_finish_shutdown_pluck(grpc_exec_ctx* exec_ctx,
291 grpc_completion_queue* cq);
292static void cq_shutdown_next(grpc_exec_ctx* exec_ctx,
293 grpc_completion_queue* cq);
294static void cq_shutdown_pluck(grpc_exec_ctx* exec_ctx,
295 grpc_completion_queue* cq);
Sree Kuchibhotla736dd902017-04-25 18:26:53 -0700296
Craig Tillerbaa14a92017-11-03 09:09:36 -0700297static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag);
298static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag);
Sree Kuchibhotla736dd902017-04-25 18:26:53 -0700299
Craig Tillerbaa14a92017-11-03 09:09:36 -0700300static void cq_end_op_for_next(grpc_exec_ctx* exec_ctx,
301 grpc_completion_queue* cq, void* tag,
302 grpc_error* error,
303 void (*done)(grpc_exec_ctx* exec_ctx,
304 void* done_arg,
305 grpc_cq_completion* storage),
306 void* done_arg, grpc_cq_completion* storage);
Sree Kuchibhotla736dd902017-04-25 18:26:53 -0700307
Craig Tillerbaa14a92017-11-03 09:09:36 -0700308static void cq_end_op_for_pluck(grpc_exec_ctx* exec_ctx,
309 grpc_completion_queue* cq, void* tag,
310 grpc_error* error,
311 void (*done)(grpc_exec_ctx* exec_ctx,
312 void* done_arg,
313 grpc_cq_completion* storage),
314 void* done_arg, grpc_cq_completion* storage);
Sree Kuchibhotla736dd902017-04-25 18:26:53 -0700315
Craig Tillerbaa14a92017-11-03 09:09:36 -0700316static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
317 void* reserved);
Sree Kuchibhotla736dd902017-04-25 18:26:53 -0700318
Craig Tillerbaa14a92017-11-03 09:09:36 -0700319static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
320 gpr_timespec deadline, void* reserved);
Sree Kuchibhotla736dd902017-04-25 18:26:53 -0700321
Craig Tillerbaa14a92017-11-03 09:09:36 -0700322static void cq_init_next(void* data);
323static void cq_init_pluck(void* data);
324static void cq_destroy_next(void* data);
325static void cq_destroy_pluck(void* data);
Craig Tillerbe290852017-06-01 12:42:09 -0700326
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700327/* Completion queue vtables based on the completion-type */
328static const cq_vtable g_cq_vtable[] = {
329 /* GRPC_CQ_NEXT */
Yash Tibrewal533d1182017-09-18 10:48:22 -0700330 {GRPC_CQ_NEXT, sizeof(cq_next_data), cq_init_next, cq_shutdown_next,
Craig Tillerbe98d242017-11-10 15:26:57 -0800331 cq_destroy_next, cq_begin_op_for_next, cq_end_op_for_next, cq_next,
332 nullptr},
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700333 /* GRPC_CQ_PLUCK */
Yash Tibrewal533d1182017-09-18 10:48:22 -0700334 {GRPC_CQ_PLUCK, sizeof(cq_pluck_data), cq_init_pluck, cq_shutdown_pluck,
Craig Tiller4782d922017-11-10 09:53:21 -0800335 cq_destroy_pluck, cq_begin_op_for_pluck, cq_end_op_for_pluck, nullptr,
Yash Tibrewal533d1182017-09-18 10:48:22 -0700336 cq_pluck},
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800337};
338
Craig Tillerbaa14a92017-11-03 09:09:36 -0700339#define DATA_FROM_CQ(cq) ((void*)(cq + 1))
Craig Tillerbe290852017-06-01 12:42:09 -0700340#define POLLSET_FROM_CQ(cq) \
Craig Tillerbaa14a92017-11-03 09:09:36 -0700341 ((grpc_pollset*)(cq->vtable->data_size + (char*)DATA_FROM_CQ(cq)))
Craig Tiller69b093b2016-02-25 19:04:07 -0800342
ncteisen7712c7c2017-07-12 23:11:27 -0700343grpc_tracer_flag grpc_cq_pluck_trace =
344 GRPC_TRACER_INITIALIZER(true, "queue_pluck");
345grpc_tracer_flag grpc_cq_event_timeout_trace =
346 GRPC_TRACER_INITIALIZER(true, "queue_timeout");
murgatroid999f6a8052016-06-15 17:39:03 -0700347
Craig Tiller84f75d42017-05-03 13:06:35 -0700348#define GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, event) \
349 if (GRPC_TRACER_ON(grpc_api_trace) && \
350 (GRPC_TRACER_ON(grpc_cq_pluck_trace) || \
351 (event)->type != GRPC_QUEUE_TIMEOUT)) { \
Craig Tillerbaa14a92017-11-03 09:09:36 -0700352 char* _ev = grpc_event_string(event); \
Craig Tiller84f75d42017-05-03 13:06:35 -0700353 gpr_log(GPR_INFO, "RETURN_EVENT[%p]: %s", cq, _ev); \
354 gpr_free(_ev); \
murgatroid999f6a8052016-06-15 17:39:03 -0700355 }
356
Craig Tillerbaa14a92017-11-03 09:09:36 -0700357static void on_pollset_shutdown_done(grpc_exec_ctx* exec_ctx, void* cq,
358 grpc_error* error);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800359
Ken Payson42bd87e2017-10-20 10:32:30 -0700360void grpc_cq_global_init() {
361 gpr_tls_init(&g_cached_event);
362 gpr_tls_init(&g_cached_cq);
363}
364
Craig Tillerbaa14a92017-11-03 09:09:36 -0700365void grpc_completion_queue_thread_local_cache_init(grpc_completion_queue* cq) {
366 if ((grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == nullptr) {
Ken Payson42bd87e2017-10-20 10:32:30 -0700367 gpr_tls_set(&g_cached_event, (intptr_t)0);
368 gpr_tls_set(&g_cached_cq, (intptr_t)cq);
369 }
370}
371
Craig Tillerbaa14a92017-11-03 09:09:36 -0700372int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue* cq,
373 void** tag, int* ok) {
374 grpc_cq_completion* storage =
375 (grpc_cq_completion*)gpr_tls_get(&g_cached_event);
Ken Payson42bd87e2017-10-20 10:32:30 -0700376 int ret = 0;
Craig Tiller4782d922017-11-10 09:53:21 -0800377 if (storage != nullptr &&
Craig Tillerbaa14a92017-11-03 09:09:36 -0700378 (grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == cq) {
Ken Payson42bd87e2017-10-20 10:32:30 -0700379 *tag = storage->tag;
380 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
Ken Payson42bd87e2017-10-20 10:32:30 -0700381 *ok = (storage->next & (uintptr_t)(1)) == 1;
Ken Paysone1533572017-11-07 11:27:02 -0800382 storage->done(&exec_ctx, storage->done_arg, storage);
Ken Payson42bd87e2017-10-20 10:32:30 -0700383 ret = 1;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700384 cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq);
Ken Payson42bd87e2017-10-20 10:32:30 -0700385 if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
386 GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
387 gpr_mu_lock(cq->mu);
388 cq_finish_shutdown_next(&exec_ctx, cq);
389 gpr_mu_unlock(cq->mu);
390 GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "shutting_down");
391 }
392 grpc_exec_ctx_finish(&exec_ctx);
393 }
394 gpr_tls_set(&g_cached_event, (intptr_t)0);
395 gpr_tls_set(&g_cached_cq, (intptr_t)0);
396
397 return ret;
398}
399
Craig Tillerbaa14a92017-11-03 09:09:36 -0700400static void cq_event_queue_init(grpc_cq_event_queue* q) {
Sree Kuchibhotla078a3402017-04-11 21:23:03 -0700401 gpr_mpscq_init(&q->queue);
Sree Kuchibhotlafe5f2352017-04-18 13:46:10 -0700402 q->queue_lock = GPR_SPINLOCK_INITIALIZER;
Sree Kuchibhotla078a3402017-04-11 21:23:03 -0700403 gpr_atm_no_barrier_store(&q->num_queue_items, 0);
404}
405
Craig Tillerbaa14a92017-11-03 09:09:36 -0700406static void cq_event_queue_destroy(grpc_cq_event_queue* q) {
Sree Kuchibhotla078a3402017-04-11 21:23:03 -0700407 gpr_mpscq_destroy(&q->queue);
Sree Kuchibhotla078a3402017-04-11 21:23:03 -0700408}
409
Craig Tillerbaa14a92017-11-03 09:09:36 -0700410static bool cq_event_queue_push(grpc_cq_event_queue* q, grpc_cq_completion* c) {
411 gpr_mpscq_push(&q->queue, (gpr_mpscq_node*)c);
Craig Tiller6f0bd2c2017-05-26 08:15:35 -0700412 return gpr_atm_no_barrier_fetch_add(&q->num_queue_items, 1) == 0;
Sree Kuchibhotla078a3402017-04-11 21:23:03 -0700413}
414
Craig Tillerbaa14a92017-11-03 09:09:36 -0700415static grpc_cq_completion* cq_event_queue_pop(grpc_cq_event_queue* q) {
Craig Tiller4782d922017-11-10 09:53:21 -0800416 grpc_cq_completion* c = nullptr;
Sree Kuchibhotla0d0fa062017-10-03 16:04:42 -0700417 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
418
Sree Kuchibhotlafe5f2352017-04-18 13:46:10 -0700419 if (gpr_spinlock_trylock(&q->queue_lock)) {
Sree Kuchibhotla0d0fa062017-10-03 16:04:42 -0700420 GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES(&exec_ctx);
421
422 bool is_empty = false;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700423 c = (grpc_cq_completion*)gpr_mpscq_pop_and_check_end(&q->queue, &is_empty);
Sree Kuchibhotlafe5f2352017-04-18 13:46:10 -0700424 gpr_spinlock_unlock(&q->queue_lock);
Sree Kuchibhotla0d0fa062017-10-03 16:04:42 -0700425
Craig Tiller4782d922017-11-10 09:53:21 -0800426 if (c == nullptr && !is_empty) {
Sree Kuchibhotla0d0fa062017-10-03 16:04:42 -0700427 GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES(&exec_ctx);
428 }
Sree Kuchibhotla2f2175c2017-10-03 14:25:08 -0700429 } else {
Sree Kuchibhotla0d0fa062017-10-03 16:04:42 -0700430 GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES(&exec_ctx);
Sree Kuchibhotlafe5f2352017-04-18 13:46:10 -0700431 }
432
Sree Kuchibhotla0d0fa062017-10-03 16:04:42 -0700433 grpc_exec_ctx_finish(&exec_ctx);
434
Sree Kuchibhotla078a3402017-04-11 21:23:03 -0700435 if (c) {
436 gpr_atm_no_barrier_fetch_add(&q->num_queue_items, -1);
437 }
438
439 return c;
440}
441
442/* Note: The counter is not incremented/decremented atomically with push/pop.
443 * The count is only eventually consistent */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700444static long cq_event_queue_num_items(grpc_cq_event_queue* q) {
Sree Kuchibhotlafe5f2352017-04-18 13:46:10 -0700445 return (long)gpr_atm_no_barrier_load(&q->num_queue_items);
Sree Kuchibhotla078a3402017-04-11 21:23:03 -0700446}
447
Craig Tillerbaa14a92017-11-03 09:09:36 -0700448grpc_completion_queue* grpc_completion_queue_create_internal(
Sree Kuchibhotla2abbf8a2017-03-21 17:31:03 -0700449 grpc_cq_completion_type completion_type,
450 grpc_cq_polling_type polling_type) {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700451 grpc_completion_queue* cq;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800452
Sree Kuchibhotla2abbf8a2017-03-21 17:31:03 -0700453 GPR_TIMER_BEGIN("grpc_completion_queue_create_internal", 0);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800454
Sree Kuchibhotla2abbf8a2017-03-21 17:31:03 -0700455 GRPC_API_TRACE(
456 "grpc_completion_queue_create_internal(completion_type=%d, "
457 "polling_type=%d)",
458 2, (completion_type, polling_type));
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800459
Craig Tillerbaa14a92017-11-03 09:09:36 -0700460 const cq_vtable* vtable = &g_cq_vtable[completion_type];
461 const cq_poller_vtable* poller_vtable =
Craig Tiller2f40ff42017-04-11 16:01:19 -0700462 &g_poller_vtable_by_poller_type[polling_type];
463
Craig Tillerf1dc9c32017-09-13 14:21:27 -0700464 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
465 GRPC_STATS_INC_CQS_CREATED(&exec_ctx);
466 grpc_exec_ctx_finish(&exec_ctx);
467
Craig Tillerbaa14a92017-11-03 09:09:36 -0700468 cq = (grpc_completion_queue*)gpr_zalloc(sizeof(grpc_completion_queue) +
469 vtable->data_size +
470 poller_vtable->size());
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800471
Craig Tiller6f0bd2c2017-05-26 08:15:35 -0700472 cq->vtable = vtable;
473 cq->poller_vtable = poller_vtable;
Sree Kuchibhotla321881d2017-02-27 11:25:28 -0800474
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700475 /* One for destroy(), one for pollset_shutdown */
Craig Tillerbe290852017-06-01 12:42:09 -0700476 gpr_ref_init(&cq->owning_refs, 2);
477
478 poller_vtable->init(POLLSET_FROM_CQ(cq), &cq->mu);
479 vtable->init(DATA_FROM_CQ(cq));
480
Craig Tillere7183b72017-06-19 11:21:02 -0700481 GRPC_CLOSURE_INIT(&cq->pollset_shutdown_done, on_pollset_shutdown_done, cq,
Craig Tillerd4654562017-01-03 08:45:56 -0800482 grpc_schedule_on_exec_ctx);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800483
Sree Kuchibhotla2abbf8a2017-03-21 17:31:03 -0700484 GPR_TIMER_END("grpc_completion_queue_create_internal", 0);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800485
Craig Tiller6f0bd2c2017-05-26 08:15:35 -0700486 return cq;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800487}
488
Craig Tillerbaa14a92017-11-03 09:09:36 -0700489static void cq_init_next(void* ptr) {
490 cq_next_data* cqd = (cq_next_data*)ptr;
Sree Kuchibhotlacfce4512017-08-02 23:30:40 -0700491 /* Initial count is dropped by grpc_completion_queue_shutdown */
Craig Tillercaf8ea92017-06-01 16:08:58 -0700492 gpr_atm_no_barrier_store(&cqd->pending_events, 1);
493 cqd->shutdown_called = false;
Craig Tillerbe290852017-06-01 12:42:09 -0700494 gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0);
495 cq_event_queue_init(&cqd->queue);
496}
497
Craig Tillerbaa14a92017-11-03 09:09:36 -0700498static void cq_destroy_next(void* ptr) {
499 cq_next_data* cqd = (cq_next_data*)ptr;
Craig Tillerbe290852017-06-01 12:42:09 -0700500 GPR_ASSERT(cq_event_queue_num_items(&cqd->queue) == 0);
501 cq_event_queue_destroy(&cqd->queue);
502}
503
Craig Tillerbaa14a92017-11-03 09:09:36 -0700504static void cq_init_pluck(void* ptr) {
505 cq_pluck_data* cqd = (cq_pluck_data*)ptr;
Sree Kuchibhotlacfce4512017-08-02 23:30:40 -0700506 /* Initial count is dropped by grpc_completion_queue_shutdown */
507 gpr_atm_no_barrier_store(&cqd->pending_events, 1);
Craig Tillerbe290852017-06-01 12:42:09 -0700508 cqd->completed_tail = &cqd->completed_head;
509 cqd->completed_head.next = (uintptr_t)cqd->completed_tail;
510 gpr_atm_no_barrier_store(&cqd->shutdown, 0);
Sree Kuchibhotlacfce4512017-08-02 23:30:40 -0700511 cqd->shutdown_called = false;
Craig Tillerbe290852017-06-01 12:42:09 -0700512 cqd->num_pluckers = 0;
513 gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0);
514}
515
Craig Tillerbaa14a92017-11-03 09:09:36 -0700516static void cq_destroy_pluck(void* ptr) {
517 cq_pluck_data* cqd = (cq_pluck_data*)ptr;
Craig Tillerbe290852017-06-01 12:42:09 -0700518 GPR_ASSERT(cqd->completed_head.next == (uintptr_t)&cqd->completed_head);
519}
520
Craig Tillerbaa14a92017-11-03 09:09:36 -0700521grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue* cq) {
Craig Tiller6f0bd2c2017-05-26 08:15:35 -0700522 return cq->vtable->cq_completion_type;
Sree Kuchibhotla321881d2017-02-27 11:25:28 -0800523}
524
Craig Tillerbaa14a92017-11-03 09:09:36 -0700525int grpc_get_cq_poll_num(grpc_completion_queue* cq) {
Yuxuan Li87827e02017-05-05 14:12:42 -0700526 int cur_num_polls;
Craig Tillerbe290852017-06-01 12:42:09 -0700527 gpr_mu_lock(cq->mu);
528 cur_num_polls = cq->num_polls;
529 gpr_mu_unlock(cq->mu);
Yuxuan Li87827e02017-05-05 14:12:42 -0700530 return cur_num_polls;
Yuxuan Li999ac152017-05-03 21:36:36 -0700531}
532
ncteisend39010e2017-06-08 17:08:07 -0700533#ifndef NDEBUG
Craig Tillerbaa14a92017-11-03 09:09:36 -0700534void grpc_cq_internal_ref(grpc_completion_queue* cq, const char* reason,
535 const char* file, int line) {
ncteisend39010e2017-06-08 17:08:07 -0700536 if (GRPC_TRACER_ON(grpc_trace_cq_refcount)) {
Craig Tiller39a73272017-07-05 08:36:04 -0700537 gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count);
ncteisen3bc25582017-06-09 10:35:35 -0700538 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
Craig Tiller39a73272017-07-05 08:36:04 -0700539 "CQ:%p ref %" PRIdPTR " -> %" PRIdPTR " %s", cq, val, val + 1,
ncteisen3bc25582017-06-09 10:35:35 -0700540 reason);
ncteisend39010e2017-06-08 17:08:07 -0700541 }
Craig Tiller463f2372015-05-28 16:16:15 -0700542#else
Craig Tillerbaa14a92017-11-03 09:09:36 -0700543void grpc_cq_internal_ref(grpc_completion_queue* cq) {
Craig Tiller463f2372015-05-28 16:16:15 -0700544#endif
Craig Tillerbe290852017-06-01 12:42:09 -0700545 gpr_ref(&cq->owning_refs);
Craig Tiller5717a982015-04-27 12:01:49 -0700546}
547
Craig Tillerbaa14a92017-11-03 09:09:36 -0700548static void on_pollset_shutdown_done(grpc_exec_ctx* exec_ctx, void* arg,
549 grpc_error* error) {
550 grpc_completion_queue* cq = (grpc_completion_queue*)arg;
Craig Tiller6f0bd2c2017-05-26 08:15:35 -0700551 GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "pollset_destroy");
Craig Tiller5717a982015-04-27 12:01:49 -0700552}
553
ncteisend39010e2017-06-08 17:08:07 -0700554#ifndef NDEBUG
Craig Tillerbaa14a92017-11-03 09:09:36 -0700555void grpc_cq_internal_unref(grpc_exec_ctx* exec_ctx, grpc_completion_queue* cq,
556 const char* reason, const char* file, int line) {
ncteisend39010e2017-06-08 17:08:07 -0700557 if (GRPC_TRACER_ON(grpc_trace_cq_refcount)) {
Craig Tiller39a73272017-07-05 08:36:04 -0700558 gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count);
ncteisen3bc25582017-06-09 10:35:35 -0700559 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
Craig Tiller39a73272017-07-05 08:36:04 -0700560 "CQ:%p unref %" PRIdPTR " -> %" PRIdPTR " %s", cq, val, val - 1,
ncteisen3bc25582017-06-09 10:35:35 -0700561 reason);
ncteisend39010e2017-06-08 17:08:07 -0700562 }
Craig Tiller463f2372015-05-28 16:16:15 -0700563#else
Craig Tillerbaa14a92017-11-03 09:09:36 -0700564void grpc_cq_internal_unref(grpc_exec_ctx* exec_ctx,
565 grpc_completion_queue* cq) {
Craig Tiller463f2372015-05-28 16:16:15 -0700566#endif
Craig Tillerbe290852017-06-01 12:42:09 -0700567 if (gpr_unref(&cq->owning_refs)) {
568 cq->vtable->destroy(DATA_FROM_CQ(cq));
Craig Tiller6f0bd2c2017-05-26 08:15:35 -0700569 cq->poller_vtable->destroy(exec_ctx, POLLSET_FROM_CQ(cq));
Craig Tiller07e8fb12017-02-09 21:36:24 -0800570#ifndef NDEBUG
Craig Tillerbe290852017-06-01 12:42:09 -0700571 gpr_free(cq->outstanding_tags);
Craig Tiller07e8fb12017-02-09 21:36:24 -0800572#endif
Craig Tiller6f0bd2c2017-05-26 08:15:35 -0700573 gpr_free(cq);
Craig Tillera82950e2015-09-22 12:33:20 -0700574 }
Craig Tiller5717a982015-04-27 12:01:49 -0700575}
576
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700577#ifndef NDEBUG
Craig Tillerbaa14a92017-11-03 09:09:36 -0700578static void cq_check_tag(grpc_completion_queue* cq, void* tag, bool lock_cq) {
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700579 int found = 0;
580 if (lock_cq) {
Craig Tillerbe290852017-06-01 12:42:09 -0700581 gpr_mu_lock(cq->mu);
Craig Tillercfc8ae12016-05-10 20:49:54 -0700582 }
Craig Tillerce1f19e2015-10-09 16:16:04 -0700583
Craig Tillerbe290852017-06-01 12:42:09 -0700584 for (int i = 0; i < (int)cq->outstanding_tag_count; i++) {
585 if (cq->outstanding_tags[i] == tag) {
586 cq->outstanding_tag_count--;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700587 GPR_SWAP(void*, cq->outstanding_tags[i],
Craig Tillerbe290852017-06-01 12:42:09 -0700588 cq->outstanding_tags[cq->outstanding_tag_count]);
Craig Tiller4bf29282015-12-14 11:25:48 -0800589 found = 1;
590 break;
591 }
592 }
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700593
594 if (lock_cq) {
Craig Tillerbe290852017-06-01 12:42:09 -0700595 gpr_mu_unlock(cq->mu);
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700596 }
597
Craig Tiller4bf29282015-12-14 11:25:48 -0800598 GPR_ASSERT(found);
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700599}
600#else
Craig Tillerbaa14a92017-11-03 09:09:36 -0700601static void cq_check_tag(grpc_completion_queue* cq, void* tag, bool lock_cq) {}
Craig Tiller4bf29282015-12-14 11:25:48 -0800602#endif
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700603
Sree Kuchibhotlacfce4512017-08-02 23:30:40 -0700604/* Atomically increments a counter only if the counter is not zero. Returns
605 * true if the increment was successful; false if the counter is zero */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700606static bool atm_inc_if_nonzero(gpr_atm* counter) {
yang-g533fbd32017-07-13 11:39:44 -0700607 while (true) {
Sree Kuchibhotlac7cebe82017-09-13 17:51:26 -0700608 gpr_atm count = gpr_atm_acq_load(counter);
Sree Kuchibhotlacfce4512017-08-02 23:30:40 -0700609 /* If zero, we are done. If not, we must to a CAS (instead of an atomic
610 * increment) to maintain the contract: do not increment the counter if it
611 * is zero. */
yang-g533fbd32017-07-13 11:39:44 -0700612 if (count == 0) {
yang-g7d6b9142017-07-13 11:48:56 -0700613 return false;
Sree Kuchibhotlac7cebe82017-09-13 17:51:26 -0700614 } else if (gpr_atm_full_cas(counter, count, count + 1)) {
yang-g533fbd32017-07-13 11:39:44 -0700615 break;
616 }
617 }
Sree Kuchibhotlacfce4512017-08-02 23:30:40 -0700618
yang-g7d6b9142017-07-13 11:48:56 -0700619 return true;
yang-g533fbd32017-07-13 11:39:44 -0700620}
621
Craig Tillerbaa14a92017-11-03 09:09:36 -0700622static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag) {
623 cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq);
Sree Kuchibhotlacfce4512017-08-02 23:30:40 -0700624 return atm_inc_if_nonzero(&cqd->pending_events);
625}
626
Craig Tillerbaa14a92017-11-03 09:09:36 -0700627static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag) {
628 cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq);
Sree Kuchibhotlacfce4512017-08-02 23:30:40 -0700629 return atm_inc_if_nonzero(&cqd->pending_events);
yang-g533fbd32017-07-13 11:39:44 -0700630}
631
Craig Tillerbaa14a92017-11-03 09:09:36 -0700632bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) {
yang-g533fbd32017-07-13 11:39:44 -0700633#ifndef NDEBUG
634 gpr_mu_lock(cq->mu);
635 if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) {
636 cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity);
Craig Tillerbaa14a92017-11-03 09:09:36 -0700637 cq->outstanding_tags = (void**)gpr_realloc(
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700638 cq->outstanding_tags,
639 sizeof(*cq->outstanding_tags) * cq->outstanding_tag_capacity);
yang-g533fbd32017-07-13 11:39:44 -0700640 }
641 cq->outstanding_tags[cq->outstanding_tag_count++] = tag;
642 gpr_mu_unlock(cq->mu);
643#endif
644 return cq->vtable->begin_op(cq, tag);
645}
646
Craig Tiller39a73272017-07-05 08:36:04 -0700647/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
648 * completion
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700649 * type of GRPC_CQ_NEXT) */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700650static void cq_end_op_for_next(grpc_exec_ctx* exec_ctx,
651 grpc_completion_queue* cq, void* tag,
652 grpc_error* error,
653 void (*done)(grpc_exec_ctx* exec_ctx,
654 void* done_arg,
655 grpc_cq_completion* storage),
656 void* done_arg, grpc_cq_completion* storage) {
Sree Kuchibhotla736dd902017-04-25 18:26:53 -0700657 GPR_TIMER_BEGIN("cq_end_op_for_next", 0);
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700658
Craig Tiller84f75d42017-05-03 13:06:35 -0700659 if (GRPC_TRACER_ON(grpc_api_trace) ||
660 (GRPC_TRACER_ON(grpc_trace_operation_failures) &&
661 error != GRPC_ERROR_NONE)) {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700662 const char* errmsg = grpc_error_string(error);
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700663 GRPC_API_TRACE(
Craig Tiller6f0bd2c2017-05-26 08:15:35 -0700664 "cq_end_op_for_next(exec_ctx=%p, cq=%p, tag=%p, error=%s, "
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700665 "done=%p, done_arg=%p, storage=%p)",
Craig Tiller6f0bd2c2017-05-26 08:15:35 -0700666 7, (exec_ctx, cq, tag, errmsg, done, done_arg, storage));
Craig Tiller84f75d42017-05-03 13:06:35 -0700667 if (GRPC_TRACER_ON(grpc_trace_operation_failures) &&
668 error != GRPC_ERROR_NONE) {
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700669 gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
Craig Tiller5ddbb9d2015-07-29 15:58:11 -0700670 }
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700671 }
Craig Tillerbaa14a92017-11-03 09:09:36 -0700672 cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq);
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700673 int is_success = (error == GRPC_ERROR_NONE);
674
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700675 storage->tag = tag;
676 storage->done = done;
677 storage->done_arg = done_arg;
Sree Kuchibhotla453c6112017-04-11 23:25:42 -0700678 storage->next = (uintptr_t)(is_success);
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700679
Craig Tiller6f0bd2c2017-05-26 08:15:35 -0700680 cq_check_tag(cq, tag, true); /* Used in debug builds only */
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700681
Craig Tillerbaa14a92017-11-03 09:09:36 -0700682 if ((grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == cq &&
683 (grpc_cq_completion*)gpr_tls_get(&g_cached_event) == nullptr) {
Ken Payson42bd87e2017-10-20 10:32:30 -0700684 gpr_tls_set(&g_cached_event, (intptr_t)storage);
685 } else {
686 /* Add the completion to the queue */
687 bool is_first = cq_event_queue_push(&cqd->queue, storage);
688 gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1);
Sree Kuchibhotla49d75e32017-09-15 15:25:50 -0700689
Ken Payson42bd87e2017-10-20 10:32:30 -0700690 /* Since we do not hold the cq lock here, it is important to do an 'acquire'
691 load here (instead of a 'no_barrier' load) to match with the release
692 store
693 (done via gpr_atm_full_fetch_add(pending_events, -1)) in cq_shutdown_next
694 */
695 bool will_definitely_shutdown = gpr_atm_acq_load(&cqd->pending_events) == 1;
Sree Kuchibhotla5461a8b2017-04-10 09:52:40 -0700696
Ken Payson42bd87e2017-10-20 10:32:30 -0700697 if (!will_definitely_shutdown) {
698 /* Only kick if this is the first item queued */
699 if (is_first) {
700 gpr_mu_lock(cq->mu);
Craig Tillerbaa14a92017-11-03 09:09:36 -0700701 grpc_error* kick_error =
Craig Tiller4782d922017-11-10 09:53:21 -0800702 cq->poller_vtable->kick(exec_ctx, POLLSET_FROM_CQ(cq), nullptr);
Ken Payson42bd87e2017-10-20 10:32:30 -0700703 gpr_mu_unlock(cq->mu);
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700704
Ken Payson42bd87e2017-10-20 10:32:30 -0700705 if (kick_error != GRPC_ERROR_NONE) {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700706 const char* msg = grpc_error_string(kick_error);
Ken Payson42bd87e2017-10-20 10:32:30 -0700707 gpr_log(GPR_ERROR, "Kick failed: %s", msg);
708 GRPC_ERROR_UNREF(kick_error);
709 }
Craig Tiller1e960fc2017-05-26 09:09:39 -0700710 }
Ken Payson42bd87e2017-10-20 10:32:30 -0700711 if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
712 GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
713 gpr_mu_lock(cq->mu);
714 cq_finish_shutdown_next(exec_ctx, cq);
715 gpr_mu_unlock(cq->mu);
716 GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
717 }
718 } else {
Craig Tillercaf8ea92017-06-01 16:08:58 -0700719 GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
Ken Payson42bd87e2017-10-20 10:32:30 -0700720 gpr_atm_rel_store(&cqd->pending_events, 0);
Craig Tillercaf8ea92017-06-01 16:08:58 -0700721 gpr_mu_lock(cq->mu);
722 cq_finish_shutdown_next(exec_ctx, cq);
723 gpr_mu_unlock(cq->mu);
724 GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
725 }
Craig Tillera82950e2015-09-22 12:33:20 -0700726 }
Craig Tillerce1f19e2015-10-09 16:16:04 -0700727
Sree Kuchibhotla736dd902017-04-25 18:26:53 -0700728 GPR_TIMER_END("cq_end_op_for_next", 0);
Craig Tillercae4b1b2016-05-10 09:11:09 -0700729
730 GRPC_ERROR_UNREF(error);
Craig Tillercce17ac2015-01-20 09:29:28 -0800731}
732
Craig Tiller39a73272017-07-05 08:36:04 -0700733/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
734 * completion
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700735 * type of GRPC_CQ_PLUCK) */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700736static void cq_end_op_for_pluck(grpc_exec_ctx* exec_ctx,
737 grpc_completion_queue* cq, void* tag,
738 grpc_error* error,
739 void (*done)(grpc_exec_ctx* exec_ctx,
740 void* done_arg,
741 grpc_cq_completion* storage),
742 void* done_arg, grpc_cq_completion* storage) {
743 cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq);
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700744 int is_success = (error == GRPC_ERROR_NONE);
745
Sree Kuchibhotla736dd902017-04-25 18:26:53 -0700746 GPR_TIMER_BEGIN("cq_end_op_for_pluck", 0);
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700747
Sree Kuchibhotla8ac5c6d2017-05-11 14:13:11 -0700748 if (GRPC_TRACER_ON(grpc_api_trace) ||
749 (GRPC_TRACER_ON(grpc_trace_operation_failures) &&
750 error != GRPC_ERROR_NONE)) {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700751 const char* errmsg = grpc_error_string(error);
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700752 GRPC_API_TRACE(
Craig Tiller6f0bd2c2017-05-26 08:15:35 -0700753 "cq_end_op_for_pluck(exec_ctx=%p, cq=%p, tag=%p, error=%s, "
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700754 "done=%p, done_arg=%p, storage=%p)",
Craig Tiller6f0bd2c2017-05-26 08:15:35 -0700755 7, (exec_ctx, cq, tag, errmsg, done, done_arg, storage));
Sree Kuchibhotla8ac5c6d2017-05-11 14:13:11 -0700756 if (GRPC_TRACER_ON(grpc_trace_operation_failures) &&
757 error != GRPC_ERROR_NONE) {
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700758 gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
759 }
760 }
761
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700762 storage->tag = tag;
763 storage->done = done;
764 storage->done_arg = done_arg;
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700765 storage->next = ((uintptr_t)&cqd->completed_head) | ((uintptr_t)(is_success));
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700766
Craig Tillerbe290852017-06-01 12:42:09 -0700767 gpr_mu_lock(cq->mu);
Craig Tiller6f0bd2c2017-05-26 08:15:35 -0700768 cq_check_tag(cq, tag, false); /* Used in debug builds only */
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700769
770 /* Add to the list of completions */
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700771 gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1);
772 cqd->completed_tail->next =
773 ((uintptr_t)storage) | (1u & (uintptr_t)cqd->completed_tail->next);
774 cqd->completed_tail = storage;
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700775
Sree Kuchibhotlacfce4512017-08-02 23:30:40 -0700776 if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
777 cq_finish_shutdown_pluck(exec_ctx, cq);
778 gpr_mu_unlock(cq->mu);
779 } else {
Craig Tiller4782d922017-11-10 09:53:21 -0800780 grpc_pollset_worker* pluck_worker = nullptr;
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700781 for (int i = 0; i < cqd->num_pluckers; i++) {
782 if (cqd->pluckers[i].tag == tag) {
783 pluck_worker = *cqd->pluckers[i].worker;
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700784 break;
785 }
786 }
787
Craig Tillerbaa14a92017-11-03 09:09:36 -0700788 grpc_error* kick_error =
Craig Tiller0ff222a2017-09-01 09:41:43 -0700789 cq->poller_vtable->kick(exec_ctx, POLLSET_FROM_CQ(cq), pluck_worker);
Sree Kuchibhotlaa72d79b2017-04-24 19:45:20 -0700790
Craig Tillerbe290852017-06-01 12:42:09 -0700791 gpr_mu_unlock(cq->mu);
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700792
793 if (kick_error != GRPC_ERROR_NONE) {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700794 const char* msg = grpc_error_string(kick_error);
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700795 gpr_log(GPR_ERROR, "Kick failed: %s", msg);
796
797 GRPC_ERROR_UNREF(kick_error);
798 }
Sree Kuchibhotla5461a8b2017-04-10 09:52:40 -0700799 }
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700800
Sree Kuchibhotla736dd902017-04-25 18:26:53 -0700801 GPR_TIMER_END("cq_end_op_for_pluck", 0);
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700802
803 GRPC_ERROR_UNREF(error);
Sree Kuchibhotla5461a8b2017-04-10 09:52:40 -0700804}
805
Craig Tillerbaa14a92017-11-03 09:09:36 -0700806void grpc_cq_end_op(grpc_exec_ctx* exec_ctx, grpc_completion_queue* cq,
807 void* tag, grpc_error* error,
808 void (*done)(grpc_exec_ctx* exec_ctx, void* done_arg,
809 grpc_cq_completion* storage),
810 void* done_arg, grpc_cq_completion* storage) {
Craig Tiller6f0bd2c2017-05-26 08:15:35 -0700811 cq->vtable->end_op(exec_ctx, cq, tag, error, done, done_arg, storage);
ctiller58393c22015-01-07 14:03:30 -0800812}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800813
Craig Tiller2f42fde2016-08-04 15:13:18 -0700814typedef struct {
Craig Tilleraef3a792016-08-24 15:13:53 -0700815 gpr_atm last_seen_things_queued_ever;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700816 grpc_completion_queue* cq;
Craig Tillerdc3998e2017-05-12 09:55:30 -0700817 grpc_millis deadline;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700818 grpc_cq_completion* stolen_completion;
819 void* tag; /* for pluck */
Craig Tiller2c4043b2016-09-05 14:50:16 -0700820 bool first_loop;
Craig Tiller2f42fde2016-08-04 15:13:18 -0700821} cq_is_finished_arg;
822
Craig Tillerbaa14a92017-11-03 09:09:36 -0700823static bool cq_is_next_finished(grpc_exec_ctx* exec_ctx, void* arg) {
824 cq_is_finished_arg* a = (cq_is_finished_arg*)arg;
825 grpc_completion_queue* cq = a->cq;
826 cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq);
Craig Tiller4782d922017-11-10 09:53:21 -0800827 GPR_ASSERT(a->stolen_completion == nullptr);
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700828
Craig Tilleraef3a792016-08-24 15:13:53 -0700829 gpr_atm current_last_seen_things_queued_ever =
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700830 gpr_atm_no_barrier_load(&cqd->things_queued_ever);
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700831
Craig Tilleraef3a792016-08-24 15:13:53 -0700832 if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) {
Craig Tilleraef3a792016-08-24 15:13:53 -0700833 a->last_seen_things_queued_ever =
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700834 gpr_atm_no_barrier_load(&cqd->things_queued_ever);
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700835
836 /* Pop a cq_completion from the queue. Returns NULL if the queue is empty
Craig Tiller39a73272017-07-05 08:36:04 -0700837 * might return NULL in some cases even if the queue is not empty; but
838 * that
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700839 * is ok and doesn't affect correctness. Might effect the tail latencies a
840 * bit) */
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700841 a->stolen_completion = cq_event_queue_pop(&cqd->queue);
Craig Tiller4782d922017-11-10 09:53:21 -0800842 if (a->stolen_completion != nullptr) {
Craig Tilleraef3a792016-08-24 15:13:53 -0700843 return true;
Craig Tiller2f42fde2016-08-04 15:13:18 -0700844 }
Craig Tiller2f42fde2016-08-04 15:13:18 -0700845 }
Craig Tillerdc3998e2017-05-12 09:55:30 -0700846 return !a->first_loop && a->deadline < grpc_exec_ctx_now(exec_ctx);
Craig Tiller2f42fde2016-08-04 15:13:18 -0700847}
848
Craig Tiller4e41e362016-08-19 13:12:54 -0700849#ifndef NDEBUG
Craig Tillerbaa14a92017-11-03 09:09:36 -0700850static void dump_pending_tags(grpc_completion_queue* cq) {
Craig Tiller84f75d42017-05-03 13:06:35 -0700851 if (!GRPC_TRACER_ON(grpc_trace_pending_tags)) return;
Craig Tiller4e41e362016-08-19 13:12:54 -0700852
853 gpr_strvec v;
854 gpr_strvec_init(&v);
855 gpr_strvec_add(&v, gpr_strdup("PENDING TAGS:"));
Craig Tillerbe290852017-06-01 12:42:09 -0700856 gpr_mu_lock(cq->mu);
857 for (size_t i = 0; i < cq->outstanding_tag_count; i++) {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700858 char* s;
Craig Tillerbe290852017-06-01 12:42:09 -0700859 gpr_asprintf(&s, " %p", cq->outstanding_tags[i]);
Craig Tiller4e41e362016-08-19 13:12:54 -0700860 gpr_strvec_add(&v, s);
861 }
Craig Tillerbe290852017-06-01 12:42:09 -0700862 gpr_mu_unlock(cq->mu);
Craig Tiller4782d922017-11-10 09:53:21 -0800863 char* out = gpr_strvec_flatten(&v, nullptr);
Craig Tiller4e41e362016-08-19 13:12:54 -0700864 gpr_strvec_destroy(&v);
865 gpr_log(GPR_DEBUG, "%s", out);
866 gpr_free(out);
867}
Craig Tiller49c644c2016-08-19 13:52:23 -0700868#else
Craig Tillerbaa14a92017-11-03 09:09:36 -0700869static void dump_pending_tags(grpc_completion_queue* cq) {}
Craig Tiller4e41e362016-08-19 13:12:54 -0700870#endif
871
Craig Tillerbaa14a92017-11-03 09:09:36 -0700872static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
873 void* reserved) {
Craig Tiller64be9f72015-05-04 14:53:51 -0700874 grpc_event ret;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700875 cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq);
Sree Kuchibhotla321881d2017-02-27 11:25:28 -0800876
Craig Tiller0ba432d2015-10-09 16:57:11 -0700877 GPR_TIMER_BEGIN("grpc_completion_queue_next", 0);
Craig Tiller86253ca2015-10-08 13:31:02 -0700878
Masood Malekghassemi76c3d742015-08-19 18:22:53 -0700879 GRPC_API_TRACE(
880 "grpc_completion_queue_next("
Craig Tiller6f0bd2c2017-05-26 08:15:35 -0700881 "cq=%p, "
Ken Payson62a6c922016-06-24 11:53:54 -0700882 "deadline=gpr_timespec { tv_sec: %" PRId64
883 ", tv_nsec: %d, clock_type: %d }, "
Craig Tiller4de3e4f2015-10-05 08:55:50 -0700884 "reserved=%p)",
Craig Tillerbaa14a92017-11-03 09:09:36 -0700885 5,
886 (cq, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
887 reserved));
Craig Tillera82950e2015-09-22 12:33:20 -0700888 GPR_ASSERT(!reserved);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800889
Craig Tiller6f0bd2c2017-05-26 08:15:35 -0700890 dump_pending_tags(cq);
Craig Tiller4e41e362016-08-19 13:12:54 -0700891
Craig Tiller6f0bd2c2017-05-26 08:15:35 -0700892 GRPC_CQ_INTERNAL_REF(cq, "next");
Sree Kuchibhotlaa72d79b2017-04-24 19:45:20 -0700893
Craig Tiller9a8c3f32017-07-21 13:14:14 -0700894 grpc_millis deadline_millis = grpc_timespec_to_millis_round_up(deadline);
Craig Tilleraef3a792016-08-24 15:13:53 -0700895 cq_is_finished_arg is_finished_arg = {
Yash Tibrewald8b84a22017-09-25 13:38:03 -0700896 gpr_atm_no_barrier_load(&cqd->things_queued_ever),
897 cq,
Craig Tiller52910232017-10-05 10:28:15 -0700898 deadline_millis,
Craig Tiller4782d922017-11-10 09:53:21 -0800899 nullptr,
900 nullptr,
Yash Tibrewald8b84a22017-09-25 13:38:03 -0700901 true};
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800902 grpc_exec_ctx exec_ctx =
903 GRPC_EXEC_CTX_INITIALIZER(0, cq_is_next_finished, &is_finished_arg);
Craig Tillera82950e2015-09-22 12:33:20 -0700904 for (;;) {
Craig Tillerfa5c6212017-07-17 14:10:44 -0700905 grpc_millis iteration_deadline = deadline_millis;
Sree Kuchibhotla55d0b492017-04-12 17:33:50 -0700906
Craig Tiller4782d922017-11-10 09:53:21 -0800907 if (is_finished_arg.stolen_completion != nullptr) {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700908 grpc_cq_completion* c = is_finished_arg.stolen_completion;
Craig Tiller4782d922017-11-10 09:53:21 -0800909 is_finished_arg.stolen_completion = nullptr;
Craig Tiller2f42fde2016-08-04 15:13:18 -0700910 ret.type = GRPC_OP_COMPLETE;
911 ret.success = c->next & 1u;
912 ret.tag = c->tag;
913 c->done(&exec_ctx, c->done_arg, c);
914 break;
915 }
Sree Kuchibhotla94aff9e2017-04-10 10:25:03 -0700916
Craig Tillerbaa14a92017-11-03 09:09:36 -0700917 grpc_cq_completion* c = cq_event_queue_pop(&cqd->queue);
Sree Kuchibhotla94aff9e2017-04-10 10:25:03 -0700918
Craig Tiller4782d922017-11-10 09:53:21 -0800919 if (c != nullptr) {
Craig Tillera82950e2015-09-22 12:33:20 -0700920 ret.type = GRPC_OP_COMPLETE;
921 ret.success = c->next & 1u;
922 ret.tag = c->tag;
923 c->done(&exec_ctx, c->done_arg, c);
924 break;
Sree Kuchibhotla55d0b492017-04-12 17:33:50 -0700925 } else {
926 /* If c == NULL it means either the queue is empty OR in an transient
927 inconsistent state. If it is the latter, we shold do a 0-timeout poll
928 so that the thread comes back quickly from poll to make a second
Craig Tiller39a73272017-07-05 08:36:04 -0700929 attempt at popping. Not doing this can potentially deadlock this
Sree Kuchibhotlaa0bfba22017-07-27 15:24:04 -0700930 thread forever (if the deadline is infinity) */
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700931 if (cq_event_queue_num_items(&cqd->queue) > 0) {
Craig Tillerfa5c6212017-07-17 14:10:44 -0700932 iteration_deadline = 0;
Sree Kuchibhotla55d0b492017-04-12 17:33:50 -0700933 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800934 }
Sree Kuchibhotla94aff9e2017-04-10 10:25:03 -0700935
Sree Kuchibhotlac7cebe82017-09-13 17:51:26 -0700936 if (gpr_atm_acq_load(&cqd->pending_events) == 0) {
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700937 /* Before returning, check if the queue has any items left over (since
938 gpr_mpscq_pop() can sometimes return NULL even if the queue is not
939 empty. If so, keep retrying but do not return GRPC_QUEUE_SHUTDOWN */
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700940 if (cq_event_queue_num_items(&cqd->queue) > 0) {
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700941 /* Go to the beginning of the loop. No point doing a poll because
Craig Tiller39a73272017-07-05 08:36:04 -0700942 (cq->shutdown == true) is only possible when there is no pending
Sree Kuchibhotlaa0bfba22017-07-27 15:24:04 -0700943 work (i.e cq->pending_events == 0) and any outstanding completion
944 events should have already been queued on this cq */
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700945 continue;
946 }
947
Craig Tillera82950e2015-09-22 12:33:20 -0700948 memset(&ret, 0, sizeof(ret));
949 ret.type = GRPC_QUEUE_SHUTDOWN;
950 break;
951 }
Sree Kuchibhotla94aff9e2017-04-10 10:25:03 -0700952
Craig Tillerdc3998e2017-05-12 09:55:30 -0700953 if (!is_finished_arg.first_loop &&
954 grpc_exec_ctx_now(&exec_ctx) >= deadline_millis) {
Craig Tillera82950e2015-09-22 12:33:20 -0700955 memset(&ret, 0, sizeof(ret));
956 ret.type = GRPC_QUEUE_TIMEOUT;
Craig Tiller6f0bd2c2017-05-26 08:15:35 -0700957 dump_pending_tags(cq);
Craig Tillera82950e2015-09-22 12:33:20 -0700958 break;
959 }
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800960
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700961 /* The main polling work happens in grpc_pollset_work */
Craig Tillerbe290852017-06-01 12:42:09 -0700962 gpr_mu_lock(cq->mu);
963 cq->num_polls++;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700964 grpc_error* err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq),
Craig Tiller4782d922017-11-10 09:53:21 -0800965 nullptr, iteration_deadline);
Craig Tillerbe290852017-06-01 12:42:09 -0700966 gpr_mu_unlock(cq->mu);
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700967
Sree Kuchibhotla94aff9e2017-04-10 10:25:03 -0700968 if (err != GRPC_ERROR_NONE) {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700969 const char* msg = grpc_error_string(err);
Sree Kuchibhotla94aff9e2017-04-10 10:25:03 -0700970 gpr_log(GPR_ERROR, "Completion queue next failed: %s", msg);
971
972 GRPC_ERROR_UNREF(err);
973 memset(&ret, 0, sizeof(ret));
974 ret.type = GRPC_QUEUE_TIMEOUT;
Craig Tiller6f0bd2c2017-05-26 08:15:35 -0700975 dump_pending_tags(cq);
Sree Kuchibhotla94aff9e2017-04-10 10:25:03 -0700976 break;
Craig Tillerccdea192016-02-16 08:06:46 -0800977 }
Craig Tiller2c4043b2016-09-05 14:50:16 -0700978 is_finished_arg.first_loop = false;
Craig Tillera82950e2015-09-22 12:33:20 -0700979 }
Sree Kuchibhotla94aff9e2017-04-10 10:25:03 -0700980
Craig Tillerf809ca32017-05-26 09:45:16 -0700981 if (cq_event_queue_num_items(&cqd->queue) > 0 &&
Sree Kuchibhotlac7cebe82017-09-13 17:51:26 -0700982 gpr_atm_acq_load(&cqd->pending_events) > 0) {
Craig Tillerbe290852017-06-01 12:42:09 -0700983 gpr_mu_lock(cq->mu);
Craig Tiller4782d922017-11-10 09:53:21 -0800984 cq->poller_vtable->kick(&exec_ctx, POLLSET_FROM_CQ(cq), nullptr);
Craig Tillerbe290852017-06-01 12:42:09 -0700985 gpr_mu_unlock(cq->mu);
Craig Tiller6f0bd2c2017-05-26 08:15:35 -0700986 }
987
Sree Kuchibhotlaa0bfba22017-07-27 15:24:04 -0700988 GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
989 GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "next");
990 grpc_exec_ctx_finish(&exec_ctx);
Craig Tiller4782d922017-11-10 09:53:21 -0800991 GPR_ASSERT(is_finished_arg.stolen_completion == nullptr);
Sree Kuchibhotlaa0bfba22017-07-27 15:24:04 -0700992
Craig Tiller0ba432d2015-10-09 16:57:11 -0700993 GPR_TIMER_END("grpc_completion_queue_next", 0);
Craig Tiller86253ca2015-10-08 13:31:02 -0700994
Craig Tiller64be9f72015-05-04 14:53:51 -0700995 return ret;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800996}
997
Craig Tillercaf8ea92017-06-01 16:08:58 -0700998/* Finishes the completion queue shutdown. This means that there are no more
999 completion events / tags expected from the completion queue
1000 - Must be called under completion queue lock
1001 - Must be called only once in completion queue's lifetime
1002 - grpc_completion_queue_shutdown() MUST have been called before calling
1003 this function */
Craig Tillerbaa14a92017-11-03 09:09:36 -07001004static void cq_finish_shutdown_next(grpc_exec_ctx* exec_ctx,
1005 grpc_completion_queue* cq) {
1006 cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq);
Craig Tillercaf8ea92017-06-01 16:08:58 -07001007
1008 GPR_ASSERT(cqd->shutdown_called);
1009 GPR_ASSERT(gpr_atm_no_barrier_load(&cqd->pending_events) == 0);
1010
1011 cq->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cq),
1012 &cq->pollset_shutdown_done);
1013}
1014
Craig Tillerbaa14a92017-11-03 09:09:36 -07001015static void cq_shutdown_next(grpc_exec_ctx* exec_ctx,
1016 grpc_completion_queue* cq) {
1017 cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq);
Craig Tillercaf8ea92017-06-01 16:08:58 -07001018
Sree Kuchibhotlacfce4512017-08-02 23:30:40 -07001019 /* Need an extra ref for cq here because:
1020 * We call cq_finish_shutdown_next() below, that would call pollset shutdown.
1021 * Pollset shutdown decrements the cq ref count which can potentially destroy
1022 * the cq (if that happens to be the last ref).
1023 * Creating an extra ref here prevents the cq from getting destroyed while
1024 * this function is still active */
Craig Tillercaf8ea92017-06-01 16:08:58 -07001025 GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
1026 gpr_mu_lock(cq->mu);
1027 if (cqd->shutdown_called) {
1028 gpr_mu_unlock(cq->mu);
Craig Tiller9c070bf2017-07-05 14:59:53 -07001029 GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
Craig Tillercaf8ea92017-06-01 16:08:58 -07001030 return;
1031 }
Sree Kuchibhotlacfce4512017-08-02 23:30:40 -07001032 cqd->shutdown_called = true;
Sree Kuchibhotla0dbfde62017-09-18 10:11:37 -07001033 /* Doing a full_fetch_add (i.e acq/release) here to match with
1034 * cq_begin_op_for_next and and cq_end_op_for_next functions which read/write
1035 * on this counter without necessarily holding a lock on cq */
Craig Tillercaf8ea92017-06-01 16:08:58 -07001036 if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
1037 cq_finish_shutdown_next(exec_ctx, cq);
1038 }
1039 gpr_mu_unlock(cq->mu);
1040 GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
1041}
1042
Craig Tillerbaa14a92017-11-03 09:09:36 -07001043grpc_event grpc_completion_queue_next(grpc_completion_queue* cq,
1044 gpr_timespec deadline, void* reserved) {
Craig Tiller6f0bd2c2017-05-26 08:15:35 -07001045 return cq->vtable->next(cq, deadline, reserved);
Sree Kuchibhotla736dd902017-04-25 18:26:53 -07001046}
1047
Craig Tillerbaa14a92017-11-03 09:09:36 -07001048static int add_plucker(grpc_completion_queue* cq, void* tag,
1049 grpc_pollset_worker** worker) {
1050 cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq);
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -07001051 if (cqd->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) {
Craig Tillera82950e2015-09-22 12:33:20 -07001052 return 0;
1053 }
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -07001054 cqd->pluckers[cqd->num_pluckers].tag = tag;
1055 cqd->pluckers[cqd->num_pluckers].worker = worker;
1056 cqd->num_pluckers++;
Craig Tiller791e78a2015-08-01 16:20:17 -07001057 return 1;
Craig Tiller5ddbb9d2015-07-29 15:58:11 -07001058}
1059
Craig Tillerbaa14a92017-11-03 09:09:36 -07001060static void del_plucker(grpc_completion_queue* cq, void* tag,
1061 grpc_pollset_worker** worker) {
1062 cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq);
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -07001063 for (int i = 0; i < cqd->num_pluckers; i++) {
1064 if (cqd->pluckers[i].tag == tag && cqd->pluckers[i].worker == worker) {
1065 cqd->num_pluckers--;
1066 GPR_SWAP(plucker, cqd->pluckers[i], cqd->pluckers[cqd->num_pluckers]);
Craig Tillera82950e2015-09-22 12:33:20 -07001067 return;
Craig Tiller5ddbb9d2015-07-29 15:58:11 -07001068 }
Craig Tillera82950e2015-09-22 12:33:20 -07001069 }
yang-gb063c872015-10-07 11:40:13 -07001070 GPR_UNREACHABLE_CODE(return );
Craig Tiller5ddbb9d2015-07-29 15:58:11 -07001071}
1072
Craig Tillerbaa14a92017-11-03 09:09:36 -07001073static bool cq_is_pluck_finished(grpc_exec_ctx* exec_ctx, void* arg) {
1074 cq_is_finished_arg* a = (cq_is_finished_arg*)arg;
1075 grpc_completion_queue* cq = a->cq;
1076 cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq);
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -07001077
Craig Tiller4782d922017-11-10 09:53:21 -08001078 GPR_ASSERT(a->stolen_completion == nullptr);
Craig Tilleraef3a792016-08-24 15:13:53 -07001079 gpr_atm current_last_seen_things_queued_ever =
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -07001080 gpr_atm_no_barrier_load(&cqd->things_queued_ever);
Craig Tilleraef3a792016-08-24 15:13:53 -07001081 if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) {
Craig Tillerbe290852017-06-01 12:42:09 -07001082 gpr_mu_lock(cq->mu);
Craig Tilleraef3a792016-08-24 15:13:53 -07001083 a->last_seen_things_queued_ever =
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -07001084 gpr_atm_no_barrier_load(&cqd->things_queued_ever);
Craig Tillerbaa14a92017-11-03 09:09:36 -07001085 grpc_cq_completion* c;
1086 grpc_cq_completion* prev = &cqd->completed_head;
1087 while ((c = (grpc_cq_completion*)(prev->next & ~(uintptr_t)1)) !=
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -07001088 &cqd->completed_head) {
Craig Tilleraef3a792016-08-24 15:13:53 -07001089 if (c->tag == a->tag) {
1090 prev->next = (prev->next & (uintptr_t)1) | (c->next & ~(uintptr_t)1);
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -07001091 if (c == cqd->completed_tail) {
1092 cqd->completed_tail = prev;
Craig Tilleraef3a792016-08-24 15:13:53 -07001093 }
Craig Tillerbe290852017-06-01 12:42:09 -07001094 gpr_mu_unlock(cq->mu);
Craig Tilleraef3a792016-08-24 15:13:53 -07001095 a->stolen_completion = c;
1096 return true;
Craig Tiller2f42fde2016-08-04 15:13:18 -07001097 }
Craig Tilleraef3a792016-08-24 15:13:53 -07001098 prev = c;
Craig Tiller2f42fde2016-08-04 15:13:18 -07001099 }
Craig Tillerbe290852017-06-01 12:42:09 -07001100 gpr_mu_unlock(cq->mu);
Craig Tiller2f42fde2016-08-04 15:13:18 -07001101 }
Craig Tillerdc3998e2017-05-12 09:55:30 -07001102 return !a->first_loop && a->deadline < grpc_exec_ctx_now(exec_ctx);
Craig Tiller2f42fde2016-08-04 15:13:18 -07001103}
1104
Craig Tillerbaa14a92017-11-03 09:09:36 -07001105static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
1106 gpr_timespec deadline, void* reserved) {
Craig Tiller64be9f72015-05-04 14:53:51 -07001107 grpc_event ret;
Craig Tillerbaa14a92017-11-03 09:09:36 -07001108 grpc_cq_completion* c;
1109 grpc_cq_completion* prev;
Craig Tiller4782d922017-11-10 09:53:21 -08001110 grpc_pollset_worker* worker = nullptr;
Craig Tillerbaa14a92017-11-03 09:09:36 -07001111 cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001112
Craig Tiller0ba432d2015-10-09 16:57:11 -07001113 GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0);
Craig Tiller86253ca2015-10-08 13:31:02 -07001114
Craig Tiller84f75d42017-05-03 13:06:35 -07001115 if (GRPC_TRACER_ON(grpc_cq_pluck_trace)) {
murgatroid999f6a8052016-06-15 17:39:03 -07001116 GRPC_API_TRACE(
1117 "grpc_completion_queue_pluck("
Craig Tiller6f0bd2c2017-05-26 08:15:35 -07001118 "cq=%p, tag=%p, "
murgatroid99b2edc6c2016-06-27 14:24:18 -07001119 "deadline=gpr_timespec { tv_sec: %" PRId64
1120 ", tv_nsec: %d, clock_type: %d }, "
murgatroid999f6a8052016-06-15 17:39:03 -07001121 "reserved=%p)",
Craig Tillerbaa14a92017-11-03 09:09:36 -07001122 6,
1123 (cq, tag, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
1124 reserved));
murgatroid999f6a8052016-06-15 17:39:03 -07001125 }
Craig Tillera82950e2015-09-22 12:33:20 -07001126 GPR_ASSERT(!reserved);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001127
Craig Tiller6f0bd2c2017-05-26 08:15:35 -07001128 dump_pending_tags(cq);
Craig Tiller4e41e362016-08-19 13:12:54 -07001129
Craig Tiller6f0bd2c2017-05-26 08:15:35 -07001130 GRPC_CQ_INTERNAL_REF(cq, "pluck");
Craig Tillerbe290852017-06-01 12:42:09 -07001131 gpr_mu_lock(cq->mu);
Craig Tiller9a8c3f32017-07-21 13:14:14 -07001132 grpc_millis deadline_millis = grpc_timespec_to_millis_round_up(deadline);
Craig Tilleraef3a792016-08-24 15:13:53 -07001133 cq_is_finished_arg is_finished_arg = {
Yash Tibrewal37fdb732017-09-25 16:45:02 -07001134 gpr_atm_no_barrier_load(&cqd->things_queued_ever),
1135 cq,
Craig Tiller52910232017-10-05 10:28:15 -07001136 deadline_millis,
Craig Tiller4782d922017-11-10 09:53:21 -08001137 nullptr,
Yash Tibrewal37fdb732017-09-25 16:45:02 -07001138 tag,
1139 true};
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001140 grpc_exec_ctx exec_ctx =
1141 GRPC_EXEC_CTX_INITIALIZER(0, cq_is_pluck_finished, &is_finished_arg);
Craig Tillera82950e2015-09-22 12:33:20 -07001142 for (;;) {
Craig Tiller4782d922017-11-10 09:53:21 -08001143 if (is_finished_arg.stolen_completion != nullptr) {
Craig Tillerbe290852017-06-01 12:42:09 -07001144 gpr_mu_unlock(cq->mu);
Craig Tiller5f70fc62016-08-04 16:00:00 -07001145 c = is_finished_arg.stolen_completion;
Craig Tiller4782d922017-11-10 09:53:21 -08001146 is_finished_arg.stolen_completion = nullptr;
Craig Tiller2f42fde2016-08-04 15:13:18 -07001147 ret.type = GRPC_OP_COMPLETE;
1148 ret.success = c->next & 1u;
1149 ret.tag = c->tag;
1150 c->done(&exec_ctx, c->done_arg, c);
1151 break;
1152 }
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -07001153 prev = &cqd->completed_head;
Craig Tillerbaa14a92017-11-03 09:09:36 -07001154 while ((c = (grpc_cq_completion*)(prev->next & ~(uintptr_t)1)) !=
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -07001155 &cqd->completed_head) {
Craig Tillera82950e2015-09-22 12:33:20 -07001156 if (c->tag == tag) {
Craig Tiller7536af02015-12-22 13:49:30 -08001157 prev->next = (prev->next & (uintptr_t)1) | (c->next & ~(uintptr_t)1);
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -07001158 if (c == cqd->completed_tail) {
1159 cqd->completed_tail = prev;
Craig Tillera82950e2015-09-22 12:33:20 -07001160 }
Craig Tillerbe290852017-06-01 12:42:09 -07001161 gpr_mu_unlock(cq->mu);
Craig Tillera82950e2015-09-22 12:33:20 -07001162 ret.type = GRPC_OP_COMPLETE;
1163 ret.success = c->next & 1u;
1164 ret.tag = c->tag;
1165 c->done(&exec_ctx, c->done_arg, c);
1166 goto done;
1167 }
1168 prev = c;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001169 }
Sree Kuchibhotla8e368452017-04-25 15:45:02 -07001170 if (gpr_atm_no_barrier_load(&cqd->shutdown)) {
Craig Tillerbe290852017-06-01 12:42:09 -07001171 gpr_mu_unlock(cq->mu);
Craig Tillera82950e2015-09-22 12:33:20 -07001172 memset(&ret, 0, sizeof(ret));
1173 ret.type = GRPC_QUEUE_SHUTDOWN;
1174 break;
1175 }
Craig Tiller6f0bd2c2017-05-26 08:15:35 -07001176 if (!add_plucker(cq, tag, &worker)) {
Craig Tillera82950e2015-09-22 12:33:20 -07001177 gpr_log(GPR_DEBUG,
1178 "Too many outstanding grpc_completion_queue_pluck calls: maximum "
1179 "is %d",
1180 GRPC_MAX_COMPLETION_QUEUE_PLUCKERS);
Craig Tillerbe290852017-06-01 12:42:09 -07001181 gpr_mu_unlock(cq->mu);
Craig Tillera82950e2015-09-22 12:33:20 -07001182 memset(&ret, 0, sizeof(ret));
1183 /* TODO(ctiller): should we use a different result here */
1184 ret.type = GRPC_QUEUE_TIMEOUT;
Craig Tiller6f0bd2c2017-05-26 08:15:35 -07001185 dump_pending_tags(cq);
Craig Tillera82950e2015-09-22 12:33:20 -07001186 break;
1187 }
Craig Tillerdc3998e2017-05-12 09:55:30 -07001188 if (!is_finished_arg.first_loop &&
1189 grpc_exec_ctx_now(&exec_ctx) >= deadline_millis) {
Craig Tiller6f0bd2c2017-05-26 08:15:35 -07001190 del_plucker(cq, tag, &worker);
Craig Tillerbe290852017-06-01 12:42:09 -07001191 gpr_mu_unlock(cq->mu);
Craig Tillera82950e2015-09-22 12:33:20 -07001192 memset(&ret, 0, sizeof(ret));
1193 ret.type = GRPC_QUEUE_TIMEOUT;
Craig Tiller6f0bd2c2017-05-26 08:15:35 -07001194 dump_pending_tags(cq);
Craig Tillera82950e2015-09-22 12:33:20 -07001195 break;
1196 }
Craig Tillerbe290852017-06-01 12:42:09 -07001197 cq->num_polls++;
Craig Tillerbaa14a92017-11-03 09:09:36 -07001198 grpc_error* err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq),
Craig Tillerdc3998e2017-05-12 09:55:30 -07001199 &worker, deadline_millis);
Craig Tillerc3571792017-05-02 12:33:38 -07001200 if (err != GRPC_ERROR_NONE) {
Craig Tiller6f0bd2c2017-05-26 08:15:35 -07001201 del_plucker(cq, tag, &worker);
Craig Tillerbe290852017-06-01 12:42:09 -07001202 gpr_mu_unlock(cq->mu);
Craig Tillerbaa14a92017-11-03 09:09:36 -07001203 const char* msg = grpc_error_string(err);
Sree Kuchibhotla467ad202017-05-11 13:15:07 -07001204 gpr_log(GPR_ERROR, "Completion queue pluck failed: %s", msg);
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001205
Craig Tillerc3571792017-05-02 12:33:38 -07001206 GRPC_ERROR_UNREF(err);
1207 memset(&ret, 0, sizeof(ret));
1208 ret.type = GRPC_QUEUE_TIMEOUT;
Craig Tiller6f0bd2c2017-05-26 08:15:35 -07001209 dump_pending_tags(cq);
Craig Tillerc3571792017-05-02 12:33:38 -07001210 break;
Craig Tillerccdea192016-02-16 08:06:46 -08001211 }
Craig Tiller2c4043b2016-09-05 14:50:16 -07001212 is_finished_arg.first_loop = false;
Craig Tiller6f0bd2c2017-05-26 08:15:35 -07001213 del_plucker(cq, tag, &worker);
Craig Tillera82950e2015-09-22 12:33:20 -07001214 }
Craig Tiller97fc6a32015-07-08 15:31:35 -07001215done:
Craig Tiller6f0bd2c2017-05-26 08:15:35 -07001216 GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
1217 GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "pluck");
Craig Tillera82950e2015-09-22 12:33:20 -07001218 grpc_exec_ctx_finish(&exec_ctx);
Craig Tiller4782d922017-11-10 09:53:21 -08001219 GPR_ASSERT(is_finished_arg.stolen_completion == nullptr);
Craig Tiller86253ca2015-10-08 13:31:02 -07001220
Craig Tiller0ba432d2015-10-09 16:57:11 -07001221 GPR_TIMER_END("grpc_completion_queue_pluck", 0);
Craig Tiller86253ca2015-10-08 13:31:02 -07001222
Craig Tiller64be9f72015-05-04 14:53:51 -07001223 return ret;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001224}
1225
Craig Tillerbaa14a92017-11-03 09:09:36 -07001226grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag,
1227 gpr_timespec deadline, void* reserved) {
Craig Tiller6f0bd2c2017-05-26 08:15:35 -07001228 return cq->vtable->pluck(cq, tag, deadline, reserved);
Sree Kuchibhotla8ac5c6d2017-05-11 14:13:11 -07001229}
Sree Kuchibhotla736dd902017-04-25 18:26:53 -07001230
Craig Tillerbaa14a92017-11-03 09:09:36 -07001231static void cq_finish_shutdown_pluck(grpc_exec_ctx* exec_ctx,
1232 grpc_completion_queue* cq) {
1233 cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq);
Craig Tillerbe290852017-06-01 12:42:09 -07001234
1235 GPR_ASSERT(cqd->shutdown_called);
1236 GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown));
1237 gpr_atm_no_barrier_store(&cqd->shutdown, 1);
1238
1239 cq->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cq),
1240 &cq->pollset_shutdown_done);
1241}
1242
Sree Kuchibhotlacfce4512017-08-02 23:30:40 -07001243/* NOTE: This function is almost exactly identical to cq_shutdown_next() but
1244 * merging them is a bit tricky and probably not worth it */
Craig Tillerbaa14a92017-11-03 09:09:36 -07001245static void cq_shutdown_pluck(grpc_exec_ctx* exec_ctx,
1246 grpc_completion_queue* cq) {
1247 cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq);
Craig Tillerbe290852017-06-01 12:42:09 -07001248
Sree Kuchibhotlacfce4512017-08-02 23:30:40 -07001249 /* Need an extra ref for cq here because:
1250 * We call cq_finish_shutdown_pluck() below, that would call pollset shutdown.
1251 * Pollset shutdown decrements the cq ref count which can potentially destroy
1252 * the cq (if that happens to be the last ref).
1253 * Creating an extra ref here prevents the cq from getting destroyed while
1254 * this function is still active */
1255 GRPC_CQ_INTERNAL_REF(cq, "shutting_down (pluck cq)");
Craig Tillerbe290852017-06-01 12:42:09 -07001256 gpr_mu_lock(cq->mu);
1257 if (cqd->shutdown_called) {
1258 gpr_mu_unlock(cq->mu);
Sree Kuchibhotlacfce4512017-08-02 23:30:40 -07001259 GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down (pluck cq)");
Craig Tillerbe290852017-06-01 12:42:09 -07001260 return;
1261 }
Sree Kuchibhotlacfce4512017-08-02 23:30:40 -07001262 cqd->shutdown_called = true;
1263 if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
Craig Tillerbe290852017-06-01 12:42:09 -07001264 cq_finish_shutdown_pluck(exec_ctx, cq);
1265 }
1266 gpr_mu_unlock(cq->mu);
Sree Kuchibhotlacfce4512017-08-02 23:30:40 -07001267 GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down (pluck cq)");
Sree Kuchibhotla8ac5c6d2017-05-11 14:13:11 -07001268}
1269
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001270/* Shutdown simply drops a ref that we reserved at creation time; if we drop
1271 to zero here, then enter shutdown mode and wake up any waiters */
Craig Tillerbaa14a92017-11-03 09:09:36 -07001272void grpc_completion_queue_shutdown(grpc_completion_queue* cq) {
Craig Tillerf5768a62015-09-22 10:54:34 -07001273 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001274 GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0);
Craig Tiller6f0bd2c2017-05-26 08:15:35 -07001275 GRPC_API_TRACE("grpc_completion_queue_shutdown(cq=%p)", 1, (cq));
Craig Tillerbe290852017-06-01 12:42:09 -07001276 cq->vtable->shutdown(&exec_ctx, cq);
Craig Tillera82950e2015-09-22 12:33:20 -07001277 grpc_exec_ctx_finish(&exec_ctx);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001278 GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001279}
1280
Craig Tillerbaa14a92017-11-03 09:09:36 -07001281void grpc_completion_queue_destroy(grpc_completion_queue* cq) {
Craig Tiller6f0bd2c2017-05-26 08:15:35 -07001282 GRPC_API_TRACE("grpc_completion_queue_destroy(cq=%p)", 1, (cq));
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001283 GPR_TIMER_BEGIN("grpc_completion_queue_destroy", 0);
Craig Tiller6f0bd2c2017-05-26 08:15:35 -07001284 grpc_completion_queue_shutdown(cq);
Sree Kuchibhotla8ac5c6d2017-05-11 14:13:11 -07001285
Sree Kuchibhotla8ac5c6d2017-05-11 14:13:11 -07001286 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
Craig Tiller6f0bd2c2017-05-26 08:15:35 -07001287 GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "destroy");
Sree Kuchibhotla8ac5c6d2017-05-11 14:13:11 -07001288 grpc_exec_ctx_finish(&exec_ctx);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001289 GPR_TIMER_END("grpc_completion_queue_destroy", 0);
David Klempnerb5056612015-02-24 14:22:50 -08001290}
1291
Craig Tillerbaa14a92017-11-03 09:09:36 -07001292grpc_pollset* grpc_cq_pollset(grpc_completion_queue* cq) {
Craig Tiller4782d922017-11-10 09:53:21 -08001293 return cq->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cq) : nullptr;
Craig Tiller190d3602015-02-18 09:23:38 -08001294}
Craig Tilleraec96aa2015-04-07 14:32:15 -07001295
Craig Tillerbaa14a92017-11-03 09:09:36 -07001296bool grpc_cq_can_listen(grpc_completion_queue* cq) {
Craig Tiller6f0bd2c2017-05-26 08:15:35 -07001297 return cq->poller_vtable->can_listen;
Craig Tiller11c58322017-04-12 08:21:17 -07001298}