blob: e0135d9fb952396f7ad0a71ef9ae748847347c18 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/surface/completion_queue.h"
35
36#include <stdio.h>
37#include <string.h>
38
ctiller58393c22015-01-07 14:03:30 -080039#include "src/core/iomgr/pollset.h"
Craig Tiller485d7762015-01-23 12:54:05 -080040#include "src/core/support/string.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080041#include "src/core/surface/call.h"
42#include "src/core/surface/event_string.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080043#include "src/core/surface/surface_trace.h"
44#include <grpc/support/alloc.h>
45#include <grpc/support/atm.h>
46#include <grpc/support/log.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080047
48#define NUM_TAG_BUCKETS 31
49
50/* A single event: extends grpc_event to form a linked list with a destruction
51 function (on_finish) that is hidden from outside this module */
52typedef struct event {
53 grpc_event base;
54 grpc_event_finish_func on_finish;
55 void *on_finish_user_data;
56 struct event *queue_next;
57 struct event *queue_prev;
58 struct event *bucket_next;
59 struct event *bucket_prev;
60} event;
61
62/* Completion queue structure */
63struct grpc_completion_queue {
ctiller58393c22015-01-07 14:03:30 -080064 /* TODO(ctiller): see if this can be removed */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080065 int allow_polling;
66
67 /* When refs drops to zero, we are in shutdown mode, and will be destroyable
68 once all queued events are drained */
69 gpr_refcount refs;
Craig Tiller5717a982015-04-27 12:01:49 -070070 /* Once owning_refs drops to zero, we will destroy the cq */
71 gpr_refcount owning_refs;
ctillerd79b4862014-12-17 16:36:59 -080072 /* the set of low level i/o things that concern this cq */
73 grpc_pollset pollset;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080074 /* 0 initially, 1 once we've begun shutting down */
75 int shutdown;
Craig Tillerf5fd4ba2015-03-02 18:01:21 +000076 int shutdown_called;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080077 /* Head of a linked list of queued events (prev points to the last element) */
78 event *queue;
79 /* Fixed size chained hash table of events for pluck() */
80 event *buckets[NUM_TAG_BUCKETS];
81
82#ifndef NDEBUG
83 /* Debug support: track which operations are in flight at any given time */
84 gpr_atm pending_op_count[GRPC_COMPLETION_DO_NOT_USE];
85#endif
86};
87
88/* Default do-nothing on_finish function */
89static void null_on_finish(void *user_data, grpc_op_error error) {}
90
Craig Tiller32946d32015-01-15 11:37:30 -080091grpc_completion_queue *grpc_completion_queue_create(void) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080092 grpc_completion_queue *cc = gpr_malloc(sizeof(grpc_completion_queue));
93 memset(cc, 0, sizeof(*cc));
94 /* Initial ref is dropped by grpc_completion_queue_shutdown */
95 gpr_ref_init(&cc->refs, 1);
Craig Tiller5717a982015-04-27 12:01:49 -070096 gpr_ref_init(&cc->owning_refs, 1);
ctillerd79b4862014-12-17 16:36:59 -080097 grpc_pollset_init(&cc->pollset);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080098 cc->allow_polling = 1;
99 return cc;
100}
101
Craig Tiller5717a982015-04-27 12:01:49 -0700102void grpc_cq_internal_ref(grpc_completion_queue *cc) {
103 gpr_ref(&cc->owning_refs);
104}
105
106static void on_pollset_destroy_done(void *arg) {
107 grpc_completion_queue *cc = arg;
108 grpc_pollset_destroy(&cc->pollset);
109 gpr_free(cc);
110}
111
112void grpc_cq_internal_unref(grpc_completion_queue *cc) {
113 if (gpr_unref(&cc->owning_refs)) {
114 GPR_ASSERT(cc->queue == NULL);
115 grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc);
116 }
117}
118
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800119void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc) {
120 cc->allow_polling = 0;
121}
122
123/* Create and append an event to the queue. Returns the event so that its data
124 members can be filled in.
ctiller58393c22015-01-07 14:03:30 -0800125 Requires GRPC_POLLSET_MU(&cc->pollset) locked. */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800126static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type,
127 void *tag, grpc_call *call,
128 grpc_event_finish_func on_finish, void *user_data) {
129 event *ev = gpr_malloc(sizeof(event));
nnoble0c475f02014-12-05 15:37:39 -0800130 gpr_uintptr bucket = ((gpr_uintptr)tag) % NUM_TAG_BUCKETS;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800131 ev->base.type = type;
132 ev->base.tag = tag;
133 ev->base.call = call;
134 ev->on_finish = on_finish ? on_finish : null_on_finish;
135 ev->on_finish_user_data = user_data;
136 if (cc->queue == NULL) {
137 cc->queue = ev->queue_next = ev->queue_prev = ev;
138 } else {
139 ev->queue_next = cc->queue;
140 ev->queue_prev = cc->queue->queue_prev;
141 ev->queue_next->queue_prev = ev->queue_prev->queue_next = ev;
142 }
143 if (cc->buckets[bucket] == NULL) {
144 cc->buckets[bucket] = ev->bucket_next = ev->bucket_prev = ev;
145 } else {
146 ev->bucket_next = cc->buckets[bucket];
147 ev->bucket_prev = cc->buckets[bucket]->bucket_prev;
148 ev->bucket_next->bucket_prev = ev->bucket_prev->bucket_next = ev;
149 }
ctiller58393c22015-01-07 14:03:30 -0800150 gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset));
151 grpc_pollset_kick(&cc->pollset);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800152 return ev;
153}
154
155void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call,
156 grpc_completion_type type) {
Craig Tiller5717a982015-04-27 12:01:49 -0700157 gpr_ref(&cc->refs);
Craig Tillerf63fed72015-01-29 10:49:34 -0800158 if (call) grpc_call_internal_ref(call);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800159#ifndef NDEBUG
160 gpr_atm_no_barrier_fetch_add(&cc->pending_op_count[type], 1);
161#endif
162}
163
164/* Signal the end of an operation - if this is the last waiting-to-be-queued
165 event, then enter shutdown mode */
166static void end_op_locked(grpc_completion_queue *cc,
167 grpc_completion_type type) {
168#ifndef NDEBUG
169 GPR_ASSERT(gpr_atm_full_fetch_add(&cc->pending_op_count[type], -1) > 0);
170#endif
Craig Tiller5717a982015-04-27 12:01:49 -0700171 if (gpr_unref(&cc->refs)) {
172 GPR_ASSERT(!cc->shutdown);
173 GPR_ASSERT(cc->shutdown_called);
174 cc->shutdown = 1;
175 gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset));
176 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800177}
178
Craig Tiller4ffdcd52015-01-16 11:34:55 -0800179void grpc_cq_end_server_shutdown(grpc_completion_queue *cc, void *tag) {
180 gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
181 add_locked(cc, GRPC_SERVER_SHUTDOWN, tag, NULL, NULL, NULL);
182 end_op_locked(cc, GRPC_SERVER_SHUTDOWN);
183 gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
184}
185
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800186void grpc_cq_end_read(grpc_completion_queue *cc, void *tag, grpc_call *call,
187 grpc_event_finish_func on_finish, void *user_data,
188 grpc_byte_buffer *read) {
189 event *ev;
ctiller58393c22015-01-07 14:03:30 -0800190 gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800191 ev = add_locked(cc, GRPC_READ, tag, call, on_finish, user_data);
192 ev->base.data.read = read;
193 end_op_locked(cc, GRPC_READ);
ctiller58393c22015-01-07 14:03:30 -0800194 gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800195}
196
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800197void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag,
198 grpc_call *call,
199 grpc_event_finish_func on_finish,
200 void *user_data, grpc_op_error error) {
201 event *ev;
ctiller58393c22015-01-07 14:03:30 -0800202 gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800203 ev = add_locked(cc, GRPC_WRITE_ACCEPTED, tag, call, on_finish, user_data);
204 ev->base.data.write_accepted = error;
205 end_op_locked(cc, GRPC_WRITE_ACCEPTED);
ctiller58393c22015-01-07 14:03:30 -0800206 gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800207}
208
Craig Tiller166e2502015-02-03 20:14:41 -0800209void grpc_cq_end_op_complete(grpc_completion_queue *cc, void *tag,
210 grpc_call *call, grpc_event_finish_func on_finish,
211 void *user_data, grpc_op_error error) {
212 event *ev;
213 gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
214 ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call, on_finish, user_data);
215 ev->base.data.write_accepted = error;
216 end_op_locked(cc, GRPC_OP_COMPLETE);
217 gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
218}
219
Craig Tillerfb189f82015-02-03 12:07:07 -0800220void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call,
221 grpc_event_finish_func on_finish, void *user_data,
222 grpc_op_error error) {
Craig Tillercce17ac2015-01-20 09:29:28 -0800223 event *ev;
224 gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
Craig Tillerfb189f82015-02-03 12:07:07 -0800225 ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call, on_finish, user_data);
Craig Tillercce17ac2015-01-20 09:29:28 -0800226 ev->base.data.write_accepted = error;
Craig Tillerfb189f82015-02-03 12:07:07 -0800227 end_op_locked(cc, GRPC_OP_COMPLETE);
Craig Tillercce17ac2015-01-20 09:29:28 -0800228 gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
229}
230
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800231void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag,
232 grpc_call *call,
233 grpc_event_finish_func on_finish,
234 void *user_data, grpc_op_error error) {
235 event *ev;
ctiller58393c22015-01-07 14:03:30 -0800236 gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800237 ev = add_locked(cc, GRPC_FINISH_ACCEPTED, tag, call, on_finish, user_data);
238 ev->base.data.finish_accepted = error;
239 end_op_locked(cc, GRPC_FINISH_ACCEPTED);
ctiller58393c22015-01-07 14:03:30 -0800240 gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800241}
242
243void grpc_cq_end_client_metadata_read(grpc_completion_queue *cc, void *tag,
244 grpc_call *call,
245 grpc_event_finish_func on_finish,
246 void *user_data, size_t count,
247 grpc_metadata *elements) {
248 event *ev;
ctiller58393c22015-01-07 14:03:30 -0800249 gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800250 ev = add_locked(cc, GRPC_CLIENT_METADATA_READ, tag, call, on_finish,
251 user_data);
252 ev->base.data.client_metadata_read.count = count;
253 ev->base.data.client_metadata_read.elements = elements;
254 end_op_locked(cc, GRPC_CLIENT_METADATA_READ);
ctiller58393c22015-01-07 14:03:30 -0800255 gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800256}
257
258void grpc_cq_end_finished(grpc_completion_queue *cc, void *tag, grpc_call *call,
259 grpc_event_finish_func on_finish, void *user_data,
ctiller2845cad2014-12-15 15:14:12 -0800260 grpc_status_code status, const char *details,
261 grpc_metadata *metadata_elements,
262 size_t metadata_count) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800263 event *ev;
ctiller58393c22015-01-07 14:03:30 -0800264 gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800265 ev = add_locked(cc, GRPC_FINISHED, tag, call, on_finish, user_data);
ctiller2845cad2014-12-15 15:14:12 -0800266 ev->base.data.finished.status = status;
267 ev->base.data.finished.details = details;
268 ev->base.data.finished.metadata_count = metadata_count;
269 ev->base.data.finished.metadata_elements = metadata_elements;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800270 end_op_locked(cc, GRPC_FINISHED);
ctiller58393c22015-01-07 14:03:30 -0800271 gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800272}
273
274void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call,
275 grpc_event_finish_func on_finish, void *user_data,
276 const char *method, const char *host,
277 gpr_timespec deadline, size_t metadata_count,
278 grpc_metadata *metadata_elements) {
279 event *ev;
ctiller58393c22015-01-07 14:03:30 -0800280 gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800281 ev = add_locked(cc, GRPC_SERVER_RPC_NEW, tag, call, on_finish, user_data);
282 ev->base.data.server_rpc_new.method = method;
283 ev->base.data.server_rpc_new.host = host;
284 ev->base.data.server_rpc_new.deadline = deadline;
285 ev->base.data.server_rpc_new.metadata_count = metadata_count;
286 ev->base.data.server_rpc_new.metadata_elements = metadata_elements;
287 end_op_locked(cc, GRPC_SERVER_RPC_NEW);
ctiller58393c22015-01-07 14:03:30 -0800288 gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800289}
290
291/* Create a GRPC_QUEUE_SHUTDOWN event without queuing it anywhere */
Craig Tiller32946d32015-01-15 11:37:30 -0800292static event *create_shutdown_event(void) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800293 event *ev = gpr_malloc(sizeof(event));
294 ev->base.type = GRPC_QUEUE_SHUTDOWN;
295 ev->base.call = NULL;
296 ev->base.tag = NULL;
297 ev->on_finish = null_on_finish;
298 return ev;
299}
300
301grpc_event *grpc_completion_queue_next(grpc_completion_queue *cc,
302 gpr_timespec deadline) {
303 event *ev = NULL;
304
ctiller58393c22015-01-07 14:03:30 -0800305 gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800306 for (;;) {
307 if (cc->queue != NULL) {
nnoble0c475f02014-12-05 15:37:39 -0800308 gpr_uintptr bucket;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800309 ev = cc->queue;
nnoble0c475f02014-12-05 15:37:39 -0800310 bucket = ((gpr_uintptr)ev->base.tag) % NUM_TAG_BUCKETS;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800311 cc->queue = ev->queue_next;
312 ev->queue_next->queue_prev = ev->queue_prev;
313 ev->queue_prev->queue_next = ev->queue_next;
314 ev->bucket_next->bucket_prev = ev->bucket_prev;
315 ev->bucket_prev->bucket_next = ev->bucket_next;
316 if (ev == cc->buckets[bucket]) {
317 cc->buckets[bucket] = ev->bucket_next;
318 if (ev == cc->buckets[bucket]) {
319 cc->buckets[bucket] = NULL;
320 }
321 }
322 if (cc->queue == ev) {
323 cc->queue = NULL;
324 }
325 break;
326 }
327 if (cc->shutdown) {
328 ev = create_shutdown_event();
329 break;
330 }
ctiller58393c22015-01-07 14:03:30 -0800331 if (cc->allow_polling && grpc_pollset_work(&cc->pollset, deadline)) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800332 continue;
333 }
ctiller58393c22015-01-07 14:03:30 -0800334 if (gpr_cv_wait(GRPC_POLLSET_CV(&cc->pollset),
335 GRPC_POLLSET_MU(&cc->pollset), deadline)) {
336 gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800337 return NULL;
338 }
339 }
ctiller58393c22015-01-07 14:03:30 -0800340 gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800341 GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base);
342 return &ev->base;
343}
344
345static event *pluck_event(grpc_completion_queue *cc, void *tag) {
nnoble0c475f02014-12-05 15:37:39 -0800346 gpr_uintptr bucket = ((gpr_uintptr)tag) % NUM_TAG_BUCKETS;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800347 event *ev = cc->buckets[bucket];
348 if (ev == NULL) return NULL;
349 do {
350 if (ev->base.tag == tag) {
351 ev->queue_next->queue_prev = ev->queue_prev;
352 ev->queue_prev->queue_next = ev->queue_next;
353 ev->bucket_next->bucket_prev = ev->bucket_prev;
354 ev->bucket_prev->bucket_next = ev->bucket_next;
355 if (ev == cc->buckets[bucket]) {
356 cc->buckets[bucket] = ev->bucket_next;
357 if (ev == cc->buckets[bucket]) {
358 cc->buckets[bucket] = NULL;
359 }
360 }
361 if (cc->queue == ev) {
362 cc->queue = ev->queue_next;
363 if (cc->queue == ev) {
364 cc->queue = NULL;
365 }
366 }
367 return ev;
368 }
369 ev = ev->bucket_next;
370 } while (ev != cc->buckets[bucket]);
371 return NULL;
372}
373
374grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
375 gpr_timespec deadline) {
376 event *ev = NULL;
377
ctiller58393c22015-01-07 14:03:30 -0800378 gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800379 for (;;) {
380 if ((ev = pluck_event(cc, tag))) {
381 break;
382 }
383 if (cc->shutdown) {
384 ev = create_shutdown_event();
385 break;
386 }
ctiller58393c22015-01-07 14:03:30 -0800387 if (cc->allow_polling && grpc_pollset_work(&cc->pollset, deadline)) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800388 continue;
389 }
ctiller58393c22015-01-07 14:03:30 -0800390 if (gpr_cv_wait(GRPC_POLLSET_CV(&cc->pollset),
391 GRPC_POLLSET_MU(&cc->pollset), deadline)) {
392 gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800393 return NULL;
394 }
395 }
ctiller58393c22015-01-07 14:03:30 -0800396 gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800397 GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base);
398 return &ev->base;
399}
400
401/* Shutdown simply drops a ref that we reserved at creation time; if we drop
402 to zero here, then enter shutdown mode and wake up any waiters */
403void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
Craig Tillerf5fd4ba2015-03-02 18:01:21 +0000404 gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
405 cc->shutdown_called = 1;
406 gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
407
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800408 if (gpr_unref(&cc->refs)) {
ctiller58393c22015-01-07 14:03:30 -0800409 gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800410 GPR_ASSERT(!cc->shutdown);
411 cc->shutdown = 1;
ctiller58393c22015-01-07 14:03:30 -0800412 gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset));
413 gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800414 }
415}
416
David Klempnerb5056612015-02-24 14:22:50 -0800417void grpc_completion_queue_destroy(grpc_completion_queue *cc) {
Craig Tiller5717a982015-04-27 12:01:49 -0700418 grpc_cq_internal_unref(cc);
David Klempnerb5056612015-02-24 14:22:50 -0800419}
420
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800421void grpc_event_finish(grpc_event *base) {
422 event *ev = (event *)base;
423 ev->on_finish(ev->on_finish_user_data, GRPC_OP_OK);
424 if (ev->base.call) {
Craig Tilleraef25da2015-01-29 17:19:45 -0800425 grpc_call_internal_unref(ev->base.call, 1);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800426 }
427 gpr_free(ev);
428}
429
430void grpc_cq_dump_pending_ops(grpc_completion_queue *cc) {
431#ifndef NDEBUG
Craig Tillerd09f8802015-01-23 13:09:21 -0800432 char tmp[GRPC_COMPLETION_DO_NOT_USE * (1 + GPR_LTOA_MIN_BUFSIZE)];
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800433 char *p = tmp;
434 int i;
435
436 for (i = 0; i < GRPC_COMPLETION_DO_NOT_USE; i++) {
Craig Tillerd09f8802015-01-23 13:09:21 -0800437 *p++ = ' ';
438 p += gpr_ltoa(cc->pending_op_count[i], p);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800439 }
440
441 gpr_log(GPR_INFO, "pending ops:%s", tmp);
442#endif
443}
ctillerd79b4862014-12-17 16:36:59 -0800444
445grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) {
446 return &cc->pollset;
Craig Tiller190d3602015-02-18 09:23:38 -0800447}
Craig Tilleraec96aa2015-04-07 14:32:15 -0700448
449void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc) {
450 gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
451 grpc_pollset_kick(&cc->pollset);
Craig Tillerc02c1d82015-04-07 16:21:55 -0700452 grpc_pollset_work(&cc->pollset,
453 gpr_time_add(gpr_now(), gpr_time_from_millis(100)));
Craig Tilleraec96aa2015-04-07 14:32:15 -0700454 gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
455}