blob: 5360d7c6c383b5e60582d3ede4747d3a1f4724d0 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
3 * Copyright 2014, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/surface/completion_queue.h"
35
36#include <math.h>
37#include <stdio.h>
38
39#include <grpc/support/alloc.h>
40#include <grpc/support/log.h>
41#include <grpc/support/thd.h>
42#include <grpc/support/time.h>
43
44typedef struct test_thread_options {
45 gpr_event on_started;
46 gpr_event *start;
47 gpr_event on_finished;
48 grpc_completion_queue *cc;
49 int iterations;
50} test_thread_options;
51
52static void producer_thread(void *arg) {
53 test_thread_options *opt = arg;
54 int i;
55
56 gpr_event_set(&opt->on_started, (void *)(gpr_intptr) 1);
57 GPR_ASSERT(gpr_event_wait(opt->start, gpr_inf_future));
58
59 for (i = 0; i < opt->iterations; i++) {
60 grpc_cq_begin_op(opt->cc, NULL, GRPC_WRITE_ACCEPTED);
61 grpc_cq_end_write_accepted(opt->cc, (void *)(gpr_intptr) 1, NULL, NULL,
62 NULL, GRPC_OP_OK);
63 }
64
65 gpr_event_set(&opt->on_finished, (void *)(gpr_intptr) 1);
66}
67
68static void consumer_thread(void *arg) {
69 test_thread_options *opt = arg;
70 grpc_event *ev;
71
72 gpr_event_set(&opt->on_started, (void *)(gpr_intptr) 1);
73 GPR_ASSERT(gpr_event_wait(opt->start, gpr_inf_future));
74
75 for (;;) {
76 ev = grpc_completion_queue_next(opt->cc, gpr_inf_future);
77 switch (ev->type) {
78 case GRPC_WRITE_ACCEPTED:
79 break;
80 case GRPC_QUEUE_SHUTDOWN:
81 gpr_event_set(&opt->on_finished, (void *)(gpr_intptr) 1);
82 return;
83 default:
84 gpr_log(GPR_ERROR, "Invalid event received: %d", ev->type);
85 abort();
86 }
87 grpc_event_finish(ev);
88 }
89}
90
91double ops_per_second(int consumers, int producers, int iterations) {
92 test_thread_options *options =
93 gpr_malloc((producers + consumers) * sizeof(test_thread_options));
94 gpr_event start = GPR_EVENT_INIT;
95 grpc_completion_queue *cc = grpc_completion_queue_create();
96 int i;
97 gpr_timespec t_start, t_end, t_delta;
98
99 /* start all threads: they will wait for phase1 */
100 for (i = 0; i < producers + consumers; i++) {
101 gpr_thd_id id;
102 gpr_event_init(&options[i].on_started);
103 gpr_event_init(&options[i].on_finished);
104 options[i].start = &start;
105 options[i].cc = cc;
106 options[i].iterations = iterations;
107 GPR_ASSERT(gpr_thd_new(&id,
108 i < producers ? producer_thread : consumer_thread,
109 options + i, NULL));
110 gpr_event_wait(&options[i].on_started, gpr_inf_future);
111 }
112
113 /* start the benchmark */
114 t_start = gpr_now();
115 gpr_event_set(&start, (void *)(gpr_intptr) 1);
116
117 /* wait for producers to finish */
118 for (i = 0; i < producers; i++) {
119 GPR_ASSERT(gpr_event_wait(&options[i].on_finished, gpr_inf_future));
120 }
121
122 /* in parallel, we shutdown the completion channel - all events should still
123 be consumed */
124 grpc_completion_queue_shutdown(cc);
125
126 /* join all threads */
127 for (i = producers; i < producers + consumers; i++) {
128 GPR_ASSERT(gpr_event_wait(&options[i].on_finished, gpr_inf_future));
129 }
130 t_end = gpr_now();
131
132 /* destroy the completion channel */
133 grpc_completion_queue_destroy(cc);
134
135 gpr_free(options);
136
137 t_delta = gpr_time_sub(t_end, t_start);
138 return (t_delta.tv_sec + 1e-9 * t_delta.tv_nsec) / (producers * iterations);
139}
140
141double ops_per_second_top(int consumers, int producers) {
142 return ops_per_second(consumers, producers, 1000000 / producers);
143}
144
145int main(void) {
146 const int counts[] = {1, 2, 3, 4, 5, 6, 7, 8, 12, 16, 20, 24, 32, 40, 64};
147 const int ncounts = sizeof(counts) / sizeof(*counts);
148 int i, j;
149
150 printf("\"\",");
151 for (i = 0; i < ncounts; i++) {
152 int producers = counts[i];
153 printf("%d%s", producers, i == ncounts - 1 ? "\n" : ",");
154 }
155
156 for (j = 0; j < ncounts; j++) {
157 int consumers = counts[j];
158 printf("%d,", consumers);
159 for (i = 0; i < ncounts; i++) {
160 int producers = counts[i];
161 printf("%f%s", ops_per_second_top(consumers, producers),
162 i == ncounts - 1 ? "\n" : ",");
163 fflush(stdout);
164 }
165 }
166
167 return 0;
168}