blob: b04aee6c73b4d58e0e0a2169329adbdf36c759d1 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2015-2016 gRPC authors.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080016 *
17 */
Craig Tiller9533d042016-03-25 17:11:06 -070018#include "src/core/lib/surface/completion_queue.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080019
20#include <stdio.h>
21#include <string.h>
22
Craig Tiller69b093b2016-02-25 19:04:07 -080023#include <grpc/support/alloc.h>
24#include <grpc/support/atm.h>
25#include <grpc/support/log.h>
Craig Tiller4e41e362016-08-19 13:12:54 -070026#include <grpc/support/string_util.h>
Craig Tiller69b093b2016-02-25 19:04:07 -080027#include <grpc/support/time.h>
28
Craig Tiller9533d042016-03-25 17:11:06 -070029#include "src/core/lib/iomgr/pollset.h"
30#include "src/core/lib/iomgr/timer.h"
31#include "src/core/lib/profiling/timers.h"
Sree Kuchibhotlafe5f2352017-04-18 13:46:10 -070032#include "src/core/lib/support/spinlock.h"
Craig Tiller9533d042016-03-25 17:11:06 -070033#include "src/core/lib/support/string.h"
34#include "src/core/lib/surface/api_trace.h"
35#include "src/core/lib/surface/call.h"
36#include "src/core/lib/surface/event_string.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080037
Craig Tiller3b654362017-05-04 08:11:17 -070038grpc_tracer_flag grpc_trace_operation_failures = GRPC_TRACER_INITIALIZER(false);
Craig Tiller4e41e362016-08-19 13:12:54 -070039#ifndef NDEBUG
Craig Tiller3b654362017-05-04 08:11:17 -070040grpc_tracer_flag grpc_trace_pending_tags = GRPC_TRACER_INITIALIZER(false);
ncteisend39010e2017-06-08 17:08:07 -070041grpc_tracer_flag grpc_trace_cq_refcount = GRPC_TRACER_INITIALIZER(false);
Craig Tiller4e41e362016-08-19 13:12:54 -070042#endif
Craig Tillera286b042016-06-13 15:20:39 +000043
Craig Tillera82950e2015-09-22 12:33:20 -070044typedef struct {
Craig Tillerd0a8ae12016-02-18 08:01:19 -080045 grpc_pollset_worker **worker;
Craig Tiller5ddbb9d2015-07-29 15:58:11 -070046 void *tag;
47} plucker;
48
Craig Tiller2f40ff42017-04-11 16:01:19 -070049typedef struct {
Craig Tiller75bfb972017-04-11 17:55:12 -070050 bool can_get_pollset;
Craig Tiller11c58322017-04-12 08:21:17 -070051 bool can_listen;
Craig Tiller2f40ff42017-04-11 16:01:19 -070052 size_t (*size)(void);
53 void (*init)(grpc_pollset *pollset, gpr_mu **mu);
54 grpc_error *(*kick)(grpc_pollset *pollset,
55 grpc_pollset_worker *specific_worker);
56 grpc_error *(*work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
57 grpc_pollset_worker **worker, gpr_timespec now,
58 gpr_timespec deadline);
59 void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
60 grpc_closure *closure);
Craig Tiller890b5842017-04-24 19:33:53 +000061 void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset);
Craig Tiller2f40ff42017-04-11 16:01:19 -070062} cq_poller_vtable;
63
64typedef struct non_polling_worker {
65 gpr_cv cv;
66 bool kicked;
67 struct non_polling_worker *next;
68 struct non_polling_worker *prev;
69} non_polling_worker;
70
71typedef struct {
72 gpr_mu mu;
73 non_polling_worker *root;
74 grpc_closure *shutdown;
75} non_polling_poller;
76
77static size_t non_polling_poller_size(void) {
78 return sizeof(non_polling_poller);
79}
80
81static void non_polling_poller_init(grpc_pollset *pollset, gpr_mu **mu) {
82 non_polling_poller *npp = (non_polling_poller *)pollset;
83 gpr_mu_init(&npp->mu);
84 *mu = &npp->mu;
85}
86
Craig Tiller956920d2017-04-24 13:09:12 -070087static void non_polling_poller_destroy(grpc_exec_ctx *exec_ctx,
88 grpc_pollset *pollset) {
Craig Tiller2f40ff42017-04-11 16:01:19 -070089 non_polling_poller *npp = (non_polling_poller *)pollset;
90 gpr_mu_destroy(&npp->mu);
91}
92
93static grpc_error *non_polling_poller_work(grpc_exec_ctx *exec_ctx,
94 grpc_pollset *pollset,
95 grpc_pollset_worker **worker,
96 gpr_timespec now,
97 gpr_timespec deadline) {
98 non_polling_poller *npp = (non_polling_poller *)pollset;
Craig Tiller75bfb972017-04-11 17:55:12 -070099 if (npp->shutdown) return GRPC_ERROR_NONE;
Craig Tiller2f40ff42017-04-11 16:01:19 -0700100 non_polling_worker w;
101 gpr_cv_init(&w.cv);
Craig Tiller75bfb972017-04-11 17:55:12 -0700102 if (worker != NULL) *worker = (grpc_pollset_worker *)&w;
Craig Tiller2f40ff42017-04-11 16:01:19 -0700103 if (npp->root == NULL) {
104 npp->root = w.next = w.prev = &w;
105 } else {
106 w.next = npp->root;
107 w.prev = w.next->prev;
108 w.next->prev = w.prev->next = &w;
109 }
110 w.kicked = false;
111 while (!npp->shutdown && !w.kicked && !gpr_cv_wait(&w.cv, &npp->mu, deadline))
112 ;
113 if (&w == npp->root) {
114 npp->root = w.next;
115 if (&w == npp->root) {
116 if (npp->shutdown) {
ncteisen969b46e2017-06-08 14:57:11 -0700117 GRPC_CLOSURE_SCHED(exec_ctx, npp->shutdown, GRPC_ERROR_NONE);
Craig Tiller2f40ff42017-04-11 16:01:19 -0700118 }
119 npp->root = NULL;
120 }
Craig Tiller2f40ff42017-04-11 16:01:19 -0700121 }
Craig Tiller75bfb972017-04-11 17:55:12 -0700122 w.next->prev = w.prev;
123 w.prev->next = w.next;
Craig Tiller2f40ff42017-04-11 16:01:19 -0700124 gpr_cv_destroy(&w.cv);
Craig Tiller75bfb972017-04-11 17:55:12 -0700125 if (worker != NULL) *worker = NULL;
Craig Tiller2f40ff42017-04-11 16:01:19 -0700126 return GRPC_ERROR_NONE;
127}
128
129static grpc_error *non_polling_poller_kick(
130 grpc_pollset *pollset, grpc_pollset_worker *specific_worker) {
131 non_polling_poller *p = (non_polling_poller *)pollset;
132 if (specific_worker == NULL) specific_worker = (grpc_pollset_worker *)p->root;
133 if (specific_worker != NULL) {
134 non_polling_worker *w = (non_polling_worker *)specific_worker;
135 if (!w->kicked) {
136 w->kicked = true;
137 gpr_cv_signal(&w->cv);
138 }
139 }
140 return GRPC_ERROR_NONE;
141}
142
143static void non_polling_poller_shutdown(grpc_exec_ctx *exec_ctx,
144 grpc_pollset *pollset,
145 grpc_closure *closure) {
146 non_polling_poller *p = (non_polling_poller *)pollset;
147 GPR_ASSERT(closure != NULL);
148 p->shutdown = closure;
149 if (p->root == NULL) {
ncteisen969b46e2017-06-08 14:57:11 -0700150 GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE);
Craig Tiller2f40ff42017-04-11 16:01:19 -0700151 } else {
152 non_polling_worker *w = p->root;
153 do {
154 gpr_cv_signal(&w->cv);
155 w = w->next;
156 } while (w != p->root);
157 }
158}
159
160static const cq_poller_vtable g_poller_vtable_by_poller_type[] = {
161 /* GRPC_CQ_DEFAULT_POLLING */
Craig Tiller75bfb972017-04-11 17:55:12 -0700162 {.can_get_pollset = true,
Craig Tiller11c58322017-04-12 08:21:17 -0700163 .can_listen = true,
Craig Tiller75bfb972017-04-11 17:55:12 -0700164 .size = grpc_pollset_size,
Craig Tiller2f40ff42017-04-11 16:01:19 -0700165 .init = grpc_pollset_init,
166 .kick = grpc_pollset_kick,
167 .work = grpc_pollset_work,
168 .shutdown = grpc_pollset_shutdown,
169 .destroy = grpc_pollset_destroy},
170 /* GRPC_CQ_NON_LISTENING */
Craig Tiller75bfb972017-04-11 17:55:12 -0700171 {.can_get_pollset = true,
Craig Tiller11c58322017-04-12 08:21:17 -0700172 .can_listen = false,
Craig Tiller75bfb972017-04-11 17:55:12 -0700173 .size = grpc_pollset_size,
Craig Tiller2f40ff42017-04-11 16:01:19 -0700174 .init = grpc_pollset_init,
175 .kick = grpc_pollset_kick,
176 .work = grpc_pollset_work,
177 .shutdown = grpc_pollset_shutdown,
178 .destroy = grpc_pollset_destroy},
179 /* GRPC_CQ_NON_POLLING */
Craig Tiller75bfb972017-04-11 17:55:12 -0700180 {.can_get_pollset = false,
Craig Tiller11c58322017-04-12 08:21:17 -0700181 .can_listen = false,
Craig Tiller75bfb972017-04-11 17:55:12 -0700182 .size = non_polling_poller_size,
Craig Tiller2f40ff42017-04-11 16:01:19 -0700183 .init = non_polling_poller_init,
184 .kick = non_polling_poller_kick,
185 .work = non_polling_poller_work,
186 .shutdown = non_polling_poller_shutdown,
187 .destroy = non_polling_poller_destroy},
188};
189
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700190typedef struct cq_vtable {
191 grpc_cq_completion_type cq_completion_type;
192 size_t (*size)();
193 void (*begin_op)(grpc_completion_queue *cc, void *tag);
194 void (*end_op)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, void *tag,
195 grpc_error *error,
196 void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg,
197 grpc_cq_completion *storage),
198 void *done_arg, grpc_cq_completion *storage);
199 grpc_event (*next)(grpc_completion_queue *cc, gpr_timespec deadline,
200 void *reserved);
201 grpc_event (*pluck)(grpc_completion_queue *cc, void *tag,
202 gpr_timespec deadline, void *reserved);
203} cq_vtable;
204
205/* Queue that holds the cq_completion_events. Internally uses gpr_mpscq queue
206 * (a lockfree multiproducer single consumer queue). It uses a queue_lock
207 * to support multiple consumers.
208 * Only used in completion queues whose completion_type is GRPC_CQ_NEXT */
Sree Kuchibhotla078a3402017-04-11 21:23:03 -0700209typedef struct grpc_cq_event_queue {
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700210 /* Spinlock to serialize consumers i.e pop() operations */
Sree Kuchibhotlafe5f2352017-04-18 13:46:10 -0700211 gpr_spinlock queue_lock;
Sree Kuchibhotla078a3402017-04-11 21:23:03 -0700212
213 gpr_mpscq queue;
214
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700215 /* A lazy counter of number of items in the queue. This is NOT atomically
216 incremented/decremented along with push/pop operations and hence is only
217 eventually consistent */
Sree Kuchibhotla078a3402017-04-11 21:23:03 -0700218 gpr_atm num_queue_items;
219} grpc_cq_event_queue;
220
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700221/* TODO: sreek Refactor this based on the completion_type. Put completion-type
222 * specific data in a different structure (and co-allocate memory for it along
223 * with completion queue + pollset )*/
224typedef struct cq_data {
Craig Tiller69b093b2016-02-25 19:04:07 -0800225 gpr_mu *mu;
Sree Kuchibhotla7a4e5b42017-02-17 09:28:46 -0800226
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700227 /** Completed events for completion-queues of type GRPC_CQ_PLUCK */
Craig Tiller97fc6a32015-07-08 15:31:35 -0700228 grpc_cq_completion completed_head;
229 grpc_cq_completion *completed_tail;
Sree Kuchibhotla5461a8b2017-04-10 09:52:40 -0700230
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700231 /** Completed events for completion-queues of type GRPC_CQ_NEXT */
Sree Kuchibhotla078a3402017-04-11 21:23:03 -0700232 grpc_cq_event_queue queue;
Sree Kuchibhotla5461a8b2017-04-10 09:52:40 -0700233
Craig Tiller97fc6a32015-07-08 15:31:35 -0700234 /** Number of pending events (+1 if we're not shutdown) */
235 gpr_refcount pending_events;
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700236
Craig Tiller97fc6a32015-07-08 15:31:35 -0700237 /** Once owning_refs drops to zero, we will destroy the cq */
Craig Tiller5717a982015-04-27 12:01:49 -0700238 gpr_refcount owning_refs;
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700239
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700240 /** Counter of how many things have ever been queued on this completion queue
Craig Tilleraef3a792016-08-24 15:13:53 -0700241 useful for avoiding locks to check the queue */
242 gpr_atm things_queued_ever;
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700243
Craig Tiller97fc6a32015-07-08 15:31:35 -0700244 /** 0 initially, 1 once we've begun shutting down */
Sree Kuchibhotla94aff9e2017-04-10 10:25:03 -0700245 gpr_atm shutdown;
Craig Tillerf5fd4ba2015-03-02 18:01:21 +0000246 int shutdown_called;
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700247
Craig Tillerb56975c2015-06-15 10:11:16 -0700248 int is_server_cq;
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700249
Craig Tiller5ddbb9d2015-07-29 15:58:11 -0700250 int num_pluckers;
Yuxuan Li87827e02017-05-05 14:12:42 -0700251 int num_polls;
Craig Tiller489df072015-08-01 16:15:45 -0700252 plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800253 grpc_closure pollset_shutdown_done;
254
Craig Tiller4bf29282015-12-14 11:25:48 -0800255#ifndef NDEBUG
256 void **outstanding_tags;
257 size_t outstanding_tag_count;
258 size_t outstanding_tag_capacity;
259#endif
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700260} cq_data;
Craig Tiller4bf29282015-12-14 11:25:48 -0800261
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700262/* Completion queue structure */
263struct grpc_completion_queue {
264 cq_data data;
265 const cq_vtable *vtable;
266 const cq_poller_vtable *poller_vtable;
267};
268
Sree Kuchibhotla736dd902017-04-25 18:26:53 -0700269/* Forward declarations */
270static void cq_finish_shutdown(grpc_exec_ctx *exec_ctx,
271 grpc_completion_queue *cc);
272
273static size_t cq_size(grpc_completion_queue *cc);
274
275static void cq_begin_op(grpc_completion_queue *cc, void *tag);
276
277static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
278 grpc_completion_queue *cc, void *tag,
279 grpc_error *error,
280 void (*done)(grpc_exec_ctx *exec_ctx,
281 void *done_arg,
282 grpc_cq_completion *storage),
283 void *done_arg, grpc_cq_completion *storage);
284
285static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
286 grpc_completion_queue *cc, void *tag,
287 grpc_error *error,
288 void (*done)(grpc_exec_ctx *exec_ctx,
289 void *done_arg,
290 grpc_cq_completion *storage),
291 void *done_arg, grpc_cq_completion *storage);
292
293static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline,
294 void *reserved);
295
296static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag,
297 gpr_timespec deadline, void *reserved);
298
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700299/* Completion queue vtables based on the completion-type */
300static const cq_vtable g_cq_vtable[] = {
301 /* GRPC_CQ_NEXT */
302 {.cq_completion_type = GRPC_CQ_NEXT,
Sree Kuchibhotla736dd902017-04-25 18:26:53 -0700303 .size = cq_size,
304 .begin_op = cq_begin_op,
305 .end_op = cq_end_op_for_next,
306 .next = cq_next,
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700307 .pluck = NULL},
308 /* GRPC_CQ_PLUCK */
309 {.cq_completion_type = GRPC_CQ_PLUCK,
Sree Kuchibhotla736dd902017-04-25 18:26:53 -0700310 .size = cq_size,
311 .begin_op = cq_begin_op,
312 .end_op = cq_end_op_for_pluck,
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700313 .next = NULL,
Sree Kuchibhotla736dd902017-04-25 18:26:53 -0700314 .pluck = cq_pluck},
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800315};
316
Craig Tiller69b093b2016-02-25 19:04:07 -0800317#define POLLSET_FROM_CQ(cq) ((grpc_pollset *)(cq + 1))
Craig Tiller40945c72016-05-16 17:15:24 -0700318#define CQ_FROM_POLLSET(ps) (((grpc_completion_queue *)ps) - 1)
Craig Tiller69b093b2016-02-25 19:04:07 -0800319
Craig Tiller84f75d42017-05-03 13:06:35 -0700320grpc_tracer_flag grpc_cq_pluck_trace = GRPC_TRACER_INITIALIZER(true);
321grpc_tracer_flag grpc_cq_event_timeout_trace = GRPC_TRACER_INITIALIZER(true);
murgatroid999f6a8052016-06-15 17:39:03 -0700322
Craig Tiller84f75d42017-05-03 13:06:35 -0700323#define GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, event) \
324 if (GRPC_TRACER_ON(grpc_api_trace) && \
325 (GRPC_TRACER_ON(grpc_cq_pluck_trace) || \
326 (event)->type != GRPC_QUEUE_TIMEOUT)) { \
327 char *_ev = grpc_event_string(event); \
328 gpr_log(GPR_INFO, "RETURN_EVENT[%p]: %s", cq, _ev); \
329 gpr_free(_ev); \
murgatroid999f6a8052016-06-15 17:39:03 -0700330 }
331
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800332static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc,
Craig Tillerf51457b2016-05-03 17:06:32 -0700333 grpc_error *error);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800334
Sree Kuchibhotla078a3402017-04-11 21:23:03 -0700335static void cq_event_queue_init(grpc_cq_event_queue *q) {
336 gpr_mpscq_init(&q->queue);
Sree Kuchibhotlafe5f2352017-04-18 13:46:10 -0700337 q->queue_lock = GPR_SPINLOCK_INITIALIZER;
Sree Kuchibhotla078a3402017-04-11 21:23:03 -0700338 gpr_atm_no_barrier_store(&q->num_queue_items, 0);
339}
340
341static void cq_event_queue_destroy(grpc_cq_event_queue *q) {
342 gpr_mpscq_destroy(&q->queue);
Sree Kuchibhotla078a3402017-04-11 21:23:03 -0700343}
344
345static void cq_event_queue_push(grpc_cq_event_queue *q, grpc_cq_completion *c) {
346 gpr_mpscq_push(&q->queue, (gpr_mpscq_node *)c);
347 gpr_atm_no_barrier_fetch_add(&q->num_queue_items, 1);
348}
349
350static grpc_cq_completion *cq_event_queue_pop(grpc_cq_event_queue *q) {
Sree Kuchibhotlafe5f2352017-04-18 13:46:10 -0700351 grpc_cq_completion *c = NULL;
352 if (gpr_spinlock_trylock(&q->queue_lock)) {
353 c = (grpc_cq_completion *)gpr_mpscq_pop(&q->queue);
354 gpr_spinlock_unlock(&q->queue_lock);
355 }
356
Sree Kuchibhotla078a3402017-04-11 21:23:03 -0700357 if (c) {
358 gpr_atm_no_barrier_fetch_add(&q->num_queue_items, -1);
359 }
360
361 return c;
362}
363
364/* Note: The counter is not incremented/decremented atomically with push/pop.
365 * The count is only eventually consistent */
366static long cq_event_queue_num_items(grpc_cq_event_queue *q) {
Sree Kuchibhotlafe5f2352017-04-18 13:46:10 -0700367 return (long)gpr_atm_no_barrier_load(&q->num_queue_items);
Sree Kuchibhotla078a3402017-04-11 21:23:03 -0700368}
369
Sree Kuchibhotla736dd902017-04-25 18:26:53 -0700370static size_t cq_size(grpc_completion_queue *cc) {
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700371 /* Size of the completion queue and the size of the pollset whose memory is
372 allocated right after that of completion queue */
373 return sizeof(grpc_completion_queue) + cc->poller_vtable->size();
374}
375
Sree Kuchibhotla2abbf8a2017-03-21 17:31:03 -0700376grpc_completion_queue *grpc_completion_queue_create_internal(
377 grpc_cq_completion_type completion_type,
378 grpc_cq_polling_type polling_type) {
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800379 grpc_completion_queue *cc;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800380
Sree Kuchibhotla2abbf8a2017-03-21 17:31:03 -0700381 GPR_TIMER_BEGIN("grpc_completion_queue_create_internal", 0);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800382
Sree Kuchibhotla2abbf8a2017-03-21 17:31:03 -0700383 GRPC_API_TRACE(
384 "grpc_completion_queue_create_internal(completion_type=%d, "
385 "polling_type=%d)",
386 2, (completion_type, polling_type));
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800387
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700388 const cq_vtable *vtable = &g_cq_vtable[completion_type];
Craig Tiller2f40ff42017-04-11 16:01:19 -0700389 const cq_poller_vtable *poller_vtable =
390 &g_poller_vtable_by_poller_type[polling_type];
391
392 cc = gpr_zalloc(sizeof(grpc_completion_queue) + poller_vtable->size());
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700393 cq_data *cqd = &cc->data;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800394
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700395 cc->vtable = vtable;
Craig Tiller2f40ff42017-04-11 16:01:19 -0700396 cc->poller_vtable = poller_vtable;
Sree Kuchibhotla321881d2017-02-27 11:25:28 -0800397
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700398 poller_vtable->init(POLLSET_FROM_CQ(cc), &cc->data.mu);
399
Craig Tiller4bf29282015-12-14 11:25:48 -0800400#ifndef NDEBUG
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700401 cqd->outstanding_tags = NULL;
402 cqd->outstanding_tag_capacity = 0;
Craig Tiller4bf29282015-12-14 11:25:48 -0800403#endif
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700404
405 /* Initial ref is dropped by grpc_completion_queue_shutdown */
406 gpr_ref_init(&cqd->pending_events, 1);
407 /* One for destroy(), one for pollset_shutdown */
408 gpr_ref_init(&cqd->owning_refs, 2);
409 cqd->completed_tail = &cqd->completed_head;
410 cqd->completed_head.next = (uintptr_t)cqd->completed_tail;
411 gpr_atm_no_barrier_store(&cqd->shutdown, 0);
412 cqd->shutdown_called = 0;
413 cqd->is_server_cq = 0;
414 cqd->num_pluckers = 0;
Yuxuan Li6ca796d2017-05-15 14:15:34 -0700415 cqd->num_polls = 0;
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700416 gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0);
417#ifndef NDEBUG
418 cqd->outstanding_tag_count = 0;
419#endif
420 cq_event_queue_init(&cqd->queue);
ncteisen969b46e2017-06-08 14:57:11 -0700421 GRPC_CLOSURE_INIT(&cqd->pollset_shutdown_done, on_pollset_shutdown_done, cc,
Craig Tillerd4654562017-01-03 08:45:56 -0800422 grpc_schedule_on_exec_ctx);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800423
Sree Kuchibhotla2abbf8a2017-03-21 17:31:03 -0700424 GPR_TIMER_END("grpc_completion_queue_create_internal", 0);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800425
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800426 return cc;
427}
428
Sree Kuchibhotla321881d2017-02-27 11:25:28 -0800429grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc) {
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700430 return cc->vtable->cq_completion_type;
Sree Kuchibhotla321881d2017-02-27 11:25:28 -0800431}
432
Yuxuan Li85d3a532017-05-08 06:18:52 +0000433int grpc_get_cq_poll_num(grpc_completion_queue *cc) {
Yuxuan Li87827e02017-05-05 14:12:42 -0700434 int cur_num_polls;
Yuxuan Li6ca796d2017-05-15 14:15:34 -0700435 gpr_mu_lock(cc->data.mu);
436 cur_num_polls = cc->data.num_polls;
437 gpr_mu_unlock(cc->data.mu);
Yuxuan Li87827e02017-05-05 14:12:42 -0700438 return cur_num_polls;
Yuxuan Li999ac152017-05-03 21:36:36 -0700439}
440
ncteisend39010e2017-06-08 17:08:07 -0700441#ifndef NDEBUG
Craig Tillera82950e2015-09-22 12:33:20 -0700442void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason,
443 const char *file, int line) {
Sree Kuchibhotla60b282e2017-04-25 12:57:29 -0700444 cq_data *cqd = &cc->data;
ncteisend39010e2017-06-08 17:08:07 -0700445 if (GRPC_TRACER_ON(grpc_trace_cq_refcount)) {
446 gpr_atm val = gpr_atm_no_barrier_load(&cqd->owning_refs.count);
ncteisen3bc25582017-06-09 10:35:35 -0700447 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
448 "CQ:%p ref %" PRIdPTR " -> %" PRIdPTR " %s", cc, val, val + 1,
449 reason);
ncteisend39010e2017-06-08 17:08:07 -0700450 }
Craig Tiller463f2372015-05-28 16:16:15 -0700451#else
Craig Tillera82950e2015-09-22 12:33:20 -0700452void grpc_cq_internal_ref(grpc_completion_queue *cc) {
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700453 cq_data *cqd = &cc->data;
Craig Tiller463f2372015-05-28 16:16:15 -0700454#endif
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700455 gpr_ref(&cqd->owning_refs);
Craig Tiller5717a982015-04-27 12:01:49 -0700456}
457
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800458static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg,
Craig Tillerf51457b2016-05-03 17:06:32 -0700459 grpc_error *error) {
Craig Tiller5717a982015-04-27 12:01:49 -0700460 grpc_completion_queue *cc = arg;
Craig Tillerf8401102017-04-17 09:47:28 -0700461 GRPC_CQ_INTERNAL_UNREF(exec_ctx, cc, "pollset_destroy");
Craig Tiller5717a982015-04-27 12:01:49 -0700462}
463
ncteisend39010e2017-06-08 17:08:07 -0700464#ifndef NDEBUG
ncteisen3bc25582017-06-09 10:35:35 -0700465void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
466 const char *reason, const char *file, int line) {
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700467 cq_data *cqd = &cc->data;
ncteisend39010e2017-06-08 17:08:07 -0700468 if (GRPC_TRACER_ON(grpc_trace_cq_refcount)) {
469 gpr_atm val = gpr_atm_no_barrier_load(&cqd->owning_refs.count);
ncteisen3bc25582017-06-09 10:35:35 -0700470 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
471 "CQ:%p unref %" PRIdPTR " -> %" PRIdPTR " %s", cc, val, val - 1,
472 reason);
ncteisend39010e2017-06-08 17:08:07 -0700473 }
Craig Tiller463f2372015-05-28 16:16:15 -0700474#else
Craig Tillerf8401102017-04-17 09:47:28 -0700475void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx,
476 grpc_completion_queue *cc) {
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700477 cq_data *cqd = &cc->data;
Craig Tiller463f2372015-05-28 16:16:15 -0700478#endif
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700479 if (gpr_unref(&cqd->owning_refs)) {
480 GPR_ASSERT(cqd->completed_head.next == (uintptr_t)&cqd->completed_head);
Craig Tiller890b5842017-04-24 19:33:53 +0000481 cc->poller_vtable->destroy(exec_ctx, POLLSET_FROM_CQ(cc));
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700482 cq_event_queue_destroy(&cqd->queue);
Craig Tiller07e8fb12017-02-09 21:36:24 -0800483#ifndef NDEBUG
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700484 gpr_free(cqd->outstanding_tags);
Craig Tiller07e8fb12017-02-09 21:36:24 -0800485#endif
486 gpr_free(cc);
Craig Tillera82950e2015-09-22 12:33:20 -0700487 }
Craig Tiller5717a982015-04-27 12:01:49 -0700488}
489
Sree Kuchibhotla736dd902017-04-25 18:26:53 -0700490static void cq_begin_op(grpc_completion_queue *cc, void *tag) {
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700491 cq_data *cqd = &cc->data;
Craig Tiller402acf62015-08-05 10:43:10 -0700492#ifndef NDEBUG
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700493 gpr_mu_lock(cqd->mu);
494 GPR_ASSERT(!cqd->shutdown_called);
495 if (cqd->outstanding_tag_count == cqd->outstanding_tag_capacity) {
496 cqd->outstanding_tag_capacity =
497 GPR_MAX(4, 2 * cqd->outstanding_tag_capacity);
498 cqd->outstanding_tags =
499 gpr_realloc(cqd->outstanding_tags, sizeof(*cqd->outstanding_tags) *
500 cqd->outstanding_tag_capacity);
Craig Tiller4bf29282015-12-14 11:25:48 -0800501 }
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700502 cqd->outstanding_tags[cqd->outstanding_tag_count++] = tag;
503 gpr_mu_unlock(cqd->mu);
Craig Tiller402acf62015-08-05 10:43:10 -0700504#endif
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700505 gpr_ref(&cqd->pending_events);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800506}
507
Sree Kuchibhotla736dd902017-04-25 18:26:53 -0700508void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) {
509 cc->vtable->begin_op(cc, tag);
510}
Craig Tiller97fc6a32015-07-08 15:31:35 -0700511
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700512#ifndef NDEBUG
Sree Kuchibhotla736dd902017-04-25 18:26:53 -0700513static void cq_check_tag(grpc_completion_queue *cc, void *tag, bool lock_cq) {
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700514 cq_data *cqd = &cc->data;
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700515 int found = 0;
516 if (lock_cq) {
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700517 gpr_mu_lock(cqd->mu);
Craig Tillercfc8ae12016-05-10 20:49:54 -0700518 }
Craig Tillerce1f19e2015-10-09 16:16:04 -0700519
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700520 for (int i = 0; i < (int)cqd->outstanding_tag_count; i++) {
521 if (cqd->outstanding_tags[i] == tag) {
522 cqd->outstanding_tag_count--;
523 GPR_SWAP(void *, cqd->outstanding_tags[i],
524 cqd->outstanding_tags[cqd->outstanding_tag_count]);
Craig Tiller4bf29282015-12-14 11:25:48 -0800525 found = 1;
526 break;
527 }
528 }
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700529
530 if (lock_cq) {
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700531 gpr_mu_unlock(cqd->mu);
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700532 }
533
Craig Tiller4bf29282015-12-14 11:25:48 -0800534 GPR_ASSERT(found);
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700535}
536#else
Sree Kuchibhotla736dd902017-04-25 18:26:53 -0700537static void cq_check_tag(grpc_completion_queue *cc, void *tag, bool lock_cq) {}
Craig Tiller4bf29282015-12-14 11:25:48 -0800538#endif
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700539
540/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion
541 * type of GRPC_CQ_NEXT) */
Sree Kuchibhotla736dd902017-04-25 18:26:53 -0700542static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
543 grpc_completion_queue *cc, void *tag,
544 grpc_error *error,
545 void (*done)(grpc_exec_ctx *exec_ctx,
546 void *done_arg,
547 grpc_cq_completion *storage),
548 void *done_arg, grpc_cq_completion *storage) {
549 GPR_TIMER_BEGIN("cq_end_op_for_next", 0);
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700550
Craig Tiller84f75d42017-05-03 13:06:35 -0700551 if (GRPC_TRACER_ON(grpc_api_trace) ||
552 (GRPC_TRACER_ON(grpc_trace_operation_failures) &&
553 error != GRPC_ERROR_NONE)) {
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700554 const char *errmsg = grpc_error_string(error);
555 GRPC_API_TRACE(
Sree Kuchibhotla736dd902017-04-25 18:26:53 -0700556 "cq_end_op_for_next(exec_ctx=%p, cc=%p, tag=%p, error=%s, "
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700557 "done=%p, done_arg=%p, storage=%p)",
558 7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage));
Craig Tiller84f75d42017-05-03 13:06:35 -0700559 if (GRPC_TRACER_ON(grpc_trace_operation_failures) &&
560 error != GRPC_ERROR_NONE) {
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700561 gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
Craig Tiller5ddbb9d2015-07-29 15:58:11 -0700562 }
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700563 }
564
565 cq_data *cqd = &cc->data;
566 int is_success = (error == GRPC_ERROR_NONE);
567
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700568 storage->tag = tag;
569 storage->done = done;
570 storage->done_arg = done_arg;
Sree Kuchibhotla453c6112017-04-11 23:25:42 -0700571 storage->next = (uintptr_t)(is_success);
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700572
Sree Kuchibhotla736dd902017-04-25 18:26:53 -0700573 cq_check_tag(cc, tag, true); /* Used in debug builds only */
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700574
575 /* Add the completion to the queue */
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700576 cq_event_queue_push(&cqd->queue, storage);
577 gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1);
Sree Kuchibhotla5461a8b2017-04-10 09:52:40 -0700578
Sree Kuchibhotla8e368452017-04-25 15:45:02 -0700579 gpr_mu_lock(cqd->mu);
yang-g67cda0a2017-05-17 17:23:47 -0700580
581 int shutdown = gpr_unref(&cqd->pending_events);
Sree Kuchibhotla8e368452017-04-25 15:45:02 -0700582 if (!shutdown) {
Sree Kuchibhotlaa72d79b2017-04-24 19:45:20 -0700583 grpc_error *kick_error = cc->poller_vtable->kick(POLLSET_FROM_CQ(cc), NULL);
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700584 gpr_mu_unlock(cqd->mu);
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700585
Craig Tiller4f1d0f32016-05-06 17:12:37 -0700586 if (kick_error != GRPC_ERROR_NONE) {
587 const char *msg = grpc_error_string(kick_error);
588 gpr_log(GPR_ERROR, "Kick failed: %s", msg);
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800589
Craig Tiller4f1d0f32016-05-06 17:12:37 -0700590 GRPC_ERROR_UNREF(kick_error);
591 }
Craig Tillera82950e2015-09-22 12:33:20 -0700592 } else {
Sree Kuchibhotla8e368452017-04-25 15:45:02 -0700593 cq_finish_shutdown(exec_ctx, cc);
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700594 gpr_mu_unlock(cqd->mu);
Craig Tillera82950e2015-09-22 12:33:20 -0700595 }
Craig Tillerce1f19e2015-10-09 16:16:04 -0700596
Sree Kuchibhotla736dd902017-04-25 18:26:53 -0700597 GPR_TIMER_END("cq_end_op_for_next", 0);
Craig Tillercae4b1b2016-05-10 09:11:09 -0700598
599 GRPC_ERROR_UNREF(error);
Craig Tillercce17ac2015-01-20 09:29:28 -0800600}
601
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700602/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion
603 * type of GRPC_CQ_PLUCK) */
Sree Kuchibhotla736dd902017-04-25 18:26:53 -0700604static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
605 grpc_completion_queue *cc, void *tag,
606 grpc_error *error,
607 void (*done)(grpc_exec_ctx *exec_ctx,
608 void *done_arg,
609 grpc_cq_completion *storage),
610 void *done_arg, grpc_cq_completion *storage) {
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700611 cq_data *cqd = &cc->data;
612 int is_success = (error == GRPC_ERROR_NONE);
613
Sree Kuchibhotla736dd902017-04-25 18:26:53 -0700614 GPR_TIMER_BEGIN("cq_end_op_for_pluck", 0);
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700615
Sree Kuchibhotla8ac5c6d2017-05-11 14:13:11 -0700616 if (GRPC_TRACER_ON(grpc_api_trace) ||
617 (GRPC_TRACER_ON(grpc_trace_operation_failures) &&
618 error != GRPC_ERROR_NONE)) {
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700619 const char *errmsg = grpc_error_string(error);
620 GRPC_API_TRACE(
Sree Kuchibhotla736dd902017-04-25 18:26:53 -0700621 "cq_end_op_for_pluck(exec_ctx=%p, cc=%p, tag=%p, error=%s, "
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700622 "done=%p, done_arg=%p, storage=%p)",
623 7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage));
Sree Kuchibhotla8ac5c6d2017-05-11 14:13:11 -0700624 if (GRPC_TRACER_ON(grpc_trace_operation_failures) &&
625 error != GRPC_ERROR_NONE) {
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700626 gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
627 }
628 }
629
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700630 storage->tag = tag;
631 storage->done = done;
632 storage->done_arg = done_arg;
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700633 storage->next = ((uintptr_t)&cqd->completed_head) | ((uintptr_t)(is_success));
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700634
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700635 gpr_mu_lock(cqd->mu);
Sree Kuchibhotla736dd902017-04-25 18:26:53 -0700636 cq_check_tag(cc, tag, false); /* Used in debug builds only */
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700637
638 /* Add to the list of completions */
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700639 gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1);
640 cqd->completed_tail->next =
641 ((uintptr_t)storage) | (1u & (uintptr_t)cqd->completed_tail->next);
642 cqd->completed_tail = storage;
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700643
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700644 int shutdown = gpr_unref(&cqd->pending_events);
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700645 if (!shutdown) {
646 grpc_pollset_worker *pluck_worker = NULL;
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700647 for (int i = 0; i < cqd->num_pluckers; i++) {
648 if (cqd->pluckers[i].tag == tag) {
649 pluck_worker = *cqd->pluckers[i].worker;
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700650 break;
651 }
652 }
653
654 grpc_error *kick_error =
Sree Kuchibhotlaa72d79b2017-04-24 19:45:20 -0700655 cc->poller_vtable->kick(POLLSET_FROM_CQ(cc), pluck_worker);
656
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700657 gpr_mu_unlock(cqd->mu);
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700658
659 if (kick_error != GRPC_ERROR_NONE) {
660 const char *msg = grpc_error_string(kick_error);
661 gpr_log(GPR_ERROR, "Kick failed: %s", msg);
662
663 GRPC_ERROR_UNREF(kick_error);
664 }
Sree Kuchibhotla5461a8b2017-04-10 09:52:40 -0700665 } else {
Sree Kuchibhotla8e368452017-04-25 15:45:02 -0700666 cq_finish_shutdown(exec_ctx, cc);
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700667 gpr_mu_unlock(cqd->mu);
Sree Kuchibhotla5461a8b2017-04-10 09:52:40 -0700668 }
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700669
Sree Kuchibhotla736dd902017-04-25 18:26:53 -0700670 GPR_TIMER_END("cq_end_op_for_pluck", 0);
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700671
672 GRPC_ERROR_UNREF(error);
Sree Kuchibhotla5461a8b2017-04-10 09:52:40 -0700673}
674
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800675void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
676 void *tag, grpc_error *error,
ctiller58393c22015-01-07 14:03:30 -0800677 void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800678 grpc_cq_completion *storage),
679 void *done_arg, grpc_cq_completion *storage) {
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700680 cc->vtable->end_op(exec_ctx, cc, tag, error, done, done_arg, storage);
ctiller58393c22015-01-07 14:03:30 -0800681}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800682
Craig Tiller2f42fde2016-08-04 15:13:18 -0700683typedef struct {
Craig Tilleraef3a792016-08-24 15:13:53 -0700684 gpr_atm last_seen_things_queued_ever;
Craig Tiller2f42fde2016-08-04 15:13:18 -0700685 grpc_completion_queue *cq;
686 gpr_timespec deadline;
687 grpc_cq_completion *stolen_completion;
688 void *tag; /* for pluck */
Craig Tiller2c4043b2016-09-05 14:50:16 -0700689 bool first_loop;
Craig Tiller2f42fde2016-08-04 15:13:18 -0700690} cq_is_finished_arg;
691
692static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) {
693 cq_is_finished_arg *a = arg;
694 grpc_completion_queue *cq = a->cq;
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700695 cq_data *cqd = &cq->data;
Craig Tiller2f42fde2016-08-04 15:13:18 -0700696 GPR_ASSERT(a->stolen_completion == NULL);
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700697
Craig Tilleraef3a792016-08-24 15:13:53 -0700698 gpr_atm current_last_seen_things_queued_ever =
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700699 gpr_atm_no_barrier_load(&cqd->things_queued_ever);
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700700
Craig Tilleraef3a792016-08-24 15:13:53 -0700701 if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) {
Craig Tilleraef3a792016-08-24 15:13:53 -0700702 a->last_seen_things_queued_ever =
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700703 gpr_atm_no_barrier_load(&cqd->things_queued_ever);
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700704
705 /* Pop a cq_completion from the queue. Returns NULL if the queue is empty
706 * might return NULL in some cases even if the queue is not empty; but that
707 * is ok and doesn't affect correctness. Might effect the tail latencies a
708 * bit) */
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700709 a->stolen_completion = cq_event_queue_pop(&cqd->queue);
Sree Kuchibhotla078a3402017-04-11 21:23:03 -0700710 if (a->stolen_completion != NULL) {
Craig Tilleraef3a792016-08-24 15:13:53 -0700711 return true;
Craig Tiller2f42fde2016-08-04 15:13:18 -0700712 }
Craig Tiller2f42fde2016-08-04 15:13:18 -0700713 }
Craig Tiller2c4043b2016-09-05 14:50:16 -0700714 return !a->first_loop &&
715 gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0;
Craig Tiller2f42fde2016-08-04 15:13:18 -0700716}
717
Craig Tiller4e41e362016-08-19 13:12:54 -0700718#ifndef NDEBUG
719static void dump_pending_tags(grpc_completion_queue *cc) {
Craig Tiller84f75d42017-05-03 13:06:35 -0700720 if (!GRPC_TRACER_ON(grpc_trace_pending_tags)) return;
Craig Tiller4e41e362016-08-19 13:12:54 -0700721
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700722 cq_data *cqd = &cc->data;
Craig Tiller4e41e362016-08-19 13:12:54 -0700723
724 gpr_strvec v;
725 gpr_strvec_init(&v);
726 gpr_strvec_add(&v, gpr_strdup("PENDING TAGS:"));
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700727 gpr_mu_lock(cqd->mu);
728 for (size_t i = 0; i < cqd->outstanding_tag_count; i++) {
Craig Tiller4e41e362016-08-19 13:12:54 -0700729 char *s;
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700730 gpr_asprintf(&s, " %p", cqd->outstanding_tags[i]);
Craig Tiller4e41e362016-08-19 13:12:54 -0700731 gpr_strvec_add(&v, s);
732 }
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700733 gpr_mu_unlock(cqd->mu);
Craig Tiller4e41e362016-08-19 13:12:54 -0700734 char *out = gpr_strvec_flatten(&v, NULL);
735 gpr_strvec_destroy(&v);
736 gpr_log(GPR_DEBUG, "%s", out);
737 gpr_free(out);
738}
Craig Tiller49c644c2016-08-19 13:52:23 -0700739#else
740static void dump_pending_tags(grpc_completion_queue *cc) {}
Craig Tiller4e41e362016-08-19 13:12:54 -0700741#endif
742
Sree Kuchibhotla736dd902017-04-25 18:26:53 -0700743static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline,
744 void *reserved) {
Craig Tiller64be9f72015-05-04 14:53:51 -0700745 grpc_event ret;
Craig Tiller69f90e62015-08-06 08:32:35 -0700746 gpr_timespec now;
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700747 cq_data *cqd = &cc->data;
Sree Kuchibhotla321881d2017-02-27 11:25:28 -0800748
Craig Tiller0ba432d2015-10-09 16:57:11 -0700749 GPR_TIMER_BEGIN("grpc_completion_queue_next", 0);
Craig Tiller86253ca2015-10-08 13:31:02 -0700750
Masood Malekghassemi76c3d742015-08-19 18:22:53 -0700751 GRPC_API_TRACE(
752 "grpc_completion_queue_next("
Craig Tiller4de3e4f2015-10-05 08:55:50 -0700753 "cc=%p, "
Ken Payson62a6c922016-06-24 11:53:54 -0700754 "deadline=gpr_timespec { tv_sec: %" PRId64
755 ", tv_nsec: %d, clock_type: %d }, "
Craig Tiller4de3e4f2015-10-05 08:55:50 -0700756 "reserved=%p)",
Sree Kuchibhotla5bec1332017-03-24 10:03:22 -0700757 5, (cc, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
758 reserved));
Craig Tillera82950e2015-09-22 12:33:20 -0700759 GPR_ASSERT(!reserved);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800760
Craig Tiller4e41e362016-08-19 13:12:54 -0700761 dump_pending_tags(cc);
Craig Tiller4e41e362016-08-19 13:12:54 -0700762
Craig Tillera82950e2015-09-22 12:33:20 -0700763 deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
Craig Tiller6a7626c2015-07-19 22:21:41 -0700764
Craig Tillera82950e2015-09-22 12:33:20 -0700765 GRPC_CQ_INTERNAL_REF(cc, "next");
Sree Kuchibhotlaa72d79b2017-04-24 19:45:20 -0700766
Craig Tilleraef3a792016-08-24 15:13:53 -0700767 cq_is_finished_arg is_finished_arg = {
Craig Tiller2c4043b2016-09-05 14:50:16 -0700768 .last_seen_things_queued_ever =
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700769 gpr_atm_no_barrier_load(&cqd->things_queued_ever),
Craig Tiller2c4043b2016-09-05 14:50:16 -0700770 .cq = cc,
771 .deadline = deadline,
772 .stolen_completion = NULL,
773 .tag = NULL,
774 .first_loop = true};
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800775 grpc_exec_ctx exec_ctx =
776 GRPC_EXEC_CTX_INITIALIZER(0, cq_is_next_finished, &is_finished_arg);
Sree Kuchibhotla94aff9e2017-04-10 10:25:03 -0700777
Craig Tillera82950e2015-09-22 12:33:20 -0700778 for (;;) {
Sree Kuchibhotla55d0b492017-04-12 17:33:50 -0700779 gpr_timespec iteration_deadline = deadline;
780
Craig Tiller2f42fde2016-08-04 15:13:18 -0700781 if (is_finished_arg.stolen_completion != NULL) {
Craig Tiller2f42fde2016-08-04 15:13:18 -0700782 grpc_cq_completion *c = is_finished_arg.stolen_completion;
783 is_finished_arg.stolen_completion = NULL;
784 ret.type = GRPC_OP_COMPLETE;
785 ret.success = c->next & 1u;
786 ret.tag = c->tag;
787 c->done(&exec_ctx, c->done_arg, c);
788 break;
789 }
Sree Kuchibhotla94aff9e2017-04-10 10:25:03 -0700790
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700791 grpc_cq_completion *c = cq_event_queue_pop(&cqd->queue);
Sree Kuchibhotla94aff9e2017-04-10 10:25:03 -0700792
793 if (c != NULL) {
Craig Tillera82950e2015-09-22 12:33:20 -0700794 ret.type = GRPC_OP_COMPLETE;
795 ret.success = c->next & 1u;
796 ret.tag = c->tag;
797 c->done(&exec_ctx, c->done_arg, c);
798 break;
Sree Kuchibhotla55d0b492017-04-12 17:33:50 -0700799 } else {
800 /* If c == NULL it means either the queue is empty OR in an transient
801 inconsistent state. If it is the latter, we shold do a 0-timeout poll
802 so that the thread comes back quickly from poll to make a second
803 attempt at popping. Not doing this can potentially deadlock this thread
804 forever (if the deadline is infinity) */
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700805 if (cq_event_queue_num_items(&cqd->queue) > 0) {
Sree Kuchibhotla55d0b492017-04-12 17:33:50 -0700806 iteration_deadline = gpr_time_0(GPR_CLOCK_MONOTONIC);
807 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800808 }
Sree Kuchibhotla94aff9e2017-04-10 10:25:03 -0700809
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700810 if (gpr_atm_no_barrier_load(&cqd->shutdown)) {
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700811 /* Before returning, check if the queue has any items left over (since
812 gpr_mpscq_pop() can sometimes return NULL even if the queue is not
813 empty. If so, keep retrying but do not return GRPC_QUEUE_SHUTDOWN */
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700814 if (cq_event_queue_num_items(&cqd->queue) > 0) {
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700815 /* Go to the beginning of the loop. No point doing a poll because
816 (cc->shutdown == true) is only possible when there is no pending work
817 (i.e cc->pending_events == 0) and any outstanding grpc_cq_completion
818 events are already queued on this cq */
819 continue;
820 }
821
Craig Tillera82950e2015-09-22 12:33:20 -0700822 memset(&ret, 0, sizeof(ret));
823 ret.type = GRPC_QUEUE_SHUTDOWN;
824 break;
825 }
Sree Kuchibhotla94aff9e2017-04-10 10:25:03 -0700826
Craig Tillera82950e2015-09-22 12:33:20 -0700827 now = gpr_now(GPR_CLOCK_MONOTONIC);
Craig Tiller2c4043b2016-09-05 14:50:16 -0700828 if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) {
Craig Tillera82950e2015-09-22 12:33:20 -0700829 memset(&ret, 0, sizeof(ret));
830 ret.type = GRPC_QUEUE_TIMEOUT;
Craig Tiller49c644c2016-08-19 13:52:23 -0700831 dump_pending_tags(cc);
Craig Tillera82950e2015-09-22 12:33:20 -0700832 break;
833 }
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800834
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700835 /* The main polling work happens in grpc_pollset_work */
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700836 gpr_mu_lock(cqd->mu);
Yuxuan Li6ca796d2017-05-15 14:15:34 -0700837 cqd->num_polls++;
Sree Kuchibhotlaa72d79b2017-04-24 19:45:20 -0700838 grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc),
Sree Kuchibhotla8ac5c6d2017-05-11 14:13:11 -0700839 NULL, now, iteration_deadline);
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700840 gpr_mu_unlock(cqd->mu);
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -0700841
Sree Kuchibhotla94aff9e2017-04-10 10:25:03 -0700842 if (err != GRPC_ERROR_NONE) {
843 const char *msg = grpc_error_string(err);
844 gpr_log(GPR_ERROR, "Completion queue next failed: %s", msg);
845
846 GRPC_ERROR_UNREF(err);
847 memset(&ret, 0, sizeof(ret));
848 ret.type = GRPC_QUEUE_TIMEOUT;
849 dump_pending_tags(cc);
850 break;
Craig Tillerccdea192016-02-16 08:06:46 -0800851 }
Craig Tiller2c4043b2016-09-05 14:50:16 -0700852 is_finished_arg.first_loop = false;
Craig Tillera82950e2015-09-22 12:33:20 -0700853 }
Sree Kuchibhotla94aff9e2017-04-10 10:25:03 -0700854
Craig Tillera82950e2015-09-22 12:33:20 -0700855 GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
Craig Tillerf8401102017-04-17 09:47:28 -0700856 GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "next");
Craig Tillera82950e2015-09-22 12:33:20 -0700857 grpc_exec_ctx_finish(&exec_ctx);
Craig Tiller2f42fde2016-08-04 15:13:18 -0700858 GPR_ASSERT(is_finished_arg.stolen_completion == NULL);
Craig Tiller86253ca2015-10-08 13:31:02 -0700859
Craig Tiller0ba432d2015-10-09 16:57:11 -0700860 GPR_TIMER_END("grpc_completion_queue_next", 0);
Craig Tiller86253ca2015-10-08 13:31:02 -0700861
Craig Tiller64be9f72015-05-04 14:53:51 -0700862 return ret;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800863}
864
Sree Kuchibhotla736dd902017-04-25 18:26:53 -0700865grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
866 gpr_timespec deadline, void *reserved) {
867 return cc->vtable->next(cc, deadline, reserved);
868}
869
Craig Tillera82950e2015-09-22 12:33:20 -0700870static int add_plucker(grpc_completion_queue *cc, void *tag,
Craig Tillerd0a8ae12016-02-18 08:01:19 -0800871 grpc_pollset_worker **worker) {
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700872 cq_data *cqd = &cc->data;
873 if (cqd->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) {
Craig Tillera82950e2015-09-22 12:33:20 -0700874 return 0;
875 }
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700876 cqd->pluckers[cqd->num_pluckers].tag = tag;
877 cqd->pluckers[cqd->num_pluckers].worker = worker;
878 cqd->num_pluckers++;
Craig Tiller791e78a2015-08-01 16:20:17 -0700879 return 1;
Craig Tiller5ddbb9d2015-07-29 15:58:11 -0700880}
881
Craig Tillera82950e2015-09-22 12:33:20 -0700882static void del_plucker(grpc_completion_queue *cc, void *tag,
Craig Tillerd0a8ae12016-02-18 08:01:19 -0800883 grpc_pollset_worker **worker) {
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700884 cq_data *cqd = &cc->data;
885 for (int i = 0; i < cqd->num_pluckers; i++) {
886 if (cqd->pluckers[i].tag == tag && cqd->pluckers[i].worker == worker) {
887 cqd->num_pluckers--;
888 GPR_SWAP(plucker, cqd->pluckers[i], cqd->pluckers[cqd->num_pluckers]);
Craig Tillera82950e2015-09-22 12:33:20 -0700889 return;
Craig Tiller5ddbb9d2015-07-29 15:58:11 -0700890 }
Craig Tillera82950e2015-09-22 12:33:20 -0700891 }
yang-gb063c872015-10-07 11:40:13 -0700892 GPR_UNREACHABLE_CODE(return );
Craig Tiller5ddbb9d2015-07-29 15:58:11 -0700893}
894
Craig Tiller2f42fde2016-08-04 15:13:18 -0700895static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) {
896 cq_is_finished_arg *a = arg;
897 grpc_completion_queue *cq = a->cq;
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700898 cq_data *cqd = &cq->data;
899
Craig Tiller2f42fde2016-08-04 15:13:18 -0700900 GPR_ASSERT(a->stolen_completion == NULL);
Craig Tilleraef3a792016-08-24 15:13:53 -0700901 gpr_atm current_last_seen_things_queued_ever =
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700902 gpr_atm_no_barrier_load(&cqd->things_queued_ever);
Craig Tilleraef3a792016-08-24 15:13:53 -0700903 if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) {
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700904 gpr_mu_lock(cqd->mu);
Craig Tilleraef3a792016-08-24 15:13:53 -0700905 a->last_seen_things_queued_ever =
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700906 gpr_atm_no_barrier_load(&cqd->things_queued_ever);
Craig Tilleraef3a792016-08-24 15:13:53 -0700907 grpc_cq_completion *c;
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700908 grpc_cq_completion *prev = &cqd->completed_head;
Craig Tilleraef3a792016-08-24 15:13:53 -0700909 while ((c = (grpc_cq_completion *)(prev->next & ~(uintptr_t)1)) !=
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700910 &cqd->completed_head) {
Craig Tilleraef3a792016-08-24 15:13:53 -0700911 if (c->tag == a->tag) {
912 prev->next = (prev->next & (uintptr_t)1) | (c->next & ~(uintptr_t)1);
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700913 if (c == cqd->completed_tail) {
914 cqd->completed_tail = prev;
Craig Tilleraef3a792016-08-24 15:13:53 -0700915 }
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700916 gpr_mu_unlock(cqd->mu);
Craig Tilleraef3a792016-08-24 15:13:53 -0700917 a->stolen_completion = c;
918 return true;
Craig Tiller2f42fde2016-08-04 15:13:18 -0700919 }
Craig Tilleraef3a792016-08-24 15:13:53 -0700920 prev = c;
Craig Tiller2f42fde2016-08-04 15:13:18 -0700921 }
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700922 gpr_mu_unlock(cqd->mu);
Craig Tiller2f42fde2016-08-04 15:13:18 -0700923 }
Craig Tiller2c4043b2016-09-05 14:50:16 -0700924 return !a->first_loop &&
925 gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0;
Craig Tiller2f42fde2016-08-04 15:13:18 -0700926}
927
Sree Kuchibhotla736dd902017-04-25 18:26:53 -0700928static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag,
Sree Kuchibhotla1eabdab2017-04-26 18:52:51 -0700929 gpr_timespec deadline, void *reserved) {
Craig Tiller64be9f72015-05-04 14:53:51 -0700930 grpc_event ret;
Craig Tiller97fc6a32015-07-08 15:31:35 -0700931 grpc_cq_completion *c;
932 grpc_cq_completion *prev;
Craig Tillerd0a8ae12016-02-18 08:01:19 -0800933 grpc_pollset_worker *worker = NULL;
Craig Tiller4c06b822015-08-06 08:41:31 -0700934 gpr_timespec now;
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700935 cq_data *cqd = &cc->data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800936
Craig Tiller0ba432d2015-10-09 16:57:11 -0700937 GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0);
Craig Tiller86253ca2015-10-08 13:31:02 -0700938
Craig Tiller84f75d42017-05-03 13:06:35 -0700939 if (GRPC_TRACER_ON(grpc_cq_pluck_trace)) {
murgatroid999f6a8052016-06-15 17:39:03 -0700940 GRPC_API_TRACE(
941 "grpc_completion_queue_pluck("
942 "cc=%p, tag=%p, "
murgatroid99b2edc6c2016-06-27 14:24:18 -0700943 "deadline=gpr_timespec { tv_sec: %" PRId64
944 ", tv_nsec: %d, clock_type: %d }, "
murgatroid999f6a8052016-06-15 17:39:03 -0700945 "reserved=%p)",
Sree Kuchibhotla5bec1332017-03-24 10:03:22 -0700946 6, (cc, tag, deadline.tv_sec, deadline.tv_nsec,
947 (int)deadline.clock_type, reserved));
murgatroid999f6a8052016-06-15 17:39:03 -0700948 }
Craig Tillera82950e2015-09-22 12:33:20 -0700949 GPR_ASSERT(!reserved);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800950
Craig Tiller4e41e362016-08-19 13:12:54 -0700951 dump_pending_tags(cc);
Craig Tiller4e41e362016-08-19 13:12:54 -0700952
Craig Tillera82950e2015-09-22 12:33:20 -0700953 deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
Craig Tiller6a7626c2015-07-19 22:21:41 -0700954
Craig Tillera82950e2015-09-22 12:33:20 -0700955 GRPC_CQ_INTERNAL_REF(cc, "pluck");
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700956 gpr_mu_lock(cqd->mu);
Craig Tilleraef3a792016-08-24 15:13:53 -0700957 cq_is_finished_arg is_finished_arg = {
Craig Tiller2c4043b2016-09-05 14:50:16 -0700958 .last_seen_things_queued_ever =
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700959 gpr_atm_no_barrier_load(&cqd->things_queued_ever),
Craig Tiller2c4043b2016-09-05 14:50:16 -0700960 .cq = cc,
961 .deadline = deadline,
962 .stolen_completion = NULL,
963 .tag = tag,
964 .first_loop = true};
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800965 grpc_exec_ctx exec_ctx =
966 GRPC_EXEC_CTX_INITIALIZER(0, cq_is_pluck_finished, &is_finished_arg);
Craig Tillera82950e2015-09-22 12:33:20 -0700967 for (;;) {
Craig Tiller2f42fde2016-08-04 15:13:18 -0700968 if (is_finished_arg.stolen_completion != NULL) {
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700969 gpr_mu_unlock(cqd->mu);
Craig Tiller5f70fc62016-08-04 16:00:00 -0700970 c = is_finished_arg.stolen_completion;
Craig Tiller2f42fde2016-08-04 15:13:18 -0700971 is_finished_arg.stolen_completion = NULL;
972 ret.type = GRPC_OP_COMPLETE;
973 ret.success = c->next & 1u;
974 ret.tag = c->tag;
975 c->done(&exec_ctx, c->done_arg, c);
976 break;
977 }
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700978 prev = &cqd->completed_head;
Craig Tiller7536af02015-12-22 13:49:30 -0800979 while ((c = (grpc_cq_completion *)(prev->next & ~(uintptr_t)1)) !=
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700980 &cqd->completed_head) {
Craig Tillera82950e2015-09-22 12:33:20 -0700981 if (c->tag == tag) {
Craig Tiller7536af02015-12-22 13:49:30 -0800982 prev->next = (prev->next & (uintptr_t)1) | (c->next & ~(uintptr_t)1);
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700983 if (c == cqd->completed_tail) {
984 cqd->completed_tail = prev;
Craig Tillera82950e2015-09-22 12:33:20 -0700985 }
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700986 gpr_mu_unlock(cqd->mu);
Craig Tillera82950e2015-09-22 12:33:20 -0700987 ret.type = GRPC_OP_COMPLETE;
988 ret.success = c->next & 1u;
989 ret.tag = c->tag;
990 c->done(&exec_ctx, c->done_arg, c);
991 goto done;
992 }
993 prev = c;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800994 }
Sree Kuchibhotla8e368452017-04-25 15:45:02 -0700995 if (gpr_atm_no_barrier_load(&cqd->shutdown)) {
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -0700996 gpr_mu_unlock(cqd->mu);
Craig Tillera82950e2015-09-22 12:33:20 -0700997 memset(&ret, 0, sizeof(ret));
998 ret.type = GRPC_QUEUE_SHUTDOWN;
999 break;
1000 }
1001 if (!add_plucker(cc, tag, &worker)) {
1002 gpr_log(GPR_DEBUG,
1003 "Too many outstanding grpc_completion_queue_pluck calls: maximum "
1004 "is %d",
1005 GRPC_MAX_COMPLETION_QUEUE_PLUCKERS);
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -07001006 gpr_mu_unlock(cqd->mu);
Craig Tillera82950e2015-09-22 12:33:20 -07001007 memset(&ret, 0, sizeof(ret));
1008 /* TODO(ctiller): should we use a different result here */
1009 ret.type = GRPC_QUEUE_TIMEOUT;
Craig Tiller49c644c2016-08-19 13:52:23 -07001010 dump_pending_tags(cc);
Craig Tillera82950e2015-09-22 12:33:20 -07001011 break;
1012 }
1013 now = gpr_now(GPR_CLOCK_MONOTONIC);
Craig Tiller2c4043b2016-09-05 14:50:16 -07001014 if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) {
Craig Tillera82950e2015-09-22 12:33:20 -07001015 del_plucker(cc, tag, &worker);
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -07001016 gpr_mu_unlock(cqd->mu);
Craig Tillera82950e2015-09-22 12:33:20 -07001017 memset(&ret, 0, sizeof(ret));
1018 ret.type = GRPC_QUEUE_TIMEOUT;
Craig Tiller49c644c2016-08-19 13:52:23 -07001019 dump_pending_tags(cc);
Craig Tillera82950e2015-09-22 12:33:20 -07001020 break;
1021 }
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001022
Yuxuan Li6ca796d2017-05-15 14:15:34 -07001023 cqd->num_polls++;
Craig Tillerc3571792017-05-02 12:33:38 -07001024 grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc),
1025 &worker, now, deadline);
1026 if (err != GRPC_ERROR_NONE) {
1027 del_plucker(cc, tag, &worker);
Sree Kuchibhotla467ad202017-05-11 13:15:07 -07001028 gpr_mu_unlock(cqd->mu);
Craig Tillerc3571792017-05-02 12:33:38 -07001029 const char *msg = grpc_error_string(err);
Sree Kuchibhotla467ad202017-05-11 13:15:07 -07001030 gpr_log(GPR_ERROR, "Completion queue pluck failed: %s", msg);
Craig Tiller4f1d0f32016-05-06 17:12:37 -07001031
Craig Tillerc3571792017-05-02 12:33:38 -07001032 GRPC_ERROR_UNREF(err);
1033 memset(&ret, 0, sizeof(ret));
1034 ret.type = GRPC_QUEUE_TIMEOUT;
1035 dump_pending_tags(cc);
1036 break;
Craig Tillerccdea192016-02-16 08:06:46 -08001037 }
Craig Tiller2c4043b2016-09-05 14:50:16 -07001038 is_finished_arg.first_loop = false;
Craig Tillera82950e2015-09-22 12:33:20 -07001039 del_plucker(cc, tag, &worker);
1040 }
Craig Tiller97fc6a32015-07-08 15:31:35 -07001041done:
Craig Tillera82950e2015-09-22 12:33:20 -07001042 GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
Sree Kuchibhotla8ac5c6d2017-05-11 14:13:11 -07001043 GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "pluck");
Craig Tillera82950e2015-09-22 12:33:20 -07001044 grpc_exec_ctx_finish(&exec_ctx);
Craig Tiller2f42fde2016-08-04 15:13:18 -07001045 GPR_ASSERT(is_finished_arg.stolen_completion == NULL);
Craig Tiller86253ca2015-10-08 13:31:02 -07001046
Craig Tiller0ba432d2015-10-09 16:57:11 -07001047 GPR_TIMER_END("grpc_completion_queue_pluck", 0);
Craig Tiller86253ca2015-10-08 13:31:02 -07001048
Craig Tiller64be9f72015-05-04 14:53:51 -07001049 return ret;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001050}
1051
Sree Kuchibhotla8ac5c6d2017-05-11 14:13:11 -07001052grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
1053 gpr_timespec deadline, void *reserved) {
1054 return cc->vtable->pluck(cc, tag, deadline, reserved);
1055}
Sree Kuchibhotla736dd902017-04-25 18:26:53 -07001056
Sree Kuchibhotla8ac5c6d2017-05-11 14:13:11 -07001057/* Finishes the completion queue shutdown. This means that there are no more
1058 completion events / tags expected from the completion queue
1059 - Must be called under completion queue lock
1060 - Must be called only once in completion queue's lifetime
1061 - grpc_completion_queue_shutdown() MUST have been called before calling
1062 this function */
1063static void cq_finish_shutdown(grpc_exec_ctx *exec_ctx,
1064 grpc_completion_queue *cc) {
1065 cq_data *cqd = &cc->data;
Sree Kuchibhotla8e368452017-04-25 15:45:02 -07001066
Sree Kuchibhotla8ac5c6d2017-05-11 14:13:11 -07001067 GPR_ASSERT(cqd->shutdown_called);
1068 GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown));
1069 gpr_atm_no_barrier_store(&cqd->shutdown, 1);
Sree Kuchibhotla8e368452017-04-25 15:45:02 -07001070
Sree Kuchibhotla8ac5c6d2017-05-11 14:13:11 -07001071 cc->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cc),
1072 &cqd->pollset_shutdown_done);
1073}
1074
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001075/* Shutdown simply drops a ref that we reserved at creation time; if we drop
1076 to zero here, then enter shutdown mode and wake up any waiters */
Craig Tillera82950e2015-09-22 12:33:20 -07001077void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
Craig Tillerf5768a62015-09-22 10:54:34 -07001078 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001079 GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0);
Masood Malekghassemi76c3d742015-08-19 18:22:53 -07001080 GRPC_API_TRACE("grpc_completion_queue_shutdown(cc=%p)", 1, (cc));
Sree Kuchibhotla8ac5c6d2017-05-11 14:13:11 -07001081 cq_data *cqd = &cc->data;
1082
1083 gpr_mu_lock(cqd->mu);
1084 if (cqd->shutdown_called) {
Sree Kuchibhotlac2134c32017-04-25 12:10:04 -07001085 gpr_mu_unlock(cqd->mu);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001086 GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
Craig Tillera82950e2015-09-22 12:33:20 -07001087 return;
1088 }
Sree Kuchibhotla8ac5c6d2017-05-11 14:13:11 -07001089 cqd->shutdown_called = 1;
1090 if (gpr_unref(&cqd->pending_events)) {
1091 cq_finish_shutdown(&exec_ctx, cc);
Craig Tillera82950e2015-09-22 12:33:20 -07001092 }
Sree Kuchibhotla8ac5c6d2017-05-11 14:13:11 -07001093 gpr_mu_unlock(cqd->mu);
Craig Tillera82950e2015-09-22 12:33:20 -07001094 grpc_exec_ctx_finish(&exec_ctx);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001095 GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001096}
1097
Craig Tillera82950e2015-09-22 12:33:20 -07001098void grpc_completion_queue_destroy(grpc_completion_queue *cc) {
Masood Malekghassemi76c3d742015-08-19 18:22:53 -07001099 GRPC_API_TRACE("grpc_completion_queue_destroy(cc=%p)", 1, (cc));
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001100 GPR_TIMER_BEGIN("grpc_completion_queue_destroy", 0);
Craig Tillera82950e2015-09-22 12:33:20 -07001101 grpc_completion_queue_shutdown(cc);
Sree Kuchibhotla8ac5c6d2017-05-11 14:13:11 -07001102
1103 /* TODO (sreek): This should not ideally be here. Refactor it into the
1104 * cq_vtable (perhaps have a create/destroy methods in the cq vtable) */
1105 if (cc->vtable->cq_completion_type == GRPC_CQ_NEXT) {
1106 GPR_ASSERT(cq_event_queue_num_items(&cc->data.queue) == 0);
Sree Kuchibhotlad7a1b8f2017-04-11 19:43:07 -07001107 }
1108
Sree Kuchibhotla8ac5c6d2017-05-11 14:13:11 -07001109 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
1110 GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "destroy");
1111 grpc_exec_ctx_finish(&exec_ctx);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001112 GPR_TIMER_END("grpc_completion_queue_destroy", 0);
David Klempnerb5056612015-02-24 14:22:50 -08001113}
1114
Craig Tillera82950e2015-09-22 12:33:20 -07001115grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) {
Craig Tiller75bfb972017-04-11 17:55:12 -07001116 return cc->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cc) : NULL;
Craig Tiller190d3602015-02-18 09:23:38 -08001117}
Craig Tilleraec96aa2015-04-07 14:32:15 -07001118
Craig Tiller40945c72016-05-16 17:15:24 -07001119grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps) {
1120 return CQ_FROM_POLLSET(ps);
1121}
1122
Sree Kuchibhotla8ac5c6d2017-05-11 14:13:11 -07001123void grpc_cq_mark_server_cq(grpc_completion_queue *cc) {
1124 cc->data.is_server_cq = 1;
Sree Kuchibhotla1f5e2622016-04-21 12:28:09 -07001125}
1126
Craig Tiller11c58322017-04-12 08:21:17 -07001127bool grpc_cq_is_server_cq(grpc_completion_queue *cc) {
Sree Kuchibhotla8ac5c6d2017-05-11 14:13:11 -07001128 return cc->data.is_server_cq;
Craig Tiller11c58322017-04-12 08:21:17 -07001129}
1130
1131bool grpc_cq_can_listen(grpc_completion_queue *cc) {
1132 return cc->poller_vtable->can_listen;
1133}