blob: 054b8973696d4a26c0c1deed7468300177061cb5 [file] [log] [blame]
Alistair Veitch85afe712016-02-02 17:58:15 -08001/*
2 *
3 * Copyright 2015-2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/census/log.h"
35#include <grpc/support/cpu.h>
36#include <grpc/support/log.h>
37#include <grpc/support/port_platform.h>
38#include <grpc/support/sync.h>
39#include <grpc/support/thd.h>
40#include <grpc/support/time.h>
41#include <grpc/support/useful.h>
42#include <stdio.h>
43#include <stdlib.h>
44#include <string.h>
45#include "test/core/util/test_config.h"
46
47/* Change this to non-zero if you want more output. */
48#define VERBOSE 0
49
50/* Log size to use for all tests. */
51#define LOG_SIZE_IN_MB 1
52#define LOG_SIZE_IN_BYTES (LOG_SIZE_IN_MB << 20)
53
54/* Fills in 'record' of size 'size'. Each byte in record is filled in with the
55 same value. The value is extracted from 'record' pointer. */
56static void write_record(char* record, size_t size) {
57 char data = (char)((uintptr_t)record % 255);
58 memset(record, data, size);
59}
60
61/* Reads fixed size records. Returns the number of records read in
62 'num_records'. */
63static void read_records(size_t record_size, const char* buffer,
64 size_t buffer_size, int* num_records) {
65 GPR_ASSERT(buffer_size >= record_size);
66 GPR_ASSERT(buffer_size % record_size == 0);
67 *num_records = (int)(buffer_size / record_size);
68 for (int i = 0; i < *num_records; ++i) {
69 const char* record = buffer + (record_size * (size_t)i);
70 char data = (char)((uintptr_t)record % 255);
71 for (size_t j = 0; j < record_size; ++j) {
72 GPR_ASSERT(data == record[j]);
73 }
74 }
75}
76
77/* Tries to write the specified number of records. Stops when the log gets
78 full. Returns the number of records written. Spins for random
79 number of times, up to 'max_spin_count', between writes. */
80static int write_records_to_log(int writer_id, size_t record_size,
81 int num_records, int max_spin_count) {
82 int counter = 0;
83 for (int i = 0; i < num_records; ++i) {
84 int spin_count = max_spin_count ? rand() % max_spin_count : 0;
85 if (VERBOSE && (counter++ == num_records / 10)) {
86 printf(" Writer %d: %d out of %d written\n", writer_id, i, num_records);
87 counter = 0;
88 }
89 char* record = (char*)(census_log_start_write(record_size));
90 if (record == NULL) {
91 return i;
92 }
93 write_record(record, record_size);
94 census_log_end_write(record, record_size);
95 for (int j = 0; j < spin_count; ++j) {
96 GPR_ASSERT(j >= 0);
97 }
98 }
99 return num_records;
100}
101
102/* Performs a single read iteration. Returns the number of records read. */
103static int perform_read_iteration(size_t record_size) {
104 const void* read_buffer = NULL;
105 size_t bytes_available;
106 int records_read = 0;
107 census_log_init_reader();
108 while ((read_buffer = census_log_read_next(&bytes_available))) {
109 int num_records = 0;
110 read_records(record_size, (const char*)read_buffer, bytes_available,
111 &num_records);
112 records_read += num_records;
113 }
114 return records_read;
115}
116
117/* Asserts that the log is empty. */
118static void assert_log_empty(void) {
119 census_log_init_reader();
120 size_t bytes_available;
121 GPR_ASSERT(census_log_read_next(&bytes_available) == NULL);
122}
123
124/* Fills the log and verifies data. If 'no fragmentation' is true, records
125 are sized such that CENSUS_LOG_2_MAX_RECORD_SIZE is a multiple of record
126 size. If not a circular log, verifies that the number of records written
127 match the number of records read. */
128static void fill_log(size_t log_size, int no_fragmentation, int circular_log) {
129 size_t size;
130 if (no_fragmentation) {
131 int log2size = rand() % (CENSUS_LOG_2_MAX_RECORD_SIZE + 1);
132 size = ((size_t)1 << log2size);
133 } else {
134 while (1) {
135 size = 1 + ((size_t)rand() % CENSUS_LOG_MAX_RECORD_SIZE);
136 if (CENSUS_LOG_MAX_RECORD_SIZE % size) {
137 break;
138 }
139 }
140 }
141 int records_written =
142 write_records_to_log(0 /* writer id */, size,
143 (int)((log_size / size) * 2), 0 /* spin count */);
144 int records_read = perform_read_iteration(size);
145 if (!circular_log) {
146 GPR_ASSERT(records_written == records_read);
147 }
148 assert_log_empty();
149}
150
151/* Structure to pass args to writer_thread */
152typedef struct writer_thread_args {
153 /* Index of this thread in the writers vector. */
154 int index;
155 /* Record size. */
156 size_t record_size;
157 /* Number of records to write. */
158 int num_records;
159 /* Used to signal when writer is complete */
160 gpr_cv* done;
161 gpr_mu* mu;
162 int* count;
163} writer_thread_args;
164
165/* Writes the given number of records of random size (up to kMaxRecordSize) and
166 random data to the specified log. */
167static void writer_thread(void* arg) {
168 writer_thread_args* args = (writer_thread_args*)arg;
169 /* Maximum number of times to spin between writes. */
170 static const int MAX_SPIN_COUNT = 50;
171 int records_written = 0;
172 if (VERBOSE) {
173 printf(" Writer %d starting\n", args->index);
174 }
175 while (records_written < args->num_records) {
176 records_written += write_records_to_log(args->index, args->record_size,
177 args->num_records - records_written,
178 MAX_SPIN_COUNT);
179 if (records_written < args->num_records) {
180 /* Ran out of log space. Sleep for a bit and let the reader catch up.
181 This should never happen for circular logs. */
182 if (VERBOSE) {
183 printf(" Writer stalled due to out-of-space: %d out of %d written\n",
184 records_written, args->num_records);
185 }
186 gpr_sleep_until(GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10));
187 }
188 }
189 /* Done. Decrement count and signal. */
190 gpr_mu_lock(args->mu);
191 (*args->count)--;
192 gpr_cv_broadcast(args->done);
193 if (VERBOSE) {
194 printf(" Writer %d done\n", args->index);
195 }
196 gpr_mu_unlock(args->mu);
197}
198
199/* struct to pass args to reader_thread */
200typedef struct reader_thread_args {
201 /* Record size. */
202 size_t record_size;
203 /* Interval between read iterations. */
204 int read_iteration_interval_in_msec;
205 /* Total number of records. */
206 int total_records;
207 /* Signalled when reader should stop. */
208 gpr_cv stop;
209 int stop_flag;
210 /* Used to signal when reader has finished */
211 gpr_cv* done;
212 gpr_mu* mu;
213 int running;
214} reader_thread_args;
215
216/* Reads and verifies the specified number of records. Reader can also be
217 stopped via gpr_cv_signal(&args->stop). Sleeps for 'read_interval_in_msec'
218 between read iterations. */
219static void reader_thread(void* arg) {
220 reader_thread_args* args = (reader_thread_args*)arg;
221 if (VERBOSE) {
222 printf(" Reader starting\n");
223 }
224 gpr_timespec interval = gpr_time_from_micros(
225 args->read_iteration_interval_in_msec * 1000, GPR_TIMESPAN);
226 gpr_mu_lock(args->mu);
227 int records_read = 0;
228 int num_iterations = 0;
229 int counter = 0;
230 while (!args->stop_flag && records_read < args->total_records) {
231 gpr_cv_wait(&args->stop, args->mu, interval);
232 if (!args->stop_flag) {
233 records_read += perform_read_iteration(args->record_size);
234 GPR_ASSERT(records_read <= args->total_records);
235 if (VERBOSE && (counter++ == 100000)) {
236 printf(" Reader: %d out of %d read\n", records_read,
237 args->total_records);
238 counter = 0;
239 }
240 ++num_iterations;
241 }
242 }
243 /* Done */
244 args->running = 0;
245 gpr_cv_broadcast(args->done);
246 if (VERBOSE) {
247 printf(" Reader: records: %d, iterations: %d\n", records_read,
248 num_iterations);
249 }
250 gpr_mu_unlock(args->mu);
251}
252
253/* Creates NUM_WRITERS writers where each writer writes NUM_RECORDS_PER_WRITER
254 records. Also, starts a reader that iterates over and reads blocks every
255 READ_ITERATION_INTERVAL_IN_MSEC. */
256/* Number of writers. */
257#define NUM_WRITERS 5
258static void multiple_writers_single_reader(int circular_log) {
259 /* Sleep interval between read iterations. */
260 static const int READ_ITERATION_INTERVAL_IN_MSEC = 10;
261 /* Maximum record size. */
262 static const size_t MAX_RECORD_SIZE = 20;
263 /* Number of records written by each writer. This is sized such that we
264 will write through the entire log ~10 times. */
265 const int NUM_RECORDS_PER_WRITER =
266 (int)((10 * census_log_remaining_space()) / (MAX_RECORD_SIZE / 2)) /
267 NUM_WRITERS;
268 size_t record_size = ((size_t)rand() % MAX_RECORD_SIZE) + 1;
269 /* Create and start writers. */
270 writer_thread_args writers[NUM_WRITERS];
271 int writers_count = NUM_WRITERS;
272 gpr_cv writers_done;
273 gpr_mu writers_mu; /* protects writers_done and writers_count */
274 gpr_cv_init(&writers_done);
275 gpr_mu_init(&writers_mu);
276 gpr_thd_id id;
277 for (int i = 0; i < NUM_WRITERS; ++i) {
278 writers[i].index = i;
279 writers[i].record_size = record_size;
280 writers[i].num_records = NUM_RECORDS_PER_WRITER;
281 writers[i].done = &writers_done;
282 writers[i].count = &writers_count;
283 writers[i].mu = &writers_mu;
284 gpr_thd_new(&id, &writer_thread, &writers[i], NULL);
285 }
286 /* Start reader. */
287 gpr_cv reader_done;
288 gpr_mu reader_mu; /* protects reader_done and reader.running */
289 reader_thread_args reader;
290 reader.record_size = record_size;
291 reader.read_iteration_interval_in_msec = READ_ITERATION_INTERVAL_IN_MSEC;
292 reader.total_records = NUM_WRITERS * NUM_RECORDS_PER_WRITER;
293 reader.stop_flag = 0;
294 gpr_cv_init(&reader.stop);
295 gpr_cv_init(&reader_done);
296 reader.done = &reader_done;
297 gpr_mu_init(&reader_mu);
298 reader.mu = &reader_mu;
299 reader.running = 1;
300 gpr_thd_new(&id, &reader_thread, &reader, NULL);
301 /* Wait for writers to finish. */
302 gpr_mu_lock(&writers_mu);
303 while (writers_count != 0) {
304 gpr_cv_wait(&writers_done, &writers_mu, gpr_inf_future(GPR_CLOCK_REALTIME));
305 }
306 gpr_mu_unlock(&writers_mu);
307 gpr_mu_destroy(&writers_mu);
308 gpr_cv_destroy(&writers_done);
309 gpr_mu_lock(&reader_mu);
310 if (circular_log) {
311 /* Stop reader. */
312 reader.stop_flag = 1;
313 gpr_cv_signal(&reader.stop);
314 }
315 /* wait for reader to finish */
316 while (reader.running) {
317 gpr_cv_wait(&reader_done, &reader_mu, gpr_inf_future(GPR_CLOCK_REALTIME));
318 }
319 if (circular_log) {
320 /* Assert that there were no out-of-space errors. */
321 GPR_ASSERT(0 == census_log_out_of_space_count());
322 }
323 gpr_mu_unlock(&reader_mu);
324 gpr_mu_destroy(&reader_mu);
325 gpr_cv_destroy(&reader_done);
326 if (VERBOSE) {
327 printf(" Reader: finished\n");
328 }
329}
330
331static void setup_test(int circular_log) {
332 census_log_initialize(LOG_SIZE_IN_MB, circular_log);
333 GPR_ASSERT(census_log_remaining_space() == LOG_SIZE_IN_BYTES);
334}
335
336/* Attempts to create a record of invalid size (size >
337 CENSUS_LOG_MAX_RECORD_SIZE). */
338void test_invalid_record_size(void) {
339 static const size_t INVALID_SIZE = CENSUS_LOG_MAX_RECORD_SIZE + 1;
340 static const size_t VALID_SIZE = 1;
341 printf("Starting test: invalid record size\n");
342 setup_test(0);
343 void* record = census_log_start_write(INVALID_SIZE);
344 GPR_ASSERT(record == NULL);
345 /* Now try writing a valid record. */
346 record = census_log_start_write(VALID_SIZE);
347 GPR_ASSERT(record != NULL);
348 census_log_end_write(record, VALID_SIZE);
349 /* Verifies that available space went down by one block. In theory, this
350 check can fail if the thread is context switched to a new CPU during the
351 start_write execution (multiple blocks get allocated), but this has not
352 been observed in practice. */
353 GPR_ASSERT(LOG_SIZE_IN_BYTES - CENSUS_LOG_MAX_RECORD_SIZE ==
354 census_log_remaining_space());
355 census_log_shutdown();
356}
357
358/* Tests end_write() with a different size than what was specified in
359 start_write(). */
360void test_end_write_with_different_size(void) {
361 static const size_t START_WRITE_SIZE = 10;
362 static const size_t END_WRITE_SIZE = 7;
363 printf("Starting test: end write with different size\n");
364 setup_test(0);
365 void* record_written = census_log_start_write(START_WRITE_SIZE);
366 GPR_ASSERT(record_written != NULL);
367 census_log_end_write(record_written, END_WRITE_SIZE);
368 census_log_init_reader();
369 size_t bytes_available;
370 const void* record_read = census_log_read_next(&bytes_available);
371 GPR_ASSERT(record_written == record_read);
372 GPR_ASSERT(END_WRITE_SIZE == bytes_available);
373 assert_log_empty();
374 census_log_shutdown();
375}
376
377/* Verifies that pending records are not available via read_next(). */
378void test_read_pending_record(void) {
379 static const size_t PR_RECORD_SIZE = 1024;
380 printf("Starting test: read pending record\n");
381 setup_test(0);
382 /* Start a write. */
383 void* record_written = census_log_start_write(PR_RECORD_SIZE);
384 GPR_ASSERT(record_written != NULL);
385 /* As write is pending, read should fail. */
386 census_log_init_reader();
387 size_t bytes_available;
388 const void* record_read = census_log_read_next(&bytes_available);
389 GPR_ASSERT(record_read == NULL);
390 /* A read followed by end_write() should succeed. */
391 census_log_end_write(record_written, PR_RECORD_SIZE);
392 census_log_init_reader();
393 record_read = census_log_read_next(&bytes_available);
394 GPR_ASSERT(record_written == record_read);
395 GPR_ASSERT(PR_RECORD_SIZE == bytes_available);
396 assert_log_empty();
397 census_log_shutdown();
398}
399
400/* Tries reading beyond pending write. */
401void test_read_beyond_pending_record(void) {
402 printf("Starting test: read beyond pending record\n");
403 setup_test(0);
404 /* Start a write. */
405 const size_t incomplete_record_size = 10;
406 void* incomplete_record = census_log_start_write(incomplete_record_size);
407 GPR_ASSERT(incomplete_record != NULL);
408 const size_t complete_record_size = 20;
409 void* complete_record = census_log_start_write(complete_record_size);
410 GPR_ASSERT(complete_record != NULL);
411 GPR_ASSERT(complete_record != incomplete_record);
412 census_log_end_write(complete_record, complete_record_size);
413 /* Now iterate over blocks to read completed records. */
414 census_log_init_reader();
415 size_t bytes_available;
416 const void* record_read = census_log_read_next(&bytes_available);
417 GPR_ASSERT(complete_record == record_read);
418 GPR_ASSERT(complete_record_size == bytes_available);
419 /* Complete first record. */
420 census_log_end_write(incomplete_record, incomplete_record_size);
421 /* Have read past the incomplete record, so read_next() should return NULL. */
422 /* NB: this test also assumes our thread did not get switched to a different
423 CPU between the two start_write calls */
424 record_read = census_log_read_next(&bytes_available);
425 GPR_ASSERT(record_read == NULL);
426 /* Reset reader to get the newly completed record. */
427 census_log_init_reader();
428 record_read = census_log_read_next(&bytes_available);
429 GPR_ASSERT(incomplete_record == record_read);
430 GPR_ASSERT(incomplete_record_size == bytes_available);
431 assert_log_empty();
432 census_log_shutdown();
433}
434
435/* Tests scenario where block being read is detached from a core and put on the
436 dirty list. */
437void test_detached_while_reading(void) {
438 printf("Starting test: detached while reading\n");
439 setup_test(0);
440 /* Start a write. */
441 static const size_t DWR_RECORD_SIZE = 10;
442 void* record_written = census_log_start_write(DWR_RECORD_SIZE);
443 GPR_ASSERT(record_written != NULL);
444 census_log_end_write(record_written, DWR_RECORD_SIZE);
445 /* Read this record. */
446 census_log_init_reader();
447 size_t bytes_available;
448 const void* record_read = census_log_read_next(&bytes_available);
449 GPR_ASSERT(record_read != NULL);
450 GPR_ASSERT(DWR_RECORD_SIZE == bytes_available);
451 /* Now fill the log. This will move the block being read from core-local
452 array to the dirty list. */
453 while ((record_written = census_log_start_write(DWR_RECORD_SIZE))) {
454 census_log_end_write(record_written, DWR_RECORD_SIZE);
455 }
456
457 /* In this iteration, read_next() should only traverse blocks in the
458 core-local array. Therefore, we expect at most gpr_cpu_num_cores() more
459 blocks. As log is full, if read_next() is traversing the dirty list, we
460 will get more than gpr_cpu_num_cores() blocks. */
461 int block_read = 0;
462 while ((record_read = census_log_read_next(&bytes_available))) {
463 ++block_read;
464 GPR_ASSERT(block_read <= (int)gpr_cpu_num_cores());
465 }
466 census_log_shutdown();
467}
468
469/* Fills non-circular log with records sized such that size is a multiple of
470 CENSUS_LOG_MAX_RECORD_SIZE (no per-block fragmentation). */
471void test_fill_log_no_fragmentation(void) {
472 printf("Starting test: fill log no fragmentation\n");
473 const int circular = 0;
474 setup_test(circular);
475 fill_log(LOG_SIZE_IN_BYTES, 1 /* no fragmentation */, circular);
476 census_log_shutdown();
477}
478
479/* Fills circular log with records sized such that size is a multiple of
480 CENSUS_LOG_MAX_RECORD_SIZE (no per-block fragmentation). */
481void test_fill_circular_log_no_fragmentation(void) {
482 printf("Starting test: fill circular log no fragmentation\n");
483 const int circular = 1;
484 setup_test(circular);
485 fill_log(LOG_SIZE_IN_BYTES, 1 /* no fragmentation */, circular);
486 census_log_shutdown();
487}
488
489/* Fills non-circular log with records that may straddle end of a block. */
490void test_fill_log_with_straddling_records(void) {
491 printf("Starting test: fill log with straddling records\n");
492 const int circular = 0;
493 setup_test(circular);
494 fill_log(LOG_SIZE_IN_BYTES, 0 /* block straddling records */, circular);
495 census_log_shutdown();
496}
497
498/* Fills circular log with records that may straddle end of a block. */
499void test_fill_circular_log_with_straddling_records(void) {
500 printf("Starting test: fill circular log with straddling records\n");
501 const int circular = 1;
502 setup_test(circular);
503 fill_log(LOG_SIZE_IN_BYTES, 0 /* block straddling records */, circular);
504 census_log_shutdown();
505}
506
507/* Tests scenario where multiple writers and a single reader are using a log
508 that is configured to discard old records. */
509void test_multiple_writers_circular_log(void) {
510 printf("Starting test: multiple writers circular log\n");
511 const int circular = 1;
512 setup_test(circular);
513 multiple_writers_single_reader(circular);
514 census_log_shutdown();
515}
516
517/* Tests scenario where multiple writers and a single reader are using a log
518 that is configured to discard old records. */
519void test_multiple_writers(void) {
520 printf("Starting test: multiple writers\n");
521 const int circular = 0;
522 setup_test(circular);
523 multiple_writers_single_reader(circular);
524 census_log_shutdown();
525}
526
527/* Repeat the straddling records and multiple writers tests with a small log. */
528void test_small_log(void) {
529 printf("Starting test: small log\n");
530 const int circular = 0;
531 census_log_initialize(0, circular);
532 size_t log_size = census_log_remaining_space();
533 GPR_ASSERT(log_size > 0);
534 fill_log(log_size, 0, circular);
535 census_log_shutdown();
536 census_log_initialize(0, circular);
537 multiple_writers_single_reader(circular);
538 census_log_shutdown();
539}
540
541void test_performance(void) {
542 for (size_t write_size = 1; write_size < CENSUS_LOG_MAX_RECORD_SIZE;
543 write_size *= 2) {
544 setup_test(0);
545 gpr_timespec start_time = gpr_now(GPR_CLOCK_REALTIME);
546 int nrecords = 0;
547 while (1) {
548 void* record = census_log_start_write(write_size);
549 if (record == NULL) {
550 break;
551 }
552 census_log_end_write(record, write_size);
553 nrecords++;
554 }
555 gpr_timespec write_time =
556 gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), start_time);
557 double write_time_micro =
558 (double)write_time.tv_sec * 1000000 + (double)write_time.tv_nsec / 1000;
559 census_log_shutdown();
560 printf(
561 "Wrote %d %d byte records in %.3g microseconds: %g records/us "
562 "(%g ns/record), %g gigabytes/s\n",
563 nrecords, (int)write_size, write_time_micro,
564 nrecords / write_time_micro, 1000 * write_time_micro / nrecords,
565 (double)((int)write_size * nrecords) / write_time_micro / 1000);
566 }
567}
568
569int main(int argc, char** argv) {
570 grpc_test_init(argc, argv);
571 srand((unsigned)gpr_now(GPR_CLOCK_REALTIME).tv_nsec);
572 test_invalid_record_size();
573 test_end_write_with_different_size();
574 test_read_pending_record();
575 test_read_beyond_pending_record();
576 test_detached_while_reading();
577 test_fill_log_no_fragmentation();
578 test_fill_circular_log_no_fragmentation();
579 test_fill_log_with_straddling_records();
580 test_fill_circular_log_with_straddling_records();
581 test_small_log();
582 test_multiple_writers();
583 test_multiple_writers_circular_log();
584 return 0;
585}