blob: a7d611579f38e46c2948fc63d00888ed9c29fa76 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
3 * Copyright 2014, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/surface/completion_queue.h"
35
36#include <stdio.h>
37#include <string.h>
38
39#include "src/core/eventmanager/em.h"
40#include "src/core/surface/call.h"
41#include "src/core/surface/event_string.h"
42#include "src/core/surface/surface_em.h"
43#include "src/core/surface/surface_trace.h"
44#include <grpc/support/alloc.h>
45#include <grpc/support/atm.h>
46#include <grpc/support/log.h>
47#include <grpc/support/string.h>
48
49#define NUM_TAG_BUCKETS 31
50
51/* A single event: extends grpc_event to form a linked list with a destruction
52 function (on_finish) that is hidden from outside this module */
53typedef struct event {
54 grpc_event base;
55 grpc_event_finish_func on_finish;
56 void *on_finish_user_data;
57 struct event *queue_next;
58 struct event *queue_prev;
59 struct event *bucket_next;
60 struct event *bucket_prev;
61} event;
62
63/* Completion queue structure */
64struct grpc_completion_queue {
65 grpc_em *em;
66 int allow_polling;
67
68 /* When refs drops to zero, we are in shutdown mode, and will be destroyable
69 once all queued events are drained */
70 gpr_refcount refs;
71 /* 0 initially, 1 once we've begun shutting down */
72 int shutdown;
73 /* Head of a linked list of queued events (prev points to the last element) */
74 event *queue;
75 /* Fixed size chained hash table of events for pluck() */
76 event *buckets[NUM_TAG_BUCKETS];
77
78#ifndef NDEBUG
79 /* Debug support: track which operations are in flight at any given time */
80 gpr_atm pending_op_count[GRPC_COMPLETION_DO_NOT_USE];
81#endif
82};
83
84/* Default do-nothing on_finish function */
85static void null_on_finish(void *user_data, grpc_op_error error) {}
86
87grpc_completion_queue *grpc_completion_queue_create() {
88 grpc_completion_queue *cc = gpr_malloc(sizeof(grpc_completion_queue));
89 memset(cc, 0, sizeof(*cc));
90 /* Initial ref is dropped by grpc_completion_queue_shutdown */
91 gpr_ref_init(&cc->refs, 1);
92 cc->em = grpc_surface_em();
93 cc->allow_polling = 1;
94 return cc;
95}
96
97void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc) {
98 cc->allow_polling = 0;
99}
100
101/* Create and append an event to the queue. Returns the event so that its data
102 members can be filled in.
103 Requires cc->em->mu locked. */
104static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type,
105 void *tag, grpc_call *call,
106 grpc_event_finish_func on_finish, void *user_data) {
107 event *ev = gpr_malloc(sizeof(event));
108 gpr_intptr bucket = ((gpr_intptr)tag) % NUM_TAG_BUCKETS;
109 GPR_ASSERT(!cc->shutdown);
110 ev->base.type = type;
111 ev->base.tag = tag;
112 ev->base.call = call;
113 ev->on_finish = on_finish ? on_finish : null_on_finish;
114 ev->on_finish_user_data = user_data;
115 if (cc->queue == NULL) {
116 cc->queue = ev->queue_next = ev->queue_prev = ev;
117 } else {
118 ev->queue_next = cc->queue;
119 ev->queue_prev = cc->queue->queue_prev;
120 ev->queue_next->queue_prev = ev->queue_prev->queue_next = ev;
121 }
122 if (cc->buckets[bucket] == NULL) {
123 cc->buckets[bucket] = ev->bucket_next = ev->bucket_prev = ev;
124 } else {
125 ev->bucket_next = cc->buckets[bucket];
126 ev->bucket_prev = cc->buckets[bucket]->bucket_prev;
127 ev->bucket_next->bucket_prev = ev->bucket_prev->bucket_next = ev;
128 }
129 gpr_cv_broadcast(&cc->em->cv);
130 return ev;
131}
132
133void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call,
134 grpc_completion_type type) {
135 gpr_ref(&cc->refs);
136 if (call) grpc_call_internal_ref(call);
137#ifndef NDEBUG
138 gpr_atm_no_barrier_fetch_add(&cc->pending_op_count[type], 1);
139#endif
140}
141
142/* Signal the end of an operation - if this is the last waiting-to-be-queued
143 event, then enter shutdown mode */
144static void end_op_locked(grpc_completion_queue *cc,
145 grpc_completion_type type) {
146#ifndef NDEBUG
147 GPR_ASSERT(gpr_atm_full_fetch_add(&cc->pending_op_count[type], -1) > 0);
148#endif
149 if (gpr_unref(&cc->refs)) {
150 GPR_ASSERT(!cc->shutdown);
151 cc->shutdown = 1;
152 gpr_cv_broadcast(&cc->em->cv);
153 }
154}
155
156void grpc_cq_end_read(grpc_completion_queue *cc, void *tag, grpc_call *call,
157 grpc_event_finish_func on_finish, void *user_data,
158 grpc_byte_buffer *read) {
159 event *ev;
160 gpr_mu_lock(&cc->em->mu);
161 ev = add_locked(cc, GRPC_READ, tag, call, on_finish, user_data);
162 ev->base.data.read = read;
163 end_op_locked(cc, GRPC_READ);
164 gpr_mu_unlock(&cc->em->mu);
165}
166
167void grpc_cq_end_invoke_accepted(grpc_completion_queue *cc, void *tag,
168 grpc_call *call,
169 grpc_event_finish_func on_finish,
170 void *user_data, grpc_op_error error) {
171 event *ev;
172 gpr_mu_lock(&cc->em->mu);
173 ev = add_locked(cc, GRPC_INVOKE_ACCEPTED, tag, call, on_finish, user_data);
174 ev->base.data.invoke_accepted = error;
175 end_op_locked(cc, GRPC_INVOKE_ACCEPTED);
176 gpr_mu_unlock(&cc->em->mu);
177}
178
179void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag,
180 grpc_call *call,
181 grpc_event_finish_func on_finish,
182 void *user_data, grpc_op_error error) {
183 event *ev;
184 gpr_mu_lock(&cc->em->mu);
185 ev = add_locked(cc, GRPC_WRITE_ACCEPTED, tag, call, on_finish, user_data);
186 ev->base.data.write_accepted = error;
187 end_op_locked(cc, GRPC_WRITE_ACCEPTED);
188 gpr_mu_unlock(&cc->em->mu);
189}
190
191void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag,
192 grpc_call *call,
193 grpc_event_finish_func on_finish,
194 void *user_data, grpc_op_error error) {
195 event *ev;
196 gpr_mu_lock(&cc->em->mu);
197 ev = add_locked(cc, GRPC_FINISH_ACCEPTED, tag, call, on_finish, user_data);
198 ev->base.data.finish_accepted = error;
199 end_op_locked(cc, GRPC_FINISH_ACCEPTED);
200 gpr_mu_unlock(&cc->em->mu);
201}
202
203void grpc_cq_end_client_metadata_read(grpc_completion_queue *cc, void *tag,
204 grpc_call *call,
205 grpc_event_finish_func on_finish,
206 void *user_data, size_t count,
207 grpc_metadata *elements) {
208 event *ev;
209 gpr_mu_lock(&cc->em->mu);
210 ev = add_locked(cc, GRPC_CLIENT_METADATA_READ, tag, call, on_finish,
211 user_data);
212 ev->base.data.client_metadata_read.count = count;
213 ev->base.data.client_metadata_read.elements = elements;
214 end_op_locked(cc, GRPC_CLIENT_METADATA_READ);
215 gpr_mu_unlock(&cc->em->mu);
216}
217
218void grpc_cq_end_finished(grpc_completion_queue *cc, void *tag, grpc_call *call,
219 grpc_event_finish_func on_finish, void *user_data,
220 grpc_status status) {
221 event *ev;
222 gpr_mu_lock(&cc->em->mu);
223 ev = add_locked(cc, GRPC_FINISHED, tag, call, on_finish, user_data);
224 ev->base.data.finished = status;
225 end_op_locked(cc, GRPC_FINISHED);
226 gpr_mu_unlock(&cc->em->mu);
227}
228
229void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call,
230 grpc_event_finish_func on_finish, void *user_data,
231 const char *method, const char *host,
232 gpr_timespec deadline, size_t metadata_count,
233 grpc_metadata *metadata_elements) {
234 event *ev;
235 gpr_mu_lock(&cc->em->mu);
236 ev = add_locked(cc, GRPC_SERVER_RPC_NEW, tag, call, on_finish, user_data);
237 ev->base.data.server_rpc_new.method = method;
238 ev->base.data.server_rpc_new.host = host;
239 ev->base.data.server_rpc_new.deadline = deadline;
240 ev->base.data.server_rpc_new.metadata_count = metadata_count;
241 ev->base.data.server_rpc_new.metadata_elements = metadata_elements;
242 end_op_locked(cc, GRPC_SERVER_RPC_NEW);
243 gpr_mu_unlock(&cc->em->mu);
244}
245
246/* Create a GRPC_QUEUE_SHUTDOWN event without queuing it anywhere */
247static event *create_shutdown_event() {
248 event *ev = gpr_malloc(sizeof(event));
249 ev->base.type = GRPC_QUEUE_SHUTDOWN;
250 ev->base.call = NULL;
251 ev->base.tag = NULL;
252 ev->on_finish = null_on_finish;
253 return ev;
254}
255
256grpc_event *grpc_completion_queue_next(grpc_completion_queue *cc,
257 gpr_timespec deadline) {
258 event *ev = NULL;
259
260 gpr_mu_lock(&cc->em->mu);
261 for (;;) {
262 if (cc->queue != NULL) {
263 gpr_intptr bucket;
264 ev = cc->queue;
265 bucket = ((gpr_intptr)ev->base.tag) % NUM_TAG_BUCKETS;
266 cc->queue = ev->queue_next;
267 ev->queue_next->queue_prev = ev->queue_prev;
268 ev->queue_prev->queue_next = ev->queue_next;
269 ev->bucket_next->bucket_prev = ev->bucket_prev;
270 ev->bucket_prev->bucket_next = ev->bucket_next;
271 if (ev == cc->buckets[bucket]) {
272 cc->buckets[bucket] = ev->bucket_next;
273 if (ev == cc->buckets[bucket]) {
274 cc->buckets[bucket] = NULL;
275 }
276 }
277 if (cc->queue == ev) {
278 cc->queue = NULL;
279 }
280 break;
281 }
282 if (cc->shutdown) {
283 ev = create_shutdown_event();
284 break;
285 }
286 if (cc->allow_polling && grpc_em_work(cc->em, deadline)) {
287 continue;
288 }
289 if (gpr_cv_wait(&cc->em->cv, &cc->em->mu, deadline)) {
290 gpr_mu_unlock(&cc->em->mu);
291 return NULL;
292 }
293 }
294 gpr_mu_unlock(&cc->em->mu);
295 GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base);
296 return &ev->base;
297}
298
299static event *pluck_event(grpc_completion_queue *cc, void *tag) {
300 gpr_intptr bucket = ((gpr_intptr)tag) % NUM_TAG_BUCKETS;
301 event *ev = cc->buckets[bucket];
302 if (ev == NULL) return NULL;
303 do {
304 if (ev->base.tag == tag) {
305 ev->queue_next->queue_prev = ev->queue_prev;
306 ev->queue_prev->queue_next = ev->queue_next;
307 ev->bucket_next->bucket_prev = ev->bucket_prev;
308 ev->bucket_prev->bucket_next = ev->bucket_next;
309 if (ev == cc->buckets[bucket]) {
310 cc->buckets[bucket] = ev->bucket_next;
311 if (ev == cc->buckets[bucket]) {
312 cc->buckets[bucket] = NULL;
313 }
314 }
315 if (cc->queue == ev) {
316 cc->queue = ev->queue_next;
317 if (cc->queue == ev) {
318 cc->queue = NULL;
319 }
320 }
321 return ev;
322 }
323 ev = ev->bucket_next;
324 } while (ev != cc->buckets[bucket]);
325 return NULL;
326}
327
328grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
329 gpr_timespec deadline) {
330 event *ev = NULL;
331
332 gpr_mu_lock(&cc->em->mu);
333 for (;;) {
334 if ((ev = pluck_event(cc, tag))) {
335 break;
336 }
337 if (cc->shutdown) {
338 ev = create_shutdown_event();
339 break;
340 }
341 if (cc->allow_polling && grpc_em_work(cc->em, deadline)) {
342 continue;
343 }
344 if (gpr_cv_wait(&cc->em->cv, &cc->em->mu, deadline)) {
345 gpr_mu_unlock(&cc->em->mu);
346 return NULL;
347 }
348 }
349 gpr_mu_unlock(&cc->em->mu);
350 GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base);
351 return &ev->base;
352}
353
354/* Shutdown simply drops a ref that we reserved at creation time; if we drop
355 to zero here, then enter shutdown mode and wake up any waiters */
356void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
357 if (gpr_unref(&cc->refs)) {
358 gpr_mu_lock(&cc->em->mu);
359 GPR_ASSERT(!cc->shutdown);
360 cc->shutdown = 1;
361 gpr_cv_broadcast(&cc->em->cv);
362 gpr_mu_unlock(&cc->em->mu);
363 }
364}
365
366void grpc_completion_queue_destroy(grpc_completion_queue *cc) {
367 GPR_ASSERT(cc->queue == NULL);
368 gpr_free(cc);
369}
370
371void grpc_event_finish(grpc_event *base) {
372 event *ev = (event *)base;
373 ev->on_finish(ev->on_finish_user_data, GRPC_OP_OK);
374 if (ev->base.call) {
375 grpc_call_internal_unref(ev->base.call);
376 }
377 gpr_free(ev);
378}
379
380void grpc_cq_dump_pending_ops(grpc_completion_queue *cc) {
381#ifndef NDEBUG
382 char tmp[256];
383 char *p = tmp;
384 int i;
385
386 for (i = 0; i < GRPC_COMPLETION_DO_NOT_USE; i++) {
387 p += sprintf(p, " %d", (int)cc->pending_op_count[i]);
388 }
389
390 gpr_log(GPR_INFO, "pending ops:%s", tmp);
391#endif
392}