blob: 6a1d83ce5d4ffee0f82d204a9d3653cbe5d1cd5d [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/surface/completion_queue.h"
35
36#include <stdio.h>
37#include <string.h>
38
ctiller58393c22015-01-07 14:03:30 -080039#include "src/core/iomgr/pollset.h"
Craig Tiller485d7762015-01-23 12:54:05 -080040#include "src/core/support/string.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080041#include "src/core/surface/call.h"
42#include "src/core/surface/event_string.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080043#include "src/core/surface/surface_trace.h"
44#include <grpc/support/alloc.h>
45#include <grpc/support/atm.h>
46#include <grpc/support/log.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080047
48#define NUM_TAG_BUCKETS 31
49
50/* A single event: extends grpc_event to form a linked list with a destruction
51 function (on_finish) that is hidden from outside this module */
52typedef struct event {
53 grpc_event base;
54 grpc_event_finish_func on_finish;
55 void *on_finish_user_data;
56 struct event *queue_next;
57 struct event *queue_prev;
58 struct event *bucket_next;
59 struct event *bucket_prev;
60} event;
61
62/* Completion queue structure */
63struct grpc_completion_queue {
ctiller58393c22015-01-07 14:03:30 -080064 /* TODO(ctiller): see if this can be removed */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080065 int allow_polling;
66
67 /* When refs drops to zero, we are in shutdown mode, and will be destroyable
68 once all queued events are drained */
69 gpr_refcount refs;
ctillerd79b4862014-12-17 16:36:59 -080070 /* the set of low level i/o things that concern this cq */
71 grpc_pollset pollset;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080072 /* 0 initially, 1 once we've begun shutting down */
73 int shutdown;
Craig Tillerf5fd4ba2015-03-02 18:01:21 +000074 int shutdown_called;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080075 /* Head of a linked list of queued events (prev points to the last element) */
76 event *queue;
77 /* Fixed size chained hash table of events for pluck() */
78 event *buckets[NUM_TAG_BUCKETS];
79
80#ifndef NDEBUG
81 /* Debug support: track which operations are in flight at any given time */
82 gpr_atm pending_op_count[GRPC_COMPLETION_DO_NOT_USE];
83#endif
84};
85
86/* Default do-nothing on_finish function */
87static void null_on_finish(void *user_data, grpc_op_error error) {}
88
Craig Tiller32946d32015-01-15 11:37:30 -080089grpc_completion_queue *grpc_completion_queue_create(void) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080090 grpc_completion_queue *cc = gpr_malloc(sizeof(grpc_completion_queue));
91 memset(cc, 0, sizeof(*cc));
92 /* Initial ref is dropped by grpc_completion_queue_shutdown */
93 gpr_ref_init(&cc->refs, 1);
ctillerd79b4862014-12-17 16:36:59 -080094 grpc_pollset_init(&cc->pollset);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080095 cc->allow_polling = 1;
96 return cc;
97}
98
99void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc) {
100 cc->allow_polling = 0;
101}
102
103/* Create and append an event to the queue. Returns the event so that its data
104 members can be filled in.
ctiller58393c22015-01-07 14:03:30 -0800105 Requires GRPC_POLLSET_MU(&cc->pollset) locked. */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800106static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type,
107 void *tag, grpc_call *call,
108 grpc_event_finish_func on_finish, void *user_data) {
109 event *ev = gpr_malloc(sizeof(event));
nnoble0c475f02014-12-05 15:37:39 -0800110 gpr_uintptr bucket = ((gpr_uintptr)tag) % NUM_TAG_BUCKETS;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800111 ev->base.type = type;
112 ev->base.tag = tag;
113 ev->base.call = call;
114 ev->on_finish = on_finish ? on_finish : null_on_finish;
115 ev->on_finish_user_data = user_data;
116 if (cc->queue == NULL) {
117 cc->queue = ev->queue_next = ev->queue_prev = ev;
118 } else {
119 ev->queue_next = cc->queue;
120 ev->queue_prev = cc->queue->queue_prev;
121 ev->queue_next->queue_prev = ev->queue_prev->queue_next = ev;
122 }
123 if (cc->buckets[bucket] == NULL) {
124 cc->buckets[bucket] = ev->bucket_next = ev->bucket_prev = ev;
125 } else {
126 ev->bucket_next = cc->buckets[bucket];
127 ev->bucket_prev = cc->buckets[bucket]->bucket_prev;
128 ev->bucket_next->bucket_prev = ev->bucket_prev->bucket_next = ev;
129 }
ctiller58393c22015-01-07 14:03:30 -0800130 gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset));
131 grpc_pollset_kick(&cc->pollset);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800132 return ev;
133}
134
135void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call,
136 grpc_completion_type type) {
137 gpr_ref(&cc->refs);
Craig Tillerf63fed72015-01-29 10:49:34 -0800138 if (call) grpc_call_internal_ref(call);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800139#ifndef NDEBUG
140 gpr_atm_no_barrier_fetch_add(&cc->pending_op_count[type], 1);
141#endif
142}
143
144/* Signal the end of an operation - if this is the last waiting-to-be-queued
145 event, then enter shutdown mode */
146static void end_op_locked(grpc_completion_queue *cc,
147 grpc_completion_type type) {
148#ifndef NDEBUG
149 GPR_ASSERT(gpr_atm_full_fetch_add(&cc->pending_op_count[type], -1) > 0);
150#endif
151 if (gpr_unref(&cc->refs)) {
152 GPR_ASSERT(!cc->shutdown);
Craig Tillerf5fd4ba2015-03-02 18:01:21 +0000153 GPR_ASSERT(cc->shutdown_called);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800154 cc->shutdown = 1;
ctiller58393c22015-01-07 14:03:30 -0800155 gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800156 }
157}
158
Craig Tiller4ffdcd52015-01-16 11:34:55 -0800159void grpc_cq_end_server_shutdown(grpc_completion_queue *cc, void *tag) {
160 gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
161 add_locked(cc, GRPC_SERVER_SHUTDOWN, tag, NULL, NULL, NULL);
162 end_op_locked(cc, GRPC_SERVER_SHUTDOWN);
163 gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
164}
165
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800166void grpc_cq_end_read(grpc_completion_queue *cc, void *tag, grpc_call *call,
167 grpc_event_finish_func on_finish, void *user_data,
168 grpc_byte_buffer *read) {
169 event *ev;
ctiller58393c22015-01-07 14:03:30 -0800170 gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800171 ev = add_locked(cc, GRPC_READ, tag, call, on_finish, user_data);
172 ev->base.data.read = read;
173 end_op_locked(cc, GRPC_READ);
ctiller58393c22015-01-07 14:03:30 -0800174 gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800175}
176
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800177void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag,
178 grpc_call *call,
179 grpc_event_finish_func on_finish,
180 void *user_data, grpc_op_error error) {
181 event *ev;
ctiller58393c22015-01-07 14:03:30 -0800182 gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800183 ev = add_locked(cc, GRPC_WRITE_ACCEPTED, tag, call, on_finish, user_data);
184 ev->base.data.write_accepted = error;
185 end_op_locked(cc, GRPC_WRITE_ACCEPTED);
ctiller58393c22015-01-07 14:03:30 -0800186 gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800187}
188
Craig Tiller166e2502015-02-03 20:14:41 -0800189void grpc_cq_end_op_complete(grpc_completion_queue *cc, void *tag,
190 grpc_call *call, grpc_event_finish_func on_finish,
191 void *user_data, grpc_op_error error) {
192 event *ev;
193 gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
194 ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call, on_finish, user_data);
195 ev->base.data.write_accepted = error;
196 end_op_locked(cc, GRPC_OP_COMPLETE);
197 gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
198}
199
Craig Tillerfb189f82015-02-03 12:07:07 -0800200void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call,
201 grpc_event_finish_func on_finish, void *user_data,
202 grpc_op_error error) {
Craig Tillercce17ac2015-01-20 09:29:28 -0800203 event *ev;
204 gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
Craig Tillerfb189f82015-02-03 12:07:07 -0800205 ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call, on_finish, user_data);
Craig Tillercce17ac2015-01-20 09:29:28 -0800206 ev->base.data.write_accepted = error;
Craig Tillerfb189f82015-02-03 12:07:07 -0800207 end_op_locked(cc, GRPC_OP_COMPLETE);
Craig Tillercce17ac2015-01-20 09:29:28 -0800208 gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
209}
210
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800211void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag,
212 grpc_call *call,
213 grpc_event_finish_func on_finish,
214 void *user_data, grpc_op_error error) {
215 event *ev;
ctiller58393c22015-01-07 14:03:30 -0800216 gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800217 ev = add_locked(cc, GRPC_FINISH_ACCEPTED, tag, call, on_finish, user_data);
218 ev->base.data.finish_accepted = error;
219 end_op_locked(cc, GRPC_FINISH_ACCEPTED);
ctiller58393c22015-01-07 14:03:30 -0800220 gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800221}
222
223void grpc_cq_end_client_metadata_read(grpc_completion_queue *cc, void *tag,
224 grpc_call *call,
225 grpc_event_finish_func on_finish,
226 void *user_data, size_t count,
227 grpc_metadata *elements) {
228 event *ev;
ctiller58393c22015-01-07 14:03:30 -0800229 gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800230 ev = add_locked(cc, GRPC_CLIENT_METADATA_READ, tag, call, on_finish,
231 user_data);
232 ev->base.data.client_metadata_read.count = count;
233 ev->base.data.client_metadata_read.elements = elements;
234 end_op_locked(cc, GRPC_CLIENT_METADATA_READ);
ctiller58393c22015-01-07 14:03:30 -0800235 gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800236}
237
238void grpc_cq_end_finished(grpc_completion_queue *cc, void *tag, grpc_call *call,
239 grpc_event_finish_func on_finish, void *user_data,
ctiller2845cad2014-12-15 15:14:12 -0800240 grpc_status_code status, const char *details,
241 grpc_metadata *metadata_elements,
242 size_t metadata_count) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800243 event *ev;
ctiller58393c22015-01-07 14:03:30 -0800244 gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800245 ev = add_locked(cc, GRPC_FINISHED, tag, call, on_finish, user_data);
ctiller2845cad2014-12-15 15:14:12 -0800246 ev->base.data.finished.status = status;
247 ev->base.data.finished.details = details;
248 ev->base.data.finished.metadata_count = metadata_count;
249 ev->base.data.finished.metadata_elements = metadata_elements;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800250 end_op_locked(cc, GRPC_FINISHED);
ctiller58393c22015-01-07 14:03:30 -0800251 gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800252}
253
254void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call,
255 grpc_event_finish_func on_finish, void *user_data,
256 const char *method, const char *host,
257 gpr_timespec deadline, size_t metadata_count,
258 grpc_metadata *metadata_elements) {
259 event *ev;
ctiller58393c22015-01-07 14:03:30 -0800260 gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800261 ev = add_locked(cc, GRPC_SERVER_RPC_NEW, tag, call, on_finish, user_data);
262 ev->base.data.server_rpc_new.method = method;
263 ev->base.data.server_rpc_new.host = host;
264 ev->base.data.server_rpc_new.deadline = deadline;
265 ev->base.data.server_rpc_new.metadata_count = metadata_count;
266 ev->base.data.server_rpc_new.metadata_elements = metadata_elements;
267 end_op_locked(cc, GRPC_SERVER_RPC_NEW);
ctiller58393c22015-01-07 14:03:30 -0800268 gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800269}
270
271/* Create a GRPC_QUEUE_SHUTDOWN event without queuing it anywhere */
Craig Tiller32946d32015-01-15 11:37:30 -0800272static event *create_shutdown_event(void) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800273 event *ev = gpr_malloc(sizeof(event));
274 ev->base.type = GRPC_QUEUE_SHUTDOWN;
275 ev->base.call = NULL;
276 ev->base.tag = NULL;
277 ev->on_finish = null_on_finish;
278 return ev;
279}
280
281grpc_event *grpc_completion_queue_next(grpc_completion_queue *cc,
282 gpr_timespec deadline) {
283 event *ev = NULL;
284
ctiller58393c22015-01-07 14:03:30 -0800285 gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800286 for (;;) {
287 if (cc->queue != NULL) {
nnoble0c475f02014-12-05 15:37:39 -0800288 gpr_uintptr bucket;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800289 ev = cc->queue;
nnoble0c475f02014-12-05 15:37:39 -0800290 bucket = ((gpr_uintptr)ev->base.tag) % NUM_TAG_BUCKETS;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800291 cc->queue = ev->queue_next;
292 ev->queue_next->queue_prev = ev->queue_prev;
293 ev->queue_prev->queue_next = ev->queue_next;
294 ev->bucket_next->bucket_prev = ev->bucket_prev;
295 ev->bucket_prev->bucket_next = ev->bucket_next;
296 if (ev == cc->buckets[bucket]) {
297 cc->buckets[bucket] = ev->bucket_next;
298 if (ev == cc->buckets[bucket]) {
299 cc->buckets[bucket] = NULL;
300 }
301 }
302 if (cc->queue == ev) {
303 cc->queue = NULL;
304 }
305 break;
306 }
307 if (cc->shutdown) {
308 ev = create_shutdown_event();
309 break;
310 }
ctiller58393c22015-01-07 14:03:30 -0800311 if (cc->allow_polling && grpc_pollset_work(&cc->pollset, deadline)) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800312 continue;
313 }
ctiller58393c22015-01-07 14:03:30 -0800314 if (gpr_cv_wait(GRPC_POLLSET_CV(&cc->pollset),
315 GRPC_POLLSET_MU(&cc->pollset), deadline)) {
316 gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800317 return NULL;
318 }
319 }
ctiller58393c22015-01-07 14:03:30 -0800320 gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800321 GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base);
322 return &ev->base;
323}
324
325static event *pluck_event(grpc_completion_queue *cc, void *tag) {
nnoble0c475f02014-12-05 15:37:39 -0800326 gpr_uintptr bucket = ((gpr_uintptr)tag) % NUM_TAG_BUCKETS;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800327 event *ev = cc->buckets[bucket];
328 if (ev == NULL) return NULL;
329 do {
330 if (ev->base.tag == tag) {
331 ev->queue_next->queue_prev = ev->queue_prev;
332 ev->queue_prev->queue_next = ev->queue_next;
333 ev->bucket_next->bucket_prev = ev->bucket_prev;
334 ev->bucket_prev->bucket_next = ev->bucket_next;
335 if (ev == cc->buckets[bucket]) {
336 cc->buckets[bucket] = ev->bucket_next;
337 if (ev == cc->buckets[bucket]) {
338 cc->buckets[bucket] = NULL;
339 }
340 }
341 if (cc->queue == ev) {
342 cc->queue = ev->queue_next;
343 if (cc->queue == ev) {
344 cc->queue = NULL;
345 }
346 }
347 return ev;
348 }
349 ev = ev->bucket_next;
350 } while (ev != cc->buckets[bucket]);
351 return NULL;
352}
353
354grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
355 gpr_timespec deadline) {
356 event *ev = NULL;
357
ctiller58393c22015-01-07 14:03:30 -0800358 gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800359 for (;;) {
360 if ((ev = pluck_event(cc, tag))) {
361 break;
362 }
363 if (cc->shutdown) {
364 ev = create_shutdown_event();
365 break;
366 }
ctiller58393c22015-01-07 14:03:30 -0800367 if (cc->allow_polling && grpc_pollset_work(&cc->pollset, deadline)) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800368 continue;
369 }
ctiller58393c22015-01-07 14:03:30 -0800370 if (gpr_cv_wait(GRPC_POLLSET_CV(&cc->pollset),
371 GRPC_POLLSET_MU(&cc->pollset), deadline)) {
372 gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800373 return NULL;
374 }
375 }
ctiller58393c22015-01-07 14:03:30 -0800376 gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800377 GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base);
378 return &ev->base;
379}
380
381/* Shutdown simply drops a ref that we reserved at creation time; if we drop
382 to zero here, then enter shutdown mode and wake up any waiters */
383void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
Craig Tillerf5fd4ba2015-03-02 18:01:21 +0000384 gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
385 cc->shutdown_called = 1;
386 gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
387
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800388 if (gpr_unref(&cc->refs)) {
ctiller58393c22015-01-07 14:03:30 -0800389 gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800390 GPR_ASSERT(!cc->shutdown);
391 cc->shutdown = 1;
ctiller58393c22015-01-07 14:03:30 -0800392 gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset));
393 gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800394 }
395}
396
David Klempnerb5056612015-02-24 14:22:50 -0800397static void on_pollset_destroy_done(void *arg) {
398 grpc_completion_queue *cc = arg;
ctillerd79b4862014-12-17 16:36:59 -0800399 grpc_pollset_destroy(&cc->pollset);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800400 gpr_free(cc);
401}
402
David Klempnerb5056612015-02-24 14:22:50 -0800403void grpc_completion_queue_destroy(grpc_completion_queue *cc) {
404 GPR_ASSERT(cc->queue == NULL);
405 grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc);
406}
407
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800408void grpc_event_finish(grpc_event *base) {
409 event *ev = (event *)base;
410 ev->on_finish(ev->on_finish_user_data, GRPC_OP_OK);
411 if (ev->base.call) {
Craig Tilleraef25da2015-01-29 17:19:45 -0800412 grpc_call_internal_unref(ev->base.call, 1);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800413 }
414 gpr_free(ev);
415}
416
417void grpc_cq_dump_pending_ops(grpc_completion_queue *cc) {
418#ifndef NDEBUG
Craig Tillerd09f8802015-01-23 13:09:21 -0800419 char tmp[GRPC_COMPLETION_DO_NOT_USE * (1 + GPR_LTOA_MIN_BUFSIZE)];
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800420 char *p = tmp;
421 int i;
422
423 for (i = 0; i < GRPC_COMPLETION_DO_NOT_USE; i++) {
Craig Tillerd09f8802015-01-23 13:09:21 -0800424 *p++ = ' ';
425 p += gpr_ltoa(cc->pending_op_count[i], p);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800426 }
427
428 gpr_log(GPR_INFO, "pending ops:%s", tmp);
429#endif
430}
ctillerd79b4862014-12-17 16:36:59 -0800431
432grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) {
433 return &cc->pollset;
Craig Tiller190d3602015-02-18 09:23:38 -0800434}