blob: 4837f5b978eb8ad9b9932a90ad49116fefd419f8 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
3 * Copyright 2014, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/surface/completion_queue.h"
35
36#include <stdio.h>
37#include <string.h>
38
ctiller18b49ab2014-12-09 14:39:16 -080039#include "src/core/iomgr/iomgr_completion_queue_interface.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080040#include "src/core/surface/call.h"
41#include "src/core/surface/event_string.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080042#include "src/core/surface/surface_trace.h"
43#include <grpc/support/alloc.h>
44#include <grpc/support/atm.h>
45#include <grpc/support/log.h>
46#include <grpc/support/string.h>
47
48#define NUM_TAG_BUCKETS 31
49
50/* A single event: extends grpc_event to form a linked list with a destruction
51 function (on_finish) that is hidden from outside this module */
52typedef struct event {
53 grpc_event base;
54 grpc_event_finish_func on_finish;
55 void *on_finish_user_data;
56 struct event *queue_next;
57 struct event *queue_prev;
58 struct event *bucket_next;
59 struct event *bucket_prev;
60} event;
61
62/* Completion queue structure */
63struct grpc_completion_queue {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080064 int allow_polling;
65
66 /* When refs drops to zero, we are in shutdown mode, and will be destroyable
67 once all queued events are drained */
68 gpr_refcount refs;
ctillerd79b4862014-12-17 16:36:59 -080069 /* the set of low level i/o things that concern this cq */
70 grpc_pollset pollset;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080071 /* 0 initially, 1 once we've begun shutting down */
72 int shutdown;
73 /* Head of a linked list of queued events (prev points to the last element) */
74 event *queue;
75 /* Fixed size chained hash table of events for pluck() */
76 event *buckets[NUM_TAG_BUCKETS];
77
78#ifndef NDEBUG
79 /* Debug support: track which operations are in flight at any given time */
80 gpr_atm pending_op_count[GRPC_COMPLETION_DO_NOT_USE];
81#endif
82};
83
84/* Default do-nothing on_finish function */
85static void null_on_finish(void *user_data, grpc_op_error error) {}
86
87grpc_completion_queue *grpc_completion_queue_create() {
88 grpc_completion_queue *cc = gpr_malloc(sizeof(grpc_completion_queue));
89 memset(cc, 0, sizeof(*cc));
90 /* Initial ref is dropped by grpc_completion_queue_shutdown */
91 gpr_ref_init(&cc->refs, 1);
ctillerd79b4862014-12-17 16:36:59 -080092 grpc_pollset_init(&cc->pollset);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080093 cc->allow_polling = 1;
94 return cc;
95}
96
97void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc) {
98 cc->allow_polling = 0;
99}
100
101/* Create and append an event to the queue. Returns the event so that its data
102 members can be filled in.
ctiller18b49ab2014-12-09 14:39:16 -0800103 Requires grpc_iomgr_mu locked. */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800104static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type,
105 void *tag, grpc_call *call,
106 grpc_event_finish_func on_finish, void *user_data) {
107 event *ev = gpr_malloc(sizeof(event));
nnoble0c475f02014-12-05 15:37:39 -0800108 gpr_uintptr bucket = ((gpr_uintptr)tag) % NUM_TAG_BUCKETS;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800109 GPR_ASSERT(!cc->shutdown);
110 ev->base.type = type;
111 ev->base.tag = tag;
112 ev->base.call = call;
113 ev->on_finish = on_finish ? on_finish : null_on_finish;
114 ev->on_finish_user_data = user_data;
115 if (cc->queue == NULL) {
116 cc->queue = ev->queue_next = ev->queue_prev = ev;
117 } else {
118 ev->queue_next = cc->queue;
119 ev->queue_prev = cc->queue->queue_prev;
120 ev->queue_next->queue_prev = ev->queue_prev->queue_next = ev;
121 }
122 if (cc->buckets[bucket] == NULL) {
123 cc->buckets[bucket] = ev->bucket_next = ev->bucket_prev = ev;
124 } else {
125 ev->bucket_next = cc->buckets[bucket];
126 ev->bucket_prev = cc->buckets[bucket]->bucket_prev;
127 ev->bucket_next->bucket_prev = ev->bucket_prev->bucket_next = ev;
128 }
ctiller18b49ab2014-12-09 14:39:16 -0800129 gpr_cv_broadcast(&grpc_iomgr_cv);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800130 return ev;
131}
132
133void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call,
134 grpc_completion_type type) {
135 gpr_ref(&cc->refs);
136 if (call) grpc_call_internal_ref(call);
137#ifndef NDEBUG
138 gpr_atm_no_barrier_fetch_add(&cc->pending_op_count[type], 1);
139#endif
140}
141
142/* Signal the end of an operation - if this is the last waiting-to-be-queued
143 event, then enter shutdown mode */
144static void end_op_locked(grpc_completion_queue *cc,
145 grpc_completion_type type) {
146#ifndef NDEBUG
147 GPR_ASSERT(gpr_atm_full_fetch_add(&cc->pending_op_count[type], -1) > 0);
148#endif
149 if (gpr_unref(&cc->refs)) {
150 GPR_ASSERT(!cc->shutdown);
151 cc->shutdown = 1;
ctiller18b49ab2014-12-09 14:39:16 -0800152 gpr_cv_broadcast(&grpc_iomgr_cv);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800153 }
154}
155
156void grpc_cq_end_read(grpc_completion_queue *cc, void *tag, grpc_call *call,
157 grpc_event_finish_func on_finish, void *user_data,
158 grpc_byte_buffer *read) {
159 event *ev;
ctiller18b49ab2014-12-09 14:39:16 -0800160 gpr_mu_lock(&grpc_iomgr_mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800161 ev = add_locked(cc, GRPC_READ, tag, call, on_finish, user_data);
162 ev->base.data.read = read;
163 end_op_locked(cc, GRPC_READ);
ctiller18b49ab2014-12-09 14:39:16 -0800164 gpr_mu_unlock(&grpc_iomgr_mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800165}
166
167void grpc_cq_end_invoke_accepted(grpc_completion_queue *cc, void *tag,
168 grpc_call *call,
169 grpc_event_finish_func on_finish,
170 void *user_data, grpc_op_error error) {
171 event *ev;
ctiller18b49ab2014-12-09 14:39:16 -0800172 gpr_mu_lock(&grpc_iomgr_mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800173 ev = add_locked(cc, GRPC_INVOKE_ACCEPTED, tag, call, on_finish, user_data);
174 ev->base.data.invoke_accepted = error;
175 end_op_locked(cc, GRPC_INVOKE_ACCEPTED);
ctiller18b49ab2014-12-09 14:39:16 -0800176 gpr_mu_unlock(&grpc_iomgr_mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800177}
178
179void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag,
180 grpc_call *call,
181 grpc_event_finish_func on_finish,
182 void *user_data, grpc_op_error error) {
183 event *ev;
ctiller18b49ab2014-12-09 14:39:16 -0800184 gpr_mu_lock(&grpc_iomgr_mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800185 ev = add_locked(cc, GRPC_WRITE_ACCEPTED, tag, call, on_finish, user_data);
186 ev->base.data.write_accepted = error;
187 end_op_locked(cc, GRPC_WRITE_ACCEPTED);
ctiller18b49ab2014-12-09 14:39:16 -0800188 gpr_mu_unlock(&grpc_iomgr_mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800189}
190
191void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag,
192 grpc_call *call,
193 grpc_event_finish_func on_finish,
194 void *user_data, grpc_op_error error) {
195 event *ev;
ctiller18b49ab2014-12-09 14:39:16 -0800196 gpr_mu_lock(&grpc_iomgr_mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800197 ev = add_locked(cc, GRPC_FINISH_ACCEPTED, tag, call, on_finish, user_data);
198 ev->base.data.finish_accepted = error;
199 end_op_locked(cc, GRPC_FINISH_ACCEPTED);
ctiller18b49ab2014-12-09 14:39:16 -0800200 gpr_mu_unlock(&grpc_iomgr_mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800201}
202
203void grpc_cq_end_client_metadata_read(grpc_completion_queue *cc, void *tag,
204 grpc_call *call,
205 grpc_event_finish_func on_finish,
206 void *user_data, size_t count,
207 grpc_metadata *elements) {
208 event *ev;
ctiller18b49ab2014-12-09 14:39:16 -0800209 gpr_mu_lock(&grpc_iomgr_mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800210 ev = add_locked(cc, GRPC_CLIENT_METADATA_READ, tag, call, on_finish,
211 user_data);
212 ev->base.data.client_metadata_read.count = count;
213 ev->base.data.client_metadata_read.elements = elements;
214 end_op_locked(cc, GRPC_CLIENT_METADATA_READ);
ctiller18b49ab2014-12-09 14:39:16 -0800215 gpr_mu_unlock(&grpc_iomgr_mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800216}
217
218void grpc_cq_end_finished(grpc_completion_queue *cc, void *tag, grpc_call *call,
219 grpc_event_finish_func on_finish, void *user_data,
ctiller2845cad2014-12-15 15:14:12 -0800220 grpc_status_code status, const char *details,
221 grpc_metadata *metadata_elements,
222 size_t metadata_count) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800223 event *ev;
ctiller18b49ab2014-12-09 14:39:16 -0800224 gpr_mu_lock(&grpc_iomgr_mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800225 ev = add_locked(cc, GRPC_FINISHED, tag, call, on_finish, user_data);
ctiller2845cad2014-12-15 15:14:12 -0800226 ev->base.data.finished.status = status;
227 ev->base.data.finished.details = details;
228 ev->base.data.finished.metadata_count = metadata_count;
229 ev->base.data.finished.metadata_elements = metadata_elements;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800230 end_op_locked(cc, GRPC_FINISHED);
ctiller18b49ab2014-12-09 14:39:16 -0800231 gpr_mu_unlock(&grpc_iomgr_mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800232}
233
234void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call,
235 grpc_event_finish_func on_finish, void *user_data,
236 const char *method, const char *host,
237 gpr_timespec deadline, size_t metadata_count,
238 grpc_metadata *metadata_elements) {
239 event *ev;
ctiller18b49ab2014-12-09 14:39:16 -0800240 gpr_mu_lock(&grpc_iomgr_mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800241 ev = add_locked(cc, GRPC_SERVER_RPC_NEW, tag, call, on_finish, user_data);
242 ev->base.data.server_rpc_new.method = method;
243 ev->base.data.server_rpc_new.host = host;
244 ev->base.data.server_rpc_new.deadline = deadline;
245 ev->base.data.server_rpc_new.metadata_count = metadata_count;
246 ev->base.data.server_rpc_new.metadata_elements = metadata_elements;
247 end_op_locked(cc, GRPC_SERVER_RPC_NEW);
ctiller18b49ab2014-12-09 14:39:16 -0800248 gpr_mu_unlock(&grpc_iomgr_mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800249}
250
251/* Create a GRPC_QUEUE_SHUTDOWN event without queuing it anywhere */
252static event *create_shutdown_event() {
253 event *ev = gpr_malloc(sizeof(event));
254 ev->base.type = GRPC_QUEUE_SHUTDOWN;
255 ev->base.call = NULL;
256 ev->base.tag = NULL;
257 ev->on_finish = null_on_finish;
258 return ev;
259}
260
261grpc_event *grpc_completion_queue_next(grpc_completion_queue *cc,
262 gpr_timespec deadline) {
263 event *ev = NULL;
264
ctiller18b49ab2014-12-09 14:39:16 -0800265 gpr_mu_lock(&grpc_iomgr_mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800266 for (;;) {
267 if (cc->queue != NULL) {
nnoble0c475f02014-12-05 15:37:39 -0800268 gpr_uintptr bucket;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800269 ev = cc->queue;
nnoble0c475f02014-12-05 15:37:39 -0800270 bucket = ((gpr_uintptr)ev->base.tag) % NUM_TAG_BUCKETS;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800271 cc->queue = ev->queue_next;
272 ev->queue_next->queue_prev = ev->queue_prev;
273 ev->queue_prev->queue_next = ev->queue_next;
274 ev->bucket_next->bucket_prev = ev->bucket_prev;
275 ev->bucket_prev->bucket_next = ev->bucket_next;
276 if (ev == cc->buckets[bucket]) {
277 cc->buckets[bucket] = ev->bucket_next;
278 if (ev == cc->buckets[bucket]) {
279 cc->buckets[bucket] = NULL;
280 }
281 }
282 if (cc->queue == ev) {
283 cc->queue = NULL;
284 }
285 break;
286 }
287 if (cc->shutdown) {
288 ev = create_shutdown_event();
289 break;
290 }
ctiller18b49ab2014-12-09 14:39:16 -0800291 if (cc->allow_polling && grpc_iomgr_work(deadline)) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800292 continue;
293 }
ctiller18b49ab2014-12-09 14:39:16 -0800294 if (gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, deadline)) {
295 gpr_mu_unlock(&grpc_iomgr_mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800296 return NULL;
297 }
298 }
ctiller18b49ab2014-12-09 14:39:16 -0800299 gpr_mu_unlock(&grpc_iomgr_mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800300 GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base);
301 return &ev->base;
302}
303
304static event *pluck_event(grpc_completion_queue *cc, void *tag) {
nnoble0c475f02014-12-05 15:37:39 -0800305 gpr_uintptr bucket = ((gpr_uintptr)tag) % NUM_TAG_BUCKETS;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800306 event *ev = cc->buckets[bucket];
307 if (ev == NULL) return NULL;
308 do {
309 if (ev->base.tag == tag) {
310 ev->queue_next->queue_prev = ev->queue_prev;
311 ev->queue_prev->queue_next = ev->queue_next;
312 ev->bucket_next->bucket_prev = ev->bucket_prev;
313 ev->bucket_prev->bucket_next = ev->bucket_next;
314 if (ev == cc->buckets[bucket]) {
315 cc->buckets[bucket] = ev->bucket_next;
316 if (ev == cc->buckets[bucket]) {
317 cc->buckets[bucket] = NULL;
318 }
319 }
320 if (cc->queue == ev) {
321 cc->queue = ev->queue_next;
322 if (cc->queue == ev) {
323 cc->queue = NULL;
324 }
325 }
326 return ev;
327 }
328 ev = ev->bucket_next;
329 } while (ev != cc->buckets[bucket]);
330 return NULL;
331}
332
333grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
334 gpr_timespec deadline) {
335 event *ev = NULL;
336
ctiller18b49ab2014-12-09 14:39:16 -0800337 gpr_mu_lock(&grpc_iomgr_mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800338 for (;;) {
339 if ((ev = pluck_event(cc, tag))) {
340 break;
341 }
342 if (cc->shutdown) {
343 ev = create_shutdown_event();
344 break;
345 }
ctiller18b49ab2014-12-09 14:39:16 -0800346 if (cc->allow_polling && grpc_iomgr_work(deadline)) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800347 continue;
348 }
ctiller18b49ab2014-12-09 14:39:16 -0800349 if (gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, deadline)) {
350 gpr_mu_unlock(&grpc_iomgr_mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800351 return NULL;
352 }
353 }
ctiller18b49ab2014-12-09 14:39:16 -0800354 gpr_mu_unlock(&grpc_iomgr_mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800355 GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base);
356 return &ev->base;
357}
358
359/* Shutdown simply drops a ref that we reserved at creation time; if we drop
360 to zero here, then enter shutdown mode and wake up any waiters */
361void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
362 if (gpr_unref(&cc->refs)) {
ctiller18b49ab2014-12-09 14:39:16 -0800363 gpr_mu_lock(&grpc_iomgr_mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800364 GPR_ASSERT(!cc->shutdown);
365 cc->shutdown = 1;
ctiller18b49ab2014-12-09 14:39:16 -0800366 gpr_cv_broadcast(&grpc_iomgr_cv);
367 gpr_mu_unlock(&grpc_iomgr_mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800368 }
369}
370
371void grpc_completion_queue_destroy(grpc_completion_queue *cc) {
372 GPR_ASSERT(cc->queue == NULL);
ctillerd79b4862014-12-17 16:36:59 -0800373 grpc_pollset_destroy(&cc->pollset);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800374 gpr_free(cc);
375}
376
377void grpc_event_finish(grpc_event *base) {
378 event *ev = (event *)base;
379 ev->on_finish(ev->on_finish_user_data, GRPC_OP_OK);
380 if (ev->base.call) {
381 grpc_call_internal_unref(ev->base.call);
382 }
383 gpr_free(ev);
384}
385
386void grpc_cq_dump_pending_ops(grpc_completion_queue *cc) {
387#ifndef NDEBUG
388 char tmp[256];
389 char *p = tmp;
390 int i;
391
392 for (i = 0; i < GRPC_COMPLETION_DO_NOT_USE; i++) {
393 p += sprintf(p, " %d", (int)cc->pending_op_count[i]);
394 }
395
396 gpr_log(GPR_INFO, "pending ops:%s", tmp);
397#endif
398}
ctillerd79b4862014-12-17 16:36:59 -0800399
400grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) {
401 return &cc->pollset;
402}