blob: 86e987bef0e421b441abe9ba680f5ed200e80a69 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
3 * Copyright 2014, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "test/core/transport/transport_end2end_tests.h"
35
36#include <stdarg.h>
37#include <stdio.h>
38#include <string.h>
39
40#include "src/core/transport/transport.h"
41#include <grpc/support/alloc.h>
42#include <grpc/support/log.h>
43#include <grpc/support/string.h>
44#include <grpc/support/thd.h>
45#include <grpc/support/useful.h>
46
47enum { REQUEST_DEADLINE = 200000 }; /* valgrind need a large value */
48
49static grpc_mdctx *g_metadata_context;
50
51static gpr_once g_pending_ops_init = GPR_ONCE_INIT;
52static gpr_mu g_mu;
53static gpr_cv g_cv;
54static int g_pending_ops;
55
56/* Defines a suite of tests that all GRPC transports should be able to pass */
57
58/******************************************************************************
59 * Testing framework
60 */
61
62/* Forward declarations */
63typedef struct test_fixture test_fixture;
64
65/* User data passed to the transport and handed to each callback */
66typedef struct test_user_data { test_fixture *fixture; } test_user_data;
67
68/* A message we expect to receive (forms a singly linked list with next) */
69typedef struct expected_message {
70 /* The next message expected */
71 struct expected_message *next;
72 /* The (owned) data that we expect to receive */
73 gpr_uint8 *data;
74 /* The length of the expected message */
75 size_t length;
76 /* How many bytes of the expected message have we received? */
77 size_t read_pos;
78 /* Have we received the GRPC_OP_BEGIN for this message */
79 int begun;
80} expected_message;
81
82/* Metadata we expect to receive */
83typedef struct expected_metadata {
84 struct expected_metadata *next;
85 struct expected_metadata *prev;
86 grpc_mdelem *metadata;
87} expected_metadata;
88
89/* Tracks a stream for a test. Forms a doubly-linked list with (prev, next) */
90typedef struct test_stream {
91 /* The owning fixture */
92 test_fixture *fixture;
93 /* The transport client stream */
94 grpc_stream *client_stream;
95 /* The transport server stream */
96 grpc_stream *server_stream;
97 /* Linked lists of messages expected on client and server */
98 expected_message *client_expected_messages;
99 expected_message *server_expected_messages;
100 expected_metadata *client_expected_metadata;
101 expected_metadata *server_expected_metadata;
102
103 /* Test streams are linked in the fixture */
104 struct test_stream *next;
105 struct test_stream *prev;
106} test_stream;
107
108/* A test_fixture tracks all transport state and expectations for a test */
109struct test_fixture {
110 gpr_mu mu;
111 gpr_cv cv; /* broadcast when expectation state has changed */
112
113 /* The transport instances */
114 grpc_transport *client_transport;
115 grpc_transport *server_transport;
116 /* User data for the transport instances - pointers to these are passed
117 to the transport. */
118 test_user_data client_ud;
119 test_user_data server_ud;
120
121 /* A pointer to the head of the tracked streams list, or NULL if no streams
122 are open */
123 test_stream *streams;
124};
125
126static void expect_metadata(test_stream *s, int from_client, const char *key,
127 const char *value);
128
129/* Convert some number of seconds into a gpr_timespec that many seconds in the
130 future */
131static gpr_timespec deadline_from_seconds(double deadline_seconds) {
132 return gpr_time_add(gpr_now(), gpr_time_from_micros(deadline_seconds * 1e6));
133}
134
135/* Init a test_user_data instance */
136static void init_user_data(test_user_data *ud, test_fixture *f,
137 grpc_transport_test_config *config, int is_client) {
138 ud->fixture = f;
139}
140
141/* Implements the alloc_recv_buffer transport callback */
142static gpr_slice alloc_recv_buffer(void *user_data, grpc_transport *transport,
143 grpc_stream *stream, size_t size_hint) {
144 return gpr_slice_malloc(size_hint);
145}
146
147static void pending_ops_cleanup() {
148 gpr_mu_destroy(&g_mu);
149 gpr_cv_destroy(&g_cv);
150}
151
152static void pending_ops_init() {
153 gpr_mu_init(&g_mu);
154 gpr_cv_init(&g_cv);
155 atexit(pending_ops_cleanup);
156}
157
158static void use_pending_ops() {
159 gpr_once_init(&g_pending_ops_init, pending_ops_init);
160}
161
162static void add_pending_op() {
163 use_pending_ops();
164 gpr_mu_lock(&g_mu);
165 g_pending_ops++;
166 gpr_mu_unlock(&g_mu);
167}
168
169static void end_pending_op() {
170 gpr_mu_lock(&g_mu);
171 g_pending_ops--;
172 gpr_cv_broadcast(&g_cv);
173 gpr_mu_unlock(&g_mu);
174}
175
176static void wait_pending_ops() {
177 use_pending_ops();
178 gpr_mu_lock(&g_mu);
179 while (g_pending_ops > 0) {
180 gpr_cv_wait(&g_cv, &g_mu, gpr_inf_future);
181 }
182 gpr_mu_unlock(&g_mu);
183}
184
185/* Implements the create_stream transport callback */
186static void create_stream(void *user_data, grpc_transport *transport,
187 const void *server_data) {
188 test_user_data *ud = user_data;
189 test_fixture *f = ud->fixture;
190 test_stream *stream;
191
192 GPR_ASSERT(ud == &f->server_ud);
193 GPR_ASSERT(transport == f->server_transport);
194
195 gpr_mu_lock(&f->mu);
196
197 /* Search streams for the peer to this stream */
198 if (!f->streams) goto done;
199 /* found the expecting stream */
200 stream = f->streams;
201 stream->server_stream = gpr_malloc(grpc_transport_stream_size(transport));
202 grpc_transport_init_stream(transport, stream->server_stream, server_data);
203
204done:
205 /* wakeup begin_stream, and maybe wait_and_verify */
206 gpr_cv_broadcast(&f->cv);
207 gpr_mu_unlock(&f->mu);
208}
209
210/* Search fixture streams for the test_stream instance holding a given transport
211 stream */
212static test_stream *find_test_stream(test_fixture *f, grpc_stream *stream) {
213 test_stream *s;
214
215 GPR_ASSERT(f->streams);
216 s = f->streams;
217 do {
218 if (s->client_stream == stream || s->server_stream == stream) {
219 return s;
220 }
221 } while (s != f->streams);
222
223 GPR_ASSERT(0 && "found");
224 return NULL;
225}
226
227/* Stringify a grpc_stream_state for debugging */
228static const char *state_name(grpc_stream_state state) {
229 switch (state) {
230 case GRPC_STREAM_OPEN:
231 return "GRPC_STREAM_OPEN";
232 case GRPC_STREAM_RECV_CLOSED:
233 return "GRPC_STREAM_RECV_CLOSED";
234 case GRPC_STREAM_SEND_CLOSED:
235 return "GRPC_STREAM_SEND_CLOSED";
236 case GRPC_STREAM_CLOSED:
237 return "GRPC_STREAM_CLOSED";
238 }
239 GPR_ASSERT(0 && "reachable");
240 return NULL;
241}
242
243typedef struct {
244 grpc_transport *transport;
245 grpc_stream *stream;
246} destroy_stream_args;
247
248static void destroy_stream(void *p) {
249 destroy_stream_args *a = p;
250 grpc_transport_destroy_stream(a->transport, a->stream);
251 gpr_free(a->stream);
252 gpr_free(a);
253 end_pending_op();
254}
255
256static void recv_batch(void *user_data, grpc_transport *transport,
257 grpc_stream *stream, grpc_stream_op *ops,
258 size_t ops_count, grpc_stream_state final_state) {
259 test_user_data *ud = user_data;
260 test_fixture *f = ud->fixture;
261 test_stream *s;
262 /* Pointer to the root pointer of either client or server expected messages;
263 not a simple pointer as we may need to manipulate the list (on receipt
264 of messages */
265 expected_message **expect_root_message;
266 expected_metadata **expect_root_metadata;
267 expected_metadata *emd;
268 size_t i, j;
269 char *hexstr1, *hexstr2;
270 int repeats = 0;
271
272 gpr_mu_lock(&f->mu);
273
274 s = find_test_stream(f, stream);
275 expect_root_message = s->client_stream == stream
276 ? &s->client_expected_messages
277 : &s->server_expected_messages;
278 expect_root_metadata = s->client_stream == stream
279 ? &s->client_expected_metadata
280 : &s->server_expected_metadata;
281
282 /* Debug log */
283 gpr_log(GPR_DEBUG, "recv_batch: %d ops on %s final_state=%s", ops_count,
284 s->client_stream == stream ? "client" : "server",
285 state_name(final_state));
286#define CLEAR_REPEATS \
287 if (repeats) { \
288 gpr_log(GPR_DEBUG, " + %d more", repeats); \
289 repeats = 0; \
290 }
291 for (i = 0; i < ops_count; i++) {
292 switch (ops[i].type) {
293 case GRPC_NO_OP:
294 CLEAR_REPEATS;
295 gpr_log(GPR_DEBUG, " [%02d] GRPC_NO_OP", i);
296 break;
297 case GRPC_OP_METADATA_BOUNDARY:
298 CLEAR_REPEATS;
299 gpr_log(GPR_DEBUG, " [%02d] GRPC_OP_METADATA_BOUNDARY", i);
300 break;
301 case GRPC_OP_METADATA:
302 CLEAR_REPEATS;
303 hexstr1 =
304 gpr_hexdump(grpc_mdstr_as_c_string(ops[i].data.metadata->key),
305 GPR_SLICE_LENGTH(ops[i].data.metadata->key->slice),
306 GPR_HEXDUMP_PLAINTEXT);
307 hexstr2 =
308 gpr_hexdump(grpc_mdstr_as_c_string(ops[i].data.metadata->value),
309 GPR_SLICE_LENGTH(ops[i].data.metadata->value->slice),
310 GPR_HEXDUMP_PLAINTEXT);
311 gpr_log(GPR_DEBUG, " [%02d] GRPC_OP_METADATA key=%s value=%s", i,
312 hexstr1, hexstr2);
313 gpr_free(hexstr1);
314 gpr_free(hexstr2);
315 break;
316 case GRPC_OP_BEGIN_MESSAGE:
317 CLEAR_REPEATS;
318 gpr_log(GPR_DEBUG, " [%02d] GRPC_OP_BEGIN_MESSAGE len=%d", i,
319 ops[i].data.begin_message.length);
320 break;
321 case GRPC_OP_DEADLINE:
322 CLEAR_REPEATS;
323 gpr_log(GPR_DEBUG, " [%02d] GRPC_OP_DEADLINE value=%d.%09d", i,
324 ops[i].data.deadline.tv_sec, ops[i].data.deadline.tv_nsec);
325 break;
326 case GRPC_OP_SLICE:
327 if (i && ops[i - 1].type == GRPC_OP_SLICE &&
328 GPR_SLICE_LENGTH(ops[i - 1].data.slice) ==
329 GPR_SLICE_LENGTH(ops[i].data.slice)) {
330 repeats++;
331 } else {
332 CLEAR_REPEATS;
333 gpr_log(GPR_DEBUG, " [%02d] GRPC_OP_SLICE len=%d", i,
334 GPR_SLICE_LENGTH(ops[i].data.slice));
335 }
336 break;
337 case GRPC_OP_FLOW_CTL_CB:
338 CLEAR_REPEATS;
339 gpr_log(GPR_DEBUG, " [%02d] GRPC_OP_FLOW_CTL_CB", i);
340 break;
341 }
342 }
343 CLEAR_REPEATS;
344
345 /* Iterate over operations, and verify them against expectations */
346 for (i = 0; i < ops_count; i++) {
347 switch (ops[i].type) {
348 case GRPC_NO_OP:
349 break;
350 case GRPC_OP_METADATA_BOUNDARY:
351 break;
352 case GRPC_OP_METADATA:
353 GPR_ASSERT(*expect_root_metadata && "must be expecting metadata");
354 emd = *expect_root_metadata;
355 if (emd == NULL) {
356 gpr_log(GPR_ERROR, "metadata not found");
357 abort();
358 }
359 do {
360 if (emd->metadata == ops[i].data.metadata) {
361 if (emd == *expect_root_metadata) {
362 if (emd->next == emd) {
363 *expect_root_metadata = NULL;
364 } else {
365 *expect_root_metadata = emd->next;
366 }
367 }
368 emd->next->prev = emd->prev;
369 emd->prev->next = emd->next;
370 grpc_mdelem_unref(emd->metadata);
371 grpc_mdelem_unref(ops[i].data.metadata);
372 gpr_free(emd);
373 emd = NULL;
374 break;
375 }
376 emd = emd->next;
377 } while (emd != *expect_root_metadata);
378 if (emd) {
379 gpr_log(GPR_ERROR, "metadata not found");
380 abort();
381 }
382 break;
383 case GRPC_OP_BEGIN_MESSAGE:
384 GPR_ASSERT(*expect_root_message && "must be expecting a message");
385 GPR_ASSERT((*expect_root_message)->read_pos == 0 &&
386 "must be at the start of a message");
387 GPR_ASSERT((*expect_root_message)->begun == 0 &&
388 "can only BEGIN a message once");
389 GPR_ASSERT((*expect_root_message)->length ==
390 ops[i].data.begin_message.length &&
391 "message lengths must match");
392 (*expect_root_message)->begun = 1;
393 break;
394 case GRPC_OP_SLICE:
395 GPR_ASSERT(*expect_root_message && "must be expecting a message");
396 GPR_ASSERT((*expect_root_message)->begun == 1 &&
397 "must have begun a message");
398 GPR_ASSERT((*expect_root_message)->read_pos +
399 GPR_SLICE_LENGTH(ops[i].data.slice) <=
400 (*expect_root_message)->length &&
401 "must not send more data than expected");
402 for (j = 0; j < GPR_SLICE_LENGTH(ops[i].data.slice); j++) {
403 GPR_ASSERT((*expect_root_message)
404 ->data[(*expect_root_message)->read_pos + j] ==
405 GPR_SLICE_START_PTR(ops[i].data.slice)[j] &&
406 "must send the correct message");
407 }
408 (*expect_root_message)->read_pos += GPR_SLICE_LENGTH(ops[i].data.slice);
409 if ((*expect_root_message)->read_pos ==
410 (*expect_root_message)->length) {
411 expected_message *great_success = *expect_root_message;
412 *expect_root_message = great_success->next;
413 gpr_free(great_success->data);
414 gpr_free(great_success);
415 }
416 gpr_slice_unref(ops[i].data.slice);
417 break;
418 case GRPC_OP_FLOW_CTL_CB:
419 GPR_ASSERT(0 && "allowed");
420 break;
421 case GRPC_OP_DEADLINE:
422 GPR_ASSERT(0 && "implemented");
423 break;
424 }
425 }
426
427 /* If the stream has become fully closed then we must destroy the transport
428 part of the stream */
429 if (final_state == GRPC_STREAM_CLOSED) {
430 destroy_stream_args *dsa = gpr_malloc(sizeof(destroy_stream_args));
431 gpr_thd_id id;
432 dsa->transport = transport;
433 dsa->stream = stream;
434 /* start a thread after incrementing a pending op counter (so we can wait
435 at test completion */
436 add_pending_op();
437 gpr_thd_new(&id, destroy_stream, dsa, NULL);
438 if (stream == s->client_stream) {
439 GPR_ASSERT(s->client_expected_messages == NULL &&
440 "must receive all expected messages");
441 s->client_stream = NULL;
442 } else {
443 GPR_ASSERT(s->server_expected_messages == NULL &&
444 "must receive all expected messages");
445 s->server_stream = NULL;
446 }
447 /* And if both the client and the server report fully closed, we can
448 unlink the stream object entirely */
449 if (s->client_stream == NULL && s->server_stream == NULL) {
450 s->next->prev = s->prev;
451 s->prev->next = s->next;
452 if (s == f->streams) {
453 if (s->next == f->streams) {
454 f->streams = NULL;
455 } else {
456 f->streams = s->next;
457 }
458 }
459 }
460 }
461
462 /* wakeup wait_and_verify */
463 gpr_cv_broadcast(&f->cv);
464 gpr_mu_unlock(&f->mu);
465}
466
467static void close_transport(void *user_data, grpc_transport *transport) {}
468
nnoble0c475f02014-12-05 15:37:39 -0800469static void recv_goaway(void *user_data, grpc_transport *transport,
470 grpc_status_code status, gpr_slice debug) {
471 gpr_slice_unref(debug);
472}
473
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800474static grpc_transport_callbacks transport_callbacks = {
nnoble0c475f02014-12-05 15:37:39 -0800475 alloc_recv_buffer, create_stream, recv_batch, recv_goaway, close_transport};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800476
477/* Helper for tests to create a stream.
478 Arguments:
479 s - uninitialized test_stream struct to begin
480 f - test fixture to associate this stream with
481 method, host, deadline_seconds - header fields for the stream */
482static void begin_stream(test_stream *s, test_fixture *f, const char *method,
483 const char *host, double deadline_seconds) {
484 /* Deadline to initiate the stream (prevents the tests from hanging
485 forever) */
486 gpr_timespec deadline = deadline_from_seconds(10.0);
487 grpc_stream_op_buffer sopb;
488
489 grpc_sopb_init(&sopb);
490
491 gpr_mu_lock(&f->mu);
492
493 s->fixture = f;
494 s->client_stream =
495 gpr_malloc(grpc_transport_stream_size(f->client_transport));
496 /* server stream will be set once it's received by the peer transport */
497 s->server_stream = NULL;
498 s->client_expected_messages = NULL;
499 s->server_expected_messages = NULL;
500 s->client_expected_metadata = NULL;
501 s->server_expected_metadata = NULL;
502
503 if (f->streams) {
504 s->next = f->streams;
505 s->prev = s->next->prev;
506 s->next->prev = s->prev->next = s;
507 } else {
508 s->next = s->prev = s;
509 }
510 f->streams = s;
511
512 gpr_mu_unlock(&f->mu);
513
514 GPR_ASSERT(0 == grpc_transport_init_stream(f->client_transport,
515 s->client_stream, NULL));
516
517#define ADDMD(k, v) \
518 do { \
519 grpc_mdelem *md = grpc_mdelem_from_strings(g_metadata_context, (k), (v)); \
520 grpc_sopb_add_metadata(&sopb, md); \
521 expect_metadata(s, 1, (k), (v)); \
522 } while (0)
523
524 ADDMD(":path", method);
525 ADDMD(":authority", host);
526 ADDMD(":method", "POST");
527 grpc_transport_send_batch(f->client_transport, s->client_stream, sopb.ops,
528 sopb.nops, 0);
529 sopb.nops = 0;
530
531 grpc_sopb_destroy(&sopb);
532
533 /* wait for the server side stream to be created */
534 gpr_mu_lock(&f->mu);
535 while (s->server_stream == NULL) {
536 GPR_ASSERT(0 == gpr_cv_wait(&f->cv, &f->mu, deadline));
537 }
538 gpr_mu_unlock(&f->mu);
539}
540
541static grpc_transport_setup_result setup_transport(
542 test_fixture *f, grpc_transport **set_transport, void *user_data,
543 grpc_transport *transport) {
544 grpc_transport_setup_result result;
545
546 gpr_mu_lock(&f->mu);
547 *set_transport = transport;
548 gpr_cv_broadcast(&f->cv);
549 gpr_mu_unlock(&f->mu);
550
551 result.callbacks = &transport_callbacks;
552 result.user_data = user_data;
553 return result;
554}
555
556static grpc_transport_setup_result setup_server_transport(
557 void *arg, grpc_transport *transport, grpc_mdctx *mdctx) {
558 test_fixture *f = arg;
559 return setup_transport(f, &f->server_transport, &f->server_ud, transport);
560}
561
562static grpc_transport_setup_result setup_client_transport(
563 void *arg, grpc_transport *transport, grpc_mdctx *mdctx) {
564 test_fixture *f = arg;
565 return setup_transport(f, &f->client_transport, &f->client_ud, transport);
566}
567
568/* Begin a test
569
570 Arguments:
571 f - uninitialized test_fixture struct
572 config - test configuration for this test
573 name - the name of this test */
574static void begin_test(test_fixture *f, grpc_transport_test_config *config,
575 const char *name) {
576 gpr_timespec timeout = gpr_time_add(gpr_now(), gpr_time_from_micros(100e6));
577
578 gpr_log(GPR_INFO, "BEGIN: %s/%s", name, config->name);
579
580 gpr_mu_init(&f->mu);
581 gpr_cv_init(&f->cv);
582
583 f->streams = NULL;
584
585 init_user_data(&f->client_ud, f, config, 1);
586 init_user_data(&f->server_ud, f, config, 0);
587
588 f->client_transport = NULL;
589 f->server_transport = NULL;
590
591 GPR_ASSERT(0 ==
592 config->create_transport(setup_client_transport, f,
593 setup_server_transport, f,
594 g_metadata_context));
595
596 gpr_mu_lock(&f->mu);
597 while (!f->client_transport || !f->server_transport) {
598 GPR_ASSERT(gpr_cv_wait(&f->cv, &f->mu, timeout));
599 }
600 gpr_mu_unlock(&f->mu);
601}
602
603/* Enumerate expected messages on a stream */
604static void enumerate_expected_messages(
605 test_stream *s, expected_message *root, const char *stream_tag,
606 void (*cb)(void *user, const char *fmt, ...), void *user) {
607 expected_message *msg;
608
609 for (msg = root; msg; msg = msg->next) {
610 cb(user,
611 "Waiting for message to finish: "
612 "length=%zu read_pos=%zu begun=%d",
613 msg->length, msg->read_pos);
614 }
615}
616
617/* Walk through everything that is still waiting to happen, and call 'cb' with
618 userdata 'user' for that expectation. */
619static void enumerate_expectations(test_fixture *f,
620 void (*cb)(void *user, const char *fmt, ...),
621 void *user) {
622 test_stream *stream;
623
624 if (f->streams) {
625 stream = f->streams;
626 do {
627 cb(user,
628 "Waiting for request to close: "
629 "client=%p, server=%p",
630 stream->client_stream, stream->server_stream);
631 enumerate_expected_messages(stream, stream->client_expected_messages,
632 "client", cb, user);
633 enumerate_expected_messages(stream, stream->server_expected_messages,
634 "server", cb, user);
635 stream = stream->next;
636 } while (stream != f->streams);
637 }
638}
639
640/* Callback for enumerate_expectations, that increments an integer each time
641 an expectation is seen */
642static void increment_expectation_count(void *p, const char *fmt, ...) {
643 ++*(int *)p;
644}
645
646/* Returns the count of pending expectations in a fixture. Requires mu taken */
647static int count_expectations(test_fixture *f) {
648 int n = 0;
649 enumerate_expectations(f, increment_expectation_count, &n);
650 return n;
651}
652
653/* Callback for enumerate_expectations that adds an expectation to the log */
654static void dump_expectation(void *p, const char *fmt, ...) {
jtattermusch97fb3f62014-12-08 15:13:41 -0800655 char *str;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800656 va_list args;
657 va_start(args, fmt);
658
jtattermusch97fb3f62014-12-08 15:13:41 -0800659 gpr_asprintf(&str, fmt, args);
660 gpr_log(GPR_INFO, "EXPECTED: %s", str);
661 gpr_free(str);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800662
663 va_end(args);
664}
665
666/* Add all pending expectations to the log */
667static void dump_expectations(test_fixture *f) {
668 enumerate_expectations(f, dump_expectation, NULL);
669}
670
671/* Wait until all expectations are completed, or crash */
672static void wait_and_verify(test_fixture *f) {
673 gpr_timespec deadline = deadline_from_seconds(10.0);
674
675 gpr_mu_lock(&f->mu);
676 while (count_expectations(f) > 0) {
677 gpr_log(GPR_INFO, "waiting for expectations to complete");
678 if (gpr_cv_wait(&f->cv, &f->mu, deadline)) {
679 gpr_log(GPR_ERROR, "Timeout waiting for expectation completion");
680 dump_expectations(f);
681 gpr_mu_unlock(&f->mu);
682 abort();
683 }
684 }
685 gpr_mu_unlock(&f->mu);
686}
687
688/* Finish a test */
689static void end_test(test_fixture *f) {
690 wait_and_verify(f);
691
692 grpc_transport_close(f->client_transport);
693 grpc_transport_close(f->server_transport);
694 grpc_transport_destroy(f->client_transport);
695 grpc_transport_destroy(f->server_transport);
696
697 wait_pending_ops();
698}
699
700/* Generate a test slice filled with {0,1,2,3,...,255,0,1,2,3,4,...} */
701static gpr_slice generate_test_data(size_t length) {
702 gpr_slice slice = gpr_slice_malloc(length);
jtattermusch57c6f0c2014-12-11 12:28:56 -0800703 size_t i;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800704 for (i = 0; i < length; i++) {
705 GPR_SLICE_START_PTR(slice)[i] = i;
706 }
707 return slice;
708}
709
710/* Add an expected message to the end of a list with root root */
711static void append_expected_message(expected_message **root,
712 expected_message *message) {
713 expected_message *end;
714
715 if (!*root) {
716 *root = message;
717 return;
718 }
719
720 for (end = *root; end->next; end = end->next)
721 ;
722 end->next = message;
723}
724
725/* Add an expected message on stream 's''.
726 If from_client==1, expect it on the server, otherwise expect it on the client
727 Variadic parameters are a NULL-terminated list of pointers to slices that
728 should be expected as payload */
729static void expect_message(test_stream *s, int from_client,
730 /* gpr_slice* */...) {
731 va_list args;
732 gpr_slice *slice;
733 size_t capacity = 32;
734 size_t length = 0;
735 gpr_uint8 *buffer = gpr_malloc(capacity);
736 expected_message *e;
737
738 va_start(args, from_client);
739 while ((slice = va_arg(args, gpr_slice *))) {
740 while (GPR_SLICE_LENGTH(*slice) + length > capacity) {
741 capacity *= 2;
742 buffer = gpr_realloc(buffer, capacity);
743 }
744 memcpy(buffer + length, GPR_SLICE_START_PTR(*slice),
745 GPR_SLICE_LENGTH(*slice));
746 length += GPR_SLICE_LENGTH(*slice);
747 }
748 va_end(args);
749
750 e = gpr_malloc(sizeof(expected_message));
751 e->data = buffer;
752 e->length = length;
753 e->read_pos = 0;
754 e->begun = 0;
755 e->next = NULL;
756
757 gpr_mu_lock(&s->fixture->mu);
758 append_expected_message(
759 from_client ? &s->server_expected_messages : &s->client_expected_messages,
760 e);
761 gpr_mu_unlock(&s->fixture->mu);
762}
763
764static void expect_metadata(test_stream *s, int from_client, const char *key,
765 const char *value) {
766 expected_metadata *e = gpr_malloc(sizeof(expected_metadata));
767 expected_metadata **root =
768 from_client ? &s->server_expected_metadata : &s->client_expected_metadata;
769 e->metadata = grpc_mdelem_from_strings(g_metadata_context, key, value);
770 gpr_mu_lock(&s->fixture->mu);
771 if (!*root) {
772 *root = e;
773 e->next = e->prev = e;
774 } else {
775 e->next = *root;
776 e->prev = e->next->prev;
777 e->next->prev = e->prev->next = e;
778 }
779 gpr_mu_unlock(&s->fixture->mu);
780}
781
782/******************************************************************************
783 * Actual unit tests
784 */
785
786/* Test that we can create, begin, and end a test */
787static void test_no_op(grpc_transport_test_config *config) {
788 test_fixture f;
789 begin_test(&f, config, __FUNCTION__);
790 end_test(&f);
791}
792
793/* Test that a request can be initiated and terminated normally */
794static void test_simple_request(grpc_transport_test_config *config) {
795 test_fixture f;
796 test_stream s;
797
798 begin_test(&f, config, __FUNCTION__);
799 begin_stream(&s, &f, "/Test", "foo.google.com", 10);
800 grpc_transport_send_batch(f.client_transport, s.client_stream, NULL, 0, 1);
801 grpc_transport_send_batch(f.server_transport, s.server_stream, NULL, 0, 1);
802 end_test(&f);
803}
804
805/* Test that a request can be aborted by the client */
806static void test_can_abort_client(grpc_transport_test_config *config) {
807 test_fixture f;
808 test_stream s;
809
810 begin_test(&f, config, __FUNCTION__);
811 begin_stream(&s, &f, "/Test", "foo.google.com", 10);
812 expect_metadata(&s, 0, "grpc-status", "1");
813 expect_metadata(&s, 1, "grpc-status", "1");
814 grpc_transport_abort_stream(f.client_transport, s.client_stream,
815 GRPC_STATUS_CANCELLED);
816 end_test(&f);
817}
818
819/* Test that a request can be aborted by the server */
820static void test_can_abort_server(grpc_transport_test_config *config) {
821 test_fixture f;
822 test_stream s;
823
824 begin_test(&f, config, __FUNCTION__);
825 begin_stream(&s, &f, "/Test", "foo.google.com", 10);
826 expect_metadata(&s, 0, "grpc-status", "1");
827 expect_metadata(&s, 1, "grpc-status", "1");
828 grpc_transport_abort_stream(f.server_transport, s.server_stream,
829 GRPC_STATUS_CANCELLED);
830 end_test(&f);
831}
832
833/* Test that a request can be sent with payload */
834static void test_request_with_data(grpc_transport_test_config *config,
835 size_t message_length) {
836 test_fixture f;
837 test_stream s;
838 gpr_slice data = generate_test_data(message_length);
839 grpc_stream_op_buffer sopb;
840
841 grpc_sopb_init(&sopb);
842 begin_test(&f, config, __FUNCTION__);
843 gpr_log(GPR_INFO, "message_length = %d", message_length);
844 begin_stream(&s, &f, "/Test", "foo.google.com", 10);
845 expect_message(&s, 1, &data, NULL);
846 grpc_sopb_add_begin_message(&sopb, message_length, 0);
847 grpc_sopb_add_slice(&sopb, data);
848 grpc_transport_set_allow_window_updates(f.server_transport, s.server_stream,
849 1);
850 grpc_transport_send_batch(f.client_transport, s.client_stream, sopb.ops,
851 sopb.nops, 1);
852 sopb.nops = 0;
853 grpc_transport_send_batch(f.server_transport, s.server_stream, NULL, 0, 1);
854 end_test(&f);
855 grpc_sopb_destroy(&sopb);
856}
857
858/* Increment an integer pointed to by x - used for verifying flow control */
859static void increment_int(void *x, grpc_op_error error) { ++*(int *)x; }
860
861/* Test that flow control callbacks are made at appropriate times */
862static void test_request_with_flow_ctl_cb(grpc_transport_test_config *config,
863 size_t message_length) {
864 test_fixture f;
865 test_stream s;
866 int flow_ctl_called = 0;
867 gpr_slice data = generate_test_data(message_length);
868 grpc_stream_op_buffer sopb;
869
870 grpc_sopb_init(&sopb);
871 begin_test(&f, config, __FUNCTION__);
872 gpr_log(GPR_INFO, "length=%d", message_length);
873 begin_stream(&s, &f, "/Test", "foo.google.com", 10);
874 expect_message(&s, 1, &data, NULL);
875 grpc_sopb_add_begin_message(&sopb, message_length, 0);
876 grpc_sopb_add_slice(&sopb, data);
877 grpc_sopb_add_flow_ctl_cb(&sopb, increment_int, &flow_ctl_called);
878 grpc_transport_set_allow_window_updates(f.server_transport, s.server_stream,
879 1);
880 grpc_transport_send_batch(f.client_transport, s.client_stream, sopb.ops,
881 sopb.nops, 1);
882 sopb.nops = 0;
883 grpc_transport_send_batch(f.server_transport, s.server_stream, NULL, 0, 1);
884 end_test(&f);
885 GPR_ASSERT(flow_ctl_called == 1);
886 grpc_sopb_destroy(&sopb);
887}
888
889/* Set an event on ping response */
890static void ping_cb(void *p) { gpr_event_set(p, (void *)1); }
891
892/* Test that pinging gets a response */
893static void test_ping(grpc_transport_test_config *config) {
894 test_fixture f;
895 gpr_event ev;
896
897 begin_test(&f, config, __FUNCTION__);
898 gpr_event_init(&ev);
899
900 grpc_transport_ping(f.client_transport, ping_cb, &ev);
901 GPR_ASSERT(gpr_event_wait(&ev, deadline_from_seconds(10)));
902
903 end_test(&f);
904}
905
906/******************************************************************************
907 * Test driver
908 */
909
910static const size_t interesting_message_lengths[] = {
911 1, 100, 10000, 100000, 1000000,
912};
913
914void grpc_transport_end2end_tests(grpc_transport_test_config *config) {
915 int i;
916
917 g_metadata_context = grpc_mdctx_create();
918
919 test_no_op(config);
920 test_simple_request(config);
921 test_can_abort_client(config);
922 test_can_abort_server(config);
923 test_ping(config);
924 for (i = 0; i < GPR_ARRAY_SIZE(interesting_message_lengths); i++) {
925 test_request_with_data(config, interesting_message_lengths[i]);
926 test_request_with_flow_ctl_cb(config, interesting_message_lengths[i]);
927 }
928
929 grpc_mdctx_orphan(g_metadata_context);
930
931 gpr_log(GPR_INFO, "tests completed ok");
932}